Skip to content

Add agent jobs flow#13

Merged
swghosh merged 3 commits intomainfrom
new
Feb 16, 2026
Merged

Add agent jobs flow#13
swghosh merged 3 commits intomainfrom
new

Conversation

@swghosh
Copy link
Collaborator

@swghosh swghosh commented Feb 16, 2026

Summary by CodeRabbit

  • New Features

    • Web UI for submitting jobs, viewing live conversation streams, and viewing final output/cost
    • Background job execution with real-time streaming and status updates
    • New HTTP endpoints for job submission, status, SSE streaming, and a synchronous command API
  • Dependencies

    • Added Server-Sent Events library for real-time updates
  • Chores

    • Updated .gitignore to ignore Python cache and local agent files

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 16, 2026

📝 Walkthrough

Walkthrough

This PR adds an agent execution subsystem and web UI: new server/agent.py to run a Claude-based agent, SSE-backed job submission and streaming endpoints plus frontend (server/server.py, server/homepage.html), a new dependency for SSE, and minor VCS ignores in .gitignore.

Changes

Cohort / File(s) Summary
VCS & Dependencies
/.gitignore, server/requirements.txt
Add .gitignore patterns for __pycache__ and .claude; add sse-starlette>=2.0.0 dependency for SSE support.
Agent Core
server/agent.py
New module implementing Claude agent workflow: AgentResult dataclass, async run_agent(...), config loading (config.json), plugin path derivation, conversation logging, and SUPPORTED_COMMANDS mapping ("api-implement" -> "oape:api-implement").
Server API & UI
server/server.py, server/homepage.html
Replace minimal API with multi-endpoint server: homepage UI, POST /submit job creation, GET /status/{job_id}, GET /stream/{job_id} SSE streaming, synchronous wrapper GET /api/v1/oape-api-implement, in-memory job store, background job runner using run_agent, input validation and working-dir resolution, and client-side JS to render streaming conversation and final output.

Sequence Diagram

sequenceDiagram
    participant Client as Browser Client
    participant Server as HTTP Server
    participant JobStore as Job Store
    participant Agent as Agent Runner
    participant Claude as Claude API

    Client->>Server: POST /submit (ep_url, command, cwd)
    Server->>Server: Validate inputs
    Server->>JobStore: Create job entry
    Server->>Server: Start background task _run_job
    Server-->>Client: Return job_id

    par Background Execution
        Server->>Agent: _run_job invokes run_agent(command, ep_url, cwd)
        Agent->>Claude: Initialize and run agent workflow
        Claude-->>Agent: Stream messages (Text/Thinking/ToolUse/ToolResult/Result)
        Agent->>Agent: Accumulate output_parts, track cost_usd, build conversation
        Agent->>JobStore: Append conversation events
    end

    Client->>Server: GET /stream/{job_id} (SSE)
    Server-->>Client: EventSourceResponse stream
    loop For each message
        Agent->>Server: on_message callback
        Server->>Client: SSE event with conversation data
    end

    Agent->>JobStore: Update job with final output/cost
    Server-->>Client: SSE final result event
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

🚥 Pre-merge checks | ✅ 3 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 78.57% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Add agent jobs flow' is partially related to the changeset, referring to job submission and asynchronous workflow management added in server.py, but it doesn't clearly convey the main changes which also include new agent execution module (agent.py), frontend UI (homepage.html), and project configuration updates.
Merge Conflict Detection ✅ Passed ✅ No merge conflicts detected when merging into main

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch new

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Fix all issues with AI agents
In `@server/agent.py`:
- Around line 127-128: The log call using conv_logger.info currently uses an
unnecessary f-string: change the call in the relevant code block (the
conv_logger.info invocation inside the assistant thinking logging) to use a
plain string literal instead of an f-string; if interpolation was intended, add
the appropriate placeholders and variables, otherwise remove the leading "f" so
it reads a normal string.

In `@server/server.py`:
- Around line 380-382: The on_message handler incorrectly leaves a
fire-and-forget task unmanaged; instead of calling
loop.create_task(_notify(condition)) directly, use
asyncio.create_task(_notify(condition)) and store the returned Task on the job
(e.g., attach it to jobs[job_id] such as jobs[job_id]["notify_task"] or append
to jobs[job_id].setdefault("tasks", [])) so the Task isn't
lost/garbage-collected; update the on_message implementation (references:
on_message, jobs, job_id, _notify, loop) to create the task via
asyncio.create_task and persist the Task reference on the job object.

