Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
316 changes: 200 additions & 116 deletions src/storage/mysql_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2421,6 +2421,7 @@ impl MySqlWatcherExt for MySqlStorage {

// Single query to fetch all groups and watchers with JOIN
// Note: watcher_ids is built dynamically from JOIN results, not from DB column
// Note: conditions are fetched separately from watcher_conditions table
let rows = sqlx::query(
r#"SELECT
g.id as server_group_id,
Expand All @@ -2432,11 +2433,10 @@ impl MySqlWatcherExt for MySqlStorage {
w.id as watcher_server_id,
w.watcher_id as client_watcher_id,
w.folder,
w.preset_id,
w.conditions,
w.updated_at as watcher_updated_at
w.is_recursive,
w.extra_json
FROM watcher_groups g
LEFT JOIN watchers w ON g.id = w.server_group_id AND w.is_deleted = FALSE
LEFT JOIN watchers w ON g.id = w.group_id AND w.is_deleted = FALSE
WHERE g.account_hash = ?
AND g.is_active = TRUE
AND g.is_deleted = FALSE
Expand All @@ -2449,7 +2449,8 @@ impl MySqlWatcherExt for MySqlStorage {
StorageError::Database(format!("Failed to fetch groups with watchers: {}", e))
})?;

let mut result: HashMap<i32, (WatcherGroup, Vec<WatcherData>)> = HashMap::new();
// Intermediate result with watcher server IDs for condition fetching
let mut result: HashMap<i32, (WatcherGroup, Vec<(i32, WatcherData)>)> = HashMap::new();

for row in rows {
let client_group_id: i32 = row.get("client_group_id");
Expand Down Expand Up @@ -2477,71 +2478,109 @@ impl MySqlWatcherExt for MySqlStorage {

// Add watcher if present (LEFT JOIN may have NULL watchers)
let watcher_server_id: Option<i32> = row.get("watcher_server_id");
if let Some(_watcher_server_id) = watcher_server_id {
if let Some(watcher_server_id) = watcher_server_id {
let client_watcher_id: i32 = row.get("client_watcher_id");
let folder: String = row.get("folder");
let _preset_id: Option<String> = row.get("preset_id");
let conditions_json: Option<String> = row.get("conditions");

// Parse conditions JSON to union/subtracting vectors
// conditions JSON format: [{"condition_type": 0, "patterns": "..."}]
let (union_conditions, subtracting_conditions) = if let Some(json) = conditions_json
{
#[derive(serde::Deserialize)]
struct ConditionEntry {
condition_type: Option<i32>,
#[serde(default)]
patterns: String,
#[serde(default)]
key: String,
#[serde(default)]
value: Vec<String>,
}
let entries: Vec<ConditionEntry> =
serde_json::from_str(&json).unwrap_or_default();
let mut union = Vec::new();
let mut subtract = Vec::new();
for entry in entries {
let cond_data = crate::sync::ConditionData {
key: entry.key.clone(),
value: if entry.value.is_empty() {
entry.patterns.split(',').map(|s| s.to_string()).collect()
} else {
entry.value
},
};
// condition_type >= 100 is subtracting, otherwise union
if entry.condition_type.unwrap_or(0) >= 100 {
subtract.push(cond_data);
} else {
union.push(cond_data);
}
}
(union, subtract)
} else {
(Vec::new(), Vec::new())
};
let is_recursive: bool = row.get("is_recursive");
let extra_json: Option<String> = row.get("extra_json");

// Conditions will be fetched separately in batch after this loop
let watcher_data = WatcherData {
watcher_id: client_watcher_id,
folder,
union_conditions,
subtracting_conditions,
recursive_path: false,
union_conditions: Vec::new(), // Will be populated later
subtracting_conditions: Vec::new(), // Will be populated later
recursive_path: is_recursive,
preset: false,
custom_type: String::new(),
update_mode: String::new(),
is_active: true,
extra_json: String::new(),
extra_json: extra_json.unwrap_or_default(),
updated_at: None,
};

// Add watcher_id to group's watcher_ids list
entry.0.watcher_ids.push(client_watcher_id);
entry.1.push(watcher_data);
entry.1.push((watcher_server_id, watcher_data));
}
}

// Batch fetch conditions for all watchers
// Collect all watcher server IDs
let all_watcher_ids: Vec<i32> = result
.values()
.flat_map(|(_, watchers)| watchers.iter().map(|(id, _)| *id))
.collect();

if !all_watcher_ids.is_empty() {
// Fetch all conditions in one query
let placeholders: Vec<&str> = all_watcher_ids.iter().map(|_| "?").collect();
let in_clause = placeholders.join(", ");
let conditions_query = format!(
r#"SELECT watcher_id, condition_type, `key`, value
FROM watcher_conditions
WHERE account_hash = ? AND watcher_id IN ({})"#,
in_clause
);

let mut query = sqlx::query(&conditions_query);
query = query.bind(account_hash);
for id in &all_watcher_ids {
query = query.bind(id);
}

let condition_rows = query
.fetch_all(self.get_sqlx_pool())
.await
.unwrap_or_default();

// Build a map of watcher_id -> conditions
let mut conditions_map: std::collections::HashMap<
i32,
(
Vec<crate::sync::ConditionData>,
Vec<crate::sync::ConditionData>,
),
> = std::collections::HashMap::new();

for cond_row in condition_rows {
let watcher_id: i32 = cond_row.get("watcher_id");
let condition_type: String = cond_row.get("condition_type");
let key: String = cond_row.get("key");
let value_json: String = cond_row.get("value");
let value: Vec<String> = serde_json::from_str(&value_json).unwrap_or_default();

let cond_data = crate::sync::ConditionData { key, value };
let entry = conditions_map
.entry(watcher_id)
.or_insert((Vec::new(), Vec::new()));

if condition_type == "union" {
entry.0.push(cond_data);
} else {
entry.1.push(cond_data);
}
}

// Apply conditions to watchers
for (_, watchers) in result.values_mut() {
for (watcher_server_id, watcher_data) in watchers.iter_mut() {
if let Some((union, subtract)) = conditions_map.get(watcher_server_id) {
watcher_data.union_conditions = union.clone();
watcher_data.subtracting_conditions = subtract.clone();
}
}
}
}

// Convert from (i32, WatcherData) to WatcherData
let result: HashMap<i32, (WatcherGroup, Vec<WatcherData>)> = result
.into_iter()
.map(|(k, (group, watchers))| {
(k, (group, watchers.into_iter().map(|(_, w)| w).collect()))
})
.collect();

debug!(
"Bulk fetched {} groups with watchers for account {}",
result.len(),
Expand Down Expand Up @@ -2627,83 +2666,127 @@ impl MySqlWatcherExt for MySqlStorage {
return Ok(0);
}

// Process in chunks to avoid MySQL packet size limits
const CHUNK_SIZE: usize = 100;
// Process each watcher individually (safer for complex upsert with conditions)
let mut total_upserted = 0;

for chunk in watchers.chunks(CHUNK_SIZE) {
// Build dynamic VALUES clause
let placeholders: Vec<String> = chunk
.iter()
.map(|_| "(?, ?, ?, ?, ?, ?, NOW(), NOW(), FALSE)".to_string())
.collect();

let query = format!(
let timestamp = chrono::Utc::now().timestamp();

for watcher in watchers {
// Generate title from folder path
let title = watcher
.folder
.split('/')
.last()
.unwrap_or(&watcher.folder)
.to_string();

// Upsert watcher
let result = sqlx::query(
r#"INSERT INTO watchers
(account_hash, server_group_id, watcher_id, folder, preset_id, conditions, created_at, updated_at, is_deleted)
VALUES {}
(watcher_id, account_hash, group_id, local_group_id, folder, title,
is_recursive, created_at, updated_at, is_active, extra_json)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON DUPLICATE KEY UPDATE
folder = VALUES(folder),
preset_id = VALUES(preset_id),
conditions = VALUES(conditions),
updated_at = NOW(),
is_deleted = FALSE"#,
placeholders.join(", ")
);
title = VALUES(title),
is_recursive = VALUES(is_recursive),
updated_at = VALUES(updated_at),
is_active = VALUES(is_active),
extra_json = VALUES(extra_json),
group_id = VALUES(group_id),
is_deleted = FALSE,
deleted_at = NULL"#,
)
.bind(watcher.watcher_id)
.bind(account_hash)
.bind(server_group_id)
.bind(client_group_id)
.bind(&watcher.folder)
.bind(&title)
.bind(watcher.recursive_path)
.bind(timestamp)
.bind(timestamp)
.bind(watcher.is_active)
.bind(&watcher.extra_json)
.execute(self.get_sqlx_pool())
.await
.map_err(|e| StorageError::Database(format!("Failed to upsert watcher: {}", e)))?;

let mut query_builder = sqlx::query(&query);
total_upserted += result.rows_affected() as usize;

for watcher in chunk {
// Convert union_conditions and subtracting_conditions to JSON format
// for database storage
#[derive(serde::Serialize)]
struct ConditionEntry {
condition_type: i32,
key: String,
value: Vec<String>,
}
let mut conditions = Vec::new();
// Get the watcher's server ID for conditions
let watcher_server_id: Option<i32> = sqlx::query_scalar(
r#"SELECT id FROM watchers
WHERE watcher_id = ? AND account_hash = ? AND local_group_id = ?"#,
)
.bind(watcher.watcher_id)
.bind(account_hash)
.bind(client_group_id)
.fetch_optional(self.get_sqlx_pool())
.await
.ok()
.flatten();

if let Some(server_id) = watcher_server_id {
// Delete existing conditions
sqlx::query(
r#"DELETE FROM watcher_conditions
WHERE account_hash = ? AND watcher_id = ?"#,
)
.bind(account_hash)
.bind(server_id)
.execute(self.get_sqlx_pool())
.await
.ok();

// Insert union conditions
for cond in &watcher.union_conditions {
conditions.push(ConditionEntry {
condition_type: 0, // union
key: cond.key.clone(),
value: cond.value.clone(),
});
}
for cond in &watcher.subtracting_conditions {
conditions.push(ConditionEntry {
condition_type: 100, // subtracting
key: cond.key.clone(),
value: cond.value.clone(),
});
let value_json =
serde_json::to_string(&cond.value).unwrap_or_else(|_| "[]".to_string());
sqlx::query(
r#"INSERT INTO watcher_conditions
(account_hash, watcher_id, local_watcher_id, local_group_id,
condition_type, `key`, value, operator, created_at, updated_at)
VALUES (?, ?, ?, ?, 'union', ?, ?, 'equals', ?, ?)"#,
)
.bind(account_hash)
.bind(server_id)
.bind(watcher.watcher_id)
.bind(client_group_id)
.bind(&cond.key)
.bind(&value_json)
.bind(timestamp)
.bind(timestamp)
.execute(self.get_sqlx_pool())
.await
.ok();
}
let conditions_json =
serde_json::to_string(&conditions).unwrap_or_else(|_| "[]".to_string());

// preset field is a bool in proto, convert to empty string for DB
let preset_str = if watcher.preset { "1" } else { "" };

query_builder = query_builder
// Insert subtracting conditions
for cond in &watcher.subtracting_conditions {
let value_json =
serde_json::to_string(&cond.value).unwrap_or_else(|_| "[]".to_string());
sqlx::query(
r#"INSERT INTO watcher_conditions
(account_hash, watcher_id, local_watcher_id, local_group_id,
condition_type, `key`, value, operator, created_at, updated_at)
VALUES (?, ?, ?, ?, 'subtract', ?, ?, 'equals', ?, ?)"#,
)
.bind(account_hash)
.bind(server_group_id)
.bind(server_id)
.bind(watcher.watcher_id)
.bind(&watcher.folder)
.bind(preset_str)
.bind(conditions_json);
.bind(client_group_id)
.bind(&cond.key)
.bind(&value_json)
.bind(timestamp)
.bind(timestamp)
.execute(self.get_sqlx_pool())
.await
.ok();
}
}

let result = query_builder
.execute(self.get_sqlx_pool())
.await
.map_err(|e| {
StorageError::Database(format!("Failed to batch upsert watchers: {}", e))
})?;

total_upserted += result.rows_affected() as usize;
}

// Note: watcher_ids column is no longer used - watchers are derived from JOIN
// Just update the group's updated_at timestamp
// Update the group's updated_at timestamp
sqlx::query(
r#"UPDATE watcher_groups
SET updated_at = NOW()
Expand Down Expand Up @@ -2827,10 +2910,11 @@ impl MySqlWatcherExt for MySqlStorage {
})?;

// 2. Soft delete watchers in these groups
// Note: watchers table uses 'group_id' column (not 'server_group_id')
let watchers_query = format!(
r#"UPDATE watchers
SET is_deleted = TRUE, deleted_at = NOW()
WHERE server_group_id IN ({}) AND account_hash = ? AND is_deleted = FALSE"#,
WHERE group_id IN ({}) AND account_hash = ? AND is_deleted = FALSE"#,
in_clause
);

Expand Down