Conversation
📝 WalkthroughWalkthroughThis PR adds an agent execution subsystem and web UI: new Changes
Sequence DiagramsequenceDiagram
participant Client as Browser Client
participant Server as HTTP Server
participant JobStore as Job Store
participant Agent as Agent Runner
participant Claude as Claude API
Client->>Server: POST /submit (ep_url, command, cwd)
Server->>Server: Validate inputs
Server->>JobStore: Create job entry
Server->>Server: Start background task _run_job
Server-->>Client: Return job_id
par Background Execution
Server->>Agent: _run_job invokes run_agent(command, ep_url, cwd)
Agent->>Claude: Initialize and run agent workflow
Claude-->>Agent: Stream messages (Text/Thinking/ToolUse/ToolResult/Result)
Agent->>Agent: Accumulate output_parts, track cost_usd, build conversation
Agent->>JobStore: Append conversation events
end
Client->>Server: GET /stream/{job_id} (SSE)
Server-->>Client: EventSourceResponse stream
loop For each message
Agent->>Server: on_message callback
Server->>Client: SSE event with conversation data
end
Agent->>JobStore: Update job with final output/cost
Server-->>Client: SSE final result event
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 3 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In `@server/agent.py`:
- Around line 127-128: The log call using conv_logger.info currently uses an
unnecessary f-string: change the call in the relevant code block (the
conv_logger.info invocation inside the assistant thinking logging) to use a
plain string literal instead of an f-string; if interpolation was intended, add
the appropriate placeholders and variables, otherwise remove the leading "f" so
it reads a normal string.
In `@server/server.py`:
- Around line 380-382: The on_message handler incorrectly leaves a
fire-and-forget task unmanaged; instead of calling
loop.create_task(_notify(condition)) directly, use
asyncio.create_task(_notify(condition)) and store the returned Task on the job
(e.g., attach it to jobs[job_id] such as jobs[job_id]["notify_task"] or append
to jobs[job_id].setdefault("tasks", [])) so the Task isn't
lost/garbage-collected; update the on_message implementation (references:
on_message, jobs, job_id, _notify, loop) to create the task via
asyncio.create_task and persist the Task reference on the job object.
In `@team-repos.txt`:
- Line 11: Replace the insecure URL scheme for the downstream operator entry:
change "http://github.com/openshift/external-secrets-operator/" to use
"https://" so the line reads
"https://github.com/openshift/external-secrets-operator/"; this ensures a secure
and consistent URL scheme for the downstream operator reference.
- Line 7: Replace the insecure URL string
"http://github.com/openshift/cert-manager-istio-csr" with
"https://github.com/openshift/cert-manager-istio-csr" so the entry matches the
HTTPS scheme used by other lines; update the single occurrence in team-repos.txt
(search for the exact URL) to ensure consistency and avoid HTTP/HTTPS
mismatches.
🧹 Nitpick comments (5)
server/agent.py (2)
26-32: Consider using a more secure temporary file location.Using
/tmp/conversation.logis flagged as potentially insecure on multi-user systems since other users could read or tamper with the log. Consider usingtempfile.gettempdir()with a unique subdirectory or a configurable path.♻️ Proposed fix using tempfile
+import tempfile + -CONVERSATION_LOG = Path("/tmp/conversation.log") +CONVERSATION_LOG = Path(tempfile.gettempdir()) / "oape_conversation.log"Or make it configurable via environment variable:
CONVERSATION_LOG = Path(os.environ.get("OAPE_CONVERSATION_LOG", "/tmp/conversation.log"))
104-108: Consider thread-safety for synchronous callback invocation.The
on_messagecallback is invoked synchronously from the async iteration. If the callback performs blocking operations or is slow, it could delay the agent loop. Inserver.py, the callback appends to a list and schedules an async task, which should be fine, but document this expectation.plugins/oape/commands/init.md (1)
206-213: Minor: Clone command doesn't specify target directory name.The
git clone --filter=blob:none "$CLONE_URL"command relies on git's default behavior to create a directory matching the repository name. Since the allowlist uses short names that match the repo names, this works, but explicitly specifying the target directory would be more robust.♻️ Optional improvement for explicit target directory
- git clone --filter=blob:none "$CLONE_URL" + git clone --filter=blob:none "$CLONE_URL" "$CLONE_DIR"server/server.py (2)
309-309: Store the task reference to prevent potential garbage collection.Fire-and-forget tasks created with
asyncio.create_task()without storing a reference can be garbage-collected before completion. While this typically works, it's safer to store the reference.♻️ Proposed fix
+ task = asyncio.create_task(_run_job(job_id, command, ep_url, working_dir)) + jobs[job_id]["task"] = task - asyncio.create_task(_run_job(job_id, command, ep_url, working_dir)) return {"job_id": job_id}
39-42: Consider adding job cleanup to prevent unbounded memory growth.The in-memory
jobsdict grows indefinitely as jobs are submitted. For long-running servers, this could cause memory issues.♻️ Suggested approach
Add a TTL-based cleanup mechanism or limit the number of retained jobs:
import time from collections import OrderedDict MAX_JOBS = 1000 JOB_TTL_SECONDS = 3600 # 1 hour jobs: OrderedDict[str, dict] = OrderedDict() def _cleanup_old_jobs(): """Remove jobs older than TTL or exceeding max count.""" now = time.time() # Remove by TTL expired = [jid for jid, job in jobs.items() if job.get("created_at", 0) + JOB_TTL_SECONDS < now] for jid in expired: del jobs[jid] # Remove oldest if over limit while len(jobs) > MAX_JOBS: jobs.popitem(last=False)Then call
_cleanup_old_jobs()insubmit_jobbefore creating a new job, and add"created_at": time.time()to the job dict.
| conv_logger.info( | ||
| f"[assistant:ThinkingBlock] (thinking)") |
There was a problem hiding this comment.
Remove extraneous f prefix from string without placeholders.
This f-string has no interpolation placeholders.
🐛 Proposed fix
conv_logger.info(
- f"[assistant:ThinkingBlock] (thinking)")
+ "[assistant:ThinkingBlock] (thinking)")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| conv_logger.info( | |
| f"[assistant:ThinkingBlock] (thinking)") | |
| conv_logger.info( | |
| "[assistant:ThinkingBlock] (thinking)") |
🧰 Tools
🪛 Ruff (0.15.0)
[error] 128-128: f-string without any placeholders
Remove extraneous f prefix
(F541)
🤖 Prompt for AI Agents
In `@server/agent.py` around lines 127 - 128, The log call using conv_logger.info
currently uses an unnecessary f-string: change the call in the relevant code
block (the conv_logger.info invocation inside the assistant thinking logging) to
use a plain string literal instead of an f-string; if interpolation was
intended, add the appropriate placeholders and variables, otherwise remove the
leading "f" so it reads a normal string.
| def on_message(msg: dict) -> None: | ||
| jobs[job_id]["conversation"].append(msg) | ||
| loop.create_task(_notify(condition)) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify the conversation data flow between agent.py and server.py
# Check if run_agent's internal conversation list is the same as what's passed to on_message
echo "=== Checking run_agent's conversation handling ==="
rg -n -A5 "conversation: list" server/agent.py
echo ""
echo "=== Checking _emit function ==="
rg -n -A5 "def _emit" server/agent.py
echo ""
echo "=== Checking on_message callback usage ==="
rg -n -B2 -A5 "on_message" server/server.pyRepository: shiftweek/oape-ai-e2e
Length of output: 1276
🏁 Script executed:
#!/bin/bash
# Check the _notify function and context
echo "=== Checking _notify function ==="
rg -n -B2 -A10 "def _notify" server/server.py
echo ""
echo "=== Checking line 309 context for comparison ==="
sed -n '305,315p' server/server.py
echo ""
echo "=== Check if tasks are stored elsewhere ==="
rg -n "create_task\|run_coroutine_threadsafe" server/server.py | head -20Repository: shiftweek/oape-ai-e2e
Length of output: 878
🏁 Script executed:
#!/bin/bash
# Check if run_agent calls on_message from a different thread
echo "=== Checking run_agent function signature and on_message invocation ==="
rg -n -A50 "async def run_agent" server/agent.py | head -60
echo ""
echo "=== Check if run_agent spawns threads ==="
rg -n "Thread\|thread\|executor\|threading" server/agent.pyRepository: shiftweek/oape-ai-e2e
Length of output: 2017
Remove incorrect double append claim; only fire-and-forget task issue remains.
The "double append" concern is incorrect. Line 106 in agent.py appends to the local conversation variable within run_agent, while line 381 in server.py appends to jobs[job_id]["conversation"] — these are separate data structures with different purposes.
The fire-and-forget task at line 382 remains a valid concern. While loop.create_task(_notify(condition)) works since it's called from within the running event loop, storing the task reference prevents potential issues:
def on_message(msg: dict) -> None:
jobs[job_id]["conversation"].append(msg)
- loop.create_task(_notify(condition))
+ asyncio.create_task(_notify(condition))(Using asyncio.create_task() directly is preferred over caching and referencing the loop.)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def on_message(msg: dict) -> None: | |
| jobs[job_id]["conversation"].append(msg) | |
| loop.create_task(_notify(condition)) | |
| def on_message(msg: dict) -> None: | |
| jobs[job_id]["conversation"].append(msg) | |
| asyncio.create_task(_notify(condition)) |
🧰 Tools
🪛 Ruff (0.15.0)
[warning] 382-382: Store a reference to the return value of loop.create_task
(RUF006)
🤖 Prompt for AI Agents
In `@server/server.py` around lines 380 - 382, The on_message handler incorrectly
leaves a fire-and-forget task unmanaged; instead of calling
loop.create_task(_notify(condition)) directly, use
asyncio.create_task(_notify(condition)) and store the returned Task on the job
(e.g., attach it to jobs[job_id] such as jobs[job_id]["notify_task"] or append
to jobs[job_id].setdefault("tasks", [])) so the Task isn't
lost/garbage-collected; update the on_message implementation (references:
on_message, jobs, job_id, _notify, loop) to create the task via
asyncio.create_task and persist the Task reference on the job object.
| downstream operator: https://github.com/openshift/cert-manager-operator | ||
| upstream forks: | ||
| - https://github.com/openshift/jetstack-cert-manager | ||
| - http://github.com/openshift/cert-manager-istio-csr |
There was a problem hiding this comment.
Inconsistent URL scheme: use https:// for security and consistency.
This URL uses http:// while other URLs in this file use https://. GitHub supports HTTPS and this inconsistency could cause issues if URLs are compared or normalized elsewhere.
🔗 Proposed fix
-- http://github.com/openshift/cert-manager-istio-csr
+- https://github.com/openshift/cert-manager-istio-csr📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| - http://github.com/openshift/cert-manager-istio-csr | |
| - https://github.com/openshift/cert-manager-istio-csr |
🤖 Prompt for AI Agents
In `@team-repos.txt` at line 7, Replace the insecure URL string
"http://github.com/openshift/cert-manager-istio-csr" with
"https://github.com/openshift/cert-manager-istio-csr" so the entry matches the
HTTPS scheme used by other lines; update the single occurrence in team-repos.txt
(search for the exact URL) to ensure consistency and avoid HTTP/HTTPS
mismatches.
|
|
||
|
|
||
| External Secrets Operator for Red Hat OpenShift | ||
| downstream operator: http://github.com/openshift/external-secrets-operator/ |
There was a problem hiding this comment.
Inconsistent URL scheme: use https:// for security and consistency.
Same issue as above—this URL uses http:// instead of https://.
🔗 Proposed fix
-downstream operator: http://github.com/openshift/external-secrets-operator/
+downstream operator: https://github.com/openshift/external-secrets-operator/📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| downstream operator: http://github.com/openshift/external-secrets-operator/ | |
| downstream operator: https://github.com/openshift/external-secrets-operator/ |
🤖 Prompt for AI Agents
In `@team-repos.txt` at line 11, Replace the insecure URL scheme for the
downstream operator entry: change
"http://github.com/openshift/external-secrets-operator/" to use "https://" so
the line reads "https://github.com/openshift/external-secrets-operator/"; this
ensures a secure and consistent URL scheme for the downstream operator
reference.
Signed-off-by: Swarup Ghosh <swghosh@redhat.com>
Signed-off-by: Swarup Ghosh <swghosh@redhat.com>
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Fix all issues with AI agents
In `@server/agent.py`:
- Around line 34-35: The CONFIGS initialization currently opens and parses
config.json at import time without error handling; update the code around
CONFIGS to catch FileNotFoundError and json.JSONDecodeError when opening/parsing
the Path(__file__).resolve().parent / "config.json", and handle them by logging
or raising a clear, descriptive exception (or falling back to sane defaults) so
the app fails with a helpful message instead of an obscure traceback; ensure you
reference CONFIGS and the config file load block when applying the change.
In `@server/server.py`:
- Around line 298-310: The created asyncio task for running jobs is not stored
and may be garbage-collected; change the code that creates the task so you
assign it to a variable (e.g., task = asyncio.create_task(_run_job(...))) and
store that reference in the jobs entry (e.g., jobs[job_id]["task"] = task) so
the task is retained; update the block around job_id, jobs[...] and
asyncio.create_task(...) to create the task, store it under the job's dict (and
optionally make sure any cleanup/cancellation logic in _run_job or elsewhere
uses that stored "task" key).
- Around line 55-63: The _resolve_working_dir function currently trusts a
user-controlled cwd; to prevent path traversal, canonicalize the input with
os.path.realpath and then enforce it is inside an allowed base (e.g., the
application's repo root or a configured ALLOWED_BASE) by comparing via
os.path.commonpath; if the resolved path is outside that allowed base or
contains disallowed patterns (absolute paths when only relative should be
accepted), raise the same HTTPException (400) with a clear message; update
callers to pass the configured base if needed and ensure validation occurs in
_resolve_working_dir before using the directory.
- Around line 336-370: The streaming loop currently assumes jobs[job_id] always
exists and will KeyError if the job is deleted; modify event_generator to
defensively fetch the job with job = jobs.get(job_id) at the top of each loop
iteration (and after waiting) instead of direct indexing, bail out by yielding a
final "complete" event (e.g., status "deleted" or similar) and return if job is
None, and replace uses of jobs[job_id]["conversation"], jobs[job_id]["status"],
and jobs[job_id]["message_event"] with values read from the job dict (or None)
to avoid KeyError during streaming; ensure the async with condition only runs if
condition is present.
🧹 Nitpick comments (4)
server/agent.py (2)
26-32: Consider using a configurable log path instead of hardcoded/tmp.The hardcoded
/tmp/conversation.logpath has potential issues:
- On multi-tenant systems, other users can read/write to this file
/tmpmay be cleared unexpectedly or have size limits- Parallel server instances would clobber the same file
Consider making this configurable via environment variable or using a path under the application directory.
♻️ Proposed fix
-CONVERSATION_LOG = Path("/tmp/conversation.log") +CONVERSATION_LOG = Path( + os.environ.get("OAPE_CONVERSATION_LOG", "/tmp/conversation.log") +)And add
import osto the imports.
193-199: Catching bareExceptionmay hide unexpected errors.While catching
Exceptionensures the agent always returns a result, it also catches programming errors (e.g.,TypeError,AttributeError) that might indicate bugs. Consider logging aterrorlevel or re-raising certain exception types.♻️ Proposed improvement
except Exception as exc: - conv_logger.info(f"[error] {traceback.format_exc()}") + conv_logger.error(f"[error] {traceback.format_exc()}") return AgentResult( output="", cost_usd=cost_usd, error=str(exc), conversation=conversation, )server/server.py (2)
39-42: In-memory job store will lose all jobs on server restart.The
jobsdict is volatile. Consider documenting this limitation or adding persistence for production use. Also, jobs are never cleaned up, leading to unbounded memory growth.♻️ Consider adding job expiration
Add a TTL mechanism or periodic cleanup for completed jobs:
import time # In job creation: jobs[job_id] = { # ... existing fields "created_at": time.time(), } # Add a cleanup task or check max jobs MAX_JOBS = 1000 if len(jobs) > MAX_JOBS: # Remove oldest completed jobs ...
66-273: Consider moving HTML template to a separate file.The 200+ line HTML template embedded as a string constant reduces code readability. Consider moving it to a separate
.htmlfile and loading it at startup.
| with open(Path(__file__).resolve().parent / "config.json") as cf: | ||
| CONFIGS = json.loads(cf.read()) |
There was a problem hiding this comment.
Config loading lacks error handling for missing or malformed file.
If config.json is missing or contains invalid JSON, the application will crash at import time with an unclear error. Consider adding explicit error handling.
🛡️ Proposed fix
-with open(Path(__file__).resolve().parent / "config.json") as cf:
- CONFIGS = json.loads(cf.read())
+_config_path = Path(__file__).resolve().parent / "config.json"
+try:
+ with open(_config_path) as cf:
+ CONFIGS = json.load(cf)
+except FileNotFoundError:
+ raise RuntimeError(f"Missing required config file: {_config_path}")
+except json.JSONDecodeError as e:
+ raise RuntimeError(f"Invalid JSON in config file {_config_path}: {e}")🤖 Prompt for AI Agents
In `@server/agent.py` around lines 34 - 35, The CONFIGS initialization currently
opens and parses config.json at import time without error handling; update the
code around CONFIGS to catch FileNotFoundError and json.JSONDecodeError when
opening/parsing the Path(__file__).resolve().parent / "config.json", and handle
them by logging or raising a clear, descriptive exception (or falling back to
sane defaults) so the app fails with a helpful message instead of an obscure
traceback; ensure you reference CONFIGS and the config file load block when
applying the change.
| def _resolve_working_dir(cwd: str) -> str: | ||
| """Resolve and validate the working directory.""" | ||
| working_dir = cwd if cwd else os.getcwd() | ||
| if not os.path.isdir(working_dir): | ||
| raise HTTPException( | ||
| status_code=400, | ||
| detail=f"The provided cwd is not a valid directory: {working_dir}", | ||
| ) | ||
| return working_dir |
There was a problem hiding this comment.
Path traversal via user-controlled cwd parameter.
While os.path.isdir validates existence, it doesn't prevent access to sensitive directories. Consider restricting allowed paths or sandboxing.
🛡️ Suggested mitigation
def _resolve_working_dir(cwd: str) -> str:
"""Resolve and validate the working directory."""
working_dir = cwd if cwd else os.getcwd()
+ # Resolve to absolute path to prevent traversal tricks
+ working_dir = os.path.realpath(working_dir)
if not os.path.isdir(working_dir):
raise HTTPException(
status_code=400,
detail=f"The provided cwd is not a valid directory: {working_dir}",
)
+ # Optional: Restrict to allowed base paths
+ # ALLOWED_BASE = os.environ.get("OAPE_ALLOWED_CWD_BASE", "/home")
+ # if not working_dir.startswith(ALLOWED_BASE):
+ # raise HTTPException(status_code=403, detail="Directory not allowed")
return working_dir📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def _resolve_working_dir(cwd: str) -> str: | |
| """Resolve and validate the working directory.""" | |
| working_dir = cwd if cwd else os.getcwd() | |
| if not os.path.isdir(working_dir): | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"The provided cwd is not a valid directory: {working_dir}", | |
| ) | |
| return working_dir | |
| def _resolve_working_dir(cwd: str) -> str: | |
| """Resolve and validate the working directory.""" | |
| working_dir = cwd if cwd else os.getcwd() | |
| # Resolve to absolute path to prevent traversal tricks | |
| working_dir = os.path.realpath(working_dir) | |
| if not os.path.isdir(working_dir): | |
| raise HTTPException( | |
| status_code=400, | |
| detail=f"The provided cwd is not a valid directory: {working_dir}", | |
| ) | |
| # Optional: Restrict to allowed base paths | |
| # ALLOWED_BASE = os.environ.get("OAPE_ALLOWED_CWD_BASE", "/home") | |
| # if not working_dir.startswith(ALLOWED_BASE): | |
| # raise HTTPException(status_code=403, detail="Directory not allowed") | |
| return working_dir |
🤖 Prompt for AI Agents
In `@server/server.py` around lines 55 - 63, The _resolve_working_dir function
currently trusts a user-controlled cwd; to prevent path traversal, canonicalize
the input with os.path.realpath and then enforce it is inside an allowed base
(e.g., the application's repo root or a configured ALLOWED_BASE) by comparing
via os.path.commonpath; if the resolved path is outside that allowed base or
contains disallowed patterns (absolute paths when only relative should be
accepted), raise the same HTTPException (400) with a clear message; update
callers to pass the configured base if needed and ensure validation occurs in
_resolve_working_dir before using the directory.
| job_id = uuid.uuid4().hex[:12] | ||
| jobs[job_id] = { | ||
| "status": "running", | ||
| "ep_url": ep_url, | ||
| "cwd": working_dir, | ||
| "conversation": [], | ||
| "message_event": asyncio.Condition(), | ||
| "output": "", | ||
| "cost_usd": 0.0, | ||
| "error": None, | ||
| } | ||
| asyncio.create_task(_run_job(job_id, command, ep_url, working_dir)) | ||
| return {"job_id": job_id} |
There was a problem hiding this comment.
Store task reference to prevent garbage collection warnings.
The task created at line 309 is not stored, which can cause issues if the task is garbage-collected before completion.
🐛 Proposed fix
- asyncio.create_task(_run_job(job_id, command, ep_url, working_dir))
+ jobs[job_id]["task"] = asyncio.create_task(
+ _run_job(job_id, command, ep_url, working_dir)
+ )
return {"job_id": job_id}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| job_id = uuid.uuid4().hex[:12] | |
| jobs[job_id] = { | |
| "status": "running", | |
| "ep_url": ep_url, | |
| "cwd": working_dir, | |
| "conversation": [], | |
| "message_event": asyncio.Condition(), | |
| "output": "", | |
| "cost_usd": 0.0, | |
| "error": None, | |
| } | |
| asyncio.create_task(_run_job(job_id, command, ep_url, working_dir)) | |
| return {"job_id": job_id} | |
| job_id = uuid.uuid4().hex[:12] | |
| jobs[job_id] = { | |
| "status": "running", | |
| "ep_url": ep_url, | |
| "cwd": working_dir, | |
| "conversation": [], | |
| "message_event": asyncio.Condition(), | |
| "output": "", | |
| "cost_usd": 0.0, | |
| "error": None, | |
| } | |
| jobs[job_id]["task"] = asyncio.create_task( | |
| _run_job(job_id, command, ep_url, working_dir) | |
| ) | |
| return {"job_id": job_id} |
🧰 Tools
🪛 Ruff (0.15.0)
[warning] 309-309: Store a reference to the return value of asyncio.create_task
(RUF006)
🤖 Prompt for AI Agents
In `@server/server.py` around lines 298 - 310, The created asyncio task for
running jobs is not stored and may be garbage-collected; change the code that
creates the task so you assign it to a variable (e.g., task =
asyncio.create_task(_run_job(...))) and store that reference in the jobs entry
(e.g., jobs[job_id]["task"] = task) so the task is retained; update the block
around job_id, jobs[...] and asyncio.create_task(...) to create the task, store
it under the job's dict (and optionally make sure any cleanup/cancellation logic
in _run_job or elsewhere uses that stored "task" key).
| async def event_generator(): | ||
| cursor = 0 | ||
| condition = jobs[job_id]["message_event"] | ||
|
|
||
| while True: | ||
| # Send any new messages since the cursor | ||
| conversation = jobs[job_id]["conversation"] | ||
| while cursor < len(conversation): | ||
| yield { | ||
| "event": "message", | ||
| "data": json.dumps(conversation[cursor], default=str), | ||
| } | ||
| cursor += 1 | ||
|
|
||
| # Check if the job is complete | ||
| status = jobs[job_id]["status"] | ||
| if status != "running": | ||
| yield { | ||
| "event": "complete", | ||
| "data": json.dumps({ | ||
| "status": status, | ||
| "output": jobs[job_id].get("output", ""), | ||
| "cost_usd": jobs[job_id].get("cost_usd", 0.0), | ||
| "error": jobs[job_id].get("error"), | ||
| }), | ||
| } | ||
| return | ||
|
|
||
| # Wait for new messages or send keepalive on timeout | ||
| async with condition: | ||
| try: | ||
| await asyncio.wait_for(condition.wait(), timeout=30.0) | ||
| except asyncio.TimeoutError: | ||
| yield {"event": "keepalive", "data": ""} | ||
|
|
There was a problem hiding this comment.
Potential KeyError if job is deleted during streaming.
If the job is removed from jobs dict while a client is streaming, subsequent accesses like jobs[job_id]["conversation"] will raise KeyError. Consider defensive checks or using .get().
🛡️ Proposed fix
async def event_generator():
cursor = 0
condition = jobs[job_id]["message_event"]
while True:
+ if job_id not in jobs:
+ yield {"event": "complete", "data": json.dumps({"status": "deleted", "error": "Job no longer exists"})}
+ return
+
# Send any new messages since the cursor
conversation = jobs[job_id]["conversation"]🤖 Prompt for AI Agents
In `@server/server.py` around lines 336 - 370, The streaming loop currently
assumes jobs[job_id] always exists and will KeyError if the job is deleted;
modify event_generator to defensively fetch the job with job = jobs.get(job_id)
at the top of each loop iteration (and after waiting) instead of direct
indexing, bail out by yielding a final "complete" event (e.g., status "deleted"
or similar) and return if job is None, and replace uses of
jobs[job_id]["conversation"], jobs[job_id]["status"], and
jobs[job_id]["message_event"] with values read from the job dict (or None) to
avoid KeyError during streaming; ensure the async with condition only runs if
condition is present.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@server/homepage.html`:
- Around line 105-115: The error handler currently calls await res.json() which
will throw if the response is not JSON; change the success/error parsing around
the fetch in the submit flow (the code handling res, statusEl, btn, streamJob)
to first await res.text(), then try JSON.parse on that text inside a try/catch
to extract a "detail" or "job_id" fallbacking to the raw text when parsing
fails; use the parsed detail/text for the thrown Error message when !res.ok and
for showing errors in statusEl (via escapeHtml), and only call streamJob(job_id)
when a job_id is successfully extracted.
| try { | ||
| const body = new URLSearchParams({ep_url: epUrl, command: command}); | ||
| if (cwd) body.append('cwd', cwd); | ||
| const res = await fetch('/submit', {method: 'POST', body}); | ||
| if (!res.ok) { throw new Error((await res.json()).detail || res.statusText); } | ||
| const {job_id} = await res.json(); | ||
| statusEl.innerHTML = '<span class="spinner"></span> Connecting to agent stream\u2026'; | ||
| streamJob(job_id); | ||
| } catch (err) { | ||
| statusEl.innerHTML = '<span class="error">Error: ' + escapeHtml(err.message) + '</span>'; | ||
| btn.disabled = false; |
There was a problem hiding this comment.
Harden error parsing for non‑JSON responses.
If /submit returns HTML/plain text (e.g., proxy error), res.json() throws and the user sees a misleading parse error. Consider parsing text first and then attempting JSON.
🛠️ Suggested fix
- if (!res.ok) { throw new Error((await res.json()).detail || res.statusText); }
+ if (!res.ok) {
+ const raw = await res.text();
+ let detail = res.statusText;
+ try {
+ const data = JSON.parse(raw);
+ detail = data.detail || detail;
+ } catch {
+ if (raw) detail = raw;
+ }
+ throw new Error(detail);
+ }🤖 Prompt for AI Agents
In `@server/homepage.html` around lines 105 - 115, The error handler currently
calls await res.json() which will throw if the response is not JSON; change the
success/error parsing around the fetch in the submit flow (the code handling
res, statusEl, btn, streamJob) to first await res.text(), then try JSON.parse on
that text inside a try/catch to extract a "detail" or "job_id" fallbacking to
the raw text when parsing fails; use the parsed detail/text for the thrown Error
message when !res.ok and for showing errors in statusEl (via escapeHtml), and
only call streamJob(job_id) when a job_id is successfully extracted.
Summary by CodeRabbit
New Features
Dependencies
Chores