Skip to content

Commit 6d76467

Browse files
committed
fix: add max param to prune signals cmd
1 parent 293011a commit 6d76467

File tree

5 files changed

+35
-17
lines changed

5 files changed

+35
-17
lines changed

engine/docker/template/grafana-dashboards/gasoline.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@
232232
"uid": "prometheus"
233233
},
234234
"editorMode": "code",
235-
"expr": "sum by (workflow_name) (\n max by(workflow_name, rivet_project, rivet_datacenter) (\n rivet_gasoline_workflow_total{rivet_project=~\"$project\",rivet_datacenter=~\"$datacenter\",workflow_name=~\"$workflow_name\"}\n * on(k8s_pod_name) group_left(rivet_project, rivet_datacenter)\n (\n (rivet_gasoline_worker_last_metrics_publish{rivet_project=~\"$project\",rivet_datacenter=~\"$datacenter\"} == bool\n on(rivet_project, rivet_datacenter) group_left()\n max by (rivet_project, rivet_datacenter) (rivet_gasoline_worker_last_metrics_publish{rivet_project=~\"$project\",rivet_datacenter=~\"$datacenter\"}))\n )\n )\n)",
235+
"expr": "sum by (workflow_name) (\n max by(workflow_name, rivet_project, rivet_datacenter) (\n rivet_gasoline_workflow_sleeping{rivet_project=~\"$project\",rivet_datacenter=~\"$datacenter\",workflow_name=~\"$workflow_name\"}\n * on(k8s_pod_name) group_left(rivet_project, rivet_datacenter)\n (\n (rivet_gasoline_worker_last_metrics_publish{rivet_project=~\"$project\",rivet_datacenter=~\"$datacenter\"} == bool\n on(rivet_project, rivet_datacenter) group_left()\n max by (rivet_project, rivet_datacenter) (rivet_gasoline_worker_last_metrics_publish{rivet_project=~\"$project\",rivet_datacenter=~\"$datacenter\"}))\n )\n )\n)",
236236
"instant": false,
237237
"legendFormat": "{{workflow_name}}",
238238
"range": true,

