Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions treeherder/perf/auto_perf_sheriffing/backfill_reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,8 @@ def _provide_records(self, backfill_report: BackfillReport, alert_context_map: l
alert=alert,
report=backfill_report,
context=json.dumps(retrigger_context, default=default_serializer),
last_detected_push_id=alert.summary.push_id,
anchor_push_id=alert.summary.push_id,
)

def __fetch_summaries_to_retrigger(
Expand Down
198 changes: 196 additions & 2 deletions treeherder/perf/auto_perf_sheriffing/secretary.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
import logging
import time
from datetime import datetime, timedelta
from typing import Any

import simplejson as json
from django.conf import settings as django_settings
from django.db.models import Q

from treeherder.perf.auto_perf_sheriffing.outcome_checker import (
OutcomeChecker,
OutcomeStatus,
)
from treeherder.perf.models import BackfillRecord, BackfillReport, PerformanceSettings
from treeherder.perf.models import (
BackfillRecord,
BackfillReport,
PerformanceDatum,
PerformanceSettings,
Push,
)
from treeherder.perfalert.perfalert import RevisionDatum, detect_changes
from treeherder.utils import default_serializer

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -126,12 +136,113 @@ def check_outcome(self):
# if outcome is IN_PROGRESS the BackfillRecord state will remain BACKFILLED to be checked again later
if outcome == OutcomeStatus.SUCCESSFUL:
record.status = BackfillRecord.SUCCESSFUL
record.save()
self.verify_and_iterate(record)
elif outcome == OutcomeStatus.FAILED:
record.status = BackfillRecord.FAILED
record.save()
record.save()
except ValueError as ex:
logger.error(ex)

def verify_and_iterate(self, record: BackfillRecord, max_iterations: int = 5):
if record.iteration_count >= max_iterations:
logger.info(
f"Record {record.alert.id} reached max iterations ({max_iterations}), stopping verification."
)
return

if record.last_detected_push_id is None:
# Legacy record, no need to compare direction
logger.warning(
f"Record {record.alert.id}: last_detected_push_id is None; cannot compare direction."
)
return

try:
detected_push_id, detected_t_value, candidates = self.re_run_detect_changes(record)

if detected_push_id is None:
logger.warning(
f"Record {record.alert.id}: No change detected in verification, stopping iteration."
)
return

log_entry = {
"iteration": record.iteration_count,
"detected_push_id": detected_push_id,
"detected_t_value": detected_t_value,
"candidates": candidates,
"timestamp": datetime.utcnow().isoformat(),
"previous_push_id": record.last_detected_push_id,
}

if detected_push_id == record.last_detected_push_id:
log_entry["direction"] = "stabilized"
log_entry["notes"] = "Detected push same as previous, culprit stabilized"
record.append_to_backfill_logs(log_entry)
record.save()
logger.info(
f"Backfill Record {record.alert.id}: Detected push {detected_push_id} stabilized, "
f"stopping iteration."
)
return

try:
detected_push = Push.objects.get(id=detected_push_id, repository=record.repository)
previous_push = Push.objects.get(
id=record.last_detected_push_id, repository=record.repository
)
except Push.DoesNotExist as ex:
logger.error(f"Record {record.alert.id}: Could not find push for comparison: {ex}")
return

if (
detected_push.time < previous_push.time
or detected_push_id < record.last_detected_push_id
):
direction = "left"
anchor_push_id = detected_push_id
else:
step = 24
direction = "right"
future_qs = (
Push.objects.filter(repository=record.repository)
.filter(
Q(time__gt=detected_push.time)
| (Q(time=detected_push.time) & Q(id__gt=detected_push.id))
)
.order_by("time", "id")
.values_list("id", flat=True)
)
rows = list(future_qs[step : step + 1])
if rows:
anchor_push_id = rows[0]
else:
last_id = future_qs.last()
anchor_push_id = last_id if last_id is not None else detected_push_id

log_entry["direction"] = direction
log_entry["notes"] = (
f"Detected push moved {direction} (from {record.last_detected_push_id} to {detected_push_id})"
)

logger.info(
f"Backfill Record {record.alert.id}: Detected push moved {direction} from {record.last_detected_push_id} to {detected_push_id}, "
f"iteration {record.iteration_count}/{max_iterations}, "
f"triggering next backfill."
)
record.last_detected_push_id = detected_push_id
record.anchor_push_id = anchor_push_id
record.append_to_backfill_logs(log_entry)
record.status = BackfillRecord.READY_FOR_PROCESSING
record.save()

except Exception as ex:
logger.error(
f"Record {record.alert.id}: Error during verification/iteration: {ex}",
exc_info=True,
)

def __assert_platform_is_supported(self, on_platform: str):
if on_platform not in self.supported_platforms:
raise ValueError(f"Unsupported platform: {on_platform}.")
Expand All @@ -148,3 +259,86 @@ def _get_default_settings(cls, as_json=True):
if as_json
else default_settings
)

