Skip to content

Commit abd8ac8

Browse files
committed
Reconnecting sockets patch. Fix clippy lints.
1 parent 15b840b commit abd8ac8

File tree

8 files changed

+291
-301
lines changed

8 files changed

+291
-301
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,4 @@ tokio = { version = "=1", features = ["full"] }
2525
uuid = { version = "=1", features = ["v4"] }
2626

2727
tcrypt = { git = "https://github.com/thatonetoast/tcrypt.git", tag = "0.1.1-1" }
28+
scopeguard = "1.2.0"

src/asynch/client.rs

Lines changed: 148 additions & 213 deletions
Large diffs are not rendered by default.

src/asynch/client_ext.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::sync::Arc;
22

33
use tokio::sync::RwLock;
44

5-
use crate::packet;
5+
use crate::{errors::Error, packet};
66

77
use super::client::AsyncClient;
88

@@ -22,4 +22,24 @@ impl<P: packet::Packet> AsyncClientRef<P> {
2222
pub async fn read(&self) -> tokio::sync::RwLockReadGuard<'_, AsyncClient<P>> {
2323
self.0.read().await
2424
}
25+
26+
pub async fn signal_reconnect(&self) -> Result<(), Error> {
27+
let client = self.0.read().await;
28+
29+
if let Some(tx) = &client.keepalive_reconnect_tx {
30+
// Clone the channel to avoid holding the read lock during send
31+
let tx_clone = tx.clone();
32+
drop(client);
33+
34+
// Send the reconnect signal
35+
match tx_clone.send(()).await {
36+
Ok(()) => Ok(()),
37+
Err(_) => Err(Error::Other("Failed to signal reconnection".to_string())),
38+
}
39+
} else {
40+
Err(Error::Other(
41+
"Keepalive reconnection channel not available".to_string(),
42+
))
43+
}
44+
}
2545
}