engine/packages/engine/src/commands/wf/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ pub enum SubCommand {
4242
#[clap(short = 'd', long)]
4343
dry_run: bool,
4444
#[clap(short = 'p', long)]
45-
parallelization: Option<u128>,
45+
parallelization: Option<u16>,
4646
},
4747
/// Deletes the history for completed workflows that match the name and before filter.
4848
PruneHistory {
@@ -53,7 +53,7 @@ pub enum SubCommand {
5353
#[clap(short = 'd', long)]
5454
dry_run: bool,
5555
#[clap(short = 'p', long)]
56-
parallelization: Option<u128>,
56+
parallelization: Option<u16>,
5757
},
5858
/// Lists the entire event history of a workflow.
5959
History {

engine/packages/engine/src/commands/wf/signal.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@ pub enum SubCommand {
3636
#[clap(short = 'd', long)]
3737
dry_run: bool,
3838
#[clap(short = 'p', long)]
39-
parallelization: Option<u128>,
39+
parallelization: Option<u16>,
40+
#[clap(short = 'm', long)]
41+
max_per_txn: Option<usize>,
4042
},
4143
}
4244

@@ -73,13 +75,15 @@ impl SubCommand {
7375
before,
7476
dry_run,
7577
parallelization,
78+
max_per_txn,
7679
} => {
7780
let total = db
7881
.prune_acked_signals(
7982
&name.iter().map(|x| x.as_str()).collect::<Vec<_>>(),
8083
before.timestamp_millis(),
8184
dry_run,
8285
parallelization.unwrap_or(1),
86+
max_per_txn,
8387
)
8488
.await?;
8589

engine/packages/gasoline/src/db/debug.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ pub trait DatabaseDebug: Database {
4545
names: &[&str],
4646
error_like: &[&str],
4747
dry_run: bool,
48-
parallelization: u128,
48+
parallelization: u16,
4949
) -> Result<usize>;
5050

5151
/// Used by pruner workflow for automatic pruning.
@@ -57,7 +57,7 @@ pub trait DatabaseDebug: Database {
5757
names: &[&str],
5858
before_ts: i64,
5959
dry_run: bool,
60-
parallelization: u128,
60+
parallelization: u16,
6161
) -> Result<usize>;
6262

6363
/// Used for manual pruning.
@@ -66,7 +66,8 @@ pub trait DatabaseDebug: Database {
6666
names: &[&str],
6767
before_ts: i64,
6868
dry_run: bool,
69-
parallelization: u128,
69+
parallelization: u16,
70+
max_per_txn: Option<usize>,
7071
) -> Result<usize>;
7172
}
7273

engine/packages/gasoline/src/db/kv/debug.rs

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1221,15 +1221,15 @@ impl DatabaseDebug for DatabaseKv {
12211221
names: &[&str],
12221222
error_like: &[&str],
12231223
dry_run: bool,
1224-
parallelization: u128,
1224+
parallelization: u16,
12251225
) -> Result<usize> {
12261226
ensure!(parallelization > 0);
12271227
ensure!(parallelization < 1024);
12281228

1229-
let chunk_size = u128::MAX / parallelization;
1229+
let chunk_size = u128::MAX / parallelization as u128;
12301230
let mut futs = FuturesUnordered::new();
12311231

1232-
for i in 0..parallelization {
1232+
for i in 0..parallelization as u128 {
12331233
let start = i * chunk_size;
12341234
futs.push(self.revive_workflows_inner(
12351235
names,
@@ -1407,15 +1407,15 @@ impl DatabaseDebug for DatabaseKv {
14071407
names: &[&str],
14081408
before_ts: i64,
14091409
dry_run: bool,
1410-
parallelization: u128,
1410+
parallelization: u16,
14111411
) -> Result<usize> {
14121412
ensure!(parallelization > 0);
14131413
ensure!(parallelization < 1024);
14141414

1415-
let chunk_size = u128::MAX / parallelization;
1415+
let chunk_size = u128::MAX / parallelization as u128;
14161416
let mut futs = FuturesUnordered::new();
14171417

1418-
for i in 0..parallelization {
1418+
for i in 0..parallelization as u128 {
14191419
let start = i * chunk_size;
14201420
futs.push(self.prune_workflow_history_inner(
14211421
names,
@@ -1442,22 +1442,24 @@ impl DatabaseDebug for DatabaseKv {
14421442
names: &[&str],
14431443
before_ts: i64,
14441444
dry_run: bool,
1445-
parallelization: u128,
1445+
parallelization: u16,
1446+
max_per_txn: Option<usize>,
14461447
) -> Result<usize> {
14471448
ensure!(parallelization > 0);
14481449
ensure!(parallelization < 1024);
14491450

1450-
let chunk_size = u128::MAX / parallelization;
1451+
let chunk_size = u128::MAX / parallelization as u128;
14511452
let mut futs = FuturesUnordered::new();
14521453

1453-
for i in 0..parallelization {
1454+
for i in 0..parallelization as u128 {
14541455
let start = i * chunk_size;
14551456
futs.push(self.prune_signals_inner(
14561457
names,
14571458
before_ts,
14581459
dry_run,
14591460
start,
14601461
start + chunk_size,
1462+
max_per_txn,
14611463
));
14621464
}
14631465

@@ -1813,6 +1815,7 @@ impl DatabaseKv {
18131815
dry_run: bool,
18141816
start: u128,
18151817
end: u128,
1818+
max_per_txn: Option<usize>,
18161819
) -> Result<usize> {
18171820
let mut total = 0;
18181821
let mut current_signal_id = Some(Id::v1(Uuid::from_u128(start), self.config.dc_label()));
@@ -1858,14 +1861,24 @@ impl DatabaseKv {
18581861
let mut ts_matches = false;
18591862

18601863
let fut = async {
1861-
while let Some(entry) = stream.try_next().await? {
1864+
loop {
1865+
let Some(entry) = stream.try_next().await? else {
1866+
break;
1867+
};
1868+
18621869
let signal_id = *self.subspace.unpack::<JustId>(entry.key())?;
18631870

18641871
if let Some(curr) = current_signal_id {
18651872
if signal_id != curr {
18661873
// Save if matches query
18671874
if name_matches && state_matches && ts_matches {
18681875
signal_ids.push(curr);
1876+
1877+
if let Some(max_per_txn) = max_per_txn {
1878+
if signal_ids.len() >= max_per_txn {
1879+
return anyhow::Ok(());
1880+
}
1881+
}
18691882
}
18701883

18711884
signals_processed += 1;

0 commit comments

Comments
 (0)