From 3fa16d266193f029eb3231a49ecb0e95bcdfefef Mon Sep 17 00:00:00 2001 From: Ming Lei Date: Thu, 11 Sep 2025 07:49:34 +0000 Subject: [PATCH 01/19] refactor: remove OnceCell wrapper from CTRL_URING Remove redundant OnceCell wrapper from thread-local CTRL_URING since thread_local! is already lazy. Simplify the structure while keeping the ublk_init_ctrl_task_ring() API for backward compatibility. Changes: - Change CTRL_URING from OnceCell> to RefCell> - Update with_ctrl_ring_internal and with_ctrl_ring_mut_internal macros - Preserve ublk_init_ctrl_task_ring() function with updated signature - Update documentation examples to reflect simplified API Signed-off-by: Ming Lei --- src/ctrl.rs | 62 ++++++++++++++++++++++++++--------------------------- 1 file changed, 30 insertions(+), 32 deletions(-) diff --git a/src/ctrl.rs b/src/ctrl.rs index a28f2f0..de30eb7 100644 --- a/src/ctrl.rs +++ b/src/ctrl.rs @@ -6,7 +6,7 @@ use derive_setters::*; use io_uring::{opcode, squeue, types, IoUring}; use log::{error, trace}; use serde::Deserialize; -use std::cell::{OnceCell, RefCell}; +use std::cell::RefCell; use std::os::unix::io::AsRawFd; use std::sync::{Arc, RwLock}; use std::{ @@ -19,11 +19,11 @@ const CTRL_PATH: &str = "/dev/ublk-control"; const MAX_BUF_SZ: u32 = 32_u32 << 20; -// per-thread control uring using OnceCell for conditional initialization +// per-thread control uring - thread_local! is already lazy // std::thread_local! { - pub(crate) static CTRL_URING: OnceCell>> = - OnceCell::new(); + pub(crate) static CTRL_URING: RefCell>> = + RefCell::new(None); } // Internal macro versions for backwards compatibility within the crate @@ -31,9 +31,9 @@ std::thread_local! { macro_rules! with_ctrl_ring_internal { ($closure:expr) => { $crate::ctrl::CTRL_URING.with(|cell| { - if let Some(ring_cell) = cell.get() { - let ring = ring_cell.borrow(); - $closure(&*ring) + let ring_ref = cell.borrow(); + if let Some(ref ring) = *ring_ref { + $closure(ring) } else { panic!("Control ring not initialized. Call ublk_init_ctrl_task_ring() first or use UblkCtrl constructor.") } @@ -45,9 +45,9 @@ macro_rules! with_ctrl_ring_internal { macro_rules! with_ctrl_ring_mut_internal { ($closure:expr) => { $crate::ctrl::CTRL_URING.with(|cell| { - if let Some(ring_cell) = cell.get() { - let mut ring = ring_cell.borrow_mut(); - $closure(&mut *ring) + let mut ring_ref = cell.borrow_mut(); + if let Some(ref mut ring) = *ring_ref { + $closure(ring) } else { panic!("Control ring not initialized. Call ublk_init_ctrl_task_ring() first or use UblkCtrl constructor.") } @@ -62,12 +62,13 @@ pub(crate) use with_ctrl_ring_mut_internal; /// Initialize the thread-local control ring using a custom closure /// /// This API allows users to customize the io_uring initialization for control operations. -/// The closure receives the OnceCell and can conditionally initialize it if not already set. -/// If the thread-local variable is already initialized, the closure does nothing. +/// The closure receives a mutable reference to the Option and can conditionally initialize +/// it if not already set. If the thread-local variable is already initialized, the closure +/// does nothing. /// /// # Arguments -/// * `init_fn` - Closure that receives OnceCell>> and returns -/// Result<(), UblkError>. Should call `cell.set()` to initialize if needed. +/// * `init_fn` - Closure that receives &mut Option> and returns +/// Result<(), UblkError>. Should set the Option to Some(ring) to initialize. /// /// # Examples /// @@ -75,18 +76,16 @@ pub(crate) use with_ctrl_ring_mut_internal; /// ```no_run /// use libublk::ublk_init_ctrl_task_ring; /// use io_uring::IoUring; -/// use std::cell::RefCell; /// /// fn example() -> Result<(), Box> { /// // Custom initialization before creating UblkCtrl -/// ublk_init_ctrl_task_ring(|cell| { -/// if cell.get().is_none() { +/// ublk_init_ctrl_task_ring(|ring_opt| { +/// if ring_opt.is_none() { /// let ring = IoUring::builder() /// .setup_cqsize(256) // Custom completion queue size /// .setup_coop_taskrun() // Enable cooperative task running /// .build(128)?; // Custom submission queue size -/// cell.set(RefCell::new(ring)) -/// .map_err(|_| libublk::UblkError::OtherError(-libc::EEXIST))?; +/// *ring_opt = Some(ring); /// } /// Ok(()) /// })?; @@ -101,18 +100,16 @@ pub(crate) use with_ctrl_ring_mut_internal; /// ```no_run /// use libublk::ublk_init_ctrl_task_ring; /// use io_uring::IoUring; -/// use std::cell::RefCell; /// /// fn advanced_example() -> Result<(), Box> { -/// ublk_init_ctrl_task_ring(|cell| { -/// if cell.get().is_none() { +/// ublk_init_ctrl_task_ring(|ring_opt| { +/// if ring_opt.is_none() { /// let ring = IoUring::builder() /// .setup_cqsize(512) /// .setup_sqpoll(1000) // Enable SQPOLL mode /// .setup_iopoll() // Enable IOPOLL for high performance /// .build(256)?; -/// cell.set(RefCell::new(ring)) -/// .map_err(|_| libublk::UblkError::OtherError(-libc::EEXIST))?; +/// *ring_opt = Some(ring); /// } /// Ok(()) /// })?; @@ -122,24 +119,25 @@ pub(crate) use with_ctrl_ring_mut_internal; /// ``` pub fn ublk_init_ctrl_task_ring(init_fn: F) -> Result<(), UblkError> where - F: FnOnce(&OnceCell>>) -> Result<(), UblkError>, + F: FnOnce(&mut Option>) -> Result<(), UblkError>, { - CTRL_URING.with(|cell| init_fn(cell)) + CTRL_URING.with(|cell| { + let mut ring_ref = cell.borrow_mut(); + init_fn(&mut *ring_ref) + }) } /// Internal function to initialize the control ring with default parameters /// /// This is called by UblkCtrlInner::new()/new_async() when the ring hasn't been -/// initialized yet. Uses default values similar to the original LazyCell approach. +/// initialized yet. Uses default values similar to the original approach. fn init_ctrl_task_ring_default(depth: u32) -> Result<(), UblkError> { - ublk_init_ctrl_task_ring(|cell| { - if cell.get().is_none() { + ublk_init_ctrl_task_ring(|ring_opt| { + if ring_opt.is_none() { let ring = IoUring::::builder() .build(depth) .map_err(UblkError::IOError)?; - - cell.set(RefCell::new(ring)) - .map_err(|_| UblkError::OtherError(-libc::EEXIST))?; + *ring_opt = Some(ring); } Ok(()) }) From 00c144fbdb9e966dc2978982d0b327c6902e029b Mon Sep 17 00:00:00 2001 From: Ming Lei Date: Sat, 6 Sep 2025 00:29:51 +0000 Subject: [PATCH 02/19] docs: consolidate and streamline io.rs documentation Implement Phase 2 of io.rs cleanup by consolidating documentation to reduce redundancy and improve maintainability. Changes: - Add comprehensive module-level documentation with complete examples for ring initialization (basic/advanced) and unified buffer APIs - Simplify individual method documentation by removing redundant examples and referencing module-level docs instead: * ublk_init_task_ring(): Remove 48 lines of duplicate examples * submit_io_cmd_unified(): Remove verbose usage examples * submit_fetch_commands_unified(): Remove redundant buffer examples * complete_io_cmd_unified(): Streamline validation/performance docs - Clean up outdated comments: * Remove historical "Previously had separate TASK_URING" comment * Remove outdated "todo: apply io_uring flags" comment Benefits: - Single source of truth for documentation examples - Better discoverability with comprehensive module-level docs - Reduced maintenance burden from duplicate examples - Improved readability with focused method documentation - All functionality preserved with better organization The documentation now follows Rust best practices with comprehensive module docs and concise method docs that reference the examples. Signed-off-by: Ming Lei --- src/io.rs | 241 ++++++++++++++++++++++++++++-------------------------- 1 file changed, 126 insertions(+), 115 deletions(-) diff --git a/src/io.rs b/src/io.rs index f7f3157..0e3f4d7 100644 --- a/src/io.rs +++ b/src/io.rs @@ -1,3 +1,126 @@ +//! # Ublk I/O Operations Module +//! +//! This module provides the core I/O functionality for ublk devices, including queue management, +//! buffer handling, and unified APIs for both traditional copy-based and zero-copy operations. +//! +//! ## Key Components +//! +//! - **Queue Management**: `UblkQueue` provides per-queue I/O handling with io_uring integration +//! - **Buffer Descriptors**: `BufDesc` and `BufDescList` provide unified buffer management +//! - **Device Abstraction**: `UblkDev` represents ublk device instances +//! - **I/O Context**: `UblkIOCtx` provides context for handling I/O operations +//! +//! ## Ring Initialization Examples +//! +//! ### Basic Custom Initialization +//! ```no_run +//! use libublk::ublk_init_task_ring; +//! use io_uring::IoUring; +//! use std::cell::RefCell; +//! +//! fn example() -> Result<(), Box> { +//! // Custom initialization before creating UblkQueue +//! ublk_init_task_ring(|cell| { +//! if cell.get().is_none() { +//! let ring = IoUring::builder() +//! .setup_cqsize(256) // Custom completion queue size +//! .setup_coop_taskrun() // Enable cooperative task running +//! .build(128)?; // Custom submission queue size +//! cell.set(RefCell::new(ring)) +//! .map_err(|_| libublk::UblkError::OtherError(-libc::EEXIST))?; +//! } +//! Ok(()) +//! })?; +//! +//! // Now create UblkQueue - it will use the pre-initialized ring +//! println!("Ring initialized! Create UblkQueue to use it."); +//! Ok(()) +//! } +//! ``` +//! +//! ### Advanced Initialization with Custom Flags +//! ```no_run +//! use libublk::ublk_init_task_ring; +//! use io_uring::IoUring; +//! use std::cell::RefCell; +//! +//! fn advanced_example() -> Result<(), Box> { +//! ublk_init_task_ring(|cell| { +//! if cell.get().is_none() { +//! let ring = IoUring::builder() +//! .setup_cqsize(512) +//! .setup_sqpoll(1000) // Enable SQPOLL mode +//! .setup_iopoll() // Enable IOPOLL for high performance +//! .build(256)?; +//! cell.set(RefCell::new(ring)) +//! .map_err(|_| libublk::UblkError::OtherError(-libc::EEXIST))?; +//! } +//! Ok(()) +//! })?; +//! println!("Advanced ring initialized!"); +//! Ok(()) +//! } +//! ``` +//! +//! ## Unified Buffer API Examples +//! +//! ### Traditional Buffer Operations +//! ```no_run +//! use libublk::io::{BufDesc, UblkQueue}; +//! use libublk::sys; +//! +//! fn example(queue: &UblkQueue) -> Result<(), libublk::UblkError> { +//! let buffer = [0u8; 4096]; +//! let slice_desc = BufDesc::Slice(&buffer); +//! let future = queue.submit_io_cmd_unified(0, sys::UBLK_U_IO_FETCH_REQ, slice_desc, -1)?; +//! // ... handle future +//! Ok(()) +//! } +//! ``` +//! +//! ### Zero-Copy Operations +//! ```no_run +//! use libublk::io::{BufDesc, UblkQueue}; +//! use libublk::sys; +//! +//! fn example(queue: &UblkQueue) -> Result<(), libublk::UblkError> { +//! let auto_reg = sys::ublk_auto_buf_reg { +//! index: 0, flags: 0, reserved0: 0, reserved1: 0 +//! }; +//! let auto_desc = BufDesc::AutoReg(auto_reg); +//! let future = queue.submit_io_cmd_unified(1, sys::UBLK_U_IO_FETCH_REQ, auto_desc, -1)?; +//! // ... handle future +//! Ok(()) +//! } +//! ``` +//! +//! ### Buffer List Operations +//! ```no_run +//! use libublk::io::{BufDescList, UblkQueue}; +//! use libublk::helpers::IoBuf; +//! use libublk::sys; +//! +//! fn example(queue: UblkQueue) -> Result { +//! // For traditional buffer operations +//! let mut bufs = Vec::new(); +//! for _ in 0..queue.get_depth() { +//! bufs.push(IoBuf::::new(4096)); +//! } +//! let slice_list = BufDescList::Slices(Some(&bufs)); +//! let queue = queue.submit_fetch_commands_unified(slice_list)?; +//! +//! // For zero-copy operations +//! let auto_regs: Vec = (0..queue.get_depth()) +//! .map(|i| sys::ublk_auto_buf_reg { +//! index: i as u16, flags: 0, reserved0: 0, reserved1: 0, +//! }) +//! .collect(); +//! let auto_list = BufDescList::AutoRegs(&auto_regs); +//! let queue = queue.submit_fetch_commands_unified(auto_list)?; +//! Ok(queue) +//! } +//! ``` + use super::uring_async::UblkUringOpFuture; #[cfg(feature = "fat_complete")] use super::UblkFatRes; @@ -11,7 +134,6 @@ use std::fs; use std::os::unix::io::{AsRawFd, RawFd}; // Unified thread-local io_uring for all queue operations -// Previously had separate TASK_URING, but now consolidated into QUEUE_RING // Thread-local queue ring using OnceCell for conditional initialization std::thread_local! { @@ -108,58 +230,7 @@ where /// * `init_fn` - Closure that receives OnceCell>> and returns /// Result<(), UblkError>. Should call `cell.set()` to initialize if needed. /// -/// # Examples -/// -/// ## Basic custom initialization: -/// ```no_run -/// use libublk::ublk_init_task_ring; -/// use io_uring::IoUring; -/// use std::cell::RefCell; -/// -/// fn example() -> Result<(), Box> { -/// // Custom initialization before creating UblkQueue -/// ublk_init_task_ring(|cell| { -/// if cell.get().is_none() { -/// let ring = IoUring::builder() -/// .setup_cqsize(256) // Custom completion queue size -/// .setup_coop_taskrun() // Enable cooperative task running -/// .build(128)?; // Custom submission queue size -/// cell.set(RefCell::new(ring)) -/// .map_err(|_| libublk::UblkError::OtherError(-libc::EEXIST))?; -/// } -/// Ok(()) -/// })?; -/// -/// // Now create UblkQueue - it will use the pre-initialized ring -/// // External users should use UblkQueue APIs rather than direct ring access -/// println!("Ring initialized! Create UblkQueue to use it."); -/// Ok(()) -/// } -/// ``` -/// -/// ## Advanced initialization with custom flags: -/// ```no_run -/// use libublk::ublk_init_task_ring; -/// use io_uring::IoUring; -/// use std::cell::RefCell; -/// -/// fn advanced_example() -> Result<(), Box> { -/// ublk_init_task_ring(|cell| { -/// if cell.get().is_none() { -/// let ring = IoUring::builder() -/// .setup_cqsize(512) -/// .setup_sqpoll(1000) // Enable SQPOLL mode -/// .setup_iopoll() // Enable IOPOLL for high performance -/// .build(256)?; -/// cell.set(RefCell::new(ring)) -/// .map_err(|_| libublk::UblkError::OtherError(-libc::EEXIST))?; -/// } -/// Ok(()) -/// })?; -/// println!("Advanced ring initialized!"); -/// Ok(()) -/// } -/// ``` +/// For detailed examples of basic and advanced initialization patterns, see the module-level documentation. pub fn ublk_init_task_ring(init_fn: F) -> Result<(), UblkError> where F: FnOnce(&OnceCell>>) -> Result<(), UblkError>, @@ -919,7 +990,6 @@ impl UblkQueue<'_> { // Users can call init_task_ring() before UblkQueue::new() to customize initialization init_task_ring_default(sq_depth as u32, cq_depth as u32)?; - //todo: apply io_uring flags from tgt.ring_flags let depth = dev.dev_info.queue_depth as u32; let cdev_fd = dev.cdev_file.as_raw_fd(); @@ -1288,34 +1358,10 @@ impl UblkQueue<'_> { /// * `BufDesc::AutoReg` - Requires `UBLK_F_AUTO_BUF_REG` to be enabled /// * `BufDesc::RawAddress` - Compatible with all device configurations (unsafe) /// - /// # Validation: - /// /// The method validates buffer descriptor compatibility with device capabilities /// before dispatching to ensure type safety and prevent runtime errors. /// - /// # Performance: - /// - /// This method has zero runtime overhead compared to calling the existing methods - /// directly, as it uses compile-time dispatch based on the buffer descriptor variant. - /// - /// # Usage Example: - /// - /// ```no_run - /// # use libublk::io::{BufDesc, UblkQueue}; - /// # use libublk::sys; - /// # fn example(queue: &UblkQueue) -> Result<(), libublk::UblkError> { - /// // For traditional buffer operations - /// let buffer = [0u8; 4096]; - /// let slice_desc = BufDesc::Slice(&buffer); - /// let future1 = queue.submit_io_cmd_unified(0, sys::UBLK_U_IO_FETCH_REQ, slice_desc, -1)?; - /// - /// // For zero-copy operations - /// let auto_reg = sys::ublk_auto_buf_reg { index: 0, flags: 0, reserved0: 0, reserved1: 0 }; - /// let auto_desc = BufDesc::AutoReg(auto_reg); - /// let future2 = queue.submit_io_cmd_unified(1, sys::UBLK_U_IO_FETCH_REQ, auto_desc, -1)?; - /// # Ok(()) - /// # } - /// ``` + /// For usage examples, see the module-level documentation. #[inline] pub fn submit_io_cmd_unified( &self, @@ -1563,35 +1609,7 @@ impl UblkQueue<'_> { /// COMMIT_AND_FETCH_REQ command is used for both committing io command /// result and fetching new incoming IO. /// - /// # Usage Example: - /// - /// ```no_run - /// # use libublk::io::{BufDescList, UblkQueue}; - /// # use libublk::helpers::IoBuf; - /// # use libublk::sys; - /// # fn example(queue: UblkQueue) -> Result { - /// // For traditional buffer operations - /// let mut bufs = Vec::new(); - /// for _ in 0..queue.get_depth() { - /// bufs.push(IoBuf::::new(4096)); - /// } - /// let slice_list = BufDescList::Slices(Some(&bufs)); - /// let queue = queue.submit_fetch_commands_unified(slice_list)?; - /// - /// // For zero-copy operations - /// let auto_regs: Vec = (0..queue.get_depth()) - /// .map(|i| sys::ublk_auto_buf_reg { - /// index: i as u16, - /// flags: 0, - /// reserved0: 0, - /// reserved1: 0, - /// }) - /// .collect(); - /// let auto_list = BufDescList::AutoRegs(&auto_regs); - /// let queue = queue.submit_fetch_commands_unified(auto_list)?; - /// # Ok(queue) - /// # } - /// ``` + /// For usage examples, see the module-level documentation. #[inline] pub fn submit_fetch_commands_unified( self, @@ -1767,16 +1785,9 @@ impl UblkQueue<'_> { /// * `BufDesc::AutoReg` - Requires `UBLK_F_AUTO_BUF_REG` to be enabled /// * `BufDesc::RawAddress` - Compatible with all device configurations (unsafe) /// - /// # Validation: - /// /// The method validates buffer descriptor compatibility with device capabilities /// before dispatching to ensure type safety and prevent runtime errors. /// - /// # Performance: - /// - /// This method has zero runtime overhead compared to calling the existing methods - /// directly, as it uses compile-time dispatch based on the buffer descriptor variant. - /// /// When calling this API, target code has to make sure that q_ring won't be borrowed. #[inline] pub fn complete_io_cmd_unified( From a9636e152cb07ac7f793b52ce011d5aa7302358d Mon Sep 17 00:00:00 2001 From: Ming Lei Date: Sat, 6 Sep 2025 00:36:01 +0000 Subject: [PATCH 03/19] docs: fix Traditional Buffer Operations example to use IoBuf Update the module-level documentation example for traditional buffer operations to use IoBuf instead of raw arrays, which is more consistent with the library's established patterns and best practices. Changes: - Replace raw array `[0u8; 4096]` with `IoBuf::::new(4096)` - Use `BufDesc::from_io_buf()` helper method for proper conversion - Add missing `use libublk::helpers::IoBuf;` import This example now demonstrates the recommended approach for traditional buffer operations and aligns with how IoBuf is used throughout the codebase and other examples. Signed-off-by: Ming Lei --- src/io.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/io.rs b/src/io.rs index 0e3f4d7..bb6ca7b 100644 --- a/src/io.rs +++ b/src/io.rs @@ -67,11 +67,12 @@ //! ### Traditional Buffer Operations //! ```no_run //! use libublk::io::{BufDesc, UblkQueue}; +//! use libublk::helpers::IoBuf; //! use libublk::sys; //! //! fn example(queue: &UblkQueue) -> Result<(), libublk::UblkError> { -//! let buffer = [0u8; 4096]; -//! let slice_desc = BufDesc::Slice(&buffer); +//! let io_buf = IoBuf::::new(4096); +//! let slice_desc = BufDesc::from_io_buf(&io_buf); //! let future = queue.submit_io_cmd_unified(0, sys::UBLK_U_IO_FETCH_REQ, slice_desc, -1)?; //! // ... handle future //! Ok(()) From c93b4e2773cf7847c6339795003f7fd217bfbe76 Mon Sep 17 00:00:00 2001 From: Ming Lei Date: Tue, 9 Sep 2025 10:56:18 +0000 Subject: [PATCH 04/19] feat: ctrl: add UblkCtrl::set_thread_affinity() API Add UblkCtrl::set_thread_affinity() API for setting queue thread affinity. Signed-off-by: Ming Lei --- src/ctrl.rs | 37 ++++++++++++++++++------------------- 1 file changed, 18 insertions(+), 19 deletions(-) diff --git a/src/ctrl.rs b/src/ctrl.rs index de30eb7..accb1ce 100644 --- a/src/ctrl.rs +++ b/src/ctrl.rs @@ -2530,27 +2530,29 @@ impl UblkCtrl { }) } - /// Set thread affinity using pthread handle + /// Set queue thread affinity using thread ID /// - /// This function sets CPU affinity for the specified pthread handle. + /// This function sets CPU affinity for the specified thread ID. /// It should be called from the main thread context after receiving - /// the pthread handle from the queue thread. - fn set_thread_affinity(pthread_handle: libc::pthread_t, affinity: &UblkQueueAffinity) { + /// the thread ID from the queue thread. + pub fn set_thread_affinity(&self, qid: u16, tid: libc::pid_t) { + // Calculate and set affinity using the thread ID + let affinity = self.calculate_queue_affinity(qid); + unsafe { - libc::pthread_setaffinity_np( - pthread_handle, + libc::sched_setaffinity( + tid, affinity.buf_len(), affinity.addr() as *const libc::cpu_set_t, ); } } - /// Initialize queue thread and return pthread handle and tid + /// Initialize queue thread and return tid /// /// This function sets up the basic thread properties and returns - /// the pthread handle and thread ID for external affinity configuration. - fn init_queue_thread() -> (libc::pthread_t, libc::pid_t) { - let pthread_handle = unsafe { libc::pthread_self() }; + /// the thread ID for external affinity configuration. + fn init_queue_thread() -> libc::pid_t { let tid = unsafe { libc::gettid() }; // Set IO flusher property for the queue thread @@ -2559,7 +2561,7 @@ impl UblkCtrl { libc::prctl(PR_SET_IO_FLUSHER, 0, 0, 0, 0); } - (pthread_handle, tid) + tid } fn create_queue_handlers( @@ -2583,8 +2585,8 @@ impl UblkCtrl { let mut _q_fn = q_fn.clone(); q_threads.push(std::thread::spawn(move || { - let (pthread_handle, tid) = Self::init_queue_thread(); - if let Err(e) = _tx.send((q, pthread_handle, tid)) { + let tid = Self::init_queue_thread(); + if let Err(e) = _tx.send((q, tid)) { eprintln!("Warning: Failed to send queue thread info: {}", e); return; } @@ -2592,9 +2594,9 @@ impl UblkCtrl { })); } - // Set affinity from main thread context using pthread handles + // Set affinity from main thread context using thread IDs for _q in 0..nr_queues { - let (qid, pthread_handle, tid) = match rx.recv() { + let (qid, tid) = match rx.recv() { Ok(data) => data, Err(e) => { eprintln!("Warning: Failed to receive queue thread info: {}", e); @@ -2602,10 +2604,7 @@ impl UblkCtrl { } }; - // Calculate and set affinity using the pthread handle - let affinity = self.calculate_queue_affinity(qid); - Self::set_thread_affinity(pthread_handle, &affinity); - + self.set_thread_affinity(qid, tid); if let Err(e) = self.configure_queue(dev, qid, tid) { eprintln!( "Warning: configure queue failed for {}-{}: {:?}", From 0fda8ed630dc798c0269ddcc3c6ff1bc96a0da1e Mon Sep 17 00:00:00 2001 From: Ming Lei Date: Tue, 9 Sep 2025 11:00:25 +0000 Subject: [PATCH 05/19] feat: add UblkCtrl::init_queue_thread() API Export it, which is still helpful for target code. Signed-off-by: Ming Lei --- src/ctrl.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ctrl.rs b/src/ctrl.rs index accb1ce..6ce7d1e 100644 --- a/src/ctrl.rs +++ b/src/ctrl.rs @@ -2552,7 +2552,7 @@ impl UblkCtrl { /// /// This function sets up the basic thread properties and returns /// the thread ID for external affinity configuration. - fn init_queue_thread() -> libc::pid_t { + pub fn init_queue_thread() -> libc::pid_t { let tid = unsafe { libc::gettid() }; // Set IO flusher property for the queue thread From e8592626e802cca83ab445e213da9d145e95b8b8 Mon Sep 17 00:00:00 2001 From: Ming Lei Date: Tue, 9 Sep 2025 11:04:40 +0000 Subject: [PATCH 06/19] feat: add more async/.await APIs Signed-off-by: Ming Lei --- src/ctrl.rs | 169 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 169 insertions(+) diff --git a/src/ctrl.rs b/src/ctrl.rs index 6ce7d1e..af5a0ac 100644 --- a/src/ctrl.rs +++ b/src/ctrl.rs @@ -763,6 +763,25 @@ struct UblkCtrlInner { /// Affinity management helpers impl UblkCtrlInner { + async fn get_queue_affinity_effective_async( + &mut self, + qid: u16, + ) -> Result { + let mut kernel_affinity = UblkQueueAffinity::new(); + self.get_queue_affinity_async(qid as u32, &mut kernel_affinity) + .await?; + + if self + .dev_flags + .contains(UblkFlags::UBLK_DEV_F_SINGLE_CPU_AFFINITY) + { + // Select single CPU from available CPUs + let selected_cpu = self.queue_selected_cpus[qid as usize]; + Ok(UblkQueueAffinity::from_single_cpu(selected_cpu)) + } else { + Ok(kernel_affinity) + } + } /// Get queue affinity from kernel and optionally transform for single CPU mode fn get_queue_affinity_effective(&mut self, qid: u16) -> Result { let mut kernel_affinity = UblkQueueAffinity::new(); @@ -780,6 +799,37 @@ impl UblkCtrlInner { } } + /// Select and store single CPU for queue (used during device setup) + async fn select_single_cpu_for_queue_async( + &mut self, + qid: u16, + cpu: Option, + ) -> Result { + let mut kernel_affinity = UblkQueueAffinity::new(); + self.get_queue_affinity_async(qid as u32, &mut kernel_affinity) + .await?; + + let selected_cpu = if let Some(cpu) = cpu { + // Validate that the specified CPU is in the affinity mask + let available_cpus = kernel_affinity.to_bits_vec(); + if available_cpus.contains(&cpu) { + cpu + } else { + return Err(UblkError::OtherError(-libc::EINVAL)); + } + } else { + // Select a random CPU from the affinity mask + kernel_affinity.get_random_cpu().unwrap_or(0) + }; + + // Store the selected CPU + if (qid as usize) < self.queue_selected_cpus.len() { + self.queue_selected_cpus[qid as usize] = selected_cpu; + Ok(selected_cpu) + } else { + Err(UblkError::OtherError(-libc::EINVAL)) + } + } /// Select and store single CPU for queue (used during device setup) fn select_single_cpu_for_queue( &mut self, @@ -827,6 +877,27 @@ impl UblkCtrlInner { Ok(kernel_affinity) } } + + /// Create appropriate affinity for queue thread setup + async fn create_thread_affinity_async( + &mut self, + qid: u16, + ) -> Result { + if self + .dev_flags + .contains(UblkFlags::UBLK_DEV_F_SINGLE_CPU_AFFINITY) + { + // For single CPU mode, select and store the CPU first + let selected_cpu = self.select_single_cpu_for_queue_async(qid, None).await?; + Ok(UblkQueueAffinity::from_single_cpu(selected_cpu)) + } else { + // For multi-CPU mode, use kernel's full affinity + let mut kernel_affinity = UblkQueueAffinity::new(); + self.get_queue_affinity_async(qid as u32, &mut kernel_affinity) + .await?; + Ok(kernel_affinity) + } + } } impl Drop for UblkCtrlInner { @@ -1726,6 +1797,56 @@ impl UblkCtrlInner { Ok(0) } + async fn build_json_async(&mut self, dev: &UblkDev) -> Result { + // Update queue thread IDs if they exist and JSON already has content + if !self.json_manager.get_json().is_null() + && self.json_manager.get_json().is_object() + && !self.json_manager.get_json().as_object().unwrap().is_empty() + { + if let Some(queues) = self.json_manager.get_json_mut().get_mut("queues") { + for qid in 0..dev.dev_info.nr_hw_queues { + if let Some(queue) = queues.get_mut(&qid.to_string()) { + if let Some(tid) = queue.get_mut("tid") { + *tid = serde_json::json!(self.queue_tids[qid as usize]); + } + } + } + } + return Ok(0); + } + + let tgt_data = dev.get_target_json(); + let mut map: serde_json::Map = serde_json::Map::new(); + + for qid in 0..dev.dev_info.nr_hw_queues { + let affinity = self.get_queue_affinity_effective_async(qid).await?; + + map.insert( + format!("{}", qid), + serde_json::json!({ + "qid": qid, + "tid": self.queue_tids[qid as usize], + "affinity": affinity.to_bits_vec(), + }), + ); + } + + let mut json = serde_json::json!({ + "dev_info": dev.dev_info, + "target": dev.tgt, + "target_flags": dev.flags.bits(), + }); + + if let Some(val) = tgt_data { + json["target_data"] = val.clone() + } + + json["queues"] = serde_json::Value::Object(map); + + self.json_manager.set_json(json); + Ok(0) + } + /// Reload json info for this device /// fn reload_json(&mut self) -> Result { @@ -2064,6 +2185,25 @@ impl UblkCtrl { Ok(0) } + pub async fn configure_queue_async( + &self, + dev: &UblkDev, + qid: u16, + tid: i32, + ) -> Result { + let mut ctrl = self.get_inner_mut(); + + ctrl.store_queue_tid(qid, tid); + + ctrl.nr_queues_configured += 1; + + if ctrl.nr_queues_configured == ctrl.dev_info.nr_hw_queues { + ctrl.build_json_async(dev).await?; + } + + Ok(0) + } + /// Dump this device info /// /// The 1st part is from UblkCtrl.dev_info, and the 2nd part is @@ -2530,6 +2670,22 @@ impl UblkCtrl { }) } + async fn calculate_queue_affinity_async(&self, queue_id: u16) -> UblkQueueAffinity { + let affi = self + .get_inner_mut() + .create_thread_affinity_async(queue_id) + .await + .unwrap_or_else(|_| { + // Fallback to kernel affinity if thread affinity creation fails + let mut affinity = UblkQueueAffinity::new(); + self.get_queue_affinity(queue_id as u32, &mut affinity) + .unwrap_or_default(); + affinity + }); + log::info!("calculate queue affinity...done\n"); + affi + } + /// Set queue thread affinity using thread ID /// /// This function sets CPU affinity for the specified thread ID. @@ -2548,6 +2704,19 @@ impl UblkCtrl { } } + pub async fn set_thread_affinity_async(&self, qid: u16, tid: libc::pid_t) { + // Calculate and set affinity using the thread ID + let affinity = self.calculate_queue_affinity_async(qid).await; + + unsafe { + libc::sched_setaffinity( + tid, + affinity.buf_len(), + affinity.addr() as *const libc::cpu_set_t, + ); + } + } + /// Initialize queue thread and return tid /// /// This function sets up the basic thread properties and returns From e019811752317480b1c6a970cb6faca027974379 Mon Sep 17 00:00:00 2001 From: Ming Lei Date: Tue, 9 Sep 2025 12:43:04 +0000 Subject: [PATCH 07/19] refactor: unify add/add_async command preparation Extract common command data preparation logic for ADD_DEV into prepare_add_cmd() method. Both sync and async variants now use the same preparation logic, reducing code duplication while maintaining identical behavior. Signed-off-by: Ming Lei --- src/ctrl.rs | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/src/ctrl.rs b/src/ctrl.rs index af5a0ac..085671e 100644 --- a/src/ctrl.rs +++ b/src/ctrl.rs @@ -1360,27 +1360,25 @@ impl UblkCtrlInner { Self::ublk_err_to_result(res) } - fn add(&mut self) -> Result { - let data = UblkCtrlCmdData::new_write_buffer_cmd( + /// Prepare ADD_DEV command data + fn prepare_add_cmd(&self) -> UblkCtrlCmdData { + UblkCtrlCmdData::new_write_buffer_cmd( sys::UBLK_U_CMD_ADD_DEV, std::ptr::addr_of!(self.dev_info) as u64, core::mem::size_of::() as u32, true, // no_dev_path - ); + ) + } + fn add(&mut self) -> Result { + let data = self.prepare_add_cmd(); self.ublk_ctrl_cmd(&data) } /// Add this device asynchronously /// async fn add_async(&mut self) -> Result { - let data = UblkCtrlCmdData::new_write_buffer_cmd( - sys::UBLK_U_CMD_ADD_DEV, - std::ptr::addr_of!(self.dev_info) as u64, - core::mem::size_of::() as u32, - true, // no_dev_path - ); - + let data = self.prepare_add_cmd(); self.ublk_ctrl_cmd_async(&data).await } From a7c499cfe6aa663b8ee41f16f99458afaa1ddeb5 Mon Sep 17 00:00:00 2001 From: Ming Lei Date: Tue, 9 Sep 2025 12:44:03 +0000 Subject: [PATCH 08/19] refactor: unify del/del_async/del_async_await command preparation Extract common command data preparation logic for DEL_DEV into prepare_del_cmd() method. All three deletion variants now use the same preparation logic with a force_async parameter, reducing code duplication while maintaining identical behavior. Signed-off-by: Ming Lei --- src/ctrl.rs | 31 +++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/src/ctrl.rs b/src/ctrl.rs index 085671e..0c4d854 100644 --- a/src/ctrl.rs +++ b/src/ctrl.rs @@ -1382,14 +1382,9 @@ impl UblkCtrlInner { self.ublk_ctrl_cmd_async(&data).await } - /// Remove this device - /// - fn del(&mut self) -> Result { - if self.is_deleted() { - return Ok(0); - } - - let cmd_op = if self + /// Prepare DEL_DEV command data + fn prepare_del_cmd(&self, force_async: bool) -> UblkCtrlCmdData { + let cmd_op = if force_async || self .dev_flags .intersects(UblkFlags::UBLK_DEV_F_DEL_DEV_ASYNC) { @@ -1397,11 +1392,19 @@ impl UblkCtrlInner { } else { sys::UBLK_U_CMD_DEL_DEV }; - let data = UblkCtrlCmdData::new_simple_cmd(cmd_op); + UblkCtrlCmdData::new_simple_cmd(cmd_op) + } + + /// Remove this device + /// + fn del(&mut self) -> Result { + if self.is_deleted() { + return Ok(0); + } + let data = self.prepare_del_cmd(false); let res = self.ublk_ctrl_cmd(&data)?; self.mark_deleted(); - Ok(res) } @@ -1411,8 +1414,8 @@ impl UblkCtrlInner { if self.is_deleted() { return Ok(0); } - let data = UblkCtrlCmdData::new_simple_cmd(sys::UBLK_U_CMD_DEL_DEV_ASYNC); - + + let data = self.prepare_del_cmd(true); let res = self.ublk_ctrl_cmd(&data)?; self.mark_deleted(); Ok(res) @@ -1429,8 +1432,8 @@ impl UblkCtrlInner { if self.is_deleted() { return Ok(0); } - let data = UblkCtrlCmdData::new_simple_cmd(sys::UBLK_U_CMD_DEL_DEV_ASYNC); - + + let data = self.prepare_del_cmd(true); let res = self.ublk_ctrl_cmd_async(&data).await?; self.mark_deleted(); Ok(res) From aa1f2a7b157d33768e66f87a8da424bf9c838a55 Mon Sep 17 00:00:00 2001 From: Ming Lei Date: Tue, 9 Sep 2025 12:45:19 +0000 Subject: [PATCH 09/19] refactor: unify features and read_dev_info command preparation Extract common command data preparation logic for: - GET_FEATURES: prepare_get_features_cmd() - GET_DEV_INFO/GET_DEV_INFO2: prepare_read_dev_info_cmd() Both sync and async variants now use the same preparation logic, reducing code duplication while maintaining identical behavior. Signed-off-by: Ming Lei --- src/ctrl.rs | 60 +++++++++++++++++++---------------------------------- 1 file changed, 21 insertions(+), 39 deletions(-) diff --git a/src/ctrl.rs b/src/ctrl.rs index 0c4d854..30e3d97 100644 --- a/src/ctrl.rs +++ b/src/ctrl.rs @@ -1439,53 +1439,47 @@ impl UblkCtrlInner { Ok(res) } - fn __get_features(&mut self) -> Result { - let features = 0_u64; - let data = UblkCtrlCmdData::new_read_buffer_cmd( + /// Prepare GET_FEATURES command data + fn prepare_get_features_cmd(features_ptr: u64) -> UblkCtrlCmdData { + UblkCtrlCmdData::new_read_buffer_cmd( sys::UBLK_U_CMD_GET_FEATURES, - std::ptr::addr_of!(features) as u64, + features_ptr, core::mem::size_of::() as u32, true, // no_dev_path - ); + ) + } + fn __get_features(&mut self) -> Result { + let features = 0_u64; + let data = Self::prepare_get_features_cmd(std::ptr::addr_of!(features) as u64); self.ublk_ctrl_cmd(&data)?; - Ok(features) } async fn __get_features_async(&mut self) -> Result { let features = 0_u64; - let data = UblkCtrlCmdData::new_read_buffer_cmd( - sys::UBLK_U_CMD_GET_FEATURES, - std::ptr::addr_of!(features) as u64, - core::mem::size_of::() as u32, - true, // no_dev_path - ); - + let data = Self::prepare_get_features_cmd(std::ptr::addr_of!(features) as u64); self.ublk_ctrl_cmd_async(&data).await?; - Ok(features) } - fn __read_dev_info(&mut self) -> Result { - let data = UblkCtrlCmdData::new_read_buffer_cmd( - sys::UBLK_U_CMD_GET_DEV_INFO, + /// Prepare read device info command data + fn prepare_read_dev_info_cmd(&self, cmd_op: u32) -> UblkCtrlCmdData { + UblkCtrlCmdData::new_read_buffer_cmd( + cmd_op, std::ptr::addr_of!(self.dev_info) as u64, core::mem::size_of::() as u32, false, // need dev_path - ); + ) + } + fn __read_dev_info(&mut self) -> Result { + let data = self.prepare_read_dev_info_cmd(sys::UBLK_U_CMD_GET_DEV_INFO); self.ublk_ctrl_cmd(&data) } fn __read_dev_info2(&mut self) -> Result { - let data = UblkCtrlCmdData::new_read_buffer_cmd( - sys::UBLK_U_CMD_GET_DEV_INFO2, - std::ptr::addr_of!(self.dev_info) as u64, - core::mem::size_of::() as u32, - false, // need dev_path - ); - + let data = self.prepare_read_dev_info_cmd(sys::UBLK_U_CMD_GET_DEV_INFO2); self.ublk_ctrl_cmd(&data) } @@ -1503,24 +1497,12 @@ impl UblkCtrlInner { } async fn __read_dev_info_async(&mut self) -> Result { - let data = UblkCtrlCmdData::new_read_buffer_cmd( - sys::UBLK_U_CMD_GET_DEV_INFO, - std::ptr::addr_of!(self.dev_info) as u64, - core::mem::size_of::() as u32, - false, // need dev_path - ); - + let data = self.prepare_read_dev_info_cmd(sys::UBLK_U_CMD_GET_DEV_INFO); self.ublk_ctrl_cmd_async(&data).await } async fn __read_dev_info2_async(&mut self) -> Result { - let data = UblkCtrlCmdData::new_read_buffer_cmd( - sys::UBLK_U_CMD_GET_DEV_INFO2, - std::ptr::addr_of!(self.dev_info) as u64, - core::mem::size_of::() as u32, - false, // need dev_path - ); - + let data = self.prepare_read_dev_info_cmd(sys::UBLK_U_CMD_GET_DEV_INFO2); self.ublk_ctrl_cmd_async(&data).await } From 866baafc6505451f9db09e9751a179f637acfc86 Mon Sep 17 00:00:00 2001 From: Ming Lei Date: Tue, 9 Sep 2025 12:49:36 +0000 Subject: [PATCH 10/19] refactor: unify start/stop/params command preparation Extract common command data preparation logic for: - START_DEV: prepare_start_cmd() - STOP_DEV: prepare_stop_cmd() - GET_PARAMS: prepare_get_params_cmd() - SET_PARAMS: prepare_set_params_cmd() Both sync and async variants now use the same preparation logic, reducing code duplication while maintaining identical behavior. Signed-off-by: Ming Lei --- src/ctrl.rs | 61 ++++++++++++++++++++++++++++------------------------- 1 file changed, 32 insertions(+), 29 deletions(-) diff --git a/src/ctrl.rs b/src/ctrl.rs index 30e3d97..242c215 100644 --- a/src/ctrl.rs +++ b/src/ctrl.rs @@ -1384,9 +1384,10 @@ impl UblkCtrlInner { /// Prepare DEL_DEV command data fn prepare_del_cmd(&self, force_async: bool) -> UblkCtrlCmdData { - let cmd_op = if force_async || self - .dev_flags - .intersects(UblkFlags::UBLK_DEV_F_DEL_DEV_ASYNC) + let cmd_op = if force_async + || self + .dev_flags + .intersects(UblkFlags::UBLK_DEV_F_DEL_DEV_ASYNC) { sys::UBLK_U_CMD_DEL_DEV_ASYNC } else { @@ -1414,7 +1415,7 @@ impl UblkCtrlInner { if self.is_deleted() { return Ok(0); } - + let data = self.prepare_del_cmd(true); let res = self.ublk_ctrl_cmd(&data)?; self.mark_deleted(); @@ -1432,7 +1433,7 @@ impl UblkCtrlInner { if self.is_deleted() { return Ok(0); } - + let data = self.prepare_del_cmd(true); let res = self.ublk_ctrl_cmd_async(&data).await?; self.mark_deleted(); @@ -1506,65 +1507,68 @@ impl UblkCtrlInner { self.ublk_ctrl_cmd_async(&data).await } + /// Prepare START_DEV command data + fn prepare_start_cmd(pid: i32) -> UblkCtrlCmdData { + UblkCtrlCmdData::new_data_cmd(sys::UBLK_U_CMD_START_DEV, pid as u64) + } + /// Start this device by sending command to ublk driver /// fn start(&mut self, pid: i32) -> Result { - let data = UblkCtrlCmdData::new_data_cmd(sys::UBLK_U_CMD_START_DEV, pid as u64); - + let data = Self::prepare_start_cmd(pid); self.ublk_ctrl_cmd(&data) } /// Start this device by sending command to ublk driver /// async fn start_async(&mut self, pid: i32) -> Result { - let data = UblkCtrlCmdData::new_data_cmd(sys::UBLK_U_CMD_START_DEV, pid as u64); - + let data = Self::prepare_start_cmd(pid); self.ublk_ctrl_cmd_async(&data).await } + /// Prepare STOP_DEV command data + fn prepare_stop_cmd() -> UblkCtrlCmdData { + UblkCtrlCmdData::new_simple_cmd(sys::UBLK_U_CMD_STOP_DEV) + } + /// Stop this device by sending command to ublk driver /// fn stop(&mut self) -> Result { - let data = UblkCtrlCmdData::new_simple_cmd(sys::UBLK_U_CMD_STOP_DEV); - + let data = Self::prepare_stop_cmd(); self.ublk_ctrl_cmd(&data) } /// Stop this device by sending command to ublk driver asynchronously /// async fn stop_async(&mut self) -> Result { - let data = UblkCtrlCmdData::new_simple_cmd(sys::UBLK_U_CMD_STOP_DEV); - + let data = Self::prepare_stop_cmd(); self.ublk_ctrl_cmd_async(&data).await } - /// Retrieve this device's parameter from ublk driver by - /// sending command - /// - /// Can't pass params by reference(&mut), why? - fn get_params(&mut self, params: &mut sys::ublk_params) -> Result { + /// Prepare GET_PARAMS command data + fn prepare_get_params_cmd(params: &mut sys::ublk_params) -> UblkCtrlCmdData { params.len = core::mem::size_of::() as u32; - let data = UblkCtrlCmdData::new_read_buffer_cmd( + UblkCtrlCmdData::new_read_buffer_cmd( sys::UBLK_U_CMD_GET_PARAMS, params as *const sys::ublk_params as u64, params.len, false, // need dev_path - ); + ) + } + /// Retrieve this device's parameter from ublk driver by + /// sending command + /// + /// Can't pass params by reference(&mut), why? + fn get_params(&mut self, params: &mut sys::ublk_params) -> Result { + let data = Self::prepare_get_params_cmd(params); self.ublk_ctrl_cmd(&data) } /// Retrieve this device's parameter from ublk driver by /// sending command in async/.await async fn get_params_async(&mut self, params: &mut sys::ublk_params) -> Result { - params.len = core::mem::size_of::() as u32; - let data = UblkCtrlCmdData::new_read_buffer_cmd( - sys::UBLK_U_CMD_GET_PARAMS, - params as *const sys::ublk_params as u64, - params.len, - false, // need dev_path - ); - + let data = Self::prepare_get_params_cmd(params); self.ublk_ctrl_cmd_async(&data).await } @@ -1600,7 +1604,6 @@ impl UblkCtrlInner { p.len, false, // need dev_path ); - self.ublk_ctrl_cmd_async(&data).await } From dd811e87e06ea90045d7173a11c3e2d4fd384a7d Mon Sep 17 00:00:00 2001 From: Ming Lei Date: Tue, 9 Sep 2025 12:53:53 +0000 Subject: [PATCH 11/19] refactor: unify queue affinity and recovery command preparation Extract common command data preparation logic for: - GET_QUEUE_AFFINITY: prepare_get_queue_affinity_cmd() - START_USER_RECOVERY: prepare_start_user_recover_cmd() - END_USER_RECOVERY: prepare_end_user_recover_cmd() Both sync and async variants now use the same preparation logic, reducing code duplication while maintaining identical behavior. Signed-off-by: Ming Lei --- src/ctrl.rs | 41 +++++++++++++++++++++++------------------ 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/src/ctrl.rs b/src/ctrl.rs index 242c215..4578e6b 100644 --- a/src/ctrl.rs +++ b/src/ctrl.rs @@ -1607,14 +1607,19 @@ impl UblkCtrlInner { self.ublk_ctrl_cmd_async(&data).await } - fn get_queue_affinity(&mut self, q: u32, bm: &mut UblkQueueAffinity) -> Result { - let data = UblkCtrlCmdData::new_data_buffer_cmd( + /// Prepare GET_QUEUE_AFFINITY command data + fn prepare_get_queue_affinity_cmd(q: u32, bm: &mut UblkQueueAffinity) -> UblkCtrlCmdData { + UblkCtrlCmdData::new_data_buffer_cmd( sys::UBLK_U_CMD_GET_QUEUE_AFFINITY, q as u64, bm.addr() as u64, bm.buf_len() as u32, true, // read_buffer - ); + ) + } + + fn get_queue_affinity(&mut self, q: u32, bm: &mut UblkQueueAffinity) -> Result { + let data = Self::prepare_get_queue_affinity_cmd(q, bm); self.ublk_ctrl_cmd(&data) } @@ -1625,41 +1630,41 @@ impl UblkCtrlInner { q: u32, bm: &mut UblkQueueAffinity, ) -> Result { - let data = UblkCtrlCmdData::new_data_buffer_cmd( - sys::UBLK_U_CMD_GET_QUEUE_AFFINITY, - q as u64, - bm.addr() as u64, - bm.buf_len() as u32, - true, // read_buffer - ); + let data = Self::prepare_get_queue_affinity_cmd(q, bm); self.ublk_ctrl_cmd_async(&data).await } - fn __start_user_recover(&mut self) -> Result { - let data = UblkCtrlCmdData::new_simple_cmd(sys::UBLK_U_CMD_START_USER_RECOVERY); + /// Prepare START_USER_RECOVERY command data + fn prepare_start_user_recover_cmd() -> UblkCtrlCmdData { + UblkCtrlCmdData::new_simple_cmd(sys::UBLK_U_CMD_START_USER_RECOVERY) + } + fn __start_user_recover(&mut self) -> Result { + let data = Self::prepare_start_user_recover_cmd(); self.ublk_ctrl_cmd(&data) } async fn __start_user_recover_async(&mut self) -> Result { - let data = UblkCtrlCmdData::new_simple_cmd(sys::UBLK_U_CMD_START_USER_RECOVERY); - + let data = Self::prepare_start_user_recover_cmd(); self.ublk_ctrl_cmd_async(&data).await } + /// Prepare END_USER_RECOVERY command data + fn prepare_end_user_recover_cmd(pid: i32) -> UblkCtrlCmdData { + UblkCtrlCmdData::new_data_cmd(sys::UBLK_U_CMD_END_USER_RECOVERY, pid as u64) + } + /// End user recover for this device, do similar thing done in start_dev() /// fn end_user_recover(&mut self, pid: i32) -> Result { - let data = UblkCtrlCmdData::new_data_cmd(sys::UBLK_U_CMD_END_USER_RECOVERY, pid as u64); - + let data = Self::prepare_end_user_recover_cmd(pid); self.ublk_ctrl_cmd(&data) } /// End user recover for this device, do similar thing done in start_dev() /// async fn end_user_recover_async(&mut self, pid: i32) -> Result { - let data = UblkCtrlCmdData::new_data_cmd(sys::UBLK_U_CMD_END_USER_RECOVERY, pid as u64); - + let data = Self::prepare_end_user_recover_cmd(pid); self.ublk_ctrl_cmd_async(&data).await } From 6997f62ee49552bafebe7c8eabf4bbfc6b4d0901 Mon Sep 17 00:00:00 2001 From: Ming Lei Date: Tue, 9 Sep 2025 13:33:46 +0000 Subject: [PATCH 12/19] refactor: unify build_json/build_json_async methods Extract common JSON building logic into helper methods: - build_json_internal() and build_json_internal_async() for data generation - update_json_queue_tids() for shared TID update logic Both sync and async versions now share validation and data preparation logic while maintaining separate execution paths. Signed-off-by: Ming Lei --- src/ctrl.rs | 97 +++++++++++++++++++++++++++-------------------------- 1 file changed, 50 insertions(+), 47 deletions(-) diff --git a/src/ctrl.rs b/src/ctrl.rs index 4578e6b..5bf3b45 100644 --- a/src/ctrl.rs +++ b/src/ctrl.rs @@ -1738,29 +1738,17 @@ impl UblkCtrlInner { /// * `tids`: queue pthread tid vector, in which each item stores the queue's /// pthread tid /// - fn build_json(&mut self, dev: &UblkDev) -> Result { - // Update queue thread IDs if they exist and JSON already has content - if !self.json_manager.get_json().is_null() - && self.json_manager.get_json().is_object() - && !self.json_manager.get_json().as_object().unwrap().is_empty() - { - if let Some(queues) = self.json_manager.get_json_mut().get_mut("queues") { - for qid in 0..dev.dev_info.nr_hw_queues { - if let Some(queue) = queues.get_mut(&qid.to_string()) { - if let Some(tid) = queue.get_mut("tid") { - *tid = serde_json::json!(self.queue_tids[qid as usize]); - } - } - } - } - return Ok(0); - } - + /// Build JSON data with provided queue affinities + fn build_json_with_affinities( + &self, + dev: &UblkDev, + queue_affinities: &[UblkQueueAffinity], + ) -> serde_json::Value { let tgt_data = dev.get_target_json(); let mut map: serde_json::Map = serde_json::Map::new(); for qid in 0..dev.dev_info.nr_hw_queues { - let affinity = self.get_queue_affinity_effective(qid)?; + let affinity = &queue_affinities[qid as usize]; map.insert( format!("{}", qid), @@ -1783,13 +1771,35 @@ impl UblkCtrlInner { } json["queues"] = serde_json::Value::Object(map); + json + } - self.json_manager.set_json(json); - Ok(0) + /// Build JSON data for device + fn build_json_internal(&mut self, dev: &UblkDev) -> Result { + let mut queue_affinities = Vec::with_capacity(dev.dev_info.nr_hw_queues as usize); + + for qid in 0..dev.dev_info.nr_hw_queues { + let affinity = self.get_queue_affinity_effective(qid)?; + queue_affinities.push(affinity); + } + + Ok(self.build_json_with_affinities(dev, &queue_affinities)) } - async fn build_json_async(&mut self, dev: &UblkDev) -> Result { - // Update queue thread IDs if they exist and JSON already has content + /// Build JSON data for device (async) + async fn build_json_internal_async(&mut self, dev: &UblkDev) -> Result { + let mut queue_affinities = Vec::with_capacity(dev.dev_info.nr_hw_queues as usize); + + for qid in 0..dev.dev_info.nr_hw_queues { + let affinity = self.get_queue_affinity_effective_async(qid).await?; + queue_affinities.push(affinity); + } + + Ok(self.build_json_with_affinities(dev, &queue_affinities)) + } + + /// Update existing JSON with queue thread IDs + fn update_json_queue_tids(&mut self, dev: &UblkDev) -> bool { if !self.json_manager.get_json().is_null() && self.json_manager.get_json().is_object() && !self.json_manager.get_json().as_object().unwrap().is_empty() @@ -1803,37 +1813,30 @@ impl UblkCtrlInner { } } } - return Ok(0); + true + } else { + false } + } - let tgt_data = dev.get_target_json(); - let mut map: serde_json::Map = serde_json::Map::new(); - - for qid in 0..dev.dev_info.nr_hw_queues { - let affinity = self.get_queue_affinity_effective_async(qid).await?; - - map.insert( - format!("{}", qid), - serde_json::json!({ - "qid": qid, - "tid": self.queue_tids[qid as usize], - "affinity": affinity.to_bits_vec(), - }), - ); + fn build_json(&mut self, dev: &UblkDev) -> Result { + // Update queue thread IDs if JSON already exists + if self.update_json_queue_tids(dev) { + return Ok(0); } - let mut json = serde_json::json!({ - "dev_info": dev.dev_info, - "target": dev.tgt, - "target_flags": dev.flags.bits(), - }); + let json = self.build_json_internal(dev)?; + self.json_manager.set_json(json); + Ok(0) + } - if let Some(val) = tgt_data { - json["target_data"] = val.clone() + async fn build_json_async(&mut self, dev: &UblkDev) -> Result { + // Update queue thread IDs if JSON already exists + if self.update_json_queue_tids(dev) { + return Ok(0); } - json["queues"] = serde_json::Value::Object(map); - + let json = self.build_json_internal_async(dev).await?; self.json_manager.set_json(json); Ok(0) } From ec071044fc489bce130fbf388f554a1eba0e8b47 Mon Sep 17 00:00:00 2001 From: Ming Lei Date: Tue, 9 Sep 2025 13:35:50 +0000 Subject: [PATCH 13/19] refactor: unify new/new_async parameter validation Extract common parameter validation logic into validate_new_params() method. Both sync and async constructors now share identical validation logic, ensuring consistent parameter checking while maintaining separate initialization paths. Signed-off-by: Ming Lei --- src/ctrl.rs | 52 +++++++++++++++++++++------------------------------- 1 file changed, 21 insertions(+), 31 deletions(-) diff --git a/src/ctrl.rs b/src/ctrl.rs index 5bf3b45..89e7378 100644 --- a/src/ctrl.rs +++ b/src/ctrl.rs @@ -1947,16 +1947,15 @@ impl UblkCtrl { /// device exported json file, dump, or any misc management task. /// #[allow(clippy::too_many_arguments)] - pub fn new( - name: Option, + /// Validate parameters for new UblkCtrl creation + fn validate_new_params( + flags: u64, + dev_flags: UblkFlags, id: i32, nr_queues: u32, depth: u32, io_buf_bytes: u32, - flags: u64, - tgt_flags: u64, - dev_flags: UblkFlags, - ) -> Result { + ) -> Result<(), UblkError> { Self::validate_param((flags & !Self::UBLK_DRV_F_ALL) == 0)?; if !Path::new(CTRL_PATH).exists() { @@ -1983,6 +1982,21 @@ impl UblkCtrl { let page_sz = unsafe { libc::sysconf(libc::_SC_PAGESIZE) } as u32; Self::validate_param(io_buf_bytes <= MAX_BUF_SZ && (io_buf_bytes & (page_sz - 1)) == 0)?; + Ok(()) + } + + pub fn new( + name: Option, + id: i32, + nr_queues: u32, + depth: u32, + io_buf_bytes: u32, + flags: u64, + tgt_flags: u64, + dev_flags: UblkFlags, + ) -> Result { + Self::validate_new_params(flags, dev_flags, id, nr_queues, depth, io_buf_bytes)?; + let inner = RwLock::new(UblkCtrlInner::new_with_params( name, id, @@ -2032,31 +2046,7 @@ impl UblkCtrl { tgt_flags: u64, dev_flags: UblkFlags, ) -> Result { - Self::validate_param((flags & !Self::UBLK_DRV_F_ALL) == 0)?; - - if !Path::new(CTRL_PATH).exists() { - eprintln!("Please run `modprobe ublk_drv` first"); - return Err(UblkError::OtherError(-libc::ENOENT)); - } - - Self::validate_param(!dev_flags.intersects(UblkFlags::UBLK_DEV_F_INTERNAL_0))?; - - // Check mlock feature compatibility - if dev_flags.intersects(UblkFlags::UBLK_DEV_F_MLOCK_IO_BUFFER) { - // mlock feature is incompatible with certain other features - Self::validate_param( - (flags & sys::UBLK_F_USER_COPY as u64) == 0 - && (flags & sys::UBLK_F_AUTO_BUF_REG as u64) == 0 - && (flags & sys::UBLK_F_SUPPORT_ZERO_COPY as u64) == 0, - )?; - } - - Self::validate_param(id >= -1)?; - Self::validate_param(nr_queues <= sys::UBLK_MAX_NR_QUEUES)?; - Self::validate_param(depth <= sys::UBLK_MAX_QUEUE_DEPTH)?; - - let page_sz = unsafe { libc::sysconf(libc::_SC_PAGESIZE) } as u32; - Self::validate_param(io_buf_bytes <= MAX_BUF_SZ && (io_buf_bytes & (page_sz - 1)) == 0)?; + Self::validate_new_params(flags, dev_flags, id, nr_queues, depth, io_buf_bytes)?; let inner = RwLock::new( UblkCtrlInner::new_with_params_async( From d218875a62ce164ccceceec0ca6586ff789db6d2 Mon Sep 17 00:00:00 2001 From: Ming Lei Date: Tue, 9 Sep 2025 13:37:54 +0000 Subject: [PATCH 14/19] refactor: unify dump/dump_async printing logic Extract common device info printing logic into dump_device_info() helper method. Both sync and async dump methods now share identical formatting and display logic while maintaining separate data collection paths. Signed-off-by: Ming Lei --- src/ctrl.rs | 63 ++++++++++++++++++----------------------------------- 1 file changed, 21 insertions(+), 42 deletions(-) diff --git a/src/ctrl.rs b/src/ctrl.rs index 89e7378..ac5f9b1 100644 --- a/src/ctrl.rs +++ b/src/ctrl.rs @@ -2192,22 +2192,8 @@ impl UblkCtrl { /// /// The 1st part is from UblkCtrl.dev_info, and the 2nd part is /// retrieved from device's exported json file - pub fn dump(&self) { - let mut ctrl = self.get_inner_mut(); - let mut p = sys::ublk_params { - ..Default::default() - }; - - if ctrl.read_dev_info().is_err() { - error!("Dump dev {} failed\n", ctrl.dev_info.dev_id); - return; - } - - if ctrl.get_params(&mut p).is_err() { - error!("Dump dev {} failed\n", ctrl.dev_info.dev_id); - return; - } - + /// Print device info after data has been collected + fn dump_device_info(ctrl: &UblkCtrlInner, p: &sys::ublk_params) { let info = &ctrl.dev_info; println!( "\ndev id {}: nr_hw_queues {} queue_depth {} block size {} dev_capacity {}", @@ -2233,7 +2219,25 @@ impl UblkCtrl { info.owner_uid, info.owner_gid ); + } + + pub fn dump(&self) { + let mut ctrl = self.get_inner_mut(); + let mut p = sys::ublk_params { + ..Default::default() + }; + if ctrl.read_dev_info().is_err() { + error!("Dump dev {} failed\n", ctrl.dev_info.dev_id); + return; + } + + if ctrl.get_params(&mut p).is_err() { + error!("Dump dev {} failed\n", ctrl.dev_info.dev_id); + return; + } + + Self::dump_device_info(&ctrl, &p); ctrl.dump_from_json(); } @@ -2264,32 +2268,7 @@ impl UblkCtrl { e })?; - let info = &ctrl.dev_info; - println!( - "\ndev id {}: nr_hw_queues {} queue_depth {} block size {} dev_capacity {}", - info.dev_id, - info.nr_hw_queues, - info.queue_depth, - 1 << p.basic.logical_bs_shift, - p.basic.dev_sectors - ); - println!( - "\tmax rq size {} daemon pid {} flags 0x{:x} state {}", - info.max_io_buf_bytes, - info.ublksrv_pid, - info.flags, - ctrl.dev_state_desc() - ); - println!( - "\tublkc: {}:{} ublkb: {}:{} owner: {}:{}", - p.devt.char_major, - p.devt.char_minor, - p.devt.disk_major, - p.devt.disk_minor, - info.owner_uid, - info.owner_gid - ); - + Self::dump_device_info(&ctrl, &p); ctrl.dump_from_json(); Ok(()) } From db1770414cc3615f06f7b32f5ec09f117c23b6fa Mon Sep 17 00:00:00 2001 From: Ming Lei Date: Tue, 9 Sep 2025 14:12:14 +0000 Subject: [PATCH 15/19] feat: warn if control command type doesn't match with the context - async command should run in async environment with UBLK_CTRL_ASYNC_AWAIT set. - sync command should run in sync environment without UBLK_CTRL_ASYNC_AWAIT Signed-off-by: Ming Lei --- src/ctrl.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/ctrl.rs b/src/ctrl.rs index ac5f9b1..2b03229 100644 --- a/src/ctrl.rs +++ b/src/ctrl.rs @@ -1316,6 +1316,7 @@ impl UblkCtrlInner { async fn ublk_ctrl_cmd_async(&mut self, data: &UblkCtrlCmdData) -> Result { // Enforce async/await API usage: async methods can only be used when UBLK_CTRL_ASYNC_AWAIT is set if !self.force_async && !self.dev_flags.contains(UblkFlags::UBLK_CTRL_ASYNC_AWAIT) { + log::warn!("Warn: async cmd {:x} is run from sync context", data.cmd_op); return Err(UblkError::OtherError(-libc::EPERM)); } @@ -1339,6 +1340,7 @@ impl UblkCtrlInner { fn ublk_ctrl_cmd(&mut self, data: &UblkCtrlCmdData) -> Result { // Enforce non-async API usage: sync methods can only be used when UBLK_CTRL_ASYNC_AWAIT is NOT set if self.dev_flags.contains(UblkFlags::UBLK_CTRL_ASYNC_AWAIT) { + log::warn!("Warn: sync cmd {:x} is run from async context", data.cmd_op); return Err(UblkError::OtherError(-libc::EPERM)); } @@ -1777,7 +1779,7 @@ impl UblkCtrlInner { /// Build JSON data for device fn build_json_internal(&mut self, dev: &UblkDev) -> Result { let mut queue_affinities = Vec::with_capacity(dev.dev_info.nr_hw_queues as usize); - + for qid in 0..dev.dev_info.nr_hw_queues { let affinity = self.get_queue_affinity_effective(qid)?; queue_affinities.push(affinity); @@ -1787,9 +1789,12 @@ impl UblkCtrlInner { } /// Build JSON data for device (async) - async fn build_json_internal_async(&mut self, dev: &UblkDev) -> Result { + async fn build_json_internal_async( + &mut self, + dev: &UblkDev, + ) -> Result { let mut queue_affinities = Vec::with_capacity(dev.dev_info.nr_hw_queues as usize); - + for qid in 0..dev.dev_info.nr_hw_queues { let affinity = self.get_queue_affinity_effective_async(qid).await?; queue_affinities.push(affinity); From 8a1554753b81bdda0744419906e4792f35735501 Mon Sep 17 00:00:00 2001 From: Ming Lei Date: Tue, 9 Sep 2025 00:33:22 +0000 Subject: [PATCH 16/19] fix: apply consistent format `cargo fmt` Signed-off-by: Ming Lei --- .rustfmt.toml | 12 ++++++++++++ 1 file changed, 12 insertions(+) create mode 100644 .rustfmt.toml diff --git a/.rustfmt.toml b/.rustfmt.toml new file mode 100644 index 0000000..77ab7fe --- /dev/null +++ b/.rustfmt.toml @@ -0,0 +1,12 @@ +edition = "2021" +max_width = 100 +use_small_heuristics = "Default" +tab_spaces = 4 +newline_style = "Unix" +use_field_init_shorthand = true +reorder_imports = true +reorder_modules = true +remove_nested_parens = true +merge_derives = true +use_try_shorthand = true +force_explicit_abi = true From 6056bedcc692dbcc198ef8719406be5714f5629d Mon Sep 17 00:00:00 2001 From: Ming Lei Date: Thu, 11 Sep 2025 08:40:13 +0000 Subject: [PATCH 17/19] uring_async: move ublk_join_tasks() into test code smol doesn't belong to libublk, we shouldn't add more such mess. Signed-off-by: Ming Lei --- src/ctrl.rs | 40 ++++++++++++++++++++++++++++++++-------- src/uring_async.rs | 24 ------------------------ 2 files changed, 32 insertions(+), 32 deletions(-) diff --git a/src/ctrl.rs b/src/ctrl.rs index 2b03229..d01888d 100644 --- a/src/ctrl.rs +++ b/src/ctrl.rs @@ -62,8 +62,8 @@ pub(crate) use with_ctrl_ring_mut_internal; /// Initialize the thread-local control ring using a custom closure /// /// This API allows users to customize the io_uring initialization for control operations. -/// The closure receives a mutable reference to the Option and can conditionally initialize -/// it if not already set. If the thread-local variable is already initialized, the closure +/// The closure receives a mutable reference to the Option and can conditionally initialize +/// it if not already set. If the thread-local variable is already initialized, the closure /// does nothing. /// /// # Arguments @@ -2819,12 +2819,42 @@ impl UblkCtrl { mod tests { use crate::ctrl::{UblkCtrlBuilder, UblkQueueAffinity}; use crate::io::{UblkDev, UblkIOCtx, UblkQueue}; + use crate::uring_async::ublk_wake_task; use crate::UblkError; use crate::{ctrl::UblkCtrl, UblkFlags, UblkIORes}; use std::cell::Cell; use std::path::Path; use std::rc::Rc; + /// Block on all tasks in the executor until they are finished + fn ublk_join_tasks( + exe: &smol::LocalExecutor, + tasks: Vec>, + ) -> Result<(), UblkError> { + use io_uring::{squeue, IoUring}; + loop { + // Check if all tasks are finished + if tasks.iter().all(|task| task.is_finished()) { + break; + } + + // Drive the executor to make progress on tasks + while exe.try_tick() {} + + // Handle control uring events + let entry = + crate::ctrl::with_ctrl_ring_mut_internal!(|r: &mut IoUring| { + match r.submit_and_wait(0) { + Err(_) => None, + _ => r.completion().next(), + } + }); + if let Some(cqe) = entry { + ublk_wake_task(cqe.user_data(), &cqe); + } + } + Ok(()) + } #[test] fn test_ublk_get_features() { match UblkCtrl::get_features() { @@ -2930,8 +2960,6 @@ mod tests { #[test] fn test_get_queue_affinity_async() { - use crate::uring_async::ublk_join_tasks; - let exe_rc = Rc::new(smol::LocalExecutor::new()); let exe = exe_rc.clone(); @@ -3200,8 +3228,6 @@ mod tests { /// Test async APIs #[test] fn test_async_apis() { - use crate::uring_async::ublk_join_tasks; - let _ = env_logger::builder() .format_target(false) .format_timestamp(None) @@ -3390,7 +3416,6 @@ mod tests { /// Test async APIs for building ublk device #[test] fn test_create_ublk_async() { - use crate::uring_async::ublk_join_tasks; let _ = env_logger::builder() .format_target(false) .format_timestamp(None) @@ -3417,7 +3442,6 @@ mod tests { #[test] fn test_ctrl_async_await_flag_enforcement() { // Test with async flag support using a sync runtime context - use crate::uring_async::ublk_join_tasks; let exe_rc = std::rc::Rc::new(smol::LocalExecutor::new()); let exe = exe_rc.clone(); diff --git a/src/uring_async.rs b/src/uring_async.rs index cf2080c..10097f1 100644 --- a/src/uring_async.rs +++ b/src/uring_async.rs @@ -244,27 +244,3 @@ pub fn ublk_wait_and_handle_ios(exe: &smol::LocalExecutor, q: &UblkQueue) { } q.unregister_io_bufs(); } - -/// Block on all tasks in the executor until they are finished -#[cfg(test)] -pub(crate) fn ublk_join_tasks( - exe: &smol::LocalExecutor, - tasks: Vec>, -) -> Result<(), UblkError> { - loop { - // Check if all tasks are finished - if tasks.iter().all(|task| task.is_finished()) { - break; - } - - // Drive the executor to make progress on tasks - while exe.try_tick() {} - - // Handle control uring events - let entry = crate::ctrl::with_ctrl_ring_mut_internal!(|ring: &mut IoUring| ublk_try_reap_cqe(ring, 0)); - if let Some(cqe) = entry { - ublk_wake_task(cqe.user_data(), &cqe); - } - } - Ok(()) -} From 72bbd30e5286e37b54965467a09b64c4ba232fc4 Mon Sep 17 00:00:00 2001 From: Ming Lei Date: Thu, 11 Sep 2025 11:45:50 +0000 Subject: [PATCH 18/19] test: basic: show totally async style Signed-off-by: Ming Lei --- Cargo.toml | 1 + src/io.rs | 5 ++--- tests/basic.rs | 27 +++++++++++++++++++++++++-- 3 files changed, 28 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 581469e..81105be 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,6 +48,7 @@ bitflags = "2.4.1" futures-timer = "3.0" [dev-dependencies] +ctor = "0.5" block-utils = "0.11.0" tempfile = "3.6.0" regex = "1.8.4" diff --git a/src/io.rs b/src/io.rs index bb6ca7b..8bc69d9 100644 --- a/src/io.rs +++ b/src/io.rs @@ -85,8 +85,8 @@ //! use libublk::sys; //! //! fn example(queue: &UblkQueue) -> Result<(), libublk::UblkError> { -//! let auto_reg = sys::ublk_auto_buf_reg { -//! index: 0, flags: 0, reserved0: 0, reserved1: 0 +//! let auto_reg = sys::ublk_auto_buf_reg { +//! index: 0, flags: 0, reserved0: 0, reserved1: 0 //! }; //! let auto_desc = BufDesc::AutoReg(auto_reg); //! let future = queue.submit_io_cmd_unified(1, sys::UBLK_U_IO_FETCH_REQ, auto_desc, -1)?; @@ -991,7 +991,6 @@ impl UblkQueue<'_> { // Users can call init_task_ring() before UblkQueue::new() to customize initialization init_task_ring_default(sq_depth as u32, cq_depth as u32)?; - let depth = dev.dev_info.queue_depth as u32; let cdev_fd = dev.cdev_file.as_raw_fd(); let cmd_buf_sz = UblkQueue::cmd_buf_sz(depth) as usize; diff --git a/tests/basic.rs b/tests/basic.rs index e45300f..41df67d 100644 --- a/tests/basic.rs +++ b/tests/basic.rs @@ -5,6 +5,7 @@ mod integration { use libublk::io::{BufDescList, UblkDev, UblkIOCtx, UblkQueue}; use libublk::override_sqe; use libublk::uring_async::ublk_wait_and_handle_ios; + use libublk::uring_async::ublk_wake_task; use libublk::{ctrl::UblkCtrl, ctrl::UblkCtrlBuilder, sys, BufDesc, UblkFlags, UblkIORes}; use std::env; use std::io::{BufRead, BufReader}; @@ -13,6 +14,15 @@ mod integration { use std::rc::Rc; use std::sync::{Arc, Mutex}; + #[ctor::ctor] + fn init_logger() { + let _ = env_logger::builder() + .format_target(false) + .format_timestamp(None) + .is_test(true) + .try_init(); + } + fn run_ublk_disk_sanity_test(ctrl: &UblkCtrl, dev_flags: UblkFlags) { use std::os::unix::fs::PermissionsExt; let dev_path = ctrl.get_cdev_path(); @@ -512,8 +522,21 @@ mod integration { })); } - ublk_wait_and_handle_ios(&exe, &q_rc); - smol::block_on(async { futures::future::join_all(f_vec).await }); + // Show standard async way, however, yield_now() does hurt perf, which is + // obviously slower than try_tick() + let q = q_rc.clone(); + f_vec.push(exe.spawn(async move { + loop { + if q.flush_and_wake_io_tasks(|data, cqe, _| ublk_wake_task(data, cqe), 1) + .is_err() + { + break; + } + //yield for handling incoming command + smol::future::yield_now().await; + } + })); + smol::block_on(exe.run(futures::future::join_all(f_vec))); }; ctrl.run_target(tgt_init, q_fn, move |ctrl: &UblkCtrl| { From a85c23cd978827d00278ec3257c2055d87799f49 Mon Sep 17 00:00:00 2001 From: Ming Lei Date: Fri, 12 Sep 2025 13:21:36 +0000 Subject: [PATCH 19/19] refactor: replace smol::block_on async block with exe.run() calls Update all smol executor usage to use exe.run(async { ... }) instead of smol::block_on(async { ... }) for proper executor integration. Signed-off-by: Ming Lei --- README.md | 2 +- examples/loop.rs | 28 +++++++++++++--------------- examples/null.rs | 2 +- examples/ramdisk.rs | 35 ++++++++++++++++++----------------- src/ctrl.rs | 2 +- tests/basic.rs | 4 ++-- 6 files changed, 36 insertions(+), 37 deletions(-) diff --git a/README.md b/README.md index 9351924..a574fb7 100644 --- a/README.md +++ b/README.md @@ -70,7 +70,7 @@ fn q_fn(qid: u16, dev: &UblkDev) { // Drive smol executor, won't exit until queue is dead libublk::uring_async::ublk_wait_and_handle_ios(&exe, &q_rc); - smol::block_on(async { futures::future::join_all(f_vec).await }); + smol::block_on(exe.run(async { futures::future::join_all(f_vec).await })); } fn main() { diff --git a/examples/loop.rs b/examples/loop.rs index bc4a55d..49e6577 100644 --- a/examples/loop.rs +++ b/examples/loop.rs @@ -4,9 +4,9 @@ use clap::{Arg, ArgAction, Command}; use ilog::IntLog; use io_uring::{opcode, squeue, types}; use libublk::helpers::IoBuf; -use libublk::io::{UblkDev, UblkIOCtx, UblkQueue, BufDescList}; +use libublk::io::{BufDescList, UblkDev, UblkIOCtx, UblkQueue}; use libublk::uring_async::ublk_wait_and_handle_ios; -use libublk::{ctrl::UblkCtrl, sys, UblkError, UblkFlags, UblkIORes, BufDesc}; +use libublk::{ctrl::UblkCtrl, sys, BufDesc, UblkError, UblkFlags, UblkIORes}; use serde::Serialize; use std::os::unix::fs::FileTypeExt; use std::os::unix::io::AsRawFd; @@ -198,22 +198,16 @@ fn lo_handle_io_cmd_sync(q: &UblkQueue<'_>, tag: u16, i: &UblkIOCtx, io_slice: & assert!(cqe_tag == tag as u32); if res != -(libc::EAGAIN) { - q.complete_io_cmd_unified( - tag, - BufDesc::Slice(io_slice), - Ok(UblkIORes::Result(res)), - ).unwrap(); + q.complete_io_cmd_unified(tag, BufDesc::Slice(io_slice), Ok(UblkIORes::Result(res))) + .unwrap(); return; } } let res = __lo_prep_submit_io_cmd(iod); if res < 0 { - q.complete_io_cmd_unified( - tag, - BufDesc::Slice(io_slice), - Ok(UblkIORes::Result(res)), - ).unwrap(); + q.complete_io_cmd_unified(tag, BufDesc::Slice(io_slice), Ok(UblkIORes::Result(res))) + .unwrap(); } else { let op = iod.op_flags & 0xff; // either start to handle or retry @@ -245,7 +239,8 @@ fn q_fn(qid: u16, dev: &UblkDev) { UblkQueue::new(qid, dev) .unwrap() .regiser_io_bufs(Some(&bufs)) - .submit_fetch_commands_unified(BufDescList::Slices(Some(&bufs))).unwrap() + .submit_fetch_commands_unified(BufDescList::Slices(Some(&bufs))) + .unwrap() .wait_and_handle_io(lo_io_handler); } @@ -267,7 +262,10 @@ fn q_a_fn(qid: u16, dev: &UblkDev, depth: u16) { q.register_io_buf(tag, &buf); loop { - let cmd_res = q.submit_io_cmd_unified(tag, cmd_op, BufDesc::Slice(buf.as_slice()), res).unwrap().await; + let cmd_res = q + .submit_io_cmd_unified(tag, cmd_op, BufDesc::Slice(buf.as_slice()), res) + .unwrap() + .await; if cmd_res == sys::UBLK_IO_RES_ABORT { break; } @@ -282,7 +280,7 @@ fn q_a_fn(qid: u16, dev: &UblkDev, depth: u16) { })); } ublk_wait_and_handle_ios(&exe, &q_rc); - smol::block_on(async { futures::future::join_all(f_vec).await }); + smol::block_on(exe.run(async { futures::future::join_all(f_vec).await })); } fn __loop_add( diff --git a/examples/null.rs b/examples/null.rs index a36dcd3..69afe17 100755 --- a/examples/null.rs +++ b/examples/null.rs @@ -109,7 +109,7 @@ fn q_async_fn(qid: u16, dev: &UblkDev, user_copy: bool) { })); } ublk_wait_and_handle_ios(&exe, &q_rc); - smol::block_on(async { futures::future::join_all(f_vec).await }); + smol::block_on(exe.run(async { futures::future::join_all(f_vec).await })); } fn __null_add( diff --git a/examples/ramdisk.rs b/examples/ramdisk.rs index ce6bfde..9c32a2b 100644 --- a/examples/ramdisk.rs +++ b/examples/ramdisk.rs @@ -9,13 +9,13 @@ use libublk::ctrl::UblkCtrl; use libublk::helpers::IoBuf; use libublk::io::{UblkDev, UblkQueue}; use libublk::uring_async::ublk_run_ctrl_task; -use libublk::{UblkError, UblkFlags, BufDesc}; +use libublk::{BufDesc, UblkError, UblkFlags}; use std::io::{Error, ErrorKind}; use std::rc::Rc; use std::sync::Arc; /// Handle I/O operations using safe slice-based memory operations. -/// +/// /// This function demonstrates how slice operations provide memory safety /// benefits over raw pointer manipulation: /// - Automatic bounds checking prevents buffer overflows @@ -34,7 +34,7 @@ fn handle_io(q: &UblkQueue, tag: u16, io_buf: &mut [u8], ramdisk_storage: &mut [ if off.saturating_add(bytes) > ramdisk_storage.len() { return -libc::EINVAL; } - + // Ensure I/O buffer has sufficient capacity for the operation // Slice bounds checking prevents reading/writing beyond buffer limits if bytes > io_buf.len() { @@ -51,7 +51,7 @@ fn handle_io(q: &UblkQueue, tag: u16, io_buf: &mut [u8], ramdisk_storage: &mut [ let src = &ramdisk_storage[off..off + bytes]; let dst = &mut io_buf[..bytes]; dst.copy_from_slice(src); - }, + } libublk::sys::UBLK_IO_OP_WRITE => { // Safe slice-to-slice copy operation replaces unsafe libc::memcpy // This approach eliminates common memory safety issues: @@ -61,7 +61,7 @@ fn handle_io(q: &UblkQueue, tag: u16, io_buf: &mut [u8], ramdisk_storage: &mut [ let src = &io_buf[..bytes]; let dst = &mut ramdisk_storage[off..off + bytes]; dst.copy_from_slice(src); - }, + } libublk::sys::UBLK_IO_OP_FLUSH => { // Flush operation requires no memory copying } @@ -75,17 +75,20 @@ fn handle_io(q: &UblkQueue, tag: u16, io_buf: &mut [u8], ramdisk_storage: &mut [ async fn io_task(q: &UblkQueue<'_>, tag: u16, ramdisk_storage: &mut [u8]) { let buf_size = q.dev.dev_info.max_io_buf_bytes as usize; - + // Use IoBuf for safe I/O buffer management with automatic memory alignment // IoBuf provides slice-based access through Deref/DerefMut traits let mut buffer = IoBuf::::new(buf_size); - + // No longer need raw pointer since we use the unified API with slices let mut cmd_op = libublk::sys::UBLK_U_IO_FETCH_REQ; let mut res = 0; loop { - let cmd_res = q.submit_io_cmd_unified(tag, cmd_op, BufDesc::Slice(buffer.as_slice()), res).unwrap().await; + let cmd_res = q + .submit_io_cmd_unified(tag, cmd_op, BufDesc::Slice(buffer.as_slice()), res) + .unwrap() + .await; if cmd_res == libublk::sys::UBLK_IO_RES_ABORT { break; } @@ -120,7 +123,7 @@ fn start_dev_fn( } }); ublk_run_ctrl_task(exe, q, &task)?; - smol::block_on(task) + smol::block_on(exe.run(task)) } fn write_dev_id(ctrl: &UblkCtrl, efd: i32) -> Result { @@ -174,12 +177,12 @@ fn rd_add_dev(dev_id: i32, ramdisk_storage: &mut [u8], size: u64, for_add: bool, // spawn async io tasks let mut f_vec = Vec::new(); - + // Extract raw pointer and length for sharing across async tasks // This is the minimal unsafe code needed for async context sharing let storage_ptr = ramdisk_storage.as_mut_ptr(); let storage_len = ramdisk_storage.len(); - + for tag in 0..ctrl.dev_info().queue_depth as u16 { let q_clone = q_rc.clone(); @@ -189,9 +192,7 @@ fn rd_add_dev(dev_id: i32, ramdisk_storage: &mut [u8], size: u64, for_add: bool, // 1. The original ramdisk_storage buffer outlives all async tasks // 2. Each task operates on different regions controlled by I/O offset bounds // 3. The slice provides bounds checking for all operations within io_task - let storage_slice = unsafe { - std::slice::from_raw_parts_mut(storage_ptr, storage_len) - }; + let storage_slice = unsafe { std::slice::from_raw_parts_mut(storage_ptr, storage_len) }; io_task(&q_clone, tag, storage_slice).await; })); } @@ -206,7 +207,7 @@ fn rd_add_dev(dev_id: i32, ramdisk_storage: &mut [u8], size: u64, for_add: bool, } _ => eprintln!("device can't be started"), } - smol::block_on(async { futures::future::join_all(f_vec).await }); + smol::block_on(exec.run(async { futures::future::join_all(f_vec).await })); } fn rd_get_device_size(ctrl: &UblkCtrl) -> u64 { @@ -245,11 +246,11 @@ fn test_add(recover: usize) { // Create ramdisk storage using IoBuf for proper alignment and memory management // IoBuf provides safe slice access while maintaining required memory alignment let mut ramdisk_buf = libublk::helpers::IoBuf::::new(size as usize); - + // Zero-initialize the ramdisk storage for consistent behavior // Using safe slice operations instead of unsafe memory manipulation ramdisk_buf.zero_buf(); - + // Get mutable slice for safe operations within rd_add_dev let storage_slice = ramdisk_buf.as_mut_slice(); rd_add_dev(dev_id, storage_slice, size, recover == 0, efd); diff --git a/src/ctrl.rs b/src/ctrl.rs index d01888d..8f582b1 100644 --- a/src/ctrl.rs +++ b/src/ctrl.rs @@ -3395,7 +3395,7 @@ mod tests { q_async_fn(&exe, &q, dev.dev_info.queue_depth as u16, &mut f_vec); crate::uring_async::ublk_wait_and_handle_ios(&exe, &q_rc); - smol::block_on(async { futures::future::join_all(f_vec).await }); + smol::block_on(exe.run(async { futures::future::join_all(f_vec).await })); }); ctrl.start_dev_async(dev_arc).await?; diff --git a/tests/basic.rs b/tests/basic.rs index 41df67d..a728121 100644 --- a/tests/basic.rs +++ b/tests/basic.rs @@ -247,7 +247,7 @@ mod integration { } ublk_wait_and_handle_ios(&exe, &q_rc); - smol::block_on(async { futures::future::join_all(f_vec).await }); + smol::block_on(exe.run(async { futures::future::join_all(f_vec).await })); }; // kick off our targets @@ -348,7 +348,7 @@ mod integration { } ublk_wait_and_handle_ios(&exe, &q_rc); - smol::block_on(async { futures::future::join_all(f_vec).await }); + smol::block_on(exe.run(async { futures::future::join_all(f_vec).await })); }; // kick off our targets