In `@team-repos.txt`:
- Line 11: Replace the insecure URL scheme for the downstream operator entry:
change "http://github.com/openshift/external-secrets-operator/" to use
"https://" so the line reads
"https://github.com/openshift/external-secrets-operator/"; this ensures a secure
and consistent URL scheme for the downstream operator reference.
- Line 7: Replace the insecure URL string
"http://github.com/openshift/cert-manager-istio-csr" with
"https://github.com/openshift/cert-manager-istio-csr" so the entry matches the
HTTPS scheme used by other lines; update the single occurrence in team-repos.txt
(search for the exact URL) to ensure consistency and avoid HTTP/HTTPS
mismatches.
🧹 Nitpick comments (5)
server/agent.py (2)

26-32: Consider using a more secure temporary file location.

Using /tmp/conversation.log is flagged as potentially insecure on multi-user systems since other users could read or tamper with the log. Consider using tempfile.gettempdir() with a unique subdirectory or a configurable path.

♻️ Proposed fix using tempfile
+import tempfile
+
-CONVERSATION_LOG = Path("/tmp/conversation.log")
+CONVERSATION_LOG = Path(tempfile.gettempdir()) / "oape_conversation.log"

Or make it configurable via environment variable:

CONVERSATION_LOG = Path(os.environ.get("OAPE_CONVERSATION_LOG", "/tmp/conversation.log"))

104-108: Consider thread-safety for synchronous callback invocation.

The on_message callback is invoked synchronously from the async iteration. If the callback performs blocking operations or is slow, it could delay the agent loop. In server.py, the callback appends to a list and schedules an async task, which should be fine, but document this expectation.

plugins/oape/commands/init.md (1)

206-213: Minor: Clone command doesn't specify target directory name.

The git clone --filter=blob:none "$CLONE_URL" command relies on git's default behavior to create a directory matching the repository name. Since the allowlist uses short names that match the repo names, this works, but explicitly specifying the target directory would be more robust.

♻️ Optional improvement for explicit target directory
-  git clone --filter=blob:none "$CLONE_URL"
+  git clone --filter=blob:none "$CLONE_URL" "$CLONE_DIR"
server/server.py (2)

309-309: Store the task reference to prevent potential garbage collection.

Fire-and-forget tasks created with asyncio.create_task() without storing a reference can be garbage-collected before completion. While this typically works, it's safer to store the reference.

♻️ Proposed fix
+    task = asyncio.create_task(_run_job(job_id, command, ep_url, working_dir))
+    jobs[job_id]["task"] = task
-    asyncio.create_task(_run_job(job_id, command, ep_url, working_dir))
     return {"job_id": job_id}

39-42: Consider adding job cleanup to prevent unbounded memory growth.

The in-memory jobs dict grows indefinitely as jobs are submitted. For long-running servers, this could cause memory issues.

♻️ Suggested approach

Add a TTL-based cleanup mechanism or limit the number of retained jobs:

import time
from collections import OrderedDict

MAX_JOBS = 1000
JOB_TTL_SECONDS = 3600  # 1 hour

jobs: OrderedDict[str, dict] = OrderedDict()

def _cleanup_old_jobs():
    """Remove jobs older than TTL or exceeding max count."""
    now = time.time()
    # Remove by TTL
    expired = [jid for jid, job in jobs.items() 
               if job.get("created_at", 0) + JOB_TTL_SECONDS < now]
    for jid in expired:
        del jobs[jid]
    # Remove oldest if over limit
    while len(jobs) > MAX_JOBS:
        jobs.popitem(last=False)

Then call _cleanup_old_jobs() in submit_job before creating a new job, and add "created_at": time.time() to the job dict.

Comment on lines +127 to +128
conv_logger.info(
f"[assistant:ThinkingBlock] (thinking)")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Remove extraneous f prefix from string without placeholders.

This f-string has no interpolation placeholders.

🐛 Proposed fix
                     conv_logger.info(
-                        f"[assistant:ThinkingBlock] (thinking)")
+                        "[assistant:ThinkingBlock] (thinking)")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
conv_logger.info(
f"[assistant:ThinkingBlock] (thinking)")
conv_logger.info(
"[assistant:ThinkingBlock] (thinking)")
🧰 Tools
🪛 Ruff (0.15.0)

