Skip to content

signal: 11, SIGSEGV: invalid memory reference #3

@NickRoz1

Description

@NickRoz1

Hi,

I have this function:

pub fn calc_partial_transpose(
    snps: &mut Vec<f64>,
    partial_matrix: &mut Vec<f64>,
    ids_num: usize,
  ) -> () {
    extern crate cblas;
    extern crate openblas_src;

    let n = ids_num;
    let k = snps.len() / n;
    println!(
      "N = {:?}\nK = {:?}\nRES_MATR_SIZE = {}\nSNPS_SIZE = {}",
      n,
      k,
      partial_matrix.len(),
      snps.len()
    );
    // Only the upper triangular part of matrix will be calculated.
    unsafe {
      cblas::dsyrk(
        cblas::Layout::RowMajor,
        cblas::Part::Upper,
        cblas::Transpose::Ordinary,
        n as i32,
        k as i32,
        1.0,
        &snps[..],
        k as i32,
        0.0,
        &mut partial_matrix[..],
        n as i32,
      );
    }
  }

On run it prints:

N = 17
K = 1000
RES_MATR_SIZE = 289
SNPS_SIZE = 17000

And fails with (signal: 11, SIGSEGV: invalid memory reference).

It is called in parallel from different threads created by this function:

pub fn calc_transpose(&mut self, batch_size: usize) -> std::io::Result<Vec<f64>> {
  if batch_size < 1 {
    panic!("Batch size can't be less than 1.");
  }
  let ids_num = self.width_of_matrix;

  let common_matrix: Arc<Mutex<Vec<f64>>> =
    Arc::new(Mutex::new(vec![0.0; ids_num * ids_num]));

  let buf_size = ids_num * batch_size;
  // For each physical thread a buffer will be created.
  let buf_num = num_cpus::get();
  use std::sync::{Arc, Mutex};

  let mut read_bufs = Vec::<Arc<Mutex<Vec<f64>>>>::new();
  let mut part_res_bufs = Vec::<Arc<Mutex<Vec<f64>>>>::new();
  for _ in 0..buf_num {
    read_bufs.push(Arc::new(Mutex::new(vec![0.0; buf_size])));
    part_res_bufs.push(Arc::new(Mutex::new(vec![0.0; ids_num * ids_num])));
  }

  use std::sync::mpsc::channel;
  use std::thread;
  let (processor, buffer_filler) = channel::<usize>();
  // Fill the concurrent queue with the buffers numbers, so the buffer
  // filler can start parsing into them.
  for buf_idx in 0..buf_num {
    processor.send(buf_idx).unwrap();
  }
  let mut threads = Vec::<thread::JoinHandle<()>>::new();
  let file_reader = self.file_reader.get_mut();
  let mut line_iter = BufReader::new(file_reader).lines();
  let mut total_snps_read: usize = 0;
  loop {
    let freed_buffer_idx = buffer_filler.recv().unwrap();
    let read_buf = read_bufs[freed_buffer_idx].clone();
    let part_buf = part_res_bufs[freed_buffer_idx].clone();

    let read_line_amount = match Self::fill_buffer(
      &mut *read_buf.lock().unwrap(),
      &mut line_iter,
      self.markers.len(),
      self.hab_mapper.clone(),
    )? {
      0 => break,
      n => {
        total_snps_read += n;
        n
      }
    };

    // Reached EOF (there is not enough records to form a full batch.)
    if read_line_amount < batch_size {
      let buf = &mut *read_buf.lock().unwrap();
      buf.resize(read_line_amount * ids_num, 0.0);
    }
    // CBlas demands first dimension of snps matrix (ids_num) to be no less
    // than a row count, thus it is increased in cases when the read batch
    // happens to be smaller than it (ids_num is 20, but only 19 records
    // were left to read from file(last batch)).
    if read_line_amount < ids_num {
      let buf = &mut *read_buf.lock().unwrap();
      buf.resize(ids_num * ids_num, 0.0);
    }
    {
      let (read_buf_arc, part_buf_arc, res_matrix_arc, proc_sender, buf_idx) = (
        read_buf.clone(),
        part_buf.clone(),
        common_matrix.clone(),
        processor.clone(),
        freed_buffer_idx.clone(),
      );
      threads.push(std::thread::spawn(move || {
        let mut threads_read_buf = read_buf_arc.lock().unwrap();
        let mut threads_part_buf = part_buf_arc.lock().unwrap();
        calc_partial_transpose(&mut threads_read_buf, &mut threads_part_buf, ids_num);
        let mut res_matrix = res_matrix_arc.lock().unwrap();
        for (buf_elem, common_matrix_elem) in
          threads_part_buf.iter_mut().zip(res_matrix.iter_mut())
        {
          *common_matrix_elem += *buf_elem;
        }
        proc_sender.send(buf_idx).unwrap();
      }));
    }
  }

  threads.into_iter().for_each(|thread| {
    thread
      .join()
      .expect("The thread creating or execution failed !")
  });

  self.file_reader.seek(SeekFrom::Start(self.snp_pos_start))?;

  let mut res = Arc::try_unwrap(common_matrix)
    .expect("Arc uwrapping failed.")
    .into_inner()
    .expect("Mutex uwrapping failed.");

  // Mirror matrix, since only the upper part was calculated.
  for i in 0..ids_num {
    let row_length = ids_num;
    for j in 0..i {
      res[i * row_length + j] = res[j * row_length + i];
    }
  }
  Ok(res)
}

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions