Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions engine/packages/gasoline/src/db/kv/debug.rs
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,7 @@ impl DatabaseDebug for DatabaseKv {
{
ensure!(
current_event.indexed_signal_ids.len() == key.index,
"corrupt history, index doesn't exist yet or is out of order"
"corrupt history, indexed signal doesn't exist yet or is out of order"
);

let signal_id = key.deserialize(entry.value())?;
Expand All @@ -933,7 +933,7 @@ impl DatabaseDebug for DatabaseKv {
{
ensure!(
current_event.indexed_names.len() == key.index,
"corrupt history, index doesn't exist yet or is out of order"
"corrupt history, indexed name doesn't exist yet or is out of order"
);

let name = key.deserialize(entry.value())?;
Expand All @@ -944,7 +944,7 @@ impl DatabaseDebug for DatabaseKv {
{
ensure!(
current_event.indexed_input_chunks.len() == key.index,
"corrupt history, index doesn't exist yet or is out of order"
"corrupt history, indexed chunk doesn't exist yet or is out of order"
);

if let Some(input_chunks) =
Expand Down
34 changes: 23 additions & 11 deletions engine/packages/gasoline/src/db/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
time::{Duration, Instant},
};

use anyhow::{Context, Result, ensure};
use anyhow::{Context, Result};
use futures_util::{StreamExt, TryStreamExt, future::try_join_all, stream::BoxStream};
use rivet_util::Id;
use rivet_util::future::CustomInstrumentExt;
Expand Down Expand Up @@ -1632,10 +1632,13 @@
.unpack::<keys::history::IndexedNameKey>(
entry.key(),
) {
ensure!(
current_event.indexed_names.len() == key.index,
"corrupt history, index doesn't exist yet or is out of order"
);
if current_event.indexed_names.len() != key.index {
tracing::error!(
?wf,
"corrupt history, indexed name doesn't exist yet or is out of order"
);
return Ok(None);
}

let name = key.deserialize(entry.value())?;
current_event.indexed_names.insert(key.index, name);
Expand All @@ -1644,11 +1647,16 @@
.unpack::<keys::history::IndexedInputChunkKey>(
entry.key(),
) {
ensure!(
current_event.indexed_input_chunks.len()
== key.index,
"corrupt history, index doesn't exist yet or is out of order"
);
if current_event.indexed_input_chunks.len()
!= key.index
{
tracing::error!(

Check failure on line 1653 in engine/packages/gasoline/src/db/kv/mod.rs

View workflow job for this annotation

GitHub Actions / Check

`kv::WorkflowHistoryEventBuilder` doesn't implement `std::fmt::Debug`

Check failure on line 1653 in engine/packages/gasoline/src/db/kv/mod.rs

View workflow job for this annotation

GitHub Actions / Check

`kv::WorkflowHistoryEventBuilder` doesn't implement `std::fmt::Debug`

Check failure on line 1653 in engine/packages/gasoline/src/db/kv/mod.rs

View workflow job for this annotation

GitHub Actions / Check

`kv::WorkflowHistoryEventBuilder` doesn't implement `std::fmt::Debug`
?wf,
?current_event,
"corrupt history, indexed chunk doesn't exist yet or is out of order"
);
return Ok(None);
}

if let Some(input_chunks) = current_event
.indexed_input_chunks
Expand Down Expand Up @@ -1682,7 +1690,7 @@
)?);
}

Ok(events_by_location)
Ok(Some(events_by_location))
}
)?;

Expand All @@ -1699,6 +1707,10 @@
return Ok(None);
}

let Some(events) = events else {
return Ok(None);
};

let create_ts = create_ts_key
.deserialize(&create_ts_entry.context("key should exist")?)?;
let ray_id = ray_id_key
Expand Down
8 changes: 8 additions & 0 deletions engine/packages/pegboard/src/workflows/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,14 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
runtime::LifecycleState::new_sleeping()
}
runtime::SpawnActorOutput::Destroy => {
ctx.v(2)
.signal(metrics::Destroy {
ts: util::timestamp::now(),
})
.to_workflow_id(metrics_workflow_id)
.send()
.await?;

// Destroyed early
ctx.workflow(destroy::Input {
namespace_id: input.namespace_id,
Expand Down
10 changes: 8 additions & 2 deletions engine/packages/pegboard/src/workflows/actor/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ async fn allocate_actor_v2(
pending_allocation_ts,
..
} => {
tracing::warn!(
tracing::debug!(
actor_id=?input.actor_id,
"failed to allocate (no availability), waiting for allocation",
);
Expand Down Expand Up @@ -1112,6 +1112,9 @@ pub async fn clear_pending_allocation(
if exists {
tx.clear(&pending_alloc_key);

// If the pending actor key still exists, we must clear its desired slot because after this
// activity the actor will go to sleep or be destroyed. We don't clear the slot if the key
// doesn't exist because the actor may either be allocated or destroyed.
if allocated_serverless_slot {
tx.atomic_op(
&rivet_types::keys::pegboard::ns::ServerlessDesiredSlotsKey::new(
Expand All @@ -1129,7 +1132,10 @@ pub async fn clear_pending_allocation(
.custom_instrument(tracing::info_span!("actor_clear_pending_alloc_tx"))
.await?;

state.allocated_serverless_slot = false;
// Only mark allocated_serverless_slot as false if it was allocated before and cleared now
if allocated_serverless_slot && cleared {
state.allocated_serverless_slot = false;
}

Ok(cleared)
}
Expand Down
17 changes: 9 additions & 8 deletions engine/sdks/typescript/test-runner/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@ const RIVET_RUNNER_KEY = process.env.RIVET_RUNNER_KEY;
const RIVET_RUNNER_VERSION = process.env.RIVET_RUNNER_VERSION
? Number(process.env.RIVET_RUNNER_VERSION)
: 1;
const RIVET_RUNNER_TOTAL_SLOTS = process.env.RIVET_RUNNER_TOTAL_SLOTS
? Number(process.env.RIVET_RUNNER_TOTAL_SLOTS)
: 10000;
const RIVET_RUNNER_TOTAL_SLOTS = parseInt(process.env.RIVET_RUNNER_TOTAL_SLOTS ?? "1");
const RIVET_ENDPOINT = process.env.RIVET_ENDPOINT ?? "http://127.0.0.1:6420";
const RIVET_TOKEN = process.env.RIVET_TOKEN ?? "dev";
const AUTOSTART_SERVER = process.env.DISABLE_SERVER === undefined;
const AUTOSTART_RUNNER = process.env.AUTOSTART_RUNNER !== undefined;
const AUTOSTART_SERVER = (process.env.AUTOSTART_SERVER ?? "1") == "1";
const AUTOSTART_RUNNER = (process.env.AUTOSTART_RUNNER ?? "0") == "1";
const AUTOCONFIGURE_SERVERLESS = (process.env.AUTOCONFIGURE_SERVERLESS ?? "1") == "1";

const runnerStarted = Promise.withResolvers<Runner>();
const runnerStopped = Promise.withResolvers<Runner>();
Expand Down Expand Up @@ -76,7 +75,7 @@ app.get("/shutdown", async (c) => {
return c.text("ok");
});

app.get("/start", async (c) => {
app.get("/api/rivet/start", async (c) => {
return streamSSE(c, async (stream) => {
const runnerStarted = Promise.withResolvers<Runner>();
const runnerStopped = Promise.withResolvers<Runner>();
Expand All @@ -95,7 +94,7 @@ app.get("/start", async (c) => {
});
});

app.get("/metadata", async (c) => {
app.get("/api/rivet/metadata", async (c) => {
return c.json({
runtime: "test-runner",
version: "1",
Expand All @@ -114,7 +113,9 @@ if (AUTOSTART_SERVER) {

if (AUTOSTART_RUNNER) {
runner = await startRunner(runnerStarted, runnerStopped);
} else await autoConfigureServerless();
} else if (AUTOCONFIGURE_SERVERLESS) {
await autoConfigureServerless();
}

process.on("SIGTERM", async () => {
getLogger().debug("received SIGTERM, force exiting in 3s");
Expand Down
Loading