Skip to content

Commit ca26005

Browse files
PythonFZclaude
andauthored
Fix/assigned job timeout (#858)
* fix: remove _worker_id to fix race condition in job processing The client was storing _worker_id from server responses, but this caused a race condition during socket reconnection: 1. Socket reconnects with NEW sid 2. Server assigns pending job to NEW sid 3. Socket event handler receives job:assign 4. But _worker_id still holds OLD value (not yet updated) 5. Worker sends wrong worker_id to server → 400 BAD REQUEST The fix removes _worker_id entirely and always uses socket.sio.sid directly. This is safe because: - The server always assigns jobs to the socket's current sid - The socket that receives job:assign is always the one with that sid - socket.sio.sid always reflects the current connection Changes: - Remove _worker_id field from ZnDraw dataclass - Simplify sid property to return socket.sio.sid directly - Remove worker_id storage in socket_manager._register_extensions_after_join - Update tests to use vis.sid instead of vis._worker_id Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * refactor: remove unused workerId return from registration methods The register_extension() and register_filesystem() methods in api_manager.py were returning workerId from server responses, but callers no longer use this value after removing _worker_id storage. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * feat: add ASSIGNED job timeout with lazy cleanup Jobs stuck in ASSIGNED state for more than 30 seconds are now automatically failed during job listing. This handles cases where a worker disconnects before confirming the job. Changes: - Add cleanup_stale_assigned_jobs() to JobManager - Call cleanup lazily during list_active_jobs() - Add error and workerId fields to job API response - Convert Job class to dataclass with proper type hints - Add tests for timeout behavior Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * fix: use assigned_at timestamp for job timeout instead of created_at - Add assigned_at timestamp when job transitions to ASSIGNED state - Update cleanup_stale_assigned_jobs() to use assigned_at (with fallback to created_at for backwards compatibility) - Update test to verify assigned_at is used for timeout calculation This fixes the issue where a job that waited in PENDING state would incorrectly timeout immediately upon assignment. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * fix * auto-refresh / fix bug --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent fe5b9c9 commit ca26005

File tree

7 files changed

+250
-64
lines changed

7 files changed

+250
-64
lines changed

app/src/hooks/useSchemas.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,22 @@ export const useJobs = (room: string) => {
214214
};
215215
}, [room, refetch]);
216216

