Skip to content

Commit 2b5a3f5

Browse files
committed
feat(server): support control plane
1 parent 436c301 commit 2b5a3f5

File tree

10 files changed

+339
-19
lines changed

10 files changed

+339
-19
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ listen_addr = "0.0.0.0:8080"
204204
chacha20poly1305 = "your-secret-key-here"
205205

206206
# AES-256-GCM (Hardware accelerated on modern CPUs)
207-
# aes256gcm = "your-secret-key-here"
207+
# aes256 = "your-secret-key-here"
208208

209209
# XOR (Lightweight, for testing only)
210210
# xor = "test-key"

etc/server.toml.example

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,38 @@
11
# Example server configuration file for Rustun VPN
22

3-
[server_config]
4-
# Address and port to listen on
5-
# 0.0.0.0 listens on all interfaces
6-
# Use 127.0.0.1 for local only
7-
listen_addr = "0.0.0.0:8080"
8-
93
[crypto_config]
104
# Encryption method (choose one):
115
# - chacha20poly1305: High security, good performance (recommended)
12-
# - aes256gcm: High security, hardware acceleration on supported CPUs
6+
# - aes256: High security, hardware acceleration on supported CPUs
137
# - xor: Fast but low security (for testing only)
148
# - plain: No encryption (for debugging only)
159

1610
chacha20poly1305 = "your-secret-key-here"
1711

1812
# Alternative encryption options (uncomment to use):
19-
# aes256gcm = "your-secret-key-here"
13+
# aes256 = "your-secret-key-here"
2014
# xor = "your-secret-key-here"
21-
# plain = true
15+
# crypto_config="plain"
16+
17+
[server_config]
18+
# Address and port to listen on
19+
# 0.0.0.0 listens on all interfaces
20+
# Use 127.0.0.1 for local only
21+
listen_addr = "0.0.0.0:8080"
22+
23+
# Optional: Conf-agent configuration for syncing with control plane
24+
# If configured, conf-agent will periodically fetch routes and report connection status
25+
# [conf_agent]
26+
# # Control plane API URL
27+
# control_plane_url = "http://localhost:8080"
28+
# # API token for authentication (optional)
29+
# api_token = "your-api-token-here"
30+
# # Routes file path to update (should match route_config.routes_file)
31+
# routes_file = "/etc/rustun/routes.json"
32+
# # Poll interval in seconds for fetching routes (default: 60)
33+
# poll_interval = 60
34+
# # Connection reporting interval in seconds (default: 30)
35+
# report_interval = 30
2236

2337
[route_config]
2438
# Path to the routes configuration file

src/network/connection_manager.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,17 @@ impl ConnectionManager {
164164

165165
None
166166
}
167+
168+
pub fn dump_connection_info(&self) -> Vec<ConnectionMeta> {
169+
let mut result = Vec::new();
170+
let guard = self.cluster_connections.read().unwrap_or_else(|e| e.into_inner());
171+
for (_, connections) in guard.iter() {
172+
for conn in connections {
173+
result.push(conn.clone());
174+
}
175+
}
176+
result
177+
}
167178
}
168179

