Skip to content

Commit fe5b9c9

Browse files
PythonFZclaudeCopilotpre-commit-ci[bot]
authored
Fix/remove worker id race condition (#856)
* fix: remove _worker_id to fix race condition in job processing The client was storing _worker_id from server responses, but this caused a race condition during socket reconnection: 1. Socket reconnects with NEW sid 2. Server assigns pending job to NEW sid 3. Socket event handler receives job:assign 4. But _worker_id still holds OLD value (not yet updated) 5. Worker sends wrong worker_id to server → 400 BAD REQUEST The fix removes _worker_id entirely and always uses socket.sio.sid directly. This is safe because: - The server always assigns jobs to the socket's current sid - The socket that receives job:assign is always the one with that sid - socket.sio.sid always reflects the current connection Changes: - Remove _worker_id field from ZnDraw dataclass - Simplify sid property to return socket.sio.sid directly - Remove worker_id storage in socket_manager._register_extensions_after_join - Update tests to use vis.sid instead of vis._worker_id Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * refactor: remove unused workerId return from registration methods The register_extension() and register_filesystem() methods in api_manager.py were returning workerId from server responses, but callers no longer use this value after removing _worker_id storage. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * fix: correct API call parameters and logging labels after worker ID removal (#857) * Initial plan * fix: address review comments - update register_filesystem params, log labels, and add request timeouts Co-authored-by: PythonFZ <46721498+PythonFZ@users.noreply.github.com> --------- Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: PythonFZ <46721498+PythonFZ@users.noreply.github.com> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fix: capture workerId from room:join to fix Socket.IO SID mismatch - Add workerId to room:join response (single source of truth) - Capture _worker_id on connect and reconnect - Remove redundant _worker_id captures from registration methods (DRY) - Fix filesystem re-registration keys (fs_entry['fs'], fs_entry['name']) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com> Co-authored-by: Copilot <198982749+Copilot@users.noreply.github.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 7796a5d commit fe5b9c9

File tree

4 files changed

+39
-45
lines changed

4 files changed

+39
-45
lines changed

src/zndraw/app/events.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -727,6 +727,7 @@ def handle_room_join(data):
727727
return {
728728
"status": "ok",
729729
"sessionId": session_id,
730+
"workerId": sid, # Server's socket SID for worker identification
730731
"cameraKey": get_session_camera_key(session_id),
731732
"step": int(current_step) if current_step else 0,
732733
"frameCount": frame_count,

src/zndraw/socket_manager.py

Lines changed: 13 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,9 @@ def connect(self):
7070
if response.get("status") != "ok":
7171
raise RuntimeError(f"Failed to join room: {response.get('message')}")
7272

73-
# Store sessionId for subsequent operations
73+
# Store sessionId and workerId for subsequent operations
7474
self.zndraw.api.session_id = response["sessionId"]
75+
self.zndraw._worker_id = response.get("workerId")
7576

7677
# Initialize minimal room data from socket response
7778
self.zndraw._len = response.get("frameCount", 0)
@@ -138,8 +139,9 @@ def _on_connect(self):
138139
)
139140
return
140141

141-
# Update session ID and minimal state
142+
# Update session ID, worker ID, and minimal state
142143
self.zndraw.api.session_id = response["sessionId"]
144+
self.zndraw._worker_id = response.get("workerId")
143145
self.zndraw._len = response.get("frameCount", 0)
144146
self.zndraw._step = response.get("step", 0)
145147

@@ -151,47 +153,38 @@ def _register_extensions_after_join(self):
151153
152154
Called from connect() after room:join succeeds and session_id is set.
153155
This ensures the session exists before attempting to register extensions.
156+
Note: worker_id is already set from room:join response, no need to capture here.
154157
"""
155158
# Re-register any extensions that were registered before connection
156159
# Process public extensions
157160
for name, ext in self.zndraw._public_extensions.items():
158-
worker_id = self.zndraw.api.register_extension(
161+
self.zndraw.api.register_extension(
159162
name=name,
160163
category=ext["extension"].category,
161164
schema=ext["extension"].model_json_schema(),
162165
socket_manager=self,
163166
public=True,
164167
)
165-
# Store the worker_id assigned by server
166-
if worker_id:
167-
self.zndraw._worker_id = worker_id
168168

169169
# Process private extensions
170170
for name, ext in self.zndraw._private_extensions.items():
171-
worker_id = self.zndraw.api.register_extension(
171+
self.zndraw.api.register_extension(
172172
name=name,
173173
category=ext["extension"].category,
174174
schema=ext["extension"].model_json_schema(),
175175
socket_manager=self,
176176
public=False,
177177
)
178-
# Store the worker_id assigned by server
179-
if worker_id:
180-
self.zndraw._worker_id = worker_id
181178

182179
# Re-register any filesystems that were registered before connection
183-
for name, fs in self.zndraw._filesystems.items():
184-
provider = fs["provider"]
185-
worker_id = self.zndraw.api.register_filesystem(
186-
name=name,
187-
provider_type=provider.__class__.__name__,
188-
root_path=provider.root_path,
180+
for _, fs_entry in self.zndraw._filesystems.items():
181+
fs_instance = fs_entry["fs"]
182+
self.zndraw.api.register_filesystem(
183+
name=fs_entry["name"],
184+
fs_type=fs_instance.__class__.__name__,
189185
socket_manager=self,
190-
public=fs["public"],
186+
public=fs_entry["public"],
191187
)
192-
# Store the worker_id assigned by server
193-
if worker_id:
194-
self.zndraw._worker_id = worker_id
195188

196189
def _on_frame_update(self, data):
197190
if "frame" in data:

src/zndraw/zndraw.py

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1575,19 +1575,15 @@ def register_extension(
15751575
f"Registering {'global' if public else 'room-scoped'} extension '{name}'..."
15761576
)
15771577

1578-
worker_id = self.api.register_extension(
1578+
self.api.register_extension(
15791579
name=name,
15801580
category=extension.category,
15811581
schema=extension.model_json_schema(),
15821582
socket_manager=self.socket,
15831583
public=public,
15841584
)
1585-
# Store the worker_id assigned by server (server's request.sid)
1586-
if worker_id:
1587-
self._worker_id = worker_id
1588-
log.info(
1589-
f"Extension '{name}' registered with {scope} (worker_id: {self._worker_id})."
1590-
)
1585+
# Note: worker_id is already set from room:join response
1586+
log.info(f"Extension '{name}' registered with {scope} (worker_id: {self.sid}).")
15911587

15921588
def run(self, extension: Extension, public: bool | None = None):
15931589
"""Run an extension by submitting a job to the server.
@@ -1701,20 +1697,16 @@ def register_filesystem(
17011697
f"Registering {'global' if public else 'room-scoped'} filesystem '{name}'..."
17021698
)
17031699

1704-
# Register with server via Socket.IO
1705-
worker_id = self.api.register_filesystem(
1700+
# Register with server via REST API
1701+
self.api.register_filesystem(
17061702
name=name,
17071703
fs_type=fs.__class__.__name__,
17081704
socket_manager=self.socket,
17091705
public=public,
17101706
)
1711-
1712-
# Store the worker_id assigned by server (server's request.sid)
1713-
if worker_id:
1714-
self._worker_id = worker_id
1715-
1707+
# Note: worker_id is already set from room:join response
17161708
log.debug(
1717-
f"Filesystem '{name}' registered with {scope} (worker_id: {self._worker_id})."
1709+
f"Filesystem '{name}' registered with {scope} (worker_id: {self.sid})."
17181710
)
17191711

17201712
def log(self, message: str) -> dict:

tests/test_job_endpoints.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ def test_get_job_details_success(server):
8282
job = vis.run(TestModifier(param=42))
8383

8484
# Fetch via REST API directly
85-
response = requests.get(f"{server}/api/jobs/{job.job_id}")
85+
response = requests.get(f"{server}/api/jobs/{job.job_id}", timeout=10)
8686

8787
assert response.status_code == 200
8888
details = response.json()
@@ -97,7 +97,7 @@ def test_get_job_details_success(server):
9797

9898
def test_get_job_details_not_found(server):
9999
"""Test GET with invalid job ID returns 404."""
100-
response = requests.get(f"{server}/api/jobs/nonexistent-job-id")
100+
response = requests.get(f"{server}/api/jobs/nonexistent-job-id", timeout=10)
101101

102102
assert response.status_code == 404
103103

@@ -140,8 +140,9 @@ def test_update_job_status_to_processing(server, get_jwt_auth_headers):
140140
# Job is already assigned, transition to processing
141141
response = requests.put(
142142
f"{server}/api/rooms/test/jobs/{job.job_id}/status",
143-
json={"status": "processing", "workerId": vis._worker_id},
143+
json={"status": "processing", "workerId": vis.sid},
144144
headers=get_jwt_auth_headers(server, "testuser"),
145+
timeout=10,
145146
)
146147

147148
assert response.status_code == 200
@@ -160,13 +161,15 @@ def test_update_job_status_to_completed(server, get_jwt_auth_headers):
160161
# Transition: assigned → processing → completed
161162
requests.put(
162163
f"{server}/api/rooms/test/jobs/{job.job_id}/status",
163-
json={"status": "processing", "workerId": vis._worker_id},
164+
json={"status": "processing", "workerId": vis.sid},
164165
headers=auth_headers,
166+
timeout=10,
165167
)
166168
response = requests.put(
167169
f"{server}/api/rooms/test/jobs/{job.job_id}/status",
168-
json={"status": "completed", "workerId": vis._worker_id},
170+
json={"status": "completed", "workerId": vis.sid},
169171
headers=auth_headers,
172+
timeout=10,
170173
)
171174

172175
assert response.status_code == 200
@@ -186,19 +189,21 @@ def test_update_job_status_to_failed(server, get_jwt_auth_headers):
186189
# Transition to processing first
187190
requests.put(
188191
f"{server}/api/rooms/test/jobs/{job.job_id}/status",
189-
json={"status": "processing", "workerId": vis._worker_id},
192+
json={"status": "processing", "workerId": vis.sid},
190193
headers=auth_headers,
194+
timeout=10,
191195
)
192196

193197
# Mark as failed
194198
response = requests.put(
195199
f"{server}/api/rooms/test/jobs/{job.job_id}/status",
196200
json={
197201
"status": "failed",
198-
"workerId": vis._worker_id,
202+
"workerId": vis.sid,
199203
"error": "Something went wrong",
200204
},
201205
headers=auth_headers,
206+
timeout=10,
202207
)
203208

204209
assert response.status_code == 200
@@ -225,8 +230,9 @@ def test_job_refresh(server, get_jwt_auth_headers):
225230
# Update via API to processing
226231
requests.put(
227232
f"{server}/api/rooms/test/jobs/{job.job_id}/status",
228-
json={"status": "processing", "workerId": vis._worker_id},
233+
json={"status": "processing", "workerId": vis.sid},
229234
headers=get_jwt_auth_headers(server, "testuser"),
235+
timeout=10,
230236
)
231237

232238
# Refresh and check
@@ -275,15 +281,17 @@ def test_job_completion_lifecycle(server, get_jwt_auth_headers):
275281
# Transition to processing
276282
requests.put(
277283
f"{server}/api/rooms/test/jobs/{job.job_id}/status",
278-
json={"status": "processing", "workerId": vis._worker_id},
284+
json={"status": "processing", "workerId": vis.sid},
279285
headers=auth_headers,
286+
timeout=10,
280287
)
281288

282289
# Mark as completed (extensions work through side effects, no result needed)
283290
response = requests.put(
284291
f"{server}/api/rooms/test/jobs/{job.job_id}/status",
285-
json={"status": "completed", "workerId": vis._worker_id},
292+
json={"status": "completed", "workerId": vis.sid},
286293
headers=auth_headers,
294+
timeout=10,
287295
)
288296

289297
assert response.status_code == 200

0 commit comments

Comments
 (0)