-
Notifications
You must be signed in to change notification settings - Fork 7
Open
Labels
Description
Hi,
I have this function:
pub fn calc_partial_transpose(
snps: &mut Vec<f64>,
partial_matrix: &mut Vec<f64>,
ids_num: usize,
) -> () {
extern crate cblas;
extern crate openblas_src;
let n = ids_num;
let k = snps.len() / n;
println!(
"N = {:?}\nK = {:?}\nRES_MATR_SIZE = {}\nSNPS_SIZE = {}",
n,
k,
partial_matrix.len(),
snps.len()
);
// Only the upper triangular part of matrix will be calculated.
unsafe {
cblas::dsyrk(
cblas::Layout::RowMajor,
cblas::Part::Upper,
cblas::Transpose::Ordinary,
n as i32,
k as i32,
1.0,
&snps[..],
k as i32,
0.0,
&mut partial_matrix[..],
n as i32,
);
}
}On run it prints:
N = 17
K = 1000
RES_MATR_SIZE = 289
SNPS_SIZE = 17000
And fails with (signal: 11, SIGSEGV: invalid memory reference).
It is called in parallel from different threads created by this function:
pub fn calc_transpose(&mut self, batch_size: usize) -> std::io::Result<Vec<f64>> {
if batch_size < 1 {
panic!("Batch size can't be less than 1.");
}
let ids_num = self.width_of_matrix;
let common_matrix: Arc<Mutex<Vec<f64>>> =
Arc::new(Mutex::new(vec![0.0; ids_num * ids_num]));
let buf_size = ids_num * batch_size;
// For each physical thread a buffer will be created.
let buf_num = num_cpus::get();
use std::sync::{Arc, Mutex};
let mut read_bufs = Vec::<Arc<Mutex<Vec<f64>>>>::new();
let mut part_res_bufs = Vec::<Arc<Mutex<Vec<f64>>>>::new();
for _ in 0..buf_num {
read_bufs.push(Arc::new(Mutex::new(vec![0.0; buf_size])));
part_res_bufs.push(Arc::new(Mutex::new(vec![0.0; ids_num * ids_num])));
}
use std::sync::mpsc::channel;
use std::thread;
let (processor, buffer_filler) = channel::<usize>();
// Fill the concurrent queue with the buffers numbers, so the buffer
// filler can start parsing into them.
for buf_idx in 0..buf_num {
processor.send(buf_idx).unwrap();
}
let mut threads = Vec::<thread::JoinHandle<()>>::new();
let file_reader = self.file_reader.get_mut();
let mut line_iter = BufReader::new(file_reader).lines();
let mut total_snps_read: usize = 0;
loop {
let freed_buffer_idx = buffer_filler.recv().unwrap();
let read_buf = read_bufs[freed_buffer_idx].clone();
let part_buf = part_res_bufs[freed_buffer_idx].clone();
let read_line_amount = match Self::fill_buffer(
&mut *read_buf.lock().unwrap(),
&mut line_iter,
self.markers.len(),
self.hab_mapper.clone(),
)? {
0 => break,
n => {
total_snps_read += n;
n
}
};
// Reached EOF (there is not enough records to form a full batch.)
if read_line_amount < batch_size {
let buf = &mut *read_buf.lock().unwrap();
buf.resize(read_line_amount * ids_num, 0.0);
}
// CBlas demands first dimension of snps matrix (ids_num) to be no less
// than a row count, thus it is increased in cases when the read batch
// happens to be smaller than it (ids_num is 20, but only 19 records
// were left to read from file(last batch)).
if read_line_amount < ids_num {
let buf = &mut *read_buf.lock().unwrap();
buf.resize(ids_num * ids_num, 0.0);
}
{
let (read_buf_arc, part_buf_arc, res_matrix_arc, proc_sender, buf_idx) = (
read_buf.clone(),
part_buf.clone(),
common_matrix.clone(),
processor.clone(),
freed_buffer_idx.clone(),
);
threads.push(std::thread::spawn(move || {
let mut threads_read_buf = read_buf_arc.lock().unwrap();
let mut threads_part_buf = part_buf_arc.lock().unwrap();
calc_partial_transpose(&mut threads_read_buf, &mut threads_part_buf, ids_num);
let mut res_matrix = res_matrix_arc.lock().unwrap();
for (buf_elem, common_matrix_elem) in
threads_part_buf.iter_mut().zip(res_matrix.iter_mut())
{
*common_matrix_elem += *buf_elem;
}
proc_sender.send(buf_idx).unwrap();
}));
}
}
threads.into_iter().for_each(|thread| {
thread
.join()
.expect("The thread creating or execution failed !")
});
self.file_reader.seek(SeekFrom::Start(self.snp_pos_start))?;
let mut res = Arc::try_unwrap(common_matrix)
.expect("Arc uwrapping failed.")
.into_inner()
.expect("Mutex uwrapping failed.");
// Mirror matrix, since only the upper part was calculated.
for i in 0..ids_num {
let row_length = ids_num;
for j in 0..i {
res[i * row_length + j] = res[j * row_length + i];
}
}
Ok(res)
}Reactions are currently unavailable