@@ -115,14 +115,15 @@ impl GClient {
115115
116116 /// Set the encryption key for the client. Only applicable to v5 protocol.
117117 pub async fn set_codec ( & self , key : u8 ) -> Result < ( ) , ClientError > {
118- Ok ( match & * self . protocol {
118+ match & * self . protocol {
119119 GProtocolEnum :: V5 ( proto) => {
120120 proto. set_encryption_key ( key) ?;
121121 }
122122 _ => {
123123 return Err ( ClientError :: UnsupportedProtocolVersion ) ;
124124 }
125- } )
125+ } ;
126+ Ok ( ( ) )
126127 }
127128
128129 /// Register an event handler for a specific PacketId.
@@ -155,64 +156,62 @@ impl GClient {
155156 }
156157
157158 /// Receiving packets loop.
158- fn read_loop_fut (
159+ async fn read_loop_fut (
159160 protocol : Arc < GProtocolEnum < BufReader < OwnedReadHalf > , OwnedWriteHalf > > ,
160161 pending_requests : PendingRequests ,
161162 event_handlers : Arc < Mutex < HashMap < PacketId , Vec < Arc < EventHandlerFn > > > > > ,
162163 shutdown : Arc < Notify > ,
163164 join_set : Arc < Mutex < JoinSet < ( ) > > > ,
164- ) -> impl Future < Output = ( ) > {
165- async move {
166- loop {
167- tokio:: select! {
168- packet_result = async {
169- protocol. read( ) . await
170- } => {
171- match packet_result {
172- Ok ( packet) => {
173- let packet_id = packet. id( ) ;
174- let mut pending = pending_requests. lock( ) . await ;
175- if let Some ( sender) = pending. remove( & packet_id) {
176- log:: debug!( "Received response packet: {:?}" , packet) ;
177- let _ = sender. send( packet. clone( ) ) ;
178- } else {
179- log:: debug!( "Received unsolicited packet: {:?}" , packet) ;
180- let event = PacketEvent { packet: packet. clone( ) } ;
181- let handlers = {
182- let map = event_handlers. lock( ) . await ;
183- map. get( & packet_id) . cloned( )
184- } ;
185- if let Some ( handler_vec) = handlers {
186- for handler in handler_vec {
187- let event_clone = event. clone( ) ;
188- let shutdown_clone = Arc :: clone( & shutdown) ;
189- let task = async move {
190- tokio:: select! {
191- _ = shutdown_clone. notified( ) => {
192- log:: debug!( "Callback task received shutdown signal, exiting early." ) ;
193- }
194- _ = ( handler) ( event_clone) => {
195- // Callback completed normally.
196- }
165+ ) {
166+ loop {
167+ tokio:: select! {
168+ packet_result = async {
169+ protocol. read( ) . await
170+ } => {
171+ match packet_result {
172+ Ok ( packet) => {
173+ let packet_id = packet. id( ) ;
174+ let mut pending = pending_requests. lock( ) . await ;
175+ if let Some ( sender) = pending. remove( & packet_id) {
176+ log:: debug!( "Received response packet: {:?}" , packet) ;
177+ let _ = sender. send( packet. clone( ) ) ;
178+ } else {
179+ log:: debug!( "Received unsolicited packet: {:?}" , packet) ;
180+ let event = PacketEvent { packet: packet. clone( ) } ;
181+ let handlers = {
182+ let map = event_handlers. lock( ) . await ;
183+ map. get( & packet_id) . cloned( )
184+ } ;
185+ if let Some ( handler_vec) = handlers {
186+ for handler in handler_vec {
187+ let event_clone = event. clone( ) ;
188+ let shutdown_clone = Arc :: clone( & shutdown) ;
189+ let task = async move {
190+ tokio:: select! {
191+ _ = shutdown_clone. notified( ) => {
192+ log:: debug!( "Callback task received shutdown signal, exiting early." ) ;
197193 }
198- } ;
199- let mut join_set_mut = join_set. lock( ) . await ;
200- join_set_mut. spawn( task) ;
201- }
194+ _ = ( handler) ( event_clone) => {
195+ // Callback completed normally.
196+ }
197+ }
198+ } ;
199+ let mut join_set_mut = join_set. lock( ) . await ;
200+ join_set_mut. spawn( task) ;
202201 }
203202 }
204203 }
205- Err ( e ) => {
206- log :: error! ( "Error reading packet: {:?}" , e ) ;
207- shutdown . notify_waiters ( ) ;
208- break ;
209- }
204+ }
205+ Err ( e ) => {
206+ log :: error! ( "Error reading packet: {:?}" , e ) ;
207+ shutdown . notify_waiters ( ) ;
208+ break ;
210209 }
211210 }
212- _ = shutdown . notified ( ) => {
213- log :: debug! ( "Shutdown signal received in read loop." ) ;
214- break ;
215- }
211+ }
212+ _ = shutdown . notified ( ) => {
213+ log :: debug! ( "Shutdown signal received in read loop." ) ;
214+ break ;
216215 }
217216 }
218217 }
0 commit comments