169180
impl Default for ConnectionManager {

src/network/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,9 @@ impl PartialEq<ConnectionMeta> for &ConnectionMeta {
134134
}
135135

136136
impl ConnectionMeta {
137+
pub fn dump(&self) -> String {
138+
format!("{},{},{},{}", self.cluster, self.identity, self.private_ip, self.last_active)
139+
}
137140
/// Check if a destination IP matches this connection's routing rules
138141
///
139142
/// Returns true if the destination matches the private IP or falls

src/server/client_manager.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
use serde::Deserialize;
1+
use serde::{Deserialize, Serialize};
22
use std::collections::HashMap;
33
use std::sync::RwLock;
44

5-
#[derive(Debug, Clone, Deserialize)]
5+
#[derive(Debug, Clone, Deserialize, Serialize)]
66
pub struct ClientConfig {
77
pub cluster: String,
88
pub identity: String,

src/server/conf_agent.rs

Lines changed: 234 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
use crate::server::client_manager::{ClientConfig, ClientManager};
2+
use crate::server::config::ConfAgentConfig;
3+
use crate::network::connection_manager::ConnectionManager;
4+
use serde::{Deserialize, Serialize};
5+
use std::sync::Arc;
6+
use tokio::fs;
7+
use tokio::time::{interval, Duration};
8+
9+
/// Connection update request for backend API
10+
#[derive(Serialize, Debug)]
11+
struct ConnectionUpdateRequest {
12+
cluster_id: u64,
13+
identity: String,
14+
last_active: Option<u64>,
15+
}
16+
17+
/// Client config response from control plane API
18+
#[derive(Deserialize, Debug)]
19+
struct ClientConfigResponse {
20+
cluster: String, // Cluster ID as string
21+
identity: String,
22+
private_ip: String,
23+
mask: String,
24+
gateway: String,
25+
ciders: Vec<String>,
26+
}
27+
28+
pub struct ConfAgent {
29+
config: ConfAgentConfig,
30+
client_manager: Arc<ClientManager>,
31+
connection_manager: Arc<ConnectionManager>,
32+
routes_file: String,
33+
}
34+
35+
36+
impl ConfAgent {
37+
pub fn new(
38+
config: ConfAgentConfig,
39+
client_manager: Arc<ClientManager>,
40+
connection_manager: Arc<ConnectionManager>,
41+
routes_file: String,
42+
) -> Self {
43+
Self {
44+
config,
45+
client_manager,
46+
connection_manager,
47+
routes_file,
48+
}
49+
}
50+
51+
/// Start the conf-agent service
52+
pub async fn start(self: Arc<Self>) -> crate::Result<()> {
53+
tracing::info!("Starting conf-agent");
54+
tracing::info!("Control plane URL: {}", self.config.control_plane_url);
55+
tracing::info!("Routes file: {}", self.config.routes_file);
56+
tracing::info!("Poll interval: {}s", self.config.poll_interval);
57+
58+
// Initial fetch and report
59+
if let Err(e) = self.fetch_and_update_routes().await {
60+
tracing::error!("Failed to fetch routes: {:?}", e);
61+
}
62+
if let Err(e) = self.report_connections().await {
63+
tracing::error!("Failed to report connections: {:?}", e);
64+
}
65+
66+
// Periodic tasks: route fetching and connection reporting
67+
let mut route_ticker = interval(Duration::from_secs(self.config.poll_interval));
68+
let mut report_ticker = interval(Duration::from_secs(self.config.report_interval));
69+
70+
loop {
71+
tokio::select! {
72+
_ = route_ticker.tick() => {
73+
if let Err(e) = self.fetch_and_update_routes().await {
74+
tracing::error!("Failed to fetch routes: {:?}", e);
75+
}
76+
}
77+
_ = report_ticker.tick() => {
78+
if let Err(e) = self.report_connections().await {
79+
tracing::error!("Failed to report connections: {:?}", e);
80+
}
81+
}
82+
}
83+
}
84+
}
85+
86+
/// Report connections from connection manager
87+
async fn report_connections(&self) -> crate::Result<()> {
88+
// Get connections from connection manager
89+
let connections = self.connection_manager.dump_connection_info();
90+
91+
if connections.is_empty() {
92+
return Ok(());
93+
}
94+
95+
// Convert ConnectionMeta to ConnectionUpdateRequest
96+
let mut updates = Vec::new();
97+
for meta in &connections {
98+
// Parse cluster ID from string to u64
99+
let cluster_id: u64 = match meta.cluster.parse() {
100+
Ok(id) => id,
101+
Err(_) => {
102+
tracing::warn!(
103+
"Invalid cluster ID '{}' for identity {}, skipping",
104+
meta.cluster,
105+
meta.identity
106+
);
107+
continue;
108+
}
109+
};
110+
111+
updates.push(ConnectionUpdateRequest {
112+
cluster_id,
113+
identity: meta.identity.clone(),
114+
last_active: Some(meta.last_active),
115+
});
116+
}
117+
118+
if updates.is_empty() {
119+
return Ok(());
120+
}
121+
122+
// Send batch update to backend
123+
let url = format!("{}/api/sync/connections", self.config.control_plane_url);
124+
Self::send_connection_updates(&url, self.config.api_token.as_deref(), &updates).await?;
125+
126+
tracing::debug!("Reported {} connection updates", updates.len());
127+
Ok(())
128+
}
129+
130+
/// Fetch routes from control plane and update local routes file
131+
async fn fetch_and_update_routes(&self) -> crate::Result<()> {
132+
tracing::debug!("Fetching routes from control plane...");
133+
134+
let url = format!("{}/api/sync/clients", self.config.control_plane_url);
135+
let routes = Self::fetch_routes(&url, self.config.api_token.as_deref()).await?;
136+
137+
tracing::info!("Fetched {} routes", routes.len());
138+
139+
// Update client manager
140+
// self.client_manager.add_clients_config(routes.clone());
141+
self.client_manager.rewrite_clients_config(routes.clone());
142+
143+
// Write to routes file
144+
Self::write_routes(&self.routes_file, &routes).await?;
145+
146+
tracing::info!("Routes file updated successfully");
147+
Ok(())
148+
}
149+
150+
/// Fetch routes from control plane API
151+
async fn fetch_routes(
152+
url: &str,
153+
token: Option<&str>,
154+
) -> crate::Result<Vec<ClientConfig>> {
155+
let mut request = ureq::get(url).timeout(Duration::from_secs(30));
156+
157+
if let Some(token) = token {
158+
request = request.set("Authorization", &format!("Bearer {}", token));
159+
}
160+
161+
let response = request.call()?;
162+
163+
let status = response.status();
164+
let body = response.into_string()?;
165+
166+
if status != 200 {
167+
return Err(format!("Control plane returned error: {} - {}", status, body).into());
168+
}
169+
170+
let routes: Vec<ClientConfigResponse> = serde_json::from_str(&body)?;
171+
172+
// Convert to ClientConfig format
173+
let client_configs: Vec<ClientConfig> = routes
174+
.into_iter()
175+
.map(|r| ClientConfig {
176+
cluster: r.cluster,
177+
identity: r.identity,
178+
private_ip: r.private_ip,
179+
mask: r.mask,
180+
gateway: r.gateway,
181+
ciders: r.ciders,
182+
})
183+
.collect();
184+
185+
Ok(client_configs)
186+
}
187+
188+
/// Write routes to file atomically
189+
async fn write_routes(file_path: &str, routes: &[ClientConfig]) -> crate::Result<()> {
190+
// Ensure parent directory exists
191+
if let Some(parent) = std::path::Path::new(file_path).parent() {
192+
fs::create_dir_all(parent).await?;
193+
}
194+
195+
// Serialize to JSON with pretty formatting
196+
let json = serde_json::to_string_pretty(routes)?;
197+
198+
// Write to temp file first, then rename (atomic write)
199+
let temp_path = format!("{}.tmp", file_path);
200+
fs::write(&temp_path, json).await?;
201+
fs::rename(&temp_path, file_path).await?;
202+
203+
Ok(())
204+
}
205+
206+
207+
/// Send connection updates to control plane API
208+
async fn send_connection_updates(
209+
url: &str,
210+
token: Option<&str>,
211+
updates: &[ConnectionUpdateRequest],
212+
) -> crate::Result<()> {
213+
let json_data = serde_json::to_string(updates)?;
214+
215+
let mut request = ureq::post(url)
216+
.set("Content-Type", "application/json")
217+
.timeout(Duration::from_secs(30));
218+
219+
if let Some(token) = token {
220+
request = request.set("Authorization", &format!("Bearer {}", token));
221+
}
222+
223+
let response = request.send_string(&json_data)?;
224+
let status = response.status();
225+
let body = response.into_string()?;
226+
227+
if status != 200 {
228+
return Err(format!("Backend returned error: {} - {}", status, body).into());
229+
}
230+
231+
Ok(())
232+
}
233+
}
234+

src/server/config.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,40 @@ pub struct Config {
88
pub server_config: ServerConfig,
99
pub crypto_config: CryptoConfig,
1010
pub route_config: RouteConfig,
11+
#[serde(default)]
12+
pub conf_agent: Option<ConfAgentConfig>,
1113
}
1214

1315
#[derive(Debug, Deserialize, Clone)]
1416
pub struct ServerConfig {
1517
pub listen_addr: String,
1618
}
1719

20+
#[derive(Debug, Deserialize, Clone)]
21+
pub struct ConfAgentConfig {
22+
/// Control plane API URL
23+
pub control_plane_url: String,
24+
/// API token for authentication
25+
#[serde(default)]
26+
pub api_token: Option<String>,
27+
/// Routes file path to update
28+
pub routes_file: String,
29+
/// Poll interval in seconds for fetching routes
30+
#[serde(default = "default_poll_interval")]
31+
pub poll_interval: u64,
32+
/// Connection reporting interval in seconds (default: 30)
33+
#[serde(default = "default_report_interval")]
34+
pub report_interval: u64,
35+
}
36+
37+
fn default_poll_interval() -> u64 {
38+
60
39+
}
40+
41+
fn default_report_interval() -> u64 {
42+
30
43+
}
44+
1845
#[derive(Debug, Deserialize)]
1946
pub struct RouteConfig {
2047
pub routes_file: String,

0 commit comments

Comments
 (0)