Skip to content

Commit 72e5bbd

Browse files
committed
fix(pb): fix metrics wf inconsistent calculation (#4103)
# Description Please include a summary of the changes and the related issue. Please also include relevant motivation and context. ## Type of change - [ ] Bug fix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected) - [ ] This change requires a documentation update ## How Has This Been Tested? Please describe the tests that you ran to verify your changes. ## Checklist: - [ ] My code follows the style guidelines of this project - [ ] I have performed a self-review of my code - [ ] I have commented my code, particularly in hard-to-understand areas - [ ] I have made corresponding changes to the documentation - [ ] My changes generate no new warnings - [ ] I have added tests that prove my fix is effective or that my feature works - [ ] New and existing unit tests pass locally with my changes
1 parent f366184 commit 72e5bbd

File tree

2 files changed

+16
-7
lines changed

2 files changed

+16
-7
lines changed

engine/packages/pegboard/src/workflows/actor/metrics.rs

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ pub(crate) async fn pegboard_actor_metrics(ctx: &mut WorkflowCtx, input: &Input)
9393

9494
// Timeout was reached, record duration up till now
9595
if sigs.is_empty() {
96-
let now = util::timestamp::now();
96+
let now = ctx.v(2).activity(GetTsInput {}).await?;
9797
if let Some(last_recorded_awake_ts) = state.last_recorded_awake_ts {
9898
new_awake_duration += now - last_recorded_awake_ts;
9999
}
@@ -124,14 +124,14 @@ pub(crate) async fn pegboard_actor_metrics(ctx: &mut WorkflowCtx, input: &Input)
124124
}
125125

126126
#[derive(Debug, Serialize, Deserialize, Hash)]
127-
pub struct InitStateInput {
127+
struct InitStateInput {
128128
actor_id: Id,
129129
namespace_id: Id,
130130
name: String,
131131
}
132132

133133
#[activity(InitState)]
134-
pub async fn init_state(ctx: &ActivityCtx, input: &InitStateInput) -> Result<bool> {
134+
async fn init_state(ctx: &ActivityCtx, input: &InitStateInput) -> Result<bool> {
135135
let mut state = ctx.state::<Option<State>>()?;
136136

137137
*state = Some(State {
@@ -155,13 +155,21 @@ pub async fn init_state(ctx: &ActivityCtx, input: &InitStateInput) -> Result<boo
155155
}
156156

157157
#[derive(Debug, Serialize, Deserialize, Hash)]
158-
pub struct RecordMetricsInput {
158+
struct GetTsInput {}
159+
160+
#[activity(GetTs)]
161+
async fn get_ts(ctx: &ActivityCtx, input: &GetTsInput) -> Result<i64> {
162+
Ok(util::timestamp::now())
163+
}
164+
165+
#[derive(Debug, Serialize, Deserialize, Hash)]
166+
struct RecordMetricsInput {
159167
/// Milliseconds.
160168
awake_duration: i64,
161169
}
162170

163171
#[activity(RecordMetrics)]
164-
pub async fn record_metrics(ctx: &ActivityCtx, input: &RecordMetricsInput) -> Result<()> {
172+
async fn record_metrics(ctx: &ActivityCtx, input: &RecordMetricsInput) -> Result<()> {
165173
let state = ctx.state::<State>()?;
166174

167175
// Seconds (rounded up)
@@ -193,10 +201,10 @@ enum KvStorageQueryResult {
193201
}
194202

195203
#[derive(Debug, Serialize, Deserialize, Hash)]
196-
pub struct RecordKvMetricsInput {}
204+
struct RecordKvMetricsInput {}
197205

198206
#[activity(RecordKvMetrics)]
199-
pub async fn record_kv_metrics(ctx: &ActivityCtx, input: &RecordKvMetricsInput) -> Result<()> {
207+
async fn record_kv_metrics(ctx: &ActivityCtx, input: &RecordKvMetricsInput) -> Result<()> {
200208
let mut state = ctx.state::<State>()?;
201209

202210
let actor_id = state.actor_id;

engine/packages/pegboard/src/workflows/actor/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
291291
namespace_id: input.namespace_id,
292292
name: input.name.clone(),
293293
})
294+
.tag("actor_id", input.actor_id)
294295
.dispatch()
295296
.await?;
296297

0 commit comments

Comments
 (0)