def re_run_detect_changes(
self, record: BackfillRecord
) -> tuple[int | None, float | None, list[dict[str, Any]]]:
"""
Re-run detect_changes on the alert's signature and return the detected push_id.
Returns None if no change is detected.
Returns:
tuple: (detected_push_id, detected_t_value, candidates)
"""
signature = record.alert.series_signature
repository = record.repository

try:
last_detected_push = Push.objects.get(
id=record.last_detected_push_id, repository=repository
)
except Push.DoesNotExist as ex:
logger.warning(f"Record {record.alert.id}: Could not find anchor push: {ex}")
return None, None, []

start_time = last_detected_push.time - django_settings.PERFHERDER_ALERTS_MAX_AGE
end_time = last_detected_push.time + timedelta(days=7)
series = (
PerformanceDatum.objects.filter(
signature=signature,
repository=repository,
push_timestamp__gte=start_time,
push_timestamp__lte=end_time,
)
.values_list("push_id", "push_timestamp", "value")
.order_by("push_timestamp")
)

revision_data = {}
for push_id, push_timestamp, value in series:
if not revision_data.get(push_id):
revision_data[push_id] = RevisionDatum(
int(time.mktime(push_timestamp.timetuple())), push_id, [], []
)
revision_data[push_id].values.append(value)

if not revision_data:
logger.warning(
f"No performance data found for signature {signature.id} in verification range."
)
return None, None, []

min_back_window = signature.min_back_window
if min_back_window is None:
min_back_window = django_settings.PERFHERDER_ALERTS_MIN_BACK_WINDOW
max_back_window = signature.max_back_window
if max_back_window is None:
max_back_window = django_settings.PERFHERDER_ALERTS_MAX_BACK_WINDOW
fore_window = signature.fore_window
if fore_window is None:
fore_window = django_settings.PERFHERDER_ALERTS_FORE_WINDOW

data = list(revision_data.values())
analyzed_series = detect_changes(
data,
min_back_window=min_back_window,
max_back_window=max_back_window,
fore_window=fore_window,
)

candidates: list[dict[str, Any]] = []
for prev, cur in zip(analyzed_series, analyzed_series[1:]):
if cur.change_detected:
candidates.append(
{
"push_id": int(cur.push_id),
"t_value": float(cur.t),
"push_timestamp": int(cur.push_timestamp),
}
)

if not candidates:
return None, None, []

push_timestamp = int(last_detected_push.time.timestamp())
culprit = min(candidates, key=lambda c: abs(c["push_timestamp"] - push_timestamp))
return culprit["push_id"], culprit["t_value"], candidates
71 changes: 34 additions & 37 deletions treeherder/perf/auto_perf_sheriffing/sherlock.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
import traceback
from datetime import datetime, time, timedelta, timezone
from json import JSONDecodeError, loads
from json import loads
from logging import INFO, WARNING

