Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ maintenance = { status = "actively-developed" }

[dependencies]
qcow2-rs = "0.1"
libublk = "0.4"
libublk = "0.4.1"
clap = { version = "4.3", features = ["derive"] }
clap_derive = "4.3"
libc = "0.2"
Expand Down
15 changes: 13 additions & 2 deletions src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ pub(crate) struct GenAddArgs {
#[clap(long, short = 'z', default_value_t = false)]
pub zero_copy: bool,

/// Use multi-cpus affinity instead of single CPU affinity (default is single CPU)
#[clap(long, default_value_t = false)]
pub multi_cpus_affinity: bool,

/// Used to resolve relative paths for backing files.
/// `RefCell` is used to allow deferred initialization of this field
/// from an immutable `GenAddArgs` reference, which is necessary
Expand Down Expand Up @@ -155,7 +159,7 @@ impl GenAddArgs {
}

if self.zero_copy {
if name != "loop" {
if name != "loop" && name != "null" {
anyhow::bail!("Target {} doesn't support zero copy", name);
}
ctrl_flags |=
Expand Down Expand Up @@ -199,14 +203,21 @@ impl GenAddArgs {
anyhow::bail!("invalid io buf size {}", buf_size);
}

// Apply single CPU affinity by default unless multi_cpus_affinity is enabled
let final_dev_flags = if self.multi_cpus_affinity {
dev_flags
} else {
dev_flags | UblkFlags::UBLK_DEV_F_SINGLE_CPU_AFFINITY
};

Ok(libublk::ctrl::UblkCtrlBuilder::default()
.name(name)
.depth(self.depth.try_into()?)
.nr_queues(self.queue.try_into()?)
.id(self.number)
.ctrl_flags(ctrl_flags.into())
.ctrl_target_flags(gen_flags)
.dev_flags(dev_flags)
.dev_flags(final_dev_flags)
.io_buf_bytes(buf_size as u32)
.build()?)
}
Expand Down
54 changes: 51 additions & 3 deletions src/loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,32 @@ fn lo_init_tgt(dev: &mut UblkDev, lo: &LoopTgt, opt: Option<LoopArgs>) -> Result
Ok(())
}

fn lo_handle_io_cmd_sync_zc(q: &UblkQueue<'_>, tag: u16, i: &UblkIOCtx, auto_buf_reg: &libublk::sys::ublk_auto_buf_reg) {
let iod = q.get_iod(tag);
let op = iod.op_flags & 0xff;
let data = UblkIOCtx::build_user_data(tag, op, 0, true);
if i.is_tgt_io() {
let user_data = i.user_data();
let res = i.result();
let cqe_tag = UblkIOCtx::user_data_to_tag(user_data);

assert!(cqe_tag == tag as u32);

if res != -(libc::EAGAIN) {
q.complete_io_cmd_with_auto_buf_reg(tag, auto_buf_reg, Ok(UblkIORes::Result(res)));
return;
}
}

let res = __lo_prep_submit_io_cmd(iod);
if res < 0 {
q.complete_io_cmd_with_auto_buf_reg(tag, auto_buf_reg, Ok(UblkIORes::Result(res)));
} else {
let sqe = __lo_make_io_sqe_zc(iod, tag).user_data(data);
q.ublk_submit_sqe_sync(sqe).unwrap();
}
}

fn lo_handle_io_cmd_sync(q: &UblkQueue<'_>, tag: u16, i: &UblkIOCtx, buf_addr: *mut u8) {
let iod = q.get_iod(tag);
let op = iod.op_flags & 0xff;
Expand All @@ -251,6 +277,28 @@ fn lo_handle_io_cmd_sync(q: &UblkQueue<'_>, tag: u16, i: &UblkIOCtx, buf_addr: *
}
}

fn q_zc_fn(qid: u16, dev: &UblkDev) {
let auto_buf_reg_list_rc = Rc::new(
(0..dev.dev_info.queue_depth)
.map(|tag| libublk::sys::ublk_auto_buf_reg {
index: tag,
flags: libublk::sys::UBLK_AUTO_BUF_REG_FALLBACK as u8,
..Default::default()
})
.collect::<Vec<_>>(),
);

let auto_buf_reg_list = auto_buf_reg_list_rc.clone();
let lo_io_handler = move |q: &UblkQueue, tag: u16, io: &UblkIOCtx| {
lo_handle_io_cmd_sync_zc(q, tag, io, &auto_buf_reg_list[tag as usize]);
};

UblkQueue::new(qid, dev)
.unwrap()
.submit_fetch_commands_with_auto_buf_reg(&auto_buf_reg_list_rc)
.wait_and_handle_io(lo_io_handler);
}

fn q_fn(qid: u16, dev: &UblkDev) {
let bufs_rc = Rc::new(dev.alloc_queue_io_bufs());
let bufs = bufs_rc.clone();
Expand Down Expand Up @@ -379,16 +427,16 @@ pub(crate) fn ublk_add_loop(
return Err(anyhow::anyhow!("loop doesn't support user copy"));
}

if ((ctrl.dev_info().flags & (libublk::sys::UBLK_F_AUTO_BUF_REG as u64)) != 0) && !aa {
return Err(anyhow::anyhow!("loop zero copy requires --async-wait"));
}

let flags = ctrl.dev_info().flags;
let comm = comm_rc.clone();
ctrl.run_target(
|dev: &mut UblkDev| lo_init_tgt(dev, &lo, opt),
move |qid, dev: &_| {
if lo.json.async_await {
q_a_fn(qid, dev)
} else if (flags & libublk::sys::UBLK_F_AUTO_BUF_REG as u64) != 0 {
q_zc_fn(qid, dev)
} else {
q_fn(qid, dev)
}
Expand Down
123 changes: 91 additions & 32 deletions src/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,33 @@ fn handle_io_cmd(q: &UblkQueue, tag: u16, buf_addr: *mut u8) {
q.complete_io_cmd(tag, buf_addr, Ok(UblkIORes::Result(bytes)));
}

fn q_sync_zc_fn(qid: u16, dev: &UblkDev) {
let auto_buf_reg_list_rc = Rc::new(
(0..dev.dev_info.queue_depth)
.map(|tag| libublk::sys::ublk_auto_buf_reg {
index: tag,
flags: libublk::sys::UBLK_AUTO_BUF_REG_FALLBACK as u8,
..Default::default()
})
.collect::<Vec<_>>(),
);

let auto_buf_reg_list = auto_buf_reg_list_rc.clone();
let io_handler = move |q: &UblkQueue, tag: u16, _io: &UblkIOCtx| {
let bytes = get_io_cmd_result(q, tag);
q.complete_io_cmd_with_auto_buf_reg(
tag,
&auto_buf_reg_list[tag as usize],
Ok(UblkIORes::Result(bytes)),
);
};

UblkQueue::new(qid, dev)
.unwrap()
.submit_fetch_commands_with_auto_buf_reg(&auto_buf_reg_list_rc)
.wait_and_handle_io(io_handler);
}

fn q_sync_fn(qid: u16, dev: &UblkDev, user_copy: bool) {
let bufs_rc = Rc::new(dev.alloc_queue_io_bufs());
let bufs = bufs_rc.clone();
Expand All @@ -59,37 +86,65 @@ fn q_sync_fn(qid: u16, dev: &UblkDev, user_copy: bool) {
.wait_and_handle_io(io_handler);
}

async fn handle_queue_tag_async_null_zc(q: Rc<UblkQueue<'_>>, tag: u16) {
let mut cmd_op = libublk::sys::UBLK_U_IO_FETCH_REQ;
let mut res = 0;
let auto_buf_reg = libublk::sys::ublk_auto_buf_reg {
index: tag,
flags: libublk::sys::UBLK_AUTO_BUF_REG_FALLBACK as u8,
..Default::default()
};

loop {
let cmd_res = q
.submit_io_cmd_with_auto_buf_reg(tag, cmd_op, &auto_buf_reg, res)
.await;
if cmd_res == libublk::sys::UBLK_IO_RES_ABORT {
break;
}

res = get_io_cmd_result(&q, tag);
cmd_op = libublk::sys::UBLK_U_IO_COMMIT_AND_FETCH_REQ;
}
}

async fn handle_queue_tag_async_null(q: Rc<UblkQueue<'_>>, tag: u16, user_copy: bool) {
let mut cmd_op = libublk::sys::UBLK_U_IO_FETCH_REQ;
let mut res = 0;
let (_buf, buf_addr) = if user_copy {
(None, std::ptr::null_mut())
} else {
let buf = IoBuf::<u8>::new(q.dev.dev_info.max_io_buf_bytes as usize);

q.register_io_buf(tag, &buf);
let addr = buf.as_mut_ptr();
(Some(buf), addr)
};

loop {
let cmd_res = q.submit_io_cmd(tag, cmd_op, buf_addr, res).await;
if cmd_res == libublk::sys::UBLK_IO_RES_ABORT {
break;
}

res = get_io_cmd_result(&q, tag);
cmd_op = libublk::sys::UBLK_U_IO_COMMIT_AND_FETCH_REQ;
}
}

fn q_async_fn(qid: u16, dev: &UblkDev, user_copy: bool) {
let depth = dev.dev_info.queue_depth;
let q_rc = Rc::new(UblkQueue::new(qid, dev).unwrap());
let exe = smol::LocalExecutor::new();
let mut f_vec = Vec::new();

for tag in 0..dev.dev_info.queue_depth {
for tag in 0..depth {
let q = q_rc.clone();

f_vec.push(exe.spawn(async move {
let mut cmd_op = libublk::sys::UBLK_U_IO_FETCH_REQ;
let mut res = 0;
let (_buf, buf_addr) = if user_copy {
(None, std::ptr::null_mut())
} else {
let buf = IoBuf::<u8>::new(q.dev.dev_info.max_io_buf_bytes as usize);

q.register_io_buf(tag, &buf);
let addr = buf.as_mut_ptr();
(Some(buf), addr)
};

loop {
let cmd_res = q.submit_io_cmd(tag, cmd_op, buf_addr, res).await;
if cmd_res == libublk::sys::UBLK_IO_RES_ABORT {
break;
}

res = get_io_cmd_result(&q, tag);
cmd_op = libublk::sys::UBLK_U_IO_COMMIT_AND_FETCH_REQ;
}
}));
if q.support_auto_buf_zc() {
f_vec.push(exe.spawn(handle_queue_tag_async_null_zc(q, tag)));
} else {
f_vec.push(exe.spawn(handle_queue_tag_async_null(q, tag, user_copy)));
}
}
ublk_wait_and_handle_ios(&exe, &q_rc);
smol::block_on(async { futures::future::join_all(f_vec).await });
Expand All @@ -108,6 +163,12 @@ pub(crate) fn ublk_add_null(
return Err(anyhow::anyhow!("null doesn't support unprivileged"));
}

let aa = if let Some(ref o) = opt {
o.async_await
} else {
false
};

let tgt_init = |dev: &mut UblkDev| {
dev.set_default_params(size);
let p = &mut dev.tgt.params;
Expand All @@ -124,17 +185,15 @@ pub(crate) fn ublk_add_null(
Ok(())
};

let aa = if let Some(ref o) = opt {
o.async_await
} else {
false
};

let q_handler = move |qid, dev: &_| {
if aa {
q_async_fn(qid, dev, user_copy)
} else {
q_sync_fn(qid, dev, user_copy)
if (flags & libublk::sys::UBLK_F_AUTO_BUF_REG as u64) != 0 {
q_sync_zc_fn(qid, dev)
} else {
q_sync_fn(qid, dev, user_copy)
}
}
};

Expand Down
8 changes: 8 additions & 0 deletions src/qcow2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,14 @@ fn ublk_qcow2_start<'a, T: Qcow2IoOps + 'a>(
let task = exe.spawn(async move { tgt.qdev.qcow2_prep_io().await.unwrap() });
ublk_run_io_task(exe, &task, q, 1)?;

//setup single cpu affinity
if dev_clone
.flags
.intersects(libublk::UblkFlags::UBLK_DEV_F_SINGLE_CPU_AFFINITY)
{
ctrl_clone.set_queue_single_affinity(0, None)?;
}

// Start device in one dedicated io task
let task = exe.spawn(async move {
let r = ctrl_clone.configure_queue(&dev_clone, 0, unsafe { libc::gettid() });
Expand Down
3 changes: 3 additions & 0 deletions tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,9 @@ mod integration {
__test_ublk_add_del_loop(4096, true, false, true, |ctrl, _bs, _file_size, _path| {
ext4_format_and_mount(ctrl);
});
__test_ublk_add_del_loop(4096, false, false, true, |ctrl, _bs, _file_size, _path| {
ext4_format_and_mount(ctrl);
});
}

#[test]
Expand Down