Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 226 additions & 0 deletions .cursor/rules/async-patterns.mdc
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
---
description: Async/await patterns and error handling for asyncio code (optional - for future async implementations)
globs: **/*.py
alwaysApply: false
---

# Async/Await Patterns

**Note**: This repository currently uses synchronous code. These patterns are provided for reference if async functionality is added in the future.

## Async Function Design

### Coroutine Definition
```python
# ✅ GOOD - Proper async function definition
async def process_llm_record_batch(
record_batch: list[dict[str, Any]],
cache: dict[str, Any]
) -> None:
"""Process a batch of records with LLM calls."""
for record in record_batch:
result = await call_llm(record["data"], cache)
save_result(record["id"], result)

# ❌ BAD - Mixing sync and async incorrectly
async def process_llm_record_batch(record_batch: list[dict[str, Any]]) -> None:
for record in record_batch:
result = call_llm(record["data"]) # Blocking call in async function
save_result(record["id"], result)
```

### Avoid Blocking Operations
```python
# ✅ GOOD - Use asyncio.to_thread for CPU-bound tasks
import asyncio

async def process_data(data: list[int]) -> list[int]:
# CPU-bound operation in thread
return await asyncio.to_thread(cpu_intensive_processing, data)

# ❌ BAD - Blocking the event loop
async def process_data(data: list[int]) -> list[int]:
return cpu_intensive_processing(data) # Blocks event loop
```

## Error Handling in Async Code

### Try-Except in Coroutines
```python
# ✅ GOOD - Proper exception handling in async functions
async def process_record(record: dict[str, Any]) -> None:
try:
result = await call_llm(record["data"])
save_result(record["id"], result)
except LLMError as e:
logger.error(f"LLM call failed for record {record['id']}", exc_info=True)
raise ProcessingError(f"Failed to process record {record['id']}") from e
except Exception as e:
logger.error(f"Unexpected error processing record {record['id']}", exc_info=True)
raise

# ❌ BAD - Silent exception handling
async def process_record(record: dict[str, Any]) -> None:
try:
result = await call_llm(record["data"])
save_result(record["id"], result)
except Exception:
pass # Swallows all errors
```

### Task Cancellation
```python
# ✅ GOOD - Proper cancellation handling
async def process_with_cleanup(record: dict[str, Any]) -> None:
try:
result = await long_running_operation(record)
save_result(record["id"], result)
except asyncio.CancelledError:
logger.info(f"Processing cancelled for record {record['id']}")
# Perform cleanup if needed
raise # Always re-raise CancelledError

# ❌ BAD - Swallowing CancelledError
async def process_with_cleanup(record: dict[str, Any]) -> None:
try:
result = await long_running_operation(record)
save_result(record["id"], result)
except asyncio.CancelledError:
pass # Prevents proper cancellation
```

## Task Management

### asyncio.gather() for Parallel Execution
```python
# ✅ GOOD - Using gather with error handling
async def process_batches(batches: list[list[dict[str, Any]]]) -> None:
tasks = [process_batch(batch) for batch in batches]
results = await asyncio.gather(*tasks, return_exceptions=True)

# Check for errors
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"Batch {i} failed", exc_info=result)
raise ProcessingError(f"Batch {i} processing failed") from result

# ❌ BAD - No error handling
async def process_batches(batches: list[list[dict[str, Any]]]) -> None:
tasks = [process_batch(batch) for batch in batches]
await asyncio.gather(*tasks) # One failure stops all
```

### Task Groups (Python 3.11+)
```python
# ✅ GOOD - Using task groups for better error handling
async def process_records(records: list[dict[str, Any]]) -> None:
async with asyncio.TaskGroup() as tg:
for record in records:
tg.create_task(process_record(record))
# All tasks complete or one fails and cancels others

# Note: Task groups provide better exception propagation
```

### Batch Processing
```python
# ✅ GOOD - Batching with proper size calculation
def calculate_batch_size(total: int, num_workers: int) -> int:
"""Calculate optimal batch size."""
return max(total // num_workers, 1)

async def process_all_records(records: list[dict[str, Any]], num_workers: int) -> None:
batch_size = calculate_batch_size(len(records), num_workers)
batches = [records[i:i + batch_size] for i in range(0, len(records), batch_size)]

tasks = [process_batch(batch) for batch in batches]
await asyncio.gather(*tasks, return_exceptions=True)

# ❌ BAD - No batching or poor batch size
async def process_all_records(records: list[dict[str, Any]]) -> None:
tasks = [process_record(record) for record in records] # Too many tasks
await asyncio.gather(*tasks)
```

## Abort/Cancellation Patterns

### Global Abort Flag Pattern
```python
# ✅ GOOD - Thread-safe abort flag with proper checking
import threading

class ProcessingState:
def __init__(self) -> None:
self._should_abort = False
self._lock = threading.Lock()

def set_abort(self) -> None:
with self._lock:
self._should_abort = True

def should_abort(self) -> bool:
with self._lock:
return self._should_abort

# Usage in async function
async def process_record_batch(
batch: list[dict[str, Any]],
state: ProcessingState
) -> None:
for record in batch:
if state.should_abort():
logger.info("Processing aborted")
return
await process_record(record)

# ❌ BAD - Non-thread-safe global flag
should_abort = False # Not thread-safe

async def process_record_batch(batch: list[dict[str, Any]]) -> None:
global should_abort
for record in batch:
if should_abort: # Race condition possible
return
await process_record(record)
```

## Timeout Handling

### Timeouts for Async Operations
```python
# ✅ GOOD - Using timeouts for async operations
async def call_llm_with_timeout(data: dict[str, Any], timeout: float = 30.0) -> str:
try:
return await asyncio.wait_for(call_llm(data), timeout=timeout)
except asyncio.TimeoutError:
logger.error(f"LLM call timed out after {timeout}s")
raise LLMTimeoutError(f"LLM call exceeded {timeout}s timeout")

# ❌ BAD - No timeout protection
async def call_llm_with_timeout(data: dict[str, Any]) -> str:
return await call_llm(data) # Could hang indefinitely
```

## Progress Reporting in Async Code

### Progress Updates
```python
# ✅ GOOD - Progress reporting in async context
async def process_records_with_progress(
records: list[dict[str, Any]],
progress_callback: Callable[[float], None]
) -> None:
total = len(records)
for i, record in enumerate(records):
await process_record(record)
if (i + 1) % PROGRESS_INTERVAL == 0:
progress = (i + 1) / total
progress_callback(progress)
progress_callback(1.0)

# ❌ BAD - Blocking progress updates
async def process_records_with_progress(records: list[dict[str, Any]]) -> None:
for i, record in enumerate(records):
await process_record(record)
print(f"Progress: {i / len(records)}") # Blocking print
```
92 changes: 92 additions & 0 deletions .cursor/rules/guidelines.mdc
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
---
description: Core coding guidelines and best practices for refinery-lf-exec-env
alwaysApply: true
---

# Core Coding Guidelines

## Project Overview

This is a containerized execution environment for labeling functions in refinery. The codebase processes data records using spacy DocBin format, executes user-defined labeling functions (classification or extraction), and returns results via HTTP PUT requests. The environment runs as a containerized function-as-a-service.

## General Principles

### Code Quality
- **Readability over cleverness**: Code is read far more often than written
- **Explicit over implicit**: Make intentions clear through naming and structure
- **Fail fast**: Validate inputs early and raise clear errors
- **Single responsibility**: Functions should do one thing well

### Error Handling
- Use specific exception types, not bare `except Exception`
- Preserve traceback information when re-raising exceptions
- Provide clear, actionable error messages with context
- For container output, use `print()` statements with `flush=True` for progress and status messages

### Naming Conventions
- Functions and variables: `snake_case`
- Constants: `UPPER_SNAKE_CASE`
- Classes: `PascalCase`
- Private functions: Prefix with `__` (double underscore) only when truly private
- Avoid magic suffixes like `_a2vybg` in new code - use descriptive names

### Type Hints
- Use type hints for all function signatures
- Prefer modern Python 3.10+ syntax:
- `str | None` instead of `Optional[str]`
- `list[str]` instead of `List[str]`
- `dict[str, int]` instead of `Dict[str, int]`
- Use `Any` sparingly - prefer specific types or protocols
- Use `object` instead of `Any` when accepting any object

### Global Variables
- **Avoid global variables** when possible
- If globals are necessary (e.g., runtime configuration), document why
- Consider using a configuration class or dataclass instead
- Group related globals into a single namespace object

### Progress Reporting
- Use `print()` statements with `flush=True` for container stdout compatibility
- Report progress as percentage: `print("progress: ", progress, flush=True)`
- Include clear status messages: "Preparing data for labeling function.", "Running labeling function.", "Finished execution."
- Use simple, parseable progress format for container orchestration

### Magic Numbers and Strings
- Extract magic numbers to named constants
- Use enums or constants for string literals that represent states/types
- Document the meaning of non-obvious values

### Function Length
- Keep functions focused and under 50 lines when possible
- Extract complex logic into helper functions
- Use descriptive function names that explain purpose

### Comments
- Write self-documenting code first
- Add comments for "why", not "what"
- Document complex algorithms or business logic
- Keep comments up-to-date with code changes
- Note when imports are expected to exist at runtime but not at dev time (e.g., `labeling_functions` module)

## Labeling Function Execution

### Function Signatures
- **Classification**: `lf(record: dict[str, Any]) -> str | None`
- Returns a label string or None
- Result format: `{record_id: [1.0, label_name]}`

- **Extraction**: `lf(record: dict[str, Any]) -> Generator[tuple[str, int, int], None, None]`
- Yields tuples of (label_name, start_idx, end_idx)
- Result format: `{record_id: [[1.0, label_name, start_idx, end_idx], ...]}`

### Data Processing
- Process records in chunks (default: 100 records per chunk)
- Use `inspect.isgeneratorfunction()` to detect extraction vs classification
- Handle spacy DocBin format: hex-encoded bytes starting with `\\x`
- Use `spacy.blank(iso2_code).vocab` for DocBin deserialization

### Container Execution
- Script receives command-line arguments: `progress`, `iso2_code`, `payload_url`
- Download data files via curl before Python execution
- Upload results via `requests.put(payload_url, json=results)`
- All output must use `flush=True` for proper container logging
Loading