diff --git a/engine/packages/gasoline/src/db/kv/debug.rs b/engine/packages/gasoline/src/db/kv/debug.rs index 07bf981ff6..3df6ce40f2 100644 --- a/engine/packages/gasoline/src/db/kv/debug.rs +++ b/engine/packages/gasoline/src/db/kv/debug.rs @@ -920,7 +920,7 @@ impl DatabaseDebug for DatabaseKv { { ensure!( current_event.indexed_signal_ids.len() == key.index, - "corrupt history, index doesn't exist yet or is out of order" + "corrupt history, indexed signal doesn't exist yet or is out of order" ); let signal_id = key.deserialize(entry.value())?; @@ -933,7 +933,7 @@ impl DatabaseDebug for DatabaseKv { { ensure!( current_event.indexed_names.len() == key.index, - "corrupt history, index doesn't exist yet or is out of order" + "corrupt history, indexed name doesn't exist yet or is out of order" ); let name = key.deserialize(entry.value())?; @@ -944,7 +944,7 @@ impl DatabaseDebug for DatabaseKv { { ensure!( current_event.indexed_input_chunks.len() == key.index, - "corrupt history, index doesn't exist yet or is out of order" + "corrupt history, indexed chunk doesn't exist yet or is out of order" ); if let Some(input_chunks) = diff --git a/engine/packages/gasoline/src/db/kv/mod.rs b/engine/packages/gasoline/src/db/kv/mod.rs index 5314a70f71..62870f5b3c 100644 --- a/engine/packages/gasoline/src/db/kv/mod.rs +++ b/engine/packages/gasoline/src/db/kv/mod.rs @@ -8,7 +8,7 @@ use std::{ time::{Duration, Instant}, }; -use anyhow::{Context, Result, ensure}; +use anyhow::{Context, Result}; use futures_util::{StreamExt, TryStreamExt, future::try_join_all, stream::BoxStream}; use rivet_util::Id; use rivet_util::future::CustomInstrumentExt; @@ -1632,10 +1632,13 @@ impl Database for DatabaseKv { .unpack::( entry.key(), ) { - ensure!( - current_event.indexed_names.len() == key.index, - "corrupt history, index doesn't exist yet or is out of order" - ); + if current_event.indexed_names.len() != key.index { + tracing::error!( + ?wf, + "corrupt history, indexed name doesn't exist yet or is out of order" + ); + return Ok(None); + } let name = key.deserialize(entry.value())?; current_event.indexed_names.insert(key.index, name); @@ -1644,11 +1647,16 @@ impl Database for DatabaseKv { .unpack::( entry.key(), ) { - ensure!( - current_event.indexed_input_chunks.len() - == key.index, - "corrupt history, index doesn't exist yet or is out of order" - ); + if current_event.indexed_input_chunks.len() + != key.index + { + tracing::error!( + ?wf, + ?current_event, + "corrupt history, indexed chunk doesn't exist yet or is out of order" + ); + return Ok(None); + } if let Some(input_chunks) = current_event .indexed_input_chunks @@ -1682,7 +1690,7 @@ impl Database for DatabaseKv { )?); } - Ok(events_by_location) + Ok(Some(events_by_location)) } )?; @@ -1699,6 +1707,10 @@ impl Database for DatabaseKv { return Ok(None); } + let Some(events) = events else { + return Ok(None); + }; + let create_ts = create_ts_key .deserialize(&create_ts_entry.context("key should exist")?)?; let ray_id = ray_id_key diff --git a/engine/packages/pegboard/src/workflows/actor/mod.rs b/engine/packages/pegboard/src/workflows/actor/mod.rs index a1b7083633..a9c8fafba9 100644 --- a/engine/packages/pegboard/src/workflows/actor/mod.rs +++ b/engine/packages/pegboard/src/workflows/actor/mod.rs @@ -326,6 +326,14 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> runtime::LifecycleState::new_sleeping() } runtime::SpawnActorOutput::Destroy => { + ctx.v(2) + .signal(metrics::Destroy { + ts: util::timestamp::now(), + }) + .to_workflow_id(metrics_workflow_id) + .send() + .await?; + // Destroyed early ctx.workflow(destroy::Input { namespace_id: input.namespace_id, diff --git a/engine/packages/pegboard/src/workflows/actor/runtime.rs b/engine/packages/pegboard/src/workflows/actor/runtime.rs index c0f2523a3f..aa8f093e90 100644 --- a/engine/packages/pegboard/src/workflows/actor/runtime.rs +++ b/engine/packages/pegboard/src/workflows/actor/runtime.rs @@ -516,7 +516,7 @@ async fn allocate_actor_v2( pending_allocation_ts, .. } => { - tracing::warn!( + tracing::debug!( actor_id=?input.actor_id, "failed to allocate (no availability), waiting for allocation", ); @@ -1112,6 +1112,9 @@ pub async fn clear_pending_allocation( if exists { tx.clear(&pending_alloc_key); + // If the pending actor key still exists, we must clear its desired slot because after this + // activity the actor will go to sleep or be destroyed. We don't clear the slot if the key + // doesn't exist because the actor may either be allocated or destroyed. if allocated_serverless_slot { tx.atomic_op( &rivet_types::keys::pegboard::ns::ServerlessDesiredSlotsKey::new( @@ -1129,7 +1132,10 @@ pub async fn clear_pending_allocation( .custom_instrument(tracing::info_span!("actor_clear_pending_alloc_tx")) .await?; - state.allocated_serverless_slot = false; + // Only mark allocated_serverless_slot as false if it was allocated before and cleared now + if allocated_serverless_slot && cleared { + state.allocated_serverless_slot = false; + } Ok(cleared) } diff --git a/engine/sdks/typescript/test-runner/src/index.ts b/engine/sdks/typescript/test-runner/src/index.ts index 5835491db5..97ed4980b7 100644 --- a/engine/sdks/typescript/test-runner/src/index.ts +++ b/engine/sdks/typescript/test-runner/src/index.ts @@ -16,13 +16,12 @@ const RIVET_RUNNER_KEY = process.env.RIVET_RUNNER_KEY; const RIVET_RUNNER_VERSION = process.env.RIVET_RUNNER_VERSION ? Number(process.env.RIVET_RUNNER_VERSION) : 1; -const RIVET_RUNNER_TOTAL_SLOTS = process.env.RIVET_RUNNER_TOTAL_SLOTS - ? Number(process.env.RIVET_RUNNER_TOTAL_SLOTS) - : 10000; +const RIVET_RUNNER_TOTAL_SLOTS = parseInt(process.env.RIVET_RUNNER_TOTAL_SLOTS ?? "1"); const RIVET_ENDPOINT = process.env.RIVET_ENDPOINT ?? "http://127.0.0.1:6420"; const RIVET_TOKEN = process.env.RIVET_TOKEN ?? "dev"; -const AUTOSTART_SERVER = process.env.DISABLE_SERVER === undefined; -const AUTOSTART_RUNNER = process.env.AUTOSTART_RUNNER !== undefined; +const AUTOSTART_SERVER = (process.env.AUTOSTART_SERVER ?? "1") == "1"; +const AUTOSTART_RUNNER = (process.env.AUTOSTART_RUNNER ?? "0") == "1"; +const AUTOCONFIGURE_SERVERLESS = (process.env.AUTOCONFIGURE_SERVERLESS ?? "1") == "1"; const runnerStarted = Promise.withResolvers(); const runnerStopped = Promise.withResolvers(); @@ -76,7 +75,7 @@ app.get("/shutdown", async (c) => { return c.text("ok"); }); -app.get("/start", async (c) => { +app.get("/api/rivet/start", async (c) => { return streamSSE(c, async (stream) => { const runnerStarted = Promise.withResolvers(); const runnerStopped = Promise.withResolvers(); @@ -95,7 +94,7 @@ app.get("/start", async (c) => { }); }); -app.get("/metadata", async (c) => { +app.get("/api/rivet/metadata", async (c) => { return c.json({ runtime: "test-runner", version: "1", @@ -114,7 +113,9 @@ if (AUTOSTART_SERVER) { if (AUTOSTART_RUNNER) { runner = await startRunner(runnerStarted, runnerStopped); -} else await autoConfigureServerless(); +} else if (AUTOCONFIGURE_SERVERLESS) { + await autoConfigureServerless(); +} process.on("SIGTERM", async () => { getLogger().debug("received SIGTERM, force exiting in 3s");