[error] 128-128: f-string without any placeholders

Remove extraneous f prefix

(F541)

🤖 Prompt for AI Agents
In `@server/agent.py` around lines 127 - 128, The log call using conv_logger.info
currently uses an unnecessary f-string: change the call in the relevant code
block (the conv_logger.info invocation inside the assistant thinking logging) to
use a plain string literal instead of an f-string; if interpolation was
intended, add the appropriate placeholders and variables, otherwise remove the
leading "f" so it reads a normal string.

Comment on lines +380 to +382
def on_message(msg: dict) -> None:
jobs[job_id]["conversation"].append(msg)
loop.create_task(_notify(condition))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify the conversation data flow between agent.py and server.py
# Check if run_agent's internal conversation list is the same as what's passed to on_message

echo "=== Checking run_agent's conversation handling ==="
rg -n -A5 "conversation: list" server/agent.py

echo ""
echo "=== Checking _emit function ==="
rg -n -A5 "def _emit" server/agent.py

echo ""
echo "=== Checking on_message callback usage ==="
rg -n -B2 -A5 "on_message" server/server.py

Repository: shiftweek/oape-ai-e2e

Length of output: 1276


🏁 Script executed:

#!/bin/bash
# Check the _notify function and context
echo "=== Checking _notify function ==="
rg -n -B2 -A10 "def _notify" server/server.py

echo ""
echo "=== Checking line 309 context for comparison ==="
sed -n '305,315p' server/server.py

echo ""
echo "=== Check if tasks are stored elsewhere ==="
rg -n "create_task\|run_coroutine_threadsafe" server/server.py | head -20

Repository: shiftweek/oape-ai-e2e

Length of output: 878


🏁 Script executed:

#!/bin/bash
# Check if run_agent calls on_message from a different thread
echo "=== Checking run_agent function signature and on_message invocation ==="
rg -n -A50 "async def run_agent" server/agent.py | head -60

echo ""
echo "=== Check if run_agent spawns threads ==="
rg -n "Thread\|thread\|executor\|threading" server/agent.py

Repository: shiftweek/oape-ai-e2e

Length of output: 2017


Remove incorrect double append claim; only fire-and-forget task issue remains.

The "double append" concern is incorrect. Line 106 in agent.py appends to the local conversation variable within run_agent, while line 381 in server.py appends to jobs[job_id]["conversation"] — these are separate data structures with different purposes.

The fire-and-forget task at line 382 remains a valid concern. While loop.create_task(_notify(condition)) works since it's called from within the running event loop, storing the task reference prevents potential issues:

 def on_message(msg: dict) -> None:
     jobs[job_id]["conversation"].append(msg)
-    loop.create_task(_notify(condition))
+    asyncio.create_task(_notify(condition))

(Using asyncio.create_task() directly is preferred over caching and referencing the loop.)

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def on_message(msg: dict) -> None:
jobs[job_id]["conversation"].append(msg)
loop.create_task(_notify(condition))
def on_message(msg: dict) -> None:
jobs[job_id]["conversation"].append(msg)
asyncio.create_task(_notify(condition))
🧰 Tools
🪛 Ruff (0.15.0)

[warning] 382-382: Store a reference to the return value of loop.create_task

(RUF006)

🤖 Prompt for AI Agents
In `@server/server.py` around lines 380 - 382, The on_message handler incorrectly
leaves a fire-and-forget task unmanaged; instead of calling
loop.create_task(_notify(condition)) directly, use
asyncio.create_task(_notify(condition)) and store the returned Task on the job
(e.g., attach it to jobs[job_id] such as jobs[job_id]["notify_task"] or append
to jobs[job_id].setdefault("tasks", [])) so the Task isn't
lost/garbage-collected; update the on_message implementation (references:
on_message, jobs, job_id, _notify, loop) to create the task via
asyncio.create_task and persist the Task reference on the job object.

downstream operator: https://github.com/openshift/cert-manager-operator
upstream forks:
- https://github.com/openshift/jetstack-cert-manager
- http://github.com/openshift/cert-manager-istio-csr
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Inconsistent URL scheme: use https:// for security and consistency.

