Skip to content

Commit 3492170

Browse files
committed
fix(pb, gas): fix overreporting slots bug, metrics wf not stopping bug, corrupt history preventing wf pull (#4148)
# Description Please include a summary of the changes and the related issue. Please also include relevant motivation and context. ## Type of change - [ ] Bug fix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected) - [ ] This change requires a documentation update ## How Has This Been Tested? Please describe the tests that you ran to verify your changes. ## Checklist: - [ ] My code follows the style guidelines of this project - [ ] I have performed a self-review of my code - [ ] I have commented my code, particularly in hard-to-understand areas - [ ] I have made corresponding changes to the documentation - [ ] My changes generate no new warnings - [ ] I have added tests that prove my fix is effective or that my feature works - [ ] New and existing unit tests pass locally with my changes
1 parent 774c38e commit 3492170

File tree

5 files changed

+51
-24
lines changed

5 files changed

+51
-24
lines changed

engine/packages/gasoline/src/db/kv/debug.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -920,7 +920,7 @@ impl DatabaseDebug for DatabaseKv {
920920
{
921921
ensure!(
922922
current_event.indexed_signal_ids.len() == key.index,
923-
"corrupt history, index doesn't exist yet or is out of order"
923+
"corrupt history, indexed signal doesn't exist yet or is out of order"
924924
);
925925

926926
let signal_id = key.deserialize(entry.value())?;
@@ -933,7 +933,7 @@ impl DatabaseDebug for DatabaseKv {
933933
{
934934
ensure!(
935935
current_event.indexed_names.len() == key.index,
936-
"corrupt history, index doesn't exist yet or is out of order"
936+
"corrupt history, indexed name doesn't exist yet or is out of order"
937937
);
938938

939939
let name = key.deserialize(entry.value())?;
@@ -944,7 +944,7 @@ impl DatabaseDebug for DatabaseKv {
944944
{
945945
ensure!(
946946
current_event.indexed_input_chunks.len() == key.index,
947-
"corrupt history, index doesn't exist yet or is out of order"
947+
"corrupt history, indexed chunk doesn't exist yet or is out of order"
948948
);
949949

950950
if let Some(input_chunks) =

engine/packages/gasoline/src/db/kv/mod.rs

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use std::{
88
time::{Duration, Instant},
99
};
1010

11-
use anyhow::{Context, Result, ensure};
11+
use anyhow::{Context, Result};
1212
use futures_util::{StreamExt, TryStreamExt, future::try_join_all, stream::BoxStream};
1313
use rivet_util::Id;
1414
use rivet_util::future::CustomInstrumentExt;
@@ -1632,10 +1632,13 @@ impl Database for DatabaseKv {
16321632
.unpack::<keys::history::IndexedNameKey>(
16331633
entry.key(),
16341634
) {
1635-
ensure!(
1636-
current_event.indexed_names.len() == key.index,
1637-
"corrupt history, index doesn't exist yet or is out of order"
1638-
);
1635+
if current_event.indexed_names.len() != key.index {
1636+
tracing::error!(
1637+
?wf,
1638+
"corrupt history, indexed name doesn't exist yet or is out of order"
1639+
);
1640+
return Ok(None);
1641+
}
16391642

16401643
let name = key.deserialize(entry.value())?;
16411644
current_event.indexed_names.insert(key.index, name);
@@ -1644,11 +1647,16 @@ impl Database for DatabaseKv {
16441647
.unpack::<keys::history::IndexedInputChunkKey>(
16451648
entry.key(),
16461649
) {
1647-
ensure!(
1648-
current_event.indexed_input_chunks.len()
1649-
== key.index,
1650-
"corrupt history, index doesn't exist yet or is out of order"
1651-
);
1650+
if current_event.indexed_input_chunks.len()
1651+
!= key.index
1652+
{
1653+
tracing::error!(
1654+
?wf,
1655+
?current_event,
1656+
"corrupt history, indexed chunk doesn't exist yet or is out of order"
1657+
);
1658+
return Ok(None);
1659+
}
16521660

16531661
if let Some(input_chunks) = current_event
16541662
.indexed_input_chunks
@@ -1682,7 +1690,7 @@ impl Database for DatabaseKv {
16821690
)?);
16831691
}
16841692

1685-
Ok(events_by_location)
1693+
Ok(Some(events_by_location))
16861694
}
16871695
)?;
16881696

@@ -1699,6 +1707,10 @@ impl Database for DatabaseKv {
16991707
return Ok(None);
17001708
}
17011709

1710+
let Some(events) = events else {
1711+
return Ok(None);
1712+
};
1713+
17021714
let create_ts = create_ts_key
17031715
.deserialize(&create_ts_entry.context("key should exist")?)?;
17041716
let ray_id = ray_id_key

engine/packages/pegboard/src/workflows/actor/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,14 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
326326
runtime::LifecycleState::new_sleeping()
327327
}
328328
runtime::SpawnActorOutput::Destroy => {
329+
ctx.v(2)
330+
.signal(metrics::Destroy {
331+
ts: util::timestamp::now(),
332+
})
333+
.to_workflow_id(metrics_workflow_id)
334+
.send()
335+
.await?;
336+
329337
// Destroyed early
330338
ctx.workflow(destroy::Input {
331339
namespace_id: input.namespace_id,

engine/packages/pegboard/src/workflows/actor/runtime.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -516,7 +516,7 @@ async fn allocate_actor_v2(
516516
pending_allocation_ts,
517517
..
518518
} => {
519-
tracing::warn!(
519+
tracing::debug!(
520520
actor_id=?input.actor_id,
521521
"failed to allocate (no availability), waiting for allocation",
522522
);
@@ -1112,6 +1112,9 @@ pub async fn clear_pending_allocation(
11121112
if exists {
11131113
tx.clear(&pending_alloc_key);
11141114

1115+
// If the pending actor key still exists, we must clear its desired slot because after this
1116+
// activity the actor will go to sleep or be destroyed. We don't clear the slot if the key
1117+
// doesn't exist because the actor may either be allocated or destroyed.
11151118
if allocated_serverless_slot {
11161119
tx.atomic_op(
11171120
&rivet_types::keys::pegboard::ns::ServerlessDesiredSlotsKey::new(
@@ -1129,7 +1132,10 @@ pub async fn clear_pending_allocation(
11291132
.custom_instrument(tracing::info_span!("actor_clear_pending_alloc_tx"))
11301133
.await?;
11311134

1132-
state.allocated_serverless_slot = false;
1135+
// Only mark allocated_serverless_slot as false if it was allocated before and cleared now
1136+
if allocated_serverless_slot && cleared {
1137+
state.allocated_serverless_slot = false;
1138+
}
11331139

11341140
Ok(cleared)
11351141
}

engine/sdks/typescript/test-runner/src/index.ts

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,12 @@ const RIVET_RUNNER_KEY = process.env.RIVET_RUNNER_KEY;
1616
const RIVET_RUNNER_VERSION = process.env.RIVET_RUNNER_VERSION
1717
? Number(process.env.RIVET_RUNNER_VERSION)
1818
: 1;
19-
const RIVET_RUNNER_TOTAL_SLOTS = process.env.RIVET_RUNNER_TOTAL_SLOTS
20-
? Number(process.env.RIVET_RUNNER_TOTAL_SLOTS)
21-
: 10000;
19+
const RIVET_RUNNER_TOTAL_SLOTS = parseInt(process.env.RIVET_RUNNER_TOTAL_SLOTS ?? "1");
2220
const RIVET_ENDPOINT = process.env.RIVET_ENDPOINT ?? "http://127.0.0.1:6420";
2321
const RIVET_TOKEN = process.env.RIVET_TOKEN ?? "dev";
24-
const AUTOSTART_SERVER = process.env.DISABLE_SERVER === undefined;
25-
const AUTOSTART_RUNNER = process.env.AUTOSTART_RUNNER !== undefined;
22+
const AUTOSTART_SERVER = (process.env.AUTOSTART_SERVER ?? "1") == "1";
23+
const AUTOSTART_RUNNER = (process.env.AUTOSTART_RUNNER ?? "0") == "1";
24+
const AUTOCONFIGURE_SERVERLESS = (process.env.AUTOCONFIGURE_SERVERLESS ?? "1") == "1";
2625

2726
const runnerStarted = Promise.withResolvers<Runner>();
2827
const runnerStopped = Promise.withResolvers<Runner>();
@@ -76,7 +75,7 @@ app.get("/shutdown", async (c) => {
7675
return c.text("ok");
7776
});
7877

79-
app.get("/start", async (c) => {
78+
app.get("/api/rivet/start", async (c) => {
8079
return streamSSE(c, async (stream) => {
8180
const runnerStarted = Promise.withResolvers<Runner>();
8281
const runnerStopped = Promise.withResolvers<Runner>();
@@ -95,7 +94,7 @@ app.get("/start", async (c) => {
9594
});
9695
});
9796

98-
app.get("/metadata", async (c) => {
97+
app.get("/api/rivet/metadata", async (c) => {
9998
return c.json({
10099
runtime: "test-runner",
101100
version: "1",
@@ -114,7 +113,9 @@ if (AUTOSTART_SERVER) {
114113

115114
if (AUTOSTART_RUNNER) {
116115
runner = await startRunner(runnerStarted, runnerStopped);
117-
} else await autoConfigureServerless();
116+
} else if (AUTOCONFIGURE_SERVERLESS) {
117+
await autoConfigureServerless();
118+
}
118119

119120
process.on("SIGTERM", async () => {
120121
getLogger().debug("received SIGTERM, force exiting in 3s");

0 commit comments

Comments
 (0)