import requests
Expand Down Expand Up @@ -30,6 +30,7 @@
BackfillNotificationRecord,
BackfillRecord,
BackfillReport,
PerformanceDatum,
PerformanceFramework,
PerformanceTelemetryAlert,
PerformanceTelemetryAlertSummary,
Expand Down Expand Up @@ -142,7 +143,7 @@ def __backfill_on(self, platform: str, frameworks: list[str], repositories: list
left, consumed = self._backfill_record(record, left)
logger.info(f"Sherlock: Backfilled record with id {record.alert.id}.")
# Model used for reporting backfill outcome
BackfillNotificationRecord.objects.create(record=record)
BackfillNotificationRecord.objects.get_or_create(record=record)
total_consumed += consumed

self.secretary.consume_backfills(platform, total_consumed)
Expand All @@ -169,32 +170,27 @@ def __fetch_records_requiring_backfills_on(

def _backfill_record(self, record: BackfillRecord, left: int) -> tuple[int, int]:
consumed = 0
data_points_to_backfill = self.__get_data_points_to_backfill(record)
for data_point in data_points_to_backfill:
if left <= 0 or self.runtime_exceeded():
break
try:
using_job_id = data_point.job_id
if not using_job_id:
logger.info(f"Failed to backfill record {record.alert.id}: invalid job id.")
continue
self.backfill_tool.backfill_job(using_job_id)
left, consumed = left - 1, consumed + 1
except (KeyError, CannotBackfillError, Exception) as ex:
logger.debug(f"Failed to backfill record {record.alert.id}: {ex}")
else:
record.try_remembering_job_properties(using_job_id)

try:
context = record.get_context()
except JSONDecodeError:
logger.warning(f"Failed to backfill record {record.alert.id}: invalid JSON context.")
record.status = BackfillRecord.FAILED
record.save()
else:
data_points_to_backfill = self.__get_data_points_to_backfill(context)
for data_point in data_points_to_backfill:
if left <= 0 or self.runtime_exceeded():
break
try:
using_job_id = data_point["job_id"]
self.backfill_tool.backfill_job(using_job_id)
left, consumed = left - 1, consumed + 1
except (KeyError, CannotBackfillError, Exception) as ex:
logger.debug(f"Failed to backfill record {record.alert.id}: {ex}")
else:
record.try_remembering_job_properties(using_job_id)

success, outcome = self._note_backfill_outcome(
record, len(data_points_to_backfill), consumed
)
log_level = INFO if success else WARNING
logger.log(log_level, f"{outcome} (for backfill record {record.alert.id})")
success, outcome = self._note_backfill_outcome(
record, len(data_points_to_backfill), consumed
)
log_level = INFO if success else WARNING
logger.log(log_level, f"{outcome} (for backfill record {record.alert.id})")

return left, consumed

Expand All @@ -205,6 +201,7 @@ def _note_backfill_outcome(
success = False

record.total_actions_triggered = actually_backfilled
record.iteration_count += 1

if actually_backfilled == to_backfill:
record.status = BackfillRecord.BACKFILLED
Expand Down Expand Up @@ -241,16 +238,16 @@ def _is_queue_overloaded(provisioner_id: str, worker_type: str, acceptable_limit
return pending_tasks_count > acceptable_limit

@staticmethod
def __get_data_points_to_backfill(context: list[dict]) -> list[dict]:
context_len = len(context)
start = None

if context_len == 1:
start = 0
elif context_len > 1:
start = 1

return context[start:]
def __get_data_points_to_backfill(record: BackfillRecord) -> list[dict]:
signature = record.alert.series_signature
repository = signature.repository
anchor_push_id = record.anchor_push_id
data_point = PerformanceDatum.objects.filter(
repository=repository,
signature=signature,
push_id=anchor_push_id,
).first()
return [data_point] if data_point else []

def telemetry_alert(self):
if not self._can_run_telemetry():
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Generated by Django 5.1.15 on 2026-02-16 07:01

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("perf", "0063_performancebugtemplate_no_action_required_text"),
]

operations = [
migrations.AddField(
model_name="backfillrecord",
name="anchor_push_id",
field=models.IntegerField(blank=True, null=True),
),
migrations.AddField(
model_name="backfillrecord",
name="backfill_logs",
field=models.TextField(default="[]"),
),
migrations.AddField(
model_name="backfillrecord",
name="iteration_count",
field=models.IntegerField(default=0),
),
migrations.AddField(
model_name="backfillrecord",
name="last_detected_push_id",
field=models.IntegerField(blank=True, null=True),
),
]
Loading