This URL uses http:// while other URLs in this file use https://. GitHub supports HTTPS and this inconsistency could cause issues if URLs are compared or normalized elsewhere.

🔗 Proposed fix
-- http://github.com/openshift/cert-manager-istio-csr
+- https://github.com/openshift/cert-manager-istio-csr
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
- http://github.com/openshift/cert-manager-istio-csr
- https://github.com/openshift/cert-manager-istio-csr
🤖 Prompt for AI Agents
In `@team-repos.txt` at line 7, Replace the insecure URL string
"http://github.com/openshift/cert-manager-istio-csr" with
"https://github.com/openshift/cert-manager-istio-csr" so the entry matches the
HTTPS scheme used by other lines; update the single occurrence in team-repos.txt
(search for the exact URL) to ensure consistency and avoid HTTP/HTTPS
mismatches.



External Secrets Operator for Red Hat OpenShift
downstream operator: http://github.com/openshift/external-secrets-operator/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Inconsistent URL scheme: use https:// for security and consistency.

Same issue as above—this URL uses http:// instead of https://.

🔗 Proposed fix
-downstream operator: http://github.com/openshift/external-secrets-operator/
+downstream operator: https://github.com/openshift/external-secrets-operator/
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
downstream operator: http://github.com/openshift/external-secrets-operator/
downstream operator: https://github.com/openshift/external-secrets-operator/
🤖 Prompt for AI Agents
In `@team-repos.txt` at line 11, Replace the insecure URL scheme for the
downstream operator entry: change
"http://github.com/openshift/external-secrets-operator/" to use "https://" so
the line reads "https://github.com/openshift/external-secrets-operator/"; this
ensures a secure and consistent URL scheme for the downstream operator
reference.

Signed-off-by: Swarup Ghosh <swghosh@redhat.com>
Signed-off-by: Swarup Ghosh <swghosh@redhat.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Fix all issues with AI agents
In `@server/agent.py`:
- Around line 34-35: The CONFIGS initialization currently opens and parses
config.json at import time without error handling; update the code around
CONFIGS to catch FileNotFoundError and json.JSONDecodeError when opening/parsing
the Path(__file__).resolve().parent / "config.json", and handle them by logging
or raising a clear, descriptive exception (or falling back to sane defaults) so
the app fails with a helpful message instead of an obscure traceback; ensure you
reference CONFIGS and the config file load block when applying the change.