src/asynch/listener.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,7 @@ where
317317
/// # Panics
318318
///
319319
/// * Panics if the specified pool doesn't exist
320+
#[allow(clippy::significant_drop_tightening)]
320321
pub async fn add_socket_to_pool(&mut self, pool_name: &str, socket: &TSocket<S>) {
321322
let mut pool = self.pools.write().await;
322323
let pool = pool.get_mut(pool_name).expect("Unknown Pool");
@@ -401,6 +402,7 @@ where
401402
/// # Returns
402403
///
403404
/// * `Result<Option<Encryptor>, Error>` - The encryption configuration or an error
405+
#[allow(clippy::significant_drop_in_scrutinee)]
404406
async fn handle_authentication(
405407
&mut self,
406408
tsocket: &mut TSocket<S>,
@@ -507,6 +509,7 @@ where
507509
/// listener.broadcast(packet).await.expect("Broadcast failed");
508510
/// }
509511
/// ```
512+
#[allow(clippy::significant_drop_in_scrutinee)]
510513
pub async fn broadcast(&self, packet: P) -> Result<(), Error> {
511514
let pool = self.keep_alive_pool.clone().sockets;
512515
for socket in pool.write().await.iter_mut() {

src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ pub mod errors;
2222
pub mod macros;
2323
pub mod packet;
2424
pub mod phantom;
25-
pub mod reconnect;
2625
pub mod resources;
2726
pub mod session;
2827

src/reconnect.rs

Lines changed: 0 additions & 73 deletions
This file was deleted.

src/reconnection_tests.rs

Lines changed: 117 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ impl ImplSession for TestSession {
9595

9696
// Define test resource
9797
#[derive(Debug, Clone)]
98+
#[allow(dead_code)]
9899
struct TestResource {
99100
data: Vec<String>,
100101
}
@@ -530,20 +531,8 @@ async fn test_max_retries_exceeded() {
530531
// For this test, we still need a client struct to configure reconnection parameters
531532
// but the initial connection attempt will fail
532533

533-
// Create reconnection config with shorter timeouts for testing
534-
let config = ReconnectionConfig {
535-
endpoints: vec![],
536-
auto_reconnect: true,
537-
max_attempts: Some(2),
538-
initial_retry_delay: 0.1,
539-
max_retry_delay: 0.2,
540-
backoff_factor: 1.5,
541-
jitter: 0.0,
542-
reinitialize: true,
543-
};
544534

545-
// Use the port simulator approach to test the behavior
546-
let reconnect_config = config.clone();
535+
547536

548537
// This test is mainly to ensure the client handles max retry limits gracefully
549538
// We'll make an attempt to connect to a non-existent server
@@ -564,3 +553,118 @@ async fn test_max_retries_exceeded() {
564553
// The actual logic for max retries is handled within the client.rs implementation
565554
// and is exercised by the other tests in a more realistic way
566555
}
556+
557+
// Test 6: Reconnection after server downtime
558+
#[tokio::test]
559+
async fn test_reconnection_after_downtime() {
560+
let port = 9096;
561+
562+
// Start a server
563+
let (server_stop_tx, server_stop_rx) = oneshot::channel();
564+
start_test_server(port, server_stop_rx).await;
565+
566+
// Give the server time to start
567+
sleep(Duration::from_millis(500)).await;
568+
569+
// Create a client with reconnection enabled
570+
let client_result = AsyncClient::<TestPacket>::new("127.0.0.1", port).await;
571+
if client_result.is_err() {
572+
println!("Skipping test_reconnection_after_downtime as we can't create initial client");
573+
let _ = server_stop_tx.send(());
574+
return;
575+
}
576+
577+
let mut client = client_result
578+
.unwrap()
579+
.with_reconnection(ReconnectionConfig {
580+
endpoints: vec![],
581+
auto_reconnect: true,
582+
max_attempts: Some(10),
583+
initial_retry_delay: 0.1, // Fast retries for testing
584+
max_retry_delay: 1.0,
585+
backoff_factor: 1.5,
586+
jitter: 0.1,
587+
reinitialize: true,
588+
});
589+
590+
// Initialize the connection
591+
client.finalize().await;
592+
593+
// Establish a session by sending an initial request
594+
let initial_packet = TestPacket::ok();
595+
let initial_response = match client.send_recv(initial_packet).await {
596+
Ok(response) => response,
597+
Err(e) => {
598+
println!("Skipping test as we could not establish initial session: {:?}", e);
599+
let _ = server_stop_tx.send(());
600+
return;
601+
}
602+
};
603+
604+
// Verify we have a session
605+
let initial_session_id = initial_response.body().session_id.clone();
606+
assert!(initial_session_id.is_some(), "No session ID in initial response");
607+
println!("Initial session ID: {:?}", initial_session_id);
608+
609+
// Stop the server
610+
server_stop_tx.send(()).unwrap();
611+
println!("Server stopped, waiting for 5 seconds...");
612+
613+
// Wait for 5 seconds to simulate extended downtime
614+
sleep(Duration::from_secs(5)).await;
615+
616+
// Start a new server
617+
let (new_server_stop_tx, new_server_stop_rx) = oneshot::channel();
618+
let new_server_handle = start_test_server(port, new_server_stop_rx).await;
619+
620+
// Give the new server time to start
621+
sleep(Duration::from_millis(500)).await;
622+
println!("New server started");
623+
624+
// Prepare for reconnection attempts
625+
let reconnect_start = Instant::now();
626+
let mut reconnected = false;
627+
let max_reconnect_time = Duration::from_secs(10);
628+
629+
// Send a packet which should trigger reconnection
630+
while reconnect_start.elapsed() < max_reconnect_time {
631+
let test_packet = TestPacket {
632+
header: "TEST".to_string(),
633+
body: PacketBody::default(),
634+
data: Some("reconnect after downtime".to_string()),
635+
};
636+
637+
match client.send_recv(test_packet).await {
638+
Ok(response) => {
639+
// Successfully reconnected
640+
assert_eq!(response.header(), "OK");
641+
println!("Successfully reconnected after 5 seconds of downtime");
642+
643+
// Verify the response contains data
644+
if let Some(data) = &response.data {
645+
assert!(data.contains("reconnect after downtime"));
646+
}
647+
648+
// Check if we got a new session
649+
let new_session_id = response.body().session_id;
650+
println!("New session ID after reconnection: {:?}", new_session_id);
651+
652+
reconnected = true;
653+
break;
654+
}
655+
Err(e) => {
656+
println!("Reconnection attempt failed: {:?}, retrying...", e);
657+
sleep(Duration::from_millis(500)).await;
658+
}
659+
}
660+
}
661+
662+
// Assert that we were able to reconnect
663+
assert!(reconnected, "Failed to reconnect after server downtime");
664+
665+
// Clean up
666+
new_server_stop_tx.send(()).unwrap();
667+
tokio::time::timeout(Duration::from_secs(2), new_server_handle)
668+
.await
669+
.ok();
670+
}

0 commit comments

Comments
 (0)