@@ -22,12 +22,18 @@ use crate::binary::command::{
2222use crate :: binary:: handlers:: utils:: receive_and_validate;
2323use crate :: shard:: IggyShard ;
2424use crate :: shard:: system:: messages:: PollingArgs ;
25+ use crate :: shard:: transmission:: message:: {
26+ ShardMessage , ShardRequest , ShardRequestPayload , SocketTransferPayload ,
27+ } ;
28+ use crate :: streaming:: segments:: IggyMessagesBatchSet ;
2529use crate :: streaming:: session:: Session ;
30+ use crate :: streaming:: { streams, topics} ;
2631use anyhow:: Result ;
27- use iggy_common:: SenderKind ;
32+ use iggy_common:: sharding:: IggyNamespace ;
33+ use iggy_common:: { ConsumerKind , SenderKind } ;
2834use iggy_common:: { IggyError , PollMessages , PooledBuffer } ;
2935use std:: rc:: Rc ;
30- use tracing:: { debug, trace} ;
36+ use tracing:: { debug, error , trace} ;
3137
3238#[ derive( Debug ) ]
3339pub struct IggyPollMetadata {
@@ -57,6 +63,7 @@ impl ServerCommandHandler for PollMessages {
5763 shard : & Rc < IggyShard > ,
5864 ) -> Result < HandlerResult , IggyError > {
5965 debug ! ( "session: {session}, command: {self}" ) ;
66+
6067 let PollMessages {
6168 consumer,
6269 partition_id,
@@ -66,10 +73,95 @@ impl ServerCommandHandler for PollMessages {
6673 stream_id,
6774 topic_id,
6875 } = self ;
69- let args = PollingArgs :: new ( strategy, count, auto_commit) ;
76+
77+ shard. ensure_topic_exists ( & stream_id, & topic_id) ?;
7078
7179 let user_id = session. get_user_id ( ) ;
7280 let client_id = session. client_id ;
81+
82+ let args = PollingArgs :: new ( strategy, count, auto_commit) ;
83+
84+ if consumer. kind == ConsumerKind :: Consumer
85+ && partition_id. is_some ( )
86+ && let Some ( ( polling_consumer, resolved_partition_id) ) = shard
87+ . resolve_consumer_with_partition_id (
88+ & stream_id,
89+ & topic_id,
90+ & consumer,
91+ client_id,
92+ partition_id,
93+ true ,
94+ ) ?
95+ {
96+ let numeric_stream_id = shard
97+ . streams
98+ . with_stream_by_id ( & stream_id, streams:: helpers:: get_stream_id ( ) ) ;
99+ let numeric_topic_id = shard. streams . with_topic_by_id (
100+ & stream_id,
101+ & topic_id,
102+ topics:: helpers:: get_topic_id ( ) ,
103+ ) ;
104+ let namespace =
105+ IggyNamespace :: new ( numeric_stream_id, numeric_topic_id, resolved_partition_id) ;
106+
107+ let enabled_socket_migration = shard. config . tcp . socket_migration ;
108+
109+ if enabled_socket_migration
110+ && !session. is_migrated ( )
111+ && let Some ( target_shard) = shard. find_shard ( & namespace)
112+ && target_shard. id != shard. id
113+ {
114+ debug ! (
115+ "TCP wrong shard detected: migrating from_shard {}, to_shard {}" ,
116+ shard. id, target_shard. id
117+ ) ;
118+
119+ shard. ensure_partition_exists ( & stream_id, & topic_id, resolved_partition_id) ?;
120+
121+ if let Some ( fd) = sender. take_and_migrate_tcp ( ) {
122+ let data = SocketTransferPayload :: PollMessages {
123+ consumer : polling_consumer,
124+ args : args. clone ( ) ,
125+ } ;
126+
127+ let payload = ShardRequestPayload :: SocketTransfer {
128+ fd,
129+ from_shard : shard. id ,
130+ client_id : session. client_id ,
131+ user_id,
132+ address : session. ip_address ,
133+ initial_data : data,
134+ } ;
135+
136+ let request = ShardRequest :: new (
137+ stream_id. clone ( ) ,
138+ topic_id. clone ( ) ,
139+ resolved_partition_id,
140+ payload,
141+ ) ;
142+
143+ let socket_transfer_msg = ShardMessage :: Request ( request) ;
144+
145+ debug ! (
146+ "Sending migrate socket to another shard {:?}" ,
147+ socket_transfer_msg
148+ ) ;
149+
150+ if let Err ( e) = shard
151+ . send_request_to_shard_or_recoil ( Some ( & namespace) , socket_transfer_msg)
152+ . await
153+ {
154+ error ! ( "tranfer socket to another shard failed, drop connection. {e:?}" ) ;
155+ return Ok ( HandlerResult :: Finished ) ;
156+ }
157+
158+ return Ok ( HandlerResult :: Migrated {
159+ to_shard : target_shard. id ,
160+ } ) ;
161+ }
162+ }
163+ }
164+
73165 let ( metadata, mut batch) = shard
74166 . poll_messages (
75167 client_id,
@@ -82,41 +174,52 @@ impl ServerCommandHandler for PollMessages {
82174 )
83175 . await ?;
84176
85- // Collect all chunks first into a Vec to extend their lifetimes.
86- // This ensures the Bytes (in reality Arc<[u8]>) references from each IggyMessagesBatch stay alive
87- // throughout the async vectored I/O operation, preventing "borrowed value does not live
88- // long enough" errors while optimizing transmission by using larger chunks.
89-
90- // 4 bytes for partition_id + 8 bytes for current_offset + 4 bytes for messages_count + size of all batches.
91- let response_length = 4 + 8 + 4 + batch. size ( ) ;
92- let response_length_bytes = response_length. to_le_bytes ( ) ;
93-
94- let mut bufs = Vec :: with_capacity ( batch. containers_count ( ) + 5 ) ;
95- let mut partition_id_buf = PooledBuffer :: with_capacity ( 4 ) ;
96- let mut current_offset_buf = PooledBuffer :: with_capacity ( 8 ) ;
97- let mut count_buf = PooledBuffer :: with_capacity ( 4 ) ;
98- partition_id_buf. put_u32_le ( metadata. partition_id ) ;
99- current_offset_buf. put_u64_le ( metadata. current_offset ) ;
100- count_buf. put_u32_le ( batch. count ( ) ) ;
101-
102- bufs. push ( partition_id_buf) ;
103- bufs. push ( current_offset_buf) ;
104- bufs. push ( count_buf) ;
105-
106- batch. iter_mut ( ) . for_each ( |m| bufs. push ( m. take_messages ( ) ) ) ;
107- trace ! (
108- "Sending {} messages to client ({} bytes) to client" ,
109- batch. count( ) ,
110- response_length
111- ) ;
177+ let ( response_length_bytes, bufs) = prepare_message_batch_buffers ( & metadata, & mut batch) ;
112178
113179 sender
114180 . send_ok_response_vectored ( & response_length_bytes, bufs)
115181 . await ?;
182+
116183 Ok ( HandlerResult :: Finished )
117184 }
118185}
119186
187+ pub fn prepare_message_batch_buffers (
188+ metadata : & IggyPollMetadata ,
189+ batch : & mut IggyMessagesBatchSet ,
190+ ) -> ( [ u8 ; 4 ] , Vec < PooledBuffer > ) {
191+ // Collect all chunks first into a Vec to extend their lifetimes.
192+ // This ensures the Bytes (in reality Arc<[u8]>) references from each IggyMessagesBatch stay alive
193+ // throughout the async vectored I/O operation, preventing "borrowed value does not live
194+ // long enough" errors while optimizing transmission by using larger chunks.
195+
196+ // 4 bytes for partition_id + 8 bytes for current_offset + 4 bytes for messages_count + size of all batches.
197+ let response_length = 4 + 8 + 4 + batch. size ( ) ;
198+ let response_length_bytes = response_length. to_le_bytes ( ) ;
199+
200+ let mut bufs = Vec :: with_capacity ( batch. containers_count ( ) + 5 ) ;
201+ let mut partition_id_buf = PooledBuffer :: with_capacity ( 4 ) ;
202+ let mut current_offset_buf = PooledBuffer :: with_capacity ( 8 ) ;
203+ let mut count_buf = PooledBuffer :: with_capacity ( 4 ) ;
204+ partition_id_buf. put_u32_le ( metadata. partition_id ) ;
205+ current_offset_buf. put_u64_le ( metadata. current_offset ) ;
206+ count_buf. put_u32_le ( batch. count ( ) ) ;
207+
208+ bufs. push ( partition_id_buf) ;
209+ bufs. push ( current_offset_buf) ;
210+ bufs. push ( count_buf) ;
211+
212+ batch. iter_mut ( ) . for_each ( |m| bufs. push ( m. take_messages ( ) ) ) ;
213+
214+ trace ! (
215+ "Sending {} messages to client ({} bytes) to client" ,
216+ batch. count( ) ,
217+ response_length
218+ ) ;
219+
220+ ( response_length_bytes, bufs)
221+ }
222+
120223impl BinaryServerCommand for PollMessages {
121224 async fn from_sender (
122225 sender : & mut SenderKind ,
0 commit comments