In `@server/server.py`:
- Around line 298-310: The created asyncio task for running jobs is not stored
and may be garbage-collected; change the code that creates the task so you
assign it to a variable (e.g., task = asyncio.create_task(_run_job(...))) and
store that reference in the jobs entry (e.g., jobs[job_id]["task"] = task) so
the task is retained; update the block around job_id, jobs[...] and
asyncio.create_task(...) to create the task, store it under the job's dict (and
optionally make sure any cleanup/cancellation logic in _run_job or elsewhere
uses that stored "task" key).
- Around line 55-63: The _resolve_working_dir function currently trusts a
user-controlled cwd; to prevent path traversal, canonicalize the input with
os.path.realpath and then enforce it is inside an allowed base (e.g., the
application's repo root or a configured ALLOWED_BASE) by comparing via
os.path.commonpath; if the resolved path is outside that allowed base or
contains disallowed patterns (absolute paths when only relative should be
accepted), raise the same HTTPException (400) with a clear message; update
callers to pass the configured base if needed and ensure validation occurs in
_resolve_working_dir before using the directory.
- Around line 336-370: The streaming loop currently assumes jobs[job_id] always
exists and will KeyError if the job is deleted; modify event_generator to
defensively fetch the job with job = jobs.get(job_id) at the top of each loop
iteration (and after waiting) instead of direct indexing, bail out by yielding a
final "complete" event (e.g., status "deleted" or similar) and return if job is
None, and replace uses of jobs[job_id]["conversation"], jobs[job_id]["status"],
and jobs[job_id]["message_event"] with values read from the job dict (or None)
to avoid KeyError during streaming; ensure the async with condition only runs if
condition is present.
🧹 Nitpick comments (4)
server/agent.py (2)

26-32: Consider using a configurable log path instead of hardcoded /tmp.

The hardcoded /tmp/conversation.log path has potential issues:

  1. On multi-tenant systems, other users can read/write to this file
  2. /tmp may be cleared unexpectedly or have size limits
  3. Parallel server instances would clobber the same file

Consider making this configurable via environment variable or using a path under the application directory.

♻️ Proposed fix
-CONVERSATION_LOG = Path("/tmp/conversation.log")
+CONVERSATION_LOG = Path(
+    os.environ.get("OAPE_CONVERSATION_LOG", "/tmp/conversation.log")
+)

And add import os to the imports.


193-199: Catching bare Exception may hide unexpected errors.

While catching Exception ensures the agent always returns a result, it also catches programming errors (e.g., TypeError, AttributeError) that might indicate bugs. Consider logging at error level or re-raising certain exception types.

♻️ Proposed improvement
     except Exception as exc:
-        conv_logger.info(f"[error] {traceback.format_exc()}")
+        conv_logger.error(f"[error] {traceback.format_exc()}")
         return AgentResult(
             output="",
             cost_usd=cost_usd,
             error=str(exc),
             conversation=conversation,
         )
server/server.py (2)

39-42: In-memory job store will lose all jobs on server restart.

The jobs dict is volatile. Consider documenting this limitation or adding persistence for production use. Also, jobs are never cleaned up, leading to unbounded memory growth.

♻️ Consider adding job expiration

Add a TTL mechanism or periodic cleanup for completed jobs:

import time

# In job creation:
jobs[job_id] = {
    # ... existing fields
    "created_at": time.time(),
}

# Add a cleanup task or check max jobs
MAX_JOBS = 1000
if len(jobs) > MAX_JOBS:
    # Remove oldest completed jobs
    ...

66-273: Consider moving HTML template to a separate file.

The 200+ line HTML template embedded as a string constant reduces code readability. Consider moving it to a separate .html file and loading it at startup.

Comment on lines +34 to +35
with open(Path(__file__).resolve().parent / "config.json") as cf:
CONFIGS = json.loads(cf.read())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Config loading lacks error handling for missing or malformed file.

If config.json is missing or contains invalid JSON, the application will crash at import time with an unclear error. Consider adding explicit error handling.

🛡️ Proposed fix
-with open(Path(__file__).resolve().parent / "config.json") as cf:
-    CONFIGS = json.loads(cf.read())
+_config_path = Path(__file__).resolve().parent / "config.json"
+try:
+    with open(_config_path) as cf:
+        CONFIGS = json.load(cf)
+except FileNotFoundError:
+    raise RuntimeError(f"Missing required config file: {_config_path}")
+except json.JSONDecodeError as e:
+    raise RuntimeError(f"Invalid JSON in config file {_config_path}: {e}")
🤖 Prompt for AI Agents
In `@server/agent.py` around lines 34 - 35, The CONFIGS initialization currently
opens and parses config.json at import time without error handling; update the
code around CONFIGS to catch FileNotFoundError and json.JSONDecodeError when
opening/parsing the Path(__file__).resolve().parent / "config.json", and handle
them by logging or raising a clear, descriptive exception (or falling back to
sane defaults) so the app fails with a helpful message instead of an obscure
traceback; ensure you reference CONFIGS and the config file load block when
applying the change.

Comment on lines +55 to +63
def _resolve_working_dir(cwd: str) -> str:
"""Resolve and validate the working directory."""
working_dir = cwd if cwd else os.getcwd()
if not os.path.isdir(working_dir):
raise HTTPException(
status_code=400,
detail=f"The provided cwd is not a valid directory: {working_dir}",
)
return working_dir
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Path traversal via user-controlled cwd parameter.

While os.path.isdir validates existence, it doesn't prevent access to sensitive directories. Consider restricting allowed paths or sandboxing.

🛡️ Suggested mitigation
 def _resolve_working_dir(cwd: str) -> str:
     """Resolve and validate the working directory."""
     working_dir = cwd if cwd else os.getcwd()
+    # Resolve to absolute path to prevent traversal tricks
+    working_dir = os.path.realpath(working_dir)
     if not os.path.isdir(working_dir):
         raise HTTPException(
             status_code=400,
             detail=f"The provided cwd is not a valid directory: {working_dir}",
         )
+    # Optional: Restrict to allowed base paths
+    # ALLOWED_BASE = os.environ.get("OAPE_ALLOWED_CWD_BASE", "/home")
+    # if not working_dir.startswith(ALLOWED_BASE):
+    #     raise HTTPException(status_code=403, detail="Directory not allowed")
     return working_dir
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def _resolve_working_dir(cwd: str) -> str:
"""Resolve and validate the working directory."""
working_dir = cwd if cwd else os.getcwd()
if not os.path.isdir(working_dir):
raise HTTPException(
status_code=400,
detail=f"The provided cwd is not a valid directory: {working_dir}",
)
return working_dir
def _resolve_working_dir(cwd: str) -> str:
"""Resolve and validate the working directory."""
working_dir = cwd if cwd else os.getcwd()
# Resolve to absolute path to prevent traversal tricks
working_dir = os.path.realpath(working_dir)
if not os.path.isdir(working_dir):
raise HTTPException(
status_code=400,
detail=f"The provided cwd is not a valid directory: {working_dir}",
)
# Optional: Restrict to allowed base paths
# ALLOWED_BASE = os.environ.get("OAPE_ALLOWED_CWD_BASE", "/home")
# if not working_dir.startswith(ALLOWED_BASE):
# raise HTTPException(status_code=403, detail="Directory not allowed")
return working_dir
🤖 Prompt for AI Agents
In `@server/server.py` around lines 55 - 63, The _resolve_working_dir function
currently trusts a user-controlled cwd; to prevent path traversal, canonicalize
the input with os.path.realpath and then enforce it is inside an allowed base
(e.g., the application's repo root or a configured ALLOWED_BASE) by comparing
via os.path.commonpath; if the resolved path is outside that allowed base or
contains disallowed patterns (absolute paths when only relative should be
accepted), raise the same HTTPException (400) with a clear message; update
callers to pass the configured base if needed and ensure validation occurs in
_resolve_working_dir before using the directory.

Comment on lines +298 to +310
job_id = uuid.uuid4().hex[:12]
jobs[job_id] = {
"status": "running",
"ep_url": ep_url,
"cwd": working_dir,
"conversation": [],
"message_event": asyncio.Condition(),
"output": "",
"cost_usd": 0.0,
"error": None,
}
asyncio.create_task(_run_job(job_id, command, ep_url, working_dir))
return {"job_id": job_id}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Store task reference to prevent garbage collection warnings.

The task created at line 309 is not stored, which can cause issues if the task is garbage-collected before completion.

🐛 Proposed fix
-    asyncio.create_task(_run_job(job_id, command, ep_url, working_dir))
+    jobs[job_id]["task"] = asyncio.create_task(
+        _run_job(job_id, command, ep_url, working_dir)
+    )
     return {"job_id": job_id}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
job_id = uuid.uuid4().hex[:12]
jobs[job_id] = {
"status": "running",
"ep_url": ep_url,
"cwd": working_dir,
"conversation": [],
"message_event": asyncio.Condition(),
"output": "",
"cost_usd": 0.0,
"error": None,
}
asyncio.create_task(_run_job(job_id, command, ep_url, working_dir))
return {"job_id": job_id}
job_id = uuid.uuid4().hex[:12]
jobs[job_id] = {
"status": "running",
"ep_url": ep_url,
"cwd": working_dir,
"conversation": [],
"message_event": asyncio.Condition(),
"output": "",
"cost_usd": 0.0,
"error": None,
}
jobs[job_id]["task"] = asyncio.create_task(
_run_job(job_id, command, ep_url, working_dir)
)
return {"job_id": job_id}
🧰 Tools
🪛 Ruff (0.15.0)

[warning] 309-309: Store a reference to the return value of asyncio.create_task

(RUF006)

🤖 Prompt for AI Agents
In `@server/server.py` around lines 298 - 310, The created asyncio task for
running jobs is not stored and may be garbage-collected; change the code that
creates the task so you assign it to a variable (e.g., task =
asyncio.create_task(_run_job(...))) and store that reference in the jobs entry
(e.g., jobs[job_id]["task"] = task) so the task is retained; update the block
around job_id, jobs[...] and asyncio.create_task(...) to create the task, store
it under the job's dict (and optionally make sure any cleanup/cancellation logic
in _run_job or elsewhere uses that stored "task" key).

Comment on lines +336 to +370
async def event_generator():
cursor = 0
condition = jobs[job_id]["message_event"]

while True:
# Send any new messages since the cursor
conversation = jobs[job_id]["conversation"]
while cursor < len(conversation):
yield {
"event": "message",
"data": json.dumps(conversation[cursor], default=str),
}
cursor += 1

# Check if the job is complete
status = jobs[job_id]["status"]
if status != "running":
yield {
"event": "complete",
"data": json.dumps({
"status": status,
"output": jobs[job_id].get("output", ""),
"cost_usd": jobs[job_id].get("cost_usd", 0.0),
"error": jobs[job_id].get("error"),
}),
}
return

# Wait for new messages or send keepalive on timeout
async with condition:
try:
await asyncio.wait_for(condition.wait(), timeout=30.0)
except asyncio.TimeoutError:
yield {"event": "keepalive", "data": ""}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Potential KeyError if job is deleted during streaming.

If the job is removed from jobs dict while a client is streaming, subsequent accesses like jobs[job_id]["conversation"] will raise KeyError. Consider defensive checks or using .get().

🛡️ Proposed fix
     async def event_generator():
         cursor = 0
         condition = jobs[job_id]["message_event"]

         while True:
+            if job_id not in jobs:
+                yield {"event": "complete", "data": json.dumps({"status": "deleted", "error": "Job no longer exists"})}
+                return
+
             # Send any new messages since the cursor
             conversation = jobs[job_id]["conversation"]
🤖 Prompt for AI Agents
In `@server/server.py` around lines 336 - 370, The streaming loop currently
assumes jobs[job_id] always exists and will KeyError if the job is deleted;
modify event_generator to defensively fetch the job with job = jobs.get(job_id)
at the top of each loop iteration (and after waiting) instead of direct
indexing, bail out by yielding a final "complete" event (e.g., status "deleted"
or similar) and return if job is None, and replace uses of
jobs[job_id]["conversation"], jobs[job_id]["status"], and
jobs[job_id]["message_event"] with values read from the job dict (or None) to
avoid KeyError during streaming; ensure the async with condition only runs if
condition is present.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@server/homepage.html`:
- Around line 105-115: The error handler currently calls await res.json() which
will throw if the response is not JSON; change the success/error parsing around
the fetch in the submit flow (the code handling res, statusEl, btn, streamJob)
to first await res.text(), then try JSON.parse on that text inside a try/catch
to extract a "detail" or "job_id" fallbacking to the raw text when parsing
fails; use the parsed detail/text for the thrown Error message when !res.ok and
for showing errors in statusEl (via escapeHtml), and only call streamJob(job_id)
when a job_id is successfully extracted.

Comment on lines +105 to +115
try {
const body = new URLSearchParams({ep_url: epUrl, command: command});
if (cwd) body.append('cwd', cwd);
const res = await fetch('/submit', {method: 'POST', body});
if (!res.ok) { throw new Error((await res.json()).detail || res.statusText); }
const {job_id} = await res.json();
statusEl.innerHTML = '<span class="spinner"></span> Connecting to agent stream\u2026';
streamJob(job_id);
} catch (err) {
statusEl.innerHTML = '<span class="error">Error: ' + escapeHtml(err.message) + '</span>';
btn.disabled = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Harden error parsing for non‑JSON responses.

If /submit returns HTML/plain text (e.g., proxy error), res.json() throws and the user sees a misleading parse error. Consider parsing text first and then attempting JSON.

🛠️ Suggested fix
-    if (!res.ok) { throw new Error((await res.json()).detail || res.statusText); }
+    if (!res.ok) {
+      const raw = await res.text();
+      let detail = res.statusText;
+      try {
+        const data = JSON.parse(raw);
+        detail = data.detail || detail;
+      } catch {
+        if (raw) detail = raw;
+      }
+      throw new Error(detail);
+    }
🤖 Prompt for AI Agents
In `@server/homepage.html` around lines 105 - 115, The error handler currently
calls await res.json() which will throw if the response is not JSON; change the
success/error parsing around the fetch in the submit flow (the code handling
res, statusEl, btn, streamJob) to first await res.text(), then try JSON.parse on
that text inside a try/catch to extract a "detail" or "job_id" fallbacking to
the raw text when parsing fails; use the parsed detail/text for the thrown Error
message when !res.ok and for showing errors in statusEl (via escapeHtml), and
only call streamJob(job_id) when a job_id is successfully extracted.

@swghosh swghosh merged commit 656b0e5 into main Feb 16, 2026
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant