1+ //! Connection management module
2+ //!
3+ //! This module provides a connection management layer to abstract
4+ //! connection handling from client implementations.
5+
6+ use std::sync::Arc;
7+ use std::time::Duration;
8+ use tokio::io::{AsyncRead, AsyncWrite, BufReader};
9+ use tokio::net::TcpStream;
10+ use tokio::sync::{Mutex, RwLock};
11+ use tokio::time::timeout;
12+
13+ use crate::config::ClientConfig;
14+ use crate::net::client::ClientError;
15+ use crate::net::protocol::{GProtocolEnum, Protocol};
16+
17+ use super::gclient::GClient;
18+
19+ /// Connection state
20+ #[derive(Debug, Clone, Copy, PartialEq, Eq)]
21+ pub enum ConnectionState {
22+ /// The connection is disconnected
23+ Disconnected,
24+ /// The connection is connecting
25+ Connecting,
26+ /// The connection is connected
27+ Connected,
28+ /// The connection is reconnecting
29+ Reconnecting,
30+ /// The connection is in an error state
31+ Error,
32+ }
33+
34+ /// Represents a connection to a server
35+ pub struct Connection {
36+ /// The protocol used by this connection, None when disconnected
37+ client: RwLock<Option<Arc<GClient>>>,
38+ /// The current state of the connection
39+ state: RwLock<ConnectionState>,
40+ /// The configuration for this connection
41+ config: ClientConfig,
42+ /// The last error encountered by this connection
43+ last_error: RwLock<Option<ClientError>>,
44+ /// Lock used when establishing a connection to prevent multiple simultaneous connect attempts
45+ connect_lock: Mutex<()>,
46+ }
47+
48+ impl Connection {
49+ /// Create a new disconnected connection
50+ pub fn new(config: ClientConfig) -> Self {
51+ Self {
52+ client: RwLock::new(None),
53+ state: RwLock::new(ConnectionState::Disconnected),
54+ config,
55+ last_error: RwLock::new(None),
56+ connect_lock: Mutex::new(()),
57+ }
58+ }
59+
60+ /// Set the client for this connection
61+ pub async fn set_client(&self, client: Arc<GClient>) {
62+ let mut client_guard = self.client.write().await;
63+ *client_guard = Some(client);
64+ }
65+
66+ /// Check if the connection has an active protocol
67+ pub async fn has_client(&self) -> bool {
68+ self.client.read().await.is_some()
69+ }
70+
71+ /// Get the current state of the connection
72+ pub async fn state(&self) -> ConnectionState {
73+ *self.state.read().await
74+ }
75+
76+ /// Set the last error encountered by this connection
77+ pub async fn set_error(&self, error: ClientError) {
78+ let mut last_error = self.last_error.write().await;
79+ *last_error = Some(error);
80+ }
81+
82+ /// Set the state of the connection
83+ async fn set_state(&self, new_state: ConnectionState) {
84+ let mut state = self.state.write().await;
85+ *state = new_state;
86+ }
87+
88+ /// Get the configuration for this connection
89+ pub fn config(&self) -> &ClientConfig {
90+ &self.config
91+ }
92+
93+ /// Shut down this connection
94+ pub async fn shutdown(&self) -> Result<(), ClientError> {
95+ log::debug!("Shutting down connection");
96+ // Set the state to disconnected first to prevent new operations
97+ self.set_state(ConnectionState::Disconnected).await;
98+
99+ // Shutdown the protocol if it exists
100+ let mut client_guard = self.client.write().await;
101+ if let Some(client) = &*client_guard {
102+ let result = client.shutdown().await;
103+ // Clear protocol regardless of shutdown result
104+ *client_guard = None;
105+ result?;
106+ }
107+
108+ Ok(())
109+ }
110+ }
111+
112+ /// A specialized implementation of Connection for TcpStream connections
113+ pub struct TcpConnection {
114+ /// The wrapped connection
115+ inner: Connection<BufReader<tokio::net::tcp::OwnedReadHalf>, tokio::net::tcp::OwnedWriteHalf>,
116+ /// Connection settings
117+ settings: ConnectionSettings,
118+ }
119+
120+ /// Settings for connection attempts
121+ pub struct ConnectionSettings {
122+ /// Connection timeout
123+ pub connect_timeout: Duration,
124+ /// Reconnection timeout
125+ pub reconnect_timeout: Duration,
126+ /// Maximum reconnection attempts before giving up
127+ pub max_reconnect_attempts: usize,
128+ /// Base delay between reconnection attempts
129+ pub reconnect_base_delay: Duration,
130+ /// Maximum delay between reconnection attempts (for exponential backoff)
131+ pub reconnect_max_delay: Duration,
132+ }
133+
134+ impl Default for ConnectionSettings {
135+ fn default() -> Self {
136+ Self {
137+ connect_timeout: Duration::from_secs(10),
138+ reconnect_timeout: Duration::from_secs(10),
139+ max_reconnect_attempts: 5,
140+ reconnect_base_delay: Duration::from_secs(1),
141+ reconnect_max_delay: Duration::from_secs(30),
142+ }
143+ }
144+ }
145+
146+ impl TcpConnection {
147+ /// Create a new TcpConnection with default settings
148+ pub fn new(config: ClientConfig, settings: Option<ConnectionSettings>) -> Self {
149+ Self {
150+ inner: Connection::new(config),
151+ settings: settings.unwrap_or_default(),
152+ }
153+ }
154+
155+ /// Connect to the server
156+ pub async fn connect(&self) -> Result<(), ClientError> {
157+ // Acquire connect lock to prevent multiple concurrent connection attempts
158+ let _lock = self.inner.connect_lock.lock().await;
159+
160+ // Check if already connected
161+ let current_state = self.inner.state().await;
162+ if current_state == ConnectionState::Connected {
163+ return Ok(());
164+ }
165+
166+ // Set state to connecting
167+ self.inner.set_state(ConnectionState::Connecting).await;
168+
169+ // Get connection info
170+ let host = &self.inner.config().host;
171+ let port = &self.inner.config().port;
172+ let addr = format!("{}:{}", host, port);
173+
174+ log::debug!("Connecting to {}:{}", host, port);
175+
176+ // Connect with timeout
177+ let connect_result = match timeout(
178+ self.settings.connect_timeout,
179+ TcpStream::connect(&addr)
180+ ).await {
181+ Ok(result) => result,
182+ Err(_) => {
183+ log::error!("Connection timeout after {:?}", self.settings.connect_timeout);
184+ self.inner.set_state(ConnectionState::Error).await;
185+ self.inner.set_error(ClientError::Other("Connection timeout".to_string())).await;
186+ return Err(ClientError::Other("Connection timeout".to_string()));
187+ }
188+ };
189+
190+ // Process stream result
191+ match connect_result {
192+ Ok(stream) => {
193+ log::debug!("Connected to server");
194+ let (read_half, write_half) = stream.into_split();
195+ let reader = BufReader::new(read_half);
196+
197+ // Create protocol based on configuration
198+ let protocol = self.create_protocol_from_config(reader, write_half);
199+
200+ // Set the protocol and update state
201+ self.inner.set_protocol(protocol).await;
202+ self.inner.set_state(ConnectionState::Connected).await;
203+
204+ Ok(())
205+ },
206+ Err(e) => {
207+ log::error!("Failed to connect: {}", e);
208+ self.inner.set_state(ConnectionState::Error).await;
209+ Err(ClientError::Tcp(e))
210+ }
211+ }
212+ }
213+
214+ /// Create protocol instance based on configuration
215+ fn create_protocol_from_config(
216+ &self,
217+ reader: BufReader<tokio::net::tcp::OwnedReadHalf>,
218+ writer: tokio::net::tcp::OwnedWriteHalf,
219+ ) -> GProtocolEnum<BufReader<tokio::net::tcp::OwnedReadHalf>, tokio::net::tcp::OwnedWriteHalf> {
220+ // Determine protocol version from client config
221+ use crate::config::ClientType;
222+ let protocol = match &self.inner.config().client_type {
223+ ClientType::Game(g) => {
224+ GProtocolEnum::V6(crate::net::protocol::proto_v6::GProtocolV6::new(reader, writer, &g.header_format, &g.encryption_keys))
225+ },
226+ ClientType::RemoteControl(_) => {
227+ GProtocolEnum::V5(crate::net::protocol::proto_v5::GProtocolV5::new(reader, writer))
228+ },
229+ ClientType::NpcControl(_) => {
230+ GProtocolEnum::V4(crate::net::protocol::proto_v4::GProtocolV4::new(reader, writer))
231+ },
232+ };
233+
234+ protocol
235+ }
236+
237+ /// Attempt to reconnect
238+ pub async fn reconnect(&self) -> Result<(), ClientError> {
239+ // Acquire connect lock to prevent multiple concurrent connection attempts
240+ let _lock = self.inner.connect_lock.lock().await;
241+
242+ // Update state
243+ self.inner.set_state(ConnectionState::Reconnecting).await;
244+
245+ // Ensure old connection is shut down
246+ let _ = self.inner.shutdown().await;
247+
248+ // Implement exponential backoff reconnection
249+ let mut attempts = 0;
250+ let max_attempts = self.settings.max_reconnect_attempts;
251+ let mut delay = self.settings.reconnect_base_delay;
252+
253+ while attempts < max_attempts {
254+ log::info!("Reconnection attempt {}/{}", attempts + 1, max_attempts);
255+
256+ // Wait before retrying
257+ if attempts > 0 {
258+ tokio::time::sleep(delay).await;
259+ // Apply exponential backoff with capping
260+ delay = std::cmp::min(
261+ delay.mul_f32(1.5),
262+ self.settings.reconnect_max_delay
263+ );
264+ }
265+
266+ // Try to connect with timeout
267+ let connect_result = timeout(
268+ self.settings.reconnect_timeout,
269+ self.connect()
270+ ).await;
271+
272+ match connect_result {
273+ Ok(Ok(_)) => {
274+ log::info!("Successfully reconnected");
275+ return Ok(());
276+ },
277+ Ok(Err(e)) => {
278+ log::warn!("Reconnection attempt failed: {:?}", e);
279+ attempts += 1;
280+ },
281+ Err(_) => {
282+ log::warn!("Reconnection attempt timed out");
283+ attempts += 1;
284+ },
285+ }
286+ }
287+
288+ // All attempts failed
289+ log::error!("Failed to reconnect after {} attempts", max_attempts);
290+ self.inner.set_state(ConnectionState::Error).await;
291+ self.inner.set_error(ClientError::Other(format!("Failed to reconnect after {} attempts", max_attempts))).await;
292+
293+ Err(ClientError::Other(format!("Failed to reconnect after {} attempts", max_attempts)))
294+ }
295+
296+ /// Get the underlying connection
297+ pub fn inner(&self) -> &Connection<BufReader<tokio::net::tcp::OwnedReadHalf>, tokio::net::tcp::OwnedWriteHalf> {
298+ &self.inner
299+ }
300+
301+ /// Shutdown the connection
302+ pub async fn shutdown(&self) -> Result<(), ClientError> {
303+ self.inner.shutdown().await
304+ }
305+ }
306+
307+ /// A connection manager that handles connection pooling and reconnection
308+ pub struct ConnectionManager {
309+ /// Managed connections
310+ connections: RwLock<Vec<Arc<TcpConnection>>>,
311+ }
312+
313+ impl ConnectionManager {
314+ /// Create a new connection manager
315+ pub fn new() -> Self {
316+ Self {
317+ connections: RwLock::new(Vec::new()),
318+ }
319+ }
320+
321+ /// Create a new connection
322+ pub async fn create_connection(
323+ &self,
324+ config: ClientConfig,
325+ settings: Option<ConnectionSettings>,
326+ ) -> Arc<TcpConnection> {
327+ let connection = Arc::new(TcpConnection::new(config, settings));
328+
329+ // Add to managed connections
330+ let mut connections = self.connections.write().await;
331+ connections.push(Arc::clone(&connection));
332+
333+ connection
334+ }
335+
336+ /// Get all managed connections
337+ pub async fn get_connections(&self) -> Vec<Arc<TcpConnection>> {
338+ let connections = self.connections.read().await;
339+ connections.clone()
340+ }
341+
342+ /// Shutdown all connections
343+ pub async fn shutdown_all(&self) -> Result<(), ClientError> {
344+ let connections = self.connections.read().await;
345+ for connection in connections.iter() {
346+ if let Err(e) = connection.shutdown().await {
347+ log::error!("Error shutting down connection: {:?}", e);
348+ }
349+ }
350+ Ok(())
351+ }
352+ }
353+
354+ impl Default for ConnectionManager {
355+ fn default() -> Self {
356+ Self::new()
357+ }
358+ }
0 commit comments