217+
// Poll every 5 seconds when there are jobs in "assigned" state
218+
// This triggers lazy cleanup on the backend for timed-out jobs
219+
useEffect(() => {
220+
const hasAssignedJobs = jobs.some((job) => job.status === "assigned");
221+
222+
if (!hasAssignedJobs || !room) {
223+
return;
224+
}
225+
226+
const intervalId = setInterval(() => {
227+
refetch(false); // Silent refetch, no loading spinner
228+
}, 5000);
229+
230+
return () => clearInterval(intervalId);
231+
}, [jobs, room, refetch]);
232+
217233
return {
218234
data: jobs,
219235
isLoading: isLoading && !hasLoaded, // Only show loading if we haven't loaded yet

src/zndraw/api_manager.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -374,9 +374,6 @@ def register_extension(
374374
f"Extension registration failed: {data.get('error', 'Unknown error')}"
375375
)
376376

377-
# Return the worker_id assigned by server so caller can store it
378-
return data.get("workerId")
379-
380377
def register_filesystem(
381378
self,
382379
name: str,
@@ -431,9 +428,6 @@ def register_filesystem(
431428
f"Filesystem registration failed: {data.get('error', 'Unknown error')}"
432429
)
433430

434-
# Return the worker_id assigned by server so caller can store it
435-
return data.get("workerId")
436-
437431
def get_frames(
438432
self, indices_or_slice, keys: list[str] | None = None
439433
) -> list[dict[bytes, bytes]]:

src/zndraw/app/job_manager.py

Lines changed: 106 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@
1515

1616
log = logging.getLogger(__name__)
1717

18+
# Jobs in ASSIGNED state should transition to PROCESSING within seconds.
19+
# If they stay in ASSIGNED longer than this, they're considered stale (worker died/disconnected).
20+
ASSIGNED_TIMEOUT_SECONDS = 30
21+
1822

1923
def _emit_job_state_changed(
2024
socketio,
@@ -249,6 +253,7 @@ def assign_job(
249253
update_data = {
250254
"status": JobStatus.ASSIGNED,
251255
"worker_id": worker_id,
256+
"assigned_at": utc_now_iso(),
252257
}
253258

254259
redis_client.hset(job_keys.hash_key(), mapping=update_data)
@@ -440,6 +445,73 @@ def fail_job(redis_client: Any, job_id: str, error: str, socketio=None) -> bool:
440445

441446
return True
442447

448+
@staticmethod
449+
def cleanup_stale_assigned_jobs(
450+
redis_client: Any, room: str, socketio=None
451+
) -> list[str]:
452+
"""Fail jobs stuck in ASSIGNED state for too long.
453+
454+
Jobs in ASSIGNED state should transition to PROCESSING within seconds.
455+
If they stay in ASSIGNED longer than ASSIGNED_TIMEOUT_SECONDS, the
456+
worker likely died or disconnected before confirming the job.
457+
458+
Parameters
459+
----------
460+
redis_client
461+
Redis client instance
462+
room : str
463+
Room identifier
464+
socketio
465+
SocketIO instance for emitting events (optional)
466+
467+
Returns
468+
-------
469+
list[str]
470+
List of job IDs that were failed due to timeout
471+
"""
472+
room_keys = RoomKeys(room)
473+
# Convert to list to avoid "Set changed size during iteration" error
474+
# since fail_job() removes jobs from the active set
475+
job_ids = list(redis_client.smembers(room_keys.jobs_active()))
476+
failed_jobs = []
477+
now = utc_now_timestamp()
478+
479+
for job_id in job_ids:
480+
job_keys = JobKeys(job_id)
481+
job_data = redis_client.hgetall(job_keys.hash_key())
482+
483+
if not job_data:
484+
continue
485+
486+
# Only check ASSIGNED jobs
487+
if job_data.get("status") != JobStatus.ASSIGNED:
488+
continue
489+
490+
# Check if job has been in ASSIGNED state too long
491+
# Use assigned_at if available, fallback to created_at for backwards compatibility
492+
assigned_at = job_data.get("assigned_at") or job_data.get("created_at")
493+
if not assigned_at:
494+
continue
495+
496+
try:
497+
assigned_timestamp = isoparse(assigned_at).timestamp()
498+
age_seconds = now - assigned_timestamp
499+
500+
if age_seconds > ASSIGNED_TIMEOUT_SECONDS:
501+
# Fail the job due to timeout
502+
error_msg = (
503+
f"Job timed out in ASSIGNED state after {age_seconds:.1f}s "
504+
f"(threshold: {ASSIGNED_TIMEOUT_SECONDS}s). "
505+
"Worker likely disconnected before processing."
506+
)
507+
JobManager.fail_job(redis_client, job_id, error_msg, socketio)
508+
failed_jobs.append(job_id)
509+
log.warning(f"Cleaned up stale ASSIGNED job {job_id}: {error_msg}")
510+
except Exception as e:
511+
log.error(f"Error checking job {job_id} for timeout: {e}")
512+
513+
return failed_jobs
514+
443515
@staticmethod
444516
def get_job(redis_client: Any, job_id: str) -> Optional[dict]:
445517
"""Get job details.
@@ -464,16 +536,28 @@ def get_job(redis_client: Any, job_id: str) -> Optional[dict]:
464536
return job_data
465537

466538
@staticmethod
467-
def list_active_jobs(redis_client: Any, room: str) -> list[dict]:
539+
def list_active_jobs(redis_client: Any, room: str, socketio=None) -> list[dict]:
468540
"""List all active (queued or running) jobs for a room.
469541
470-
Args:
471-
redis_client: Redis client instance
472-
room: Room identifier
542+
Performs lazy cleanup of stale ASSIGNED jobs before returning results.
473543
474-
Returns:
544+
Parameters
545+
----------
546+
redis_client
547+
Redis client instance
548+
room : str
549+
Room identifier
550+
socketio
551+
SocketIO instance for emitting events during cleanup (optional)
552+
553+
Returns
554+
-------
555+
list[dict]
475556
List of job data dicts
476557
"""
558+
# Lazy cleanup: fail any jobs stuck in ASSIGNED state
559+
JobManager.cleanup_stale_assigned_jobs(redis_client, room, socketio)
560+
477561
room_keys = RoomKeys(room)
478562
job_ids = redis_client.smembers(room_keys.jobs_active())
479563

@@ -536,15 +620,26 @@ def list_inactive_jobs(redis_client: Any, room: str) -> list[dict]:
536620
return jobs
537621

538622
@staticmethod
539-
def list_all_jobs(redis_client: Any, room: str) -> list[dict]:
623+
def list_all_jobs(redis_client: Any, room: str, socketio=None) -> list[dict]:
540624
"""List all jobs for a room.
541-
Args:
542-
redis_client: Redis client instance
543-
room: Room identifier
544-
Returns:
625+
626+
Performs lazy cleanup of stale ASSIGNED jobs via list_active_jobs.
627+
628+
Parameters
629+
----------
630+
redis_client
631+
Redis client instance
632+
room : str
633+
Room identifier
634+
socketio
635+
SocketIO instance for emitting events during cleanup (optional)
636+
637+
Returns
638+
-------
639+
list[dict]
545640
List of job data dicts
546641
"""
547-
active_jobs = JobManager.list_active_jobs(redis_client, room)
642+
active_jobs = JobManager.list_active_jobs(redis_client, room, socketio)
548643
inactive_jobs = JobManager.list_inactive_jobs(redis_client, room)
549644
return active_jobs + inactive_jobs
550645

src/zndraw/app/job_routes.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,10 +120,13 @@ def _transition_worker_to_idle(
120120

121121
@jobs.route("/api/rooms/<string:room_id>/jobs", methods=["GET"])
122122
def list_jobs(room_id: str):
123-
"""List active jobs for a room."""
123+
"""List active jobs for a room.
124+
125+
Performs lazy cleanup of stale ASSIGNED jobs.
126+
"""
124127
redis_client = current_app.extensions["redis"]
125-
jobs = JobManager.list_all_jobs(redis_client, room_id)
126-
return jobs, 200
128+
jobs_list = JobManager.list_all_jobs(redis_client, room_id, socketio)
129+
return jobs_list, 200
127130

128131

129132
@jobs.route("/api/rooms/<string:room_id>/jobs/<string:job_id>", methods=["GET"])
@@ -191,6 +194,8 @@ def get_job_details(job_id: str):
191194
"public": job.get("public") == "true",
192195
"status": job["status"],
193196
"createdAt": job.get("created_at"),
197+
"error": job.get("error") or None,
198+
"workerId": job.get("worker_id") or None,
194199
}
195200

196201
log.debug(f"Worker fetching job {job_id}: {job['category']}/{job['extension']}")

src/zndraw/job.py

Lines changed: 41 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,19 @@
11
"""Job object for tracking job progress."""
22

33
import time
4+
from dataclasses import dataclass, field
45
from typing import Any
56

67
from zndraw.app.job_manager import JobStatus
78

89

10+
@dataclass
911
class Job:
1012
"""Represents a submitted job and allows tracking its progress.
1113
1214
This object is returned by vis.run() and provides methods to monitor
1315
job execution, wait for completion, and retrieve results.
1416
15-
In Jupyter notebooks, displays an iframe showing live progress.
16-
1717
Parameters
1818
----------
1919
job_id : str
@@ -22,23 +22,23 @@ class Job:
2222
Server URL
2323
room : str
2424
Room ID
25-
api : APIManager
25+
api : Any
2626
API manager instance for making requests
27+
socket : Any
28+
Socket manager instance (optional)
2729
2830
Examples
2931
--------
3032
>>> job = vis.run(MyExtension(param=42))
3133
>>> job.wait() # Block until completion
32-
>>> result = job.get_result()
3334
"""
3435

35-
def __init__(self, job_id: str, url: str, room: str, api: Any, socket: Any = None):
36-
self.job_id = job_id
37-
self.url = url
38-
self.room = room
39-
self.api = api
40-
self.socket = socket
41-
self._cached_data: dict[str, Any] | None = None
36+
job_id: str
37+
url: str
38+
room: str
39+
api: Any
40+
socket: Any = None
41+
_cached_data: dict[str, Any] = field(default_factory=dict, repr=False)
4242

4343
def refresh(self) -> dict[str, Any]:
4444
"""Fetch latest job status from server.
@@ -51,6 +51,12 @@ def refresh(self) -> dict[str, Any]:
5151
self._cached_data = self.api.get_job(self.job_id)
5252
return self._cached_data
5353

54+
def _ensure_cached(self) -> dict[str, Any]:
55+
"""Ensure cached data is loaded, fetching if necessary."""
56+
if not self._cached_data:
57+
self.refresh()
58+
return self._cached_data
59+
5460
@property
5561
def status(self) -> str:
5662
"""Get current job status.
@@ -60,9 +66,7 @@ def status(self) -> str:
6066
str
6167
One of: pending, assigned, processing, completed, failed
6268
"""
63-
if self._cached_data is None:
64-
self.refresh()
65-
return self._cached_data.get("status", "unknown")
69+
return self._ensure_cached().get("status", "unknown")
6670

6771
def is_pending(self) -> bool:
6872
"""Check if job is pending (waiting for worker)."""
@@ -88,6 +92,28 @@ def is_done(self) -> bool:
8892
"""Check if job is in a terminal state (completed or failed)."""
8993
return self.is_completed() or self.is_failed()
9094

95+
@property
96+
def error(self) -> str | None:
97+
"""Get error message if job failed.
98+
99+
Returns
100+
-------
101+
str | None
102+
Error message if failed, None otherwise
103+
"""
104+
return self._ensure_cached().get("error") or None
105+
106+
@property
107+
def worker_id(self) -> str | None:
108+
"""Get the worker ID assigned to this job.
109+
110+
Returns
111+
-------
112+
str | None
113+
Worker session ID if assigned, None otherwise
114+
"""
115+
return self._ensure_cached().get("workerId") or None
116+
91117
def wait(self, timeout: float | None = None, poll_interval: float = 0.5) -> None:
92118
"""Block until job completes or fails.
93119
@@ -120,19 +146,4 @@ def wait(self, timeout: float | None = None, poll_interval: float = 0.5) -> None
120146

121147
def __repr__(self) -> str:
122148
"""Terminal-friendly representation."""
123-
status = self.status
124-
return f"Job(id={self.job_id}, status={status})"
125-
126-
# def _repr_html_(self) -> str:
127-
# """Jupyter notebook representation using iframe.
128-
129-
# Displays live progress by embedding the server's job page.
130-
# """
131-
# try:
132-
# from IPython.display import IFrame
133-
# except ImportError:
134-
# raise ImportError(
135-
# "IPython is required for viewer display. Install with: uv add / pip install ipython"
136-
# )
137-
# iframe_url = f"{self.url}/job/{self.job_id}"
138-
# return IFrame(src=iframe_url, width="100%", height=600)._repr_html_()
149+
return f"Job(id={self.job_id}, status={self.status})"

src/zndraw/zndraw.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -689,16 +689,15 @@ def load_selection_group(self, group_name: str) -> None:
689689
def sid(self) -> str | None:
690690
"""Return the worker ID assigned by the server.
691691
692-
The server assigns a worker ID (its request.sid) during extension registration.
693-
This ID is used consistently for both registration and disconnect cleanup.
692+
The server assigns a worker ID (its request.sid) during room:join.
693+
This ID is used for job assignment and worker tracking.
694694
695695
Returns
696696
-------
697697
str | None
698-
The worker ID assigned by server, client's socket.sio.sid if not yet registered,
699-
or None if not connected.
698+
The server-assigned worker ID, or None if not yet connected.
700699
"""
701-
return self._worker_id if self._worker_id else self.socket.sio.sid
700+
return self._worker_id
702701

703702
@property
704703
def is_admin(self) -> bool:

0 commit comments

Comments
 (0)