Skip to content

Commit a3a7dcd

Browse files
committed
fix(pb): handle going away signals gracefully (#4104)
# Description Please include a summary of the changes and the related issue. Please also include relevant motivation and context. ## Type of change - [ ] Bug fix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected) - [ ] This change requires a documentation update ## How Has This Been Tested? Please describe the tests that you ran to verify your changes. ## Checklist: - [ ] My code follows the style guidelines of this project - [ ] I have performed a self-review of my code - [ ] I have commented my code, particularly in hard-to-understand areas - [ ] I have made corresponding changes to the documentation - [ ] My changes generate no new warnings - [ ] I have added tests that prove my fix is effective or that my feature works - [ ] New and existing unit tests pass locally with my changes
1 parent 72e5bbd commit a3a7dcd

File tree

2 files changed

+35
-17
lines changed

2 files changed

+35
-17
lines changed

engine/packages/pegboard/src/workflows/runner.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -402,17 +402,26 @@ async fn handle_stopping(
402402
})
403403
.await?;
404404

405-
// Set all remaining actors to lost immediately
405+
// Set all remaining actors as going away immediately
406406
if !actors.is_empty() {
407407
for (actor_id, generation) in &actors {
408-
ctx.signal(crate::workflows::actor::GoingAway {
409-
generation: *generation,
410-
reset_rescheduling: reset_actor_rescheduling,
411-
})
412-
.to_workflow::<crate::workflows::actor::Workflow>()
413-
.tag("actor_id", actor_id)
414-
.send()
415-
.await?;
408+
let res = ctx
409+
.signal(crate::workflows::actor::GoingAway {
410+
generation: *generation,
411+
reset_rescheduling: reset_actor_rescheduling,
412+
})
413+
.to_workflow::<crate::workflows::actor::Workflow>()
414+
.tag("actor_id", actor_id)
415+
.graceful_not_found()
416+
.send()
417+
.await?;
418+
419+
if res.is_none() {
420+
tracing::warn!(
421+
?actor_id,
422+
"actor workflow not found, likely already stopped"
423+
);
424+
}
416425
}
417426
}
418427
}

engine/packages/pegboard/src/workflows/runner2.rs

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -237,14 +237,23 @@ async fn handle_stopping(
237237
// Set all remaining actors as going away immediately
238238
if !actors.is_empty() {
239239
for (actor_id, generation) in &actors {
240-
ctx.signal(crate::workflows::actor::GoingAway {
241-
generation: *generation,
242-
reset_rescheduling: reset_actor_rescheduling,
243-
})
244-
.to_workflow::<crate::workflows::actor::Workflow>()
245-
.tag("actor_id", actor_id)
246-
.send()
247-
.await?;
240+
let res = ctx
241+
.signal(crate::workflows::actor::GoingAway {
242+
generation: *generation,
243+
reset_rescheduling: reset_actor_rescheduling,
244+
})
245+
.to_workflow::<crate::workflows::actor::Workflow>()
246+
.tag("actor_id", actor_id)
247+
.graceful_not_found()
248+
.send()
249+
.await?;
250+
251+
if res.is_none() {
252+
tracing::warn!(
253+
?actor_id,
254+
"actor workflow not found, likely already stopped"
255+
);
256+
}
248257
}
249258
}
250259
}

0 commit comments

Comments
 (0)