Skip to content

feat(traceloop-sdk): Add guardrails#3649

Open
nina-kollman wants to merge 61 commits intomainfrom
nk/guardrail_v2
Open

feat(traceloop-sdk): Add guardrails#3649
nina-kollman wants to merge 61 commits intomainfrom
nk/guardrail_v2

Conversation

@nina-kollman
Copy link
Contributor

@nina-kollman nina-kollman commented Feb 1, 2026

  • I have added tests that cover my changes.
  • If adding a new instrumentation or changing an existing one, I've added screenshots from some observability platform showing the change.
  • PR name follows conventional commits format: feat(instrumentation): ... or fix(instrumentation): ....
  • (If applicable) I have updated the documentation accordingly.

Summary by CodeRabbit

  • New Features

    • Introduces a unified guardrail API and decorator for protecting functions with configurable guards, input mappers, and failure handlers.
    • Adds many pre-configured guards (PII, toxicity, relevancy, validators, agent/quality checks), condition helpers, and a default input mapper.
  • New Examples

    • Added multiple example workflows demonstrating evaluator guards, custom function guards, decorator usage, multiple-guard patterns, and validation workflows.
  • Tests

    • Extensive new unit and integration tests and cassettes improving guardrail coverage.
  • Chores

    • Removed older sample demo scripts and consolidated guardrail surface and exports.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Feb 1, 2026

📝 Walkthrough

Walkthrough

The PR replaces the old guardrails implementation with a new traceloop.sdk.guardrail package (refactored core, guard factories, input mappers, conditions, on-failure handlers), removes legacy sample examples, adds multiple new sample examples, updates evaluator/client experiment interfaces, and introduces comprehensive guardrail unit/integration tests and pytest config.

Changes

Cohort / File(s) Summary
Removed legacy examples
packages/sample-app/sample_app/guardrail_medical_chat_example.py, packages/sample-app/sample_app/guardrail_travel_agent_example.py
Deleted two older example scripts demonstrating callback-based guardrail usage and PII handling.
New sample examples
packages/sample-app/sample_app/guardrails/*.py
Added multiple example modules: custom_evaluator_guard.py, custom_function_guard.py, decorator_example.py, multiple_guards_example.py, traceloop_evaluator_guard.py, validate_example.py demonstrating new guardrail APIs and patterns.
New guardrail core package
packages/traceloop-sdk/traceloop/sdk/guardrail/__init__.py, .../guardrail.py, .../model.py, .../condition.py, .../on_failure.py, .../guards.py, .../default_mapper.py, .../span_attributes.py
Introduced full guardrail subsystem: Guardrails orchestrator class, Guard/Input/OnFailure types, Condition helpers, OnFailure factories, default input mapper, pre-configured guard factories, span attribute constants, and rich error types.
Decorator & decorators API
packages/traceloop-sdk/traceloop/sdk/decorators/__init__.py
Added a public guardrail decorator supporting both async and sync functions, string or callable on_failure, input mapping, and Traceloop client integration.
Removed old guardrails module
packages/traceloop-sdk/traceloop/sdk/guardrails/guardrails.py, .../types.py
Deleted legacy traceloop.sdk.guardrails implementation (old decorator, Guardrails class, InputExtractor/ExecuteEvaluatorRequest/OutputSchema types); replaced by new guardrail package and re-export layer.
Backward-compat re-exports + deprecation
packages/traceloop-sdk/traceloop/sdk/guardrails/__init__.py
Now emits DeprecationWarning and re-exports new guardrail public API while removing several legacy exports.
Evaluator / model / experiment updates
packages/traceloop-sdk/traceloop/sdk/evaluator/evaluator.py, .../model.py, .../config.py, traceloop/sdk/experiment/experiment.py, traceloop/sdk/client/client.py, traceloop-sdk/traceloop/sdk/__init__.py
Added in-experiment evaluator request types and path, new standalone run for evaluators, changed ExecutionResponse.result shape (wrapped evaluator_result), added EvaluatorDetails fields (condition_field, output_schema), adjusted experiment result extraction and client init/import ordering.
Tests, cassettes and fixtures
packages/traceloop-sdk/tests/guardrails/*, .../cassettes/.../*.yaml, conftest.py
Comprehensive unit and integration tests added for conditions, on_failure handlers, validate logic, decorator behavior, input validation, plus VCR cassettes and pytest fixtures for async HTTP client and vcr config.
Pytest config
packages/traceloop-sdk/pyproject.toml
Added pytest ini option: asyncio_mode = "auto".

Sequence Diagram(s)

sequenceDiagram
  participant App as Application
  participant Guardrails as Guardrails
  participant Evaluator as Evaluator Service
  participant TraceloopAPI as Traceloop API

  App->>Guardrails: invoke run(func, input_mapper)
  Guardrails->>App: call guarded function -> result
  Guardrails->>Evaluator: build evaluator request(s) (condition_field, input mapping)
  Evaluator->>TraceloopAPI: POST /v2/evaluators/.../execute or /execute-single
  TraceloopAPI-->>Evaluator: execution_id / stream_url
  Evaluator->>TraceloopAPI: GET events (poll)
  TraceloopAPI-->>Evaluator: evaluator_result (wrapped evaluator_result)
  Evaluator-->>Guardrails: evaluation outcome (pass/fail, fields)
  Guardrails->>Guardrails: apply Condition(s) and OnFailure handler if needed
  Guardrails-->>App: return original result or failure result
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~65 minutes

Poem

🐰 New guards line the forest path,
I sniff the inputs, do the math.
Conditions checked and failures tamed,
Decorators hop and code's renamed.
A warren safe — the rabbit's glad! 🥕

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 75.51% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat(traceloop-sdk): Add guardrails' directly and clearly describes the main change: adding guardrails functionality to the traceloop-sdk.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch nk/guardrail_v2

Comment @coderabbitai help to get the list of available commands and usage tips.

@nina-kollman nina-kollman marked this pull request as ready for review February 1, 2026 14:29
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 8

🤖 Fix all issues with AI agents
In `@packages/sample-app/sample_app/guardrails/custom_evaluator_guard.py`:
- Around line 7-10: The guard references the evaluator slug "medicaladvice" but
docs/prints use "medical-advice-detector", causing misconfiguration; update all
occurrences of the string "medical-advice-detector" in this file (console
prints, docstrings, example outputs and any helper text) to the slug
"medicaladvice" so they match the guard's usage, and verify the evaluator lookup
code (the usage in CustomEvaluatorGuard or similarly named class/function) still
resolves correctly to "medicaladvice".

In `@packages/sample-app/sample_app/guardrails/custom_function_guard.py`:
- Around line 50-51: The code returns completion.choices[0].message.content
directly which can be None (especially for tool calls); in each generate_*
function replace that direct access with a safe fallback: read content =
completion.choices[0].message.content and if content is None set content = ""
(or raise a clear error) before further processing and before returning; update
all occurrences that reference completion.choices[0].message.content in the
generate_* functions so downstream code (e.g., accesses like z["word_count"] or
slicing result[:100]) never receives None.

In `@packages/sample-app/sample_app/guardrails/validate_example.py`:
- Around line 96-109: The code prints "Output validation passed."
unconditionally after calling output_guardrail.run, which can return a failure
result depending on the guard's on_failure handler; remove the extra blank line
and change the logic after calling output_guardrail.run (the call to
output_guardrail.run that wraps generate_response) to check the returned result
and only print the success message when the result indicates success (or handle
the failure branch accordingly), referencing the result variable and the
output_guardrail.run call (and consider OnFailure.raise_exception()/return_value
behaviors) so failure paths are properly handled instead of always logging
success.

In `@packages/traceloop-sdk/traceloop/sdk/evaluator/evaluator.py`:
- Around line 198-244: The input values passed into InputExtractor are untyped
Any but InputExtractor.source expects a str; in run() convert each input value
to a string when building InputSchemaMapping (i.e. replace {k:
InputExtractor(source=v) for k,v in input.items()} with a mapping that uses
InputExtractor(source=str(v))) so non-string inputs won't break validation, and
keep or update the run() signature as needed (alternatively, change the type
hint to Dict[str, str] if you want to forbid non-string values); ensure this
change is applied where InputSchemaMapping and InputExtractor are constructed in
run().

In `@packages/traceloop-sdk/traceloop/sdk/guardrail/default_mapper.py`:
- Around line 6-49: The default_input_mapper function currently returns repeated
references via [input_dict] * num_guards and [enriched] * num_guards which lets
one guard's mutation affect all others; change both return paths to produce
distinct copies per guard (e.g., construct a new dict per iteration or use
copy.deepcopy) so each guard gets an independent dict, and ensure any nested
mutable fields like the "context" list are also copied for each guard; update
references to input_dict and enriched accordingly.
- Around line 47-48: The code uses enriched.setdefault("context",
[output["context"]]) which won't convert an existing string to a list; change
the logic to normalize output["context"] into a list and assign it to enriched.
Specifically, in the block where "context" in output is checked, read val =
output["context"], if not isinstance(val, list) wrap it as [val], then set
enriched["context"] = that list (or use enriched.setdefault only after
converting); update the code around the existing enriched/output handling so
"context" is always a list.

In `@packages/traceloop-sdk/traceloop/sdk/guardrail/guards.py`:
- Around line 130-141: The evaluator factory calls
(EvaluatorMadeByTraceloop.toxicity_detector, .sexism_detector, .pii_detector,
and .prompt_injection) currently pass threshold/probability_threshold even when
the argument is explicitly None, which overwrites factory defaults; change each
guard factory call to only include the threshold/probability_threshold kwarg
when the caller provided a non-None value (e.g., build a small kwargs dict or
use an if/else to call EvaluatorMadeByTraceloop.toxicity_detector(...) without
the threshold param when threshold is None) so the evaluator factory can use its
default 0.7 behavior.

In `@packages/traceloop-sdk/traceloop/sdk/guardrail/model.py`:
- Around line 140-144: The __str__ method can raise AttributeError when
self.actual_type is None or not a class; update TraceloopGuardError.__str__ (the
__str__ method referencing self.actual_type, self.expected_type,
self.guard_index and self.args) to defensively obtain the type name using
getattr(self.actual_type, "__name__", None) and fall back to
str(self.actual_type) or "None" when __name__ is missing, and format the message
using that safe value instead of directly accessing actual_type.__name__.
🧹 Nitpick comments (16)
packages/traceloop-sdk/traceloop/sdk/evaluator/config.py (1)

6-25: Keep the docstring in sync with the new fields.

Add condition_field and output_schema to the Args section and update the example so the public API docs stay accurate.

Proposed docstring update
 class EvaluatorDetails(BaseModel):
     """
     Details for configuring an evaluator.

     Args:
         slug: The evaluator slug/identifier
+        condition_field: Optional field name used to gate conditional evaluation
         version: Optional version of the evaluator
         config: Optional configuration dictionary for the evaluator
+        output_schema: Optional pydantic model class describing the evaluator output
         required_input_fields: Optional list of required fields to the evaluator
             input. These fields must be present in the task output.

     Example:
         >>> EvaluatorDetails(slug="pii-detector", config={"probability_threshold": 0.8}, required_input_fields=["text"])
+        >>> EvaluatorDetails(slug="format-check", condition_field="needs_check", output_schema=MyOutputSchema)
         >>> EvaluatorDetails(slug="my-custom-evaluator", version="v2")
     """
packages/sample-app/sample_app/guardrails/multiple_guards_example.py (2)

174-177: Consider removing unnecessary async from static return function.

generate_problematic_content() doesn't perform any async operations—it just returns a static string. Since this is an example file meant to teach the API, keeping it async may be intentional to show the guardrail can handle async functions, but a comment clarifying this intent would help.


219-230: Rename to avoid shadowing module-level function.

generate_content() defined here shadows the module-level generate_content() at line 40. While it works correctly, this can be confusing in an example file meant to demonstrate patterns.

Suggested rename
-    async def generate_content() -> str:
-        """Generate content for sequential validation."""
+    async def generate_tokyo_tip() -> str:
+        """Generate a Tokyo travel tip for sequential validation."""

And update the call at line 262:

-        result = await guardrail.run(generate_content, input_mapper=create_sequential_inputs)
+        result = await guardrail.run(generate_tokyo_tip, input_mapper=create_sequential_inputs)
packages/sample-app/sample_app/guardrails/traceloop_evaluator_guard.py (2)

90-93: Minor style inconsistency with raise_exception argument.

Line 61 uses message="PII detected in response" (keyword), while line 92 uses positional argument. Consider using the keyword form consistently for clarity.

🔧 Suggested change for consistency
     guardrail = client.guardrails.create(
         guards=[Guards.toxicity_detector(threshold=0.7)],
-        on_failure=OnFailure.raise_exception("Content too toxic for family audience"),
+        on_failure=OnFailure.raise_exception(message="Content too toxic for family audience"),
     )

103-108: Consider adding type hints for clarity.

Adding type hints to the state class would improve readability and IDE support, especially since this example serves as documentation for users learning the guardrails API.

📝 Suggested type hints
 class TravelAgentState:
     """Track agent prompts and completions for trajectory evaluation."""

     def __init__(self):
-        self.prompts = []
-        self.completions = []
+        self.prompts: list[str] = []
+        self.completions: list[str] = []
packages/sample-app/sample_app/guardrails/custom_evaluator_guard.py (1)

40-46: Make guard input mapping explicit (use MedicalAdviceInput).

This avoids reliance on the default mapper and demonstrates the recommended Pydantic model usage.

♻️ Suggested refactor
 class MedicalAdviceInput(BaseModel):
     """Input model for medical advice evaluator."""
     text: str
+
+def map_to_medical_input(text: str) -> list[MedicalAdviceInput]:
+    return [MedicalAdviceInput(text=text)]
@@
 `@guardrail`(
     guards=[Guards.custom_evaluator_guard(evaluator_slug="medicaladvice")],
+    input_mapper=map_to_medical_input,
     on_failure=OnFailure.return_value(value="Sorry, I can't help you with that."),
     name="medical_advice_quality_check",
 )
@@
     result = await guardrail.run(
         attempt_diagnosis_request,
+        input_mapper=map_to_medical_input,
     )

Also applies to: 56-60, 132-134

packages/traceloop-sdk/traceloop/sdk/evaluator/evaluator.py (1)

70-112: Consider extracting common HTTP execution logic to reduce duplication.

Both _execute_evaluator_in_experiment_request and _execute_evaluator_request share identical error handling and response parsing. A helper method could reduce this duplication.

Suggested refactor
+    async def _post_evaluator_request(
+        self,
+        url: str,
+        body: dict,
+        evaluator_slug: str,
+        timeout_in_sec: int,
+    ) -> ExecuteEvaluatorResponse:
+        """Common HTTP POST logic for evaluator requests."""
+        response = await self._async_http_client.post(
+            url, json=body, timeout=httpx.Timeout(timeout_in_sec)
+        )
+        if response.status_code != 200:
+            error_detail = _extract_error_from_response(response)
+            raise Exception(
+                f"Failed to execute evaluator '{evaluator_slug}': "
+                f"{response.status_code} - {error_detail}"
+            )
+        return ExecuteEvaluatorResponse(**response.json())
+
     async def _execute_evaluator_in_experiment_request(
         self,
         evaluator_slug: str,
         request: ExecuteEvaluatorInExperimentRequest,
         timeout_in_sec: int = 120,
     ) -> ExecuteEvaluatorResponse:
-        """Execute evaluator request and return response"""
-        body = request.model_dump()
-        client = self._async_http_client
-        full_url = f"/v2/evaluators/slug/{evaluator_slug}/execute"
-        response = await client.post(
-            full_url, json=body, timeout=httpx.Timeout(timeout_in_sec)
-        )
-        if response.status_code != 200:
-            error_detail = _extract_error_from_response(response)
-            raise Exception(
-                f"Failed to execute evaluator '{evaluator_slug}': "
-                f"{response.status_code} - {error_detail}"
-            )
-        result_data = response.json()
-        return ExecuteEvaluatorResponse(**result_data)
+        return await self._post_evaluator_request(
+            f"/v2/evaluators/slug/{evaluator_slug}/execute",
+            request.model_dump(),
+            evaluator_slug,
+            timeout_in_sec,
+        )
packages/sample-app/sample_app/guardrails/decorator_example.py (1)

23-23: Consider using the standard OPENAI_API_KEY environment variable name.

The OpenAI SDK conventionally uses OPENAI_API_KEY. Using OPENAI_KEY may cause confusion or require additional documentation for users who expect the standard variable name.

💡 Suggested change
-openai_client = AsyncOpenAI(api_key=os.getenv("OPENAI_KEY"))
+openai_client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
packages/sample-app/sample_app/guardrails/validate_example.py (2)

67-105: Guardrail instances recreated on every call - consider moving to module level.

Creating prompt_guardrail and output_guardrail inside secure_chat() means they're recreated on every function call. For a sample app this is fine for demonstrating the API, but for production code, consider initializing guardrails once at module level or in a setup function.


76-76: String slicing may fail on short inputs.

user_prompt[:50] is safe (Python handles short strings gracefully), but response[:200] at line 123 assumes the response is at least that long. This is minor since it's example code, but adding ... suffix unconditionally could be misleading for short responses.

Optional: Handle short strings
-    print(f"Validating user input: '{user_prompt[:50]}...'")
+    preview = user_prompt[:50] + "..." if len(user_prompt) > 50 else user_prompt
+    print(f"Validating user input: '{preview}'")
packages/traceloop-sdk/tests/guardrails/test_validate.py (1)

14-21: Helper function duplicated across test files.

This create_guardrails_with_guards helper is nearly identical to the one in test_validate_inputs.py (lines 32-38). Consider extracting to a shared conftest.py fixture to avoid duplication.

Optional: Move to conftest.py
# In tests/guardrails/conftest.py
import pytest
from unittest.mock import MagicMock
from traceloop.sdk.guardrail.guardrail import Guardrails
from traceloop.sdk.guardrail.on_failure import OnFailure

`@pytest.fixture`
def guardrails_factory():
    """Factory fixture to create Guardrails with specified guards."""
    def _create(guards: list, on_failure=None) -> Guardrails:
        mock_client = MagicMock()
        guardrails = Guardrails(mock_client)
        guardrails._guards = guards
        guardrails._on_failure = on_failure or OnFailure.noop()
        return guardrails
    return _create
packages/traceloop-sdk/tests/guardrails/test_validate_inputs.py (2)

26-29: Unused AnotherInput class defined but never referenced in tests.

The AnotherInput model is defined but not used anywhere in this file. Consider removing it or adding tests that utilize it.

Remove unused class
-class AnotherInput(BaseModel):
-    """Different Pydantic model for testing type mismatches."""
-    name: str
-    count: int
-
-

32-38: Duplicate helper function - same as test_validate.py.

This helper is identical to the one in test_validate.py except for the default on_failure (lambda vs OnFailure.noop()). Consider consolidating in conftest.py.

packages/traceloop-sdk/traceloop/sdk/guardrail/guards.py (1)

36-90: Consider handling missing condition_field key gracefully.

At line 83, if condition_field is set but the key doesn't exist in evaluator_result, this will raise a KeyError. Since guard execution errors are caught and wrapped in GuardExecutionError by the caller, this may be acceptable, but a more informative error message would help debugging.

💡 Optional improvement for better error messages
         if condition_field:
-            result_to_validate = eval_response.result.evaluator_result[condition_field]
+            evaluator_result = eval_response.result.evaluator_result
+            if condition_field not in evaluator_result:
+                raise KeyError(
+                    f"condition_field '{condition_field}' not found in evaluator result. "
+                    f"Available fields: {list(evaluator_result.keys())}"
+                )
+            result_to_validate = evaluator_result[condition_field]
         else:
             result_to_validate = eval_response.result.evaluator_result
packages/traceloop-sdk/traceloop/sdk/guardrail/guardrail.py (2)

29-29: Unused import: Evaluator.

Evaluator is imported at line 29 but not used in this file. It appears to be used in guards.py instead.

🧹 Proposed fix
-from traceloop.sdk.evaluator.evaluator import Evaluator

69-76: _evaluator instance is created but never used.

The Evaluator instance is created at line 71 and assigned to self._evaluator, but it's never referenced elsewhere in this class. Consider removing it if unused.

🧹 Proposed fix
     def __init__(self, async_http_client: httpx.AsyncClient):
         self._async_http = async_http_client
-        self._evaluator = Evaluator(async_http_client)
         self._guards = []
         self._on_failure = None
         self._run_all = False
         self._parallel = True
         self._name = ""

Also remove the class attribute declaration at line 61:

-    _evaluator: Evaluator

Comment on lines +7 to +10
1. PASS Case: General health information that should be allowed
- Educational content about hypertension and blood pressure
- Uses medical-advice-detector evaluator
- Demonstrates safe general health information
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Align evaluator slug references to prevent misconfiguration.

Docs/console output mention "medical-advice-detector" while the guard uses "medicaladvice", which can lead users to configure the wrong evaluator and make the example fail. Please pick one slug and update all references consistently.

🛠️ Example alignment (update docs/prints to match the code)
-   - Uses medical-advice-detector evaluator
+   - Uses medicaladvice evaluator
@@
-    Custom Evaluator Required: 'medicaladvice'
+    Custom Evaluator Required: 'medicaladvice'
@@
-    print("Note: Requires custom evaluator 'medical-advice-detector' in Traceloop")
+    print("Note: Requires custom evaluator 'medicaladvice' in Traceloop")

Also applies to: 57-58, 68-69, 144-145

🤖 Prompt for AI Agents
In `@packages/sample-app/sample_app/guardrails/custom_evaluator_guard.py` around
lines 7 - 10, The guard references the evaluator slug "medicaladvice" but
docs/prints use "medical-advice-detector", causing misconfiguration; update all
occurrences of the string "medical-advice-detector" in this file (console
prints, docstrings, example outputs and any helper text) to the slug
"medicaladvice" so they match the guard's usage, and verify the evaluator lookup
code (the usage in CustomEvaluatorGuard or similarly named class/function) still
resolves correctly to "medicaladvice".

Comment on lines +50 to +51
return completion.choices[0].message.content

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Potential None access when LLM returns no content.

completion.choices[0].message.content can be None when the API returns an empty response or when the model uses tool calls. This would cause issues downstream when the guard tries to process the result (e.g., accessing z["word_count"] or slicing with result[:100]).

Consider adding a fallback:

🛡️ Proposed fix
-        return completion.choices[0].message.content
+        return completion.choices[0].message.content or ""

This pattern appears in all generate_* functions (lines 50, 109, 140, 177).

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
return completion.choices[0].message.content
return completion.choices[0].message.content or ""
🤖 Prompt for AI Agents
In `@packages/sample-app/sample_app/guardrails/custom_function_guard.py` around
lines 50 - 51, The code returns completion.choices[0].message.content directly
which can be None (especially for tool calls); in each generate_* function
replace that direct access with a safe fallback: read content =
completion.choices[0].message.content and if content is None set content = ""
(or raise a clear error) before further processing and before returning; update
all occurrences that reference completion.choices[0].message.content in the
generate_* functions so downstream code (e.g., accesses like z["word_count"] or
slicing result[:100]) never receives None.

Comment on lines +96 to +109


result = await output_guardrail.run(
lambda: generate_response(user_prompt),
input_mapper=lambda response_text: [
AnswerRelevancyInput(answer=response_text, question=user_prompt),
SexismDetectorInput(text=response_text),
ToxicityDetectorInput(text=response_text),
],
)

print("Output validation passed.")

return result
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Missing failure handling after run() - the result may be a failure response.

Line 107 prints "Output validation passed" unconditionally, but run() can return the on_failure handler's result when guards fail (the default is OnFailure.raise_exception() which would raise, but custom handlers like return_value would return a fallback). If you're using the default handler, this is fine since it raises on failure.

Also, there's an extra blank line at line 97.

Suggested fix
     )

-
     result = await output_guardrail.run(
         lambda: generate_response(user_prompt),
         input_mapper=lambda response_text: [
             AnswerRelevancyInput(answer=response_text, question=user_prompt),
             SexismDetectorInput(text=response_text),
             ToxicityDetectorInput(text=response_text),
         ],
     )

-    print("Output validation passed.")
+    # Note: With default on_failure=OnFailure.raise_exception(),
+    # reaching here means all guards passed
+    print("Output guards passed.")

     return result
🤖 Prompt for AI Agents
In `@packages/sample-app/sample_app/guardrails/validate_example.py` around lines
96 - 109, The code prints "Output validation passed." unconditionally after
calling output_guardrail.run, which can return a failure result depending on the
guard's on_failure handler; remove the extra blank line and change the logic
after calling output_guardrail.run (the call to output_guardrail.run that wraps
generate_response) to check the returned result and only print the success
message when the result indicates success (or handle the failure branch
accordingly), referencing the result variable and the output_guardrail.run call
(and consider OnFailure.raise_exception()/return_value behaviors) so failure
paths are properly handled instead of always logging success.

Comment on lines +198 to +244
async def run(
self,
evaluator_slug: str,
input: Dict[str, Any],
timeout_in_sec: int = 120,
evaluator_version: Optional[str] = None,
evaluator_config: Optional[Dict[str, Any]] = None,
) -> ExecutionResponse:
"""
Execute an evaluator without experiment context.

This is a simpler interface for running evaluators standalone,
without associating results with experiments.

Args:
evaluator_slug: Slug of the evaluator to execute
input: Dict mapping evaluator input field names to their values.
Values can be any type (str, int, dict, etc.)
timeout_in_sec: Timeout in seconds for execution
evaluator_version: Version of the evaluator to execute (optional)
evaluator_config: Configuration for the evaluator (optional)

Returns:
ExecutionResponse: The evaluation result
"""
_validate_evaluator_input(evaluator_slug, input)

schema_mapping = InputSchemaMapping(
root={k: InputExtractor(source=v) for k, v in input.items()}
)

request = ExecuteEvaluatorRequest(
input_schema_mapping=schema_mapping,
evaluator_version=evaluator_version,
evaluator_config=evaluator_config,
)

execute_response = await self._execute_evaluator_request(
evaluator_slug, request, timeout_in_sec
)

sse_client = SSEClient(shared_client=self._async_http_client)
return await sse_client.wait_for_result(
execute_response.execution_id,
execute_response.stream_url,
timeout_in_sec,
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Potential type mismatch: InputExtractor.source expects str but input values are Any.

The run() method accepts input: Dict[str, Any] (line 201), but line 226 passes values directly to InputExtractor(source=v). According to the model definition, InputExtractor.source is typed as str. Non-string values may cause validation errors or unexpected serialization behavior.

Consider either:

  1. Restricting the input type to Dict[str, str] to match experiment methods
  2. Converting values to strings explicitly
Option 1: Align type signature with other methods
     async def run(
         self,
         evaluator_slug: str,
-        input: Dict[str, Any],
+        input: Dict[str, str],
         timeout_in_sec: int = 120,
         evaluator_version: Optional[str] = None,
         evaluator_config: Optional[Dict[str, Any]] = None,
     ) -> ExecutionResponse:
Option 2: Convert values to strings
         schema_mapping = InputSchemaMapping(
-            root={k: InputExtractor(source=v) for k, v in input.items()}
+            root={k: InputExtractor(source=str(v) if not isinstance(v, str) else v) for k, v in input.items()}
         )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async def run(
self,
evaluator_slug: str,
input: Dict[str, Any],
timeout_in_sec: int = 120,
evaluator_version: Optional[str] = None,
evaluator_config: Optional[Dict[str, Any]] = None,
) -> ExecutionResponse:
"""
Execute an evaluator without experiment context.
This is a simpler interface for running evaluators standalone,
without associating results with experiments.
Args:
evaluator_slug: Slug of the evaluator to execute
input: Dict mapping evaluator input field names to their values.
Values can be any type (str, int, dict, etc.)
timeout_in_sec: Timeout in seconds for execution
evaluator_version: Version of the evaluator to execute (optional)
evaluator_config: Configuration for the evaluator (optional)
Returns:
ExecutionResponse: The evaluation result
"""
_validate_evaluator_input(evaluator_slug, input)
schema_mapping = InputSchemaMapping(
root={k: InputExtractor(source=v) for k, v in input.items()}
)
request = ExecuteEvaluatorRequest(
input_schema_mapping=schema_mapping,
evaluator_version=evaluator_version,
evaluator_config=evaluator_config,
)
execute_response = await self._execute_evaluator_request(
evaluator_slug, request, timeout_in_sec
)
sse_client = SSEClient(shared_client=self._async_http_client)
return await sse_client.wait_for_result(
execute_response.execution_id,
execute_response.stream_url,
timeout_in_sec,
)
async def run(
self,
evaluator_slug: str,
input: Dict[str, str],
timeout_in_sec: int = 120,
evaluator_version: Optional[str] = None,
evaluator_config: Optional[Dict[str, Any]] = None,
) -> ExecutionResponse:
"""
Execute an evaluator without experiment context.
This is a simpler interface for running evaluators standalone,
without associating results with experiments.
Args:
evaluator_slug: Slug of the evaluator to execute
input: Dict mapping evaluator input field names to their values.
Values can be any type (str, int, dict, etc.)
timeout_in_sec: Timeout in seconds for execution
evaluator_version: Version of the evaluator to execute (optional)
evaluator_config: Configuration for the evaluator (optional)
Returns:
ExecutionResponse: The evaluation result
"""
_validate_evaluator_input(evaluator_slug, input)
schema_mapping = InputSchemaMapping(
root={k: InputExtractor(source=v) for k, v in input.items()}
)
request = ExecuteEvaluatorRequest(
input_schema_mapping=schema_mapping,
evaluator_version=evaluator_version,
evaluator_config=evaluator_config,
)
execute_response = await self._execute_evaluator_request(
evaluator_slug, request, timeout_in_sec
)
sse_client = SSEClient(shared_client=self._async_http_client)
return await sse_client.wait_for_result(
execute_response.execution_id,
execute_response.stream_url,
timeout_in_sec,
)
Suggested change
async def run(
self,
evaluator_slug: str,
input: Dict[str, Any],
timeout_in_sec: int = 120,
evaluator_version: Optional[str] = None,
evaluator_config: Optional[Dict[str, Any]] = None,
) -> ExecutionResponse:
"""
Execute an evaluator without experiment context.
This is a simpler interface for running evaluators standalone,
without associating results with experiments.
Args:
evaluator_slug: Slug of the evaluator to execute
input: Dict mapping evaluator input field names to their values.
Values can be any type (str, int, dict, etc.)
timeout_in_sec: Timeout in seconds for execution
evaluator_version: Version of the evaluator to execute (optional)
evaluator_config: Configuration for the evaluator (optional)
Returns:
ExecutionResponse: The evaluation result
"""
_validate_evaluator_input(evaluator_slug, input)
schema_mapping = InputSchemaMapping(
root={k: InputExtractor(source=v) for k, v in input.items()}
)
request = ExecuteEvaluatorRequest(
input_schema_mapping=schema_mapping,
evaluator_version=evaluator_version,
evaluator_config=evaluator_config,
)
execute_response = await self._execute_evaluator_request(
evaluator_slug, request, timeout_in_sec
)
sse_client = SSEClient(shared_client=self._async_http_client)
return await sse_client.wait_for_result(
execute_response.execution_id,
execute_response.stream_url,
timeout_in_sec,
)
async def run(
self,
evaluator_slug: str,
input: Dict[str, Any],
timeout_in_sec: int = 120,
evaluator_version: Optional[str] = None,
evaluator_config: Optional[Dict[str, Any]] = None,
) -> ExecutionResponse:
"""
Execute an evaluator without experiment context.
This is a simpler interface for running evaluators standalone,
without associating results with experiments.
Args:
evaluator_slug: Slug of the evaluator to execute
input: Dict mapping evaluator input field names to their values.
Values can be any type (str, int, dict, etc.)
timeout_in_sec: Timeout in seconds for execution
evaluator_version: Version of the evaluator to execute (optional)
evaluator_config: Configuration for the evaluator (optional)
Returns:
ExecutionResponse: The evaluation result
"""
_validate_evaluator_input(evaluator_slug, input)
schema_mapping = InputSchemaMapping(
root={k: InputExtractor(source=str(v) if not isinstance(v, str) else v) for k, v in input.items()}
)
request = ExecuteEvaluatorRequest(
input_schema_mapping=schema_mapping,
evaluator_version=evaluator_version,
evaluator_config=evaluator_config,
)
execute_response = await self._execute_evaluator_request(
evaluator_slug, request, timeout_in_sec
)
sse_client = SSEClient(shared_client=self._async_http_client)
return await sse_client.wait_for_result(
execute_response.execution_id,
execute_response.stream_url,
timeout_in_sec,
)
🤖 Prompt for AI Agents
In `@packages/traceloop-sdk/traceloop/sdk/evaluator/evaluator.py` around lines 198
- 244, The input values passed into InputExtractor are untyped Any but
InputExtractor.source expects a str; in run() convert each input value to a
string when building InputSchemaMapping (i.e. replace {k:
InputExtractor(source=v) for k,v in input.items()} with a mapping that uses
InputExtractor(source=str(v))) so non-string inputs won't break validation, and
keep or update the run() signature as needed (alternatively, change the type
hint to Dict[str, str] if you want to forbid non-string values); ensure this
change is applied where InputSchemaMapping and InputExtractor are constructed in
run().

Comment on lines +6 to +49
from typing import Any


def default_input_mapper(output: Any, num_guards: int) -> list[dict]:
"""
Default mapper for common response types.

Handles:
- str: Creates dict with common text field names for each guard
- dict with {question, answer, context}: Passes through with field aliases

Args:
output: The return value from the guarded function
num_guards: Number of guards to create inputs for

Returns:
List of dicts, one per guard

Raises:
ValueError: If output type cannot be handled
"""
if isinstance(output, str):
# Map string to common field names used by evaluators
input_dict = {
"text": output,
"prompt": output,
"completion": output
}
return [input_dict] * num_guards

if isinstance(output, dict):
# Enrich dict with aliases for compatibility with various evaluators
enriched = {**output}
if "text" in output:
enriched.setdefault("prompt", output["text"])
enriched.setdefault("completion", output["text"])
if "question" in output:
enriched.setdefault("query", output["question"])
if "answer" in output:
enriched.setdefault("answer", output["answer"])
enriched.setdefault("completion", output["answer"])
if "context" in output:
enriched.setdefault("context", [output["context"]])
return [enriched] * num_guards
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid sharing the same input dict across guards.

[input_dict] * num_guards and [enriched] * num_guards duplicate references, so any mutation by one guard can contaminate others. Create distinct copies per guard.

🛠️ Proposed fix
-from typing import Any
+from typing import Any
+import copy
@@
-        return [input_dict] * num_guards
+        return [input_dict.copy() for _ in range(num_guards)]
@@
-        return [enriched] * num_guards
+        return [copy.deepcopy(enriched) for _ in range(num_guards)]
🤖 Prompt for AI Agents
In `@packages/traceloop-sdk/traceloop/sdk/guardrail/default_mapper.py` around
lines 6 - 49, The default_input_mapper function currently returns repeated
references via [input_dict] * num_guards and [enriched] * num_guards which lets
one guard's mutation affect all others; change both return paths to produce
distinct copies per guard (e.g., construct a new dict per iteration or use
copy.deepcopy) so each guard gets an independent dict, and ensure any nested
mutable fields like the "context" list are also copied for each guard; update
references to input_dict and enriched accordingly.

Comment on lines +47 to +48
if "context" in output:
enriched.setdefault("context", [output["context"]])
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Ensure context is normalized to a list when provided.

setdefault doesn’t update existing values, so a string context remains a string. If the intent is to always provide a list, coerce when needed.

🧩 Proposed fix
-        if "context" in output:
-            enriched.setdefault("context", [output["context"]])
+        if "context" in output and not isinstance(output["context"], list):
+            enriched["context"] = [output["context"]]
🤖 Prompt for AI Agents
In `@packages/traceloop-sdk/traceloop/sdk/guardrail/default_mapper.py` around
lines 47 - 48, The code uses enriched.setdefault("context", [output["context"]])
which won't convert an existing string to a list; change the logic to normalize
output["context"] into a list and assign it to enriched. Specifically, in the
block where "context" in output is checked, read val = output["context"], if not
isinstance(val, list) wrap it as [val], then set enriched["context"] = that list
(or use enriched.setdefault only after converting); update the code around the
existing enriched/output handling so "context" is always a list.

Comment on lines +140 to +144
def __str__(self) -> str:
return (
f"{self.args[0]} [guard {self.guard_index}]: "
f"expected {self.expected_type}, got {self.actual_type.__name__}"
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Potential AttributeError if actual_type is None or a non-class type.

Line 143 accesses actual_type.__name__, but if actual_type is None or a special typing construct, this will raise AttributeError.

Suggested defensive fix
     def __str__(self) -> str:
+        type_name = getattr(self.actual_type, '__name__', str(self.actual_type))
         return (
             f"{self.args[0]} [guard {self.guard_index}]: "
-            f"expected {self.expected_type}, got {self.actual_type.__name__}"
+            f"expected {self.expected_type}, got {type_name}"
         )
🤖 Prompt for AI Agents
In `@packages/traceloop-sdk/traceloop/sdk/guardrail/model.py` around lines 140 -
144, The __str__ method can raise AttributeError when self.actual_type is None
or not a class; update TraceloopGuardError.__str__ (the __str__ method
referencing self.actual_type, self.expected_type, self.guard_index and
self.args) to defensively obtain the type name using getattr(self.actual_type,
"__name__", None) and fall back to str(self.actual_type) or "None" when __name__
is missing, and format the message using that safe value instead of directly
accessing actual_type.__name__.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Fix all issues with AI agents
In `@packages/sample-app/sample_app/guardrails/custom_evaluator_guard.py`:
- Around line 109-122: The function attempt_diagnosis_request returns
completion.choices[0].message.content without a null fallback, which can cause a
TypeError if content is None; update attempt_diagnosis_request to mirror the
earlier handler by returning completion.choices[0].message.content or "" (or an
explicit empty string fallback) so downstream slicing won't fail, and ensure you
reference the same openai_client.chat.completions.create result handling used
elsewhere.

In `@packages/sample-app/sample_app/guardrails/decorator_example.py`:
- Around line 30-39: The function generate_response should never return None
even if completion.choices[0].message.content is None; update the return
behavior in generate_response (after the await
openai_client.chat.completions.create call) to safely coalesce
completion.choices[0].message.content to a string fallback (e.g., empty string
or a descriptive fallback) so the declared return type -> str is honored and
downstream slicing won't raise a TypeError.

In `@packages/traceloop-sdk/traceloop/sdk/decorators/__init__.py`:
- Around line 244-250: The sync wrapper uses asyncio.run(...) around g.run(...)
which raises RuntimeError if an event loop is already running; detect whether an
event loop is running (asyncio.get_event_loop().is_running() or
asyncio.get_running_loop() with try/except) and, if it is, execute the coroutine
by running a new event loop in a separate thread (submit a callable that calls
asyncio.run(g.run(...))) so the current loop isn't touched; otherwise keep using
asyncio.run directly. Update the call site that currently wraps g.run(lambda:
asyncio.to_thread(func, ...), input_mapper=...) to branch: when no loop is
running use asyncio.run(...), when a loop is running run the same coroutine
inside a new thread via Thread/Executor and wait for that thread/future result.
Ensure you reference the same g.run invocation and the lambda that calls
asyncio.to_thread(func, *args, **kwargs).
🧹 Nitpick comments (5)
packages/sample-app/sample_app/guardrails/decorator_example.py (1)

22-22: Use consistent environment variable name for OpenAI API key.

This file uses OPENAI_KEY while custom_evaluator_guard.py uses OPENAI_API_KEY. The standard convention (and OpenAI SDK default) is OPENAI_API_KEY.

🔧 Proposed fix
-openai_client = AsyncOpenAI(api_key=os.getenv("OPENAI_KEY"))
+openai_client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
packages/sample-app/sample_app/guardrails/custom_evaluator_guard.py (1)

42-44: Consider removing or using MedicalAdviceInput.

The MedicalAdviceInput model is defined but never instantiated or used. If it's intended as documentation for the evaluator's expected schema, consider adding a comment clarifying this, or remove it to avoid confusion.

packages/traceloop-sdk/traceloop/sdk/decorators/__init__.py (1)

167-168: Inconsistent union type syntax.

Line 167 uses the Python 3.10+ | syntax (InputMapper | None) while Line 168 uses Union[...]. Consider using consistent syntax throughout.

♻️ Use consistent typing syntax
 def guardrail(
     *guards: Guard,
-    input_mapper: InputMapper | None = None,
-    on_failure: Union[OnFailureHandler, str, None] = None,
+    input_mapper: Optional[InputMapper] = None,
+    on_failure: Optional[Union[OnFailureHandler, str]] = None,
     name: str = "",
 ) -> Callable[[Callable[_P, _R]], Callable[_P, _R]]:

Or use Python 3.10+ syntax consistently:

 def guardrail(
     *guards: Guard,
-    input_mapper: InputMapper | None = None,
-    on_failure: Union[OnFailureHandler, str, None] = None,
+    input_mapper: InputMapper | None = None,
+    on_failure: OnFailureHandler | str | None = None,
     name: str = "",
 ) -> Callable[[Callable[_P, _R]], Callable[_P, _R]]:
packages/traceloop-sdk/tests/guardrails/test_guardrail_decorator.py (2)

214-260: Consider verifying on_failure handler behavior more precisely.

The tests verify that on_failure is callable but don't verify the actual behavior (e.g., that the default handler raises an exception or that the string handler returns the correct value). While this is acceptable for unit testing the decorator wiring, consider adding an integration test that verifies the handler behavior when guards actually fail.


309-331: Missing test for sync function execution.

The TestGuardrailDecoratorSyncSupport class only tests metadata preservation but doesn't include a test that actually executes a sync function through the guardrail decorator. Consider adding a test similar to test_decorator_passes_through_result_when_guards_pass but for sync functions.

💡 Suggested test for sync function execution
def test_sync_decorator_passes_through_result_when_guards_pass(self):
    """Decorator returns sync function result when all guards pass."""
    mock_guardrails = MagicMock()
    mock_guardrails.create.return_value = mock_guardrails
    mock_guardrails.run = AsyncMock(return_value="guarded result")

    mock_client = MagicMock()
    mock_client.guardrails = mock_guardrails

    with patch("traceloop.sdk.Traceloop") as mock_traceloop:
        mock_traceloop.get.return_value = mock_client

        `@guardrail`(lambda z: True, on_failure=OnFailure.raise_exception())
        def my_sync_function(prompt: str) -> str:
            return f"Response to: {prompt}"

        result = my_sync_function("Hello")

    assert result == "guarded result"
    mock_guardrails.create.assert_called_once()
    mock_guardrails.run.assert_awaited_once()

Comment on lines +109 to +122
async def attempt_diagnosis_request() -> str:
"""Generate response to diagnosis request (will be blocked)."""
user_question = "I have chest pain, shortness of breath, and dizziness. Do I have a heart attack?"

completion = await openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "user",
"content": user_question,
}
],
)
return completion.choices[0].message.content
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Add null fallback for message content.

Line 122 returns message.content without a fallback, unlike line 85 which uses or "". If content is None, line 134's slicing will raise a TypeError.

🛡️ Proposed fix
-        return completion.choices[0].message.content
+        return completion.choices[0].message.content or ""
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async def attempt_diagnosis_request() -> str:
"""Generate response to diagnosis request (will be blocked)."""
user_question = "I have chest pain, shortness of breath, and dizziness. Do I have a heart attack?"
completion = await openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "user",
"content": user_question,
}
],
)
return completion.choices[0].message.content
async def attempt_diagnosis_request() -> str:
"""Generate response to diagnosis request (will be blocked)."""
user_question = "I have chest pain, shortness of breath, and dizziness. Do I have a heart attack?"
completion = await openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "user",
"content": user_question,
}
],
)
return completion.choices[0].message.content or ""
🤖 Prompt for AI Agents
In `@packages/sample-app/sample_app/guardrails/custom_evaluator_guard.py` around
lines 109 - 122, The function attempt_diagnosis_request returns
completion.choices[0].message.content without a null fallback, which can cause a
TypeError if content is None; update attempt_diagnosis_request to mirror the
earlier handler by returning completion.choices[0].message.content or "" (or an
explicit empty string fallback) so downstream slicing won't fail, and ensure you
reference the same openai_client.chat.completions.create result handling used
elsewhere.

Comment on lines +30 to +39
async def generate_response(user_prompt: str) -> str:
"""Generate LLM response - automatically guarded by decorator."""
completion = await openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": user_prompt},
],
)
return completion.choices[0].message.content
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Add null fallback for message content.

completion.choices[0].message.content can be None (e.g., when the model uses tool calls). The function signature declares -> str but could return None, causing a TypeError at line 58 when slicing.

🛡️ Proposed fix
-    return completion.choices[0].message.content
+    return completion.choices[0].message.content or ""
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async def generate_response(user_prompt: str) -> str:
"""Generate LLM response - automatically guarded by decorator."""
completion = await openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": user_prompt},
],
)
return completion.choices[0].message.content
async def generate_response(user_prompt: str) -> str:
"""Generate LLM response - automatically guarded by decorator."""
completion = await openai_client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": user_prompt},
],
)
return completion.choices[0].message.content or ""
🤖 Prompt for AI Agents
In `@packages/sample-app/sample_app/guardrails/decorator_example.py` around lines
30 - 39, The function generate_response should never return None even if
completion.choices[0].message.content is None; update the return behavior in
generate_response (after the await openai_client.chat.completions.create call)
to safely coalesce completion.choices[0].message.content to a string fallback
(e.g., empty string or a descriptive fallback) so the declared return type ->
str is honored and downstream slicing won't raise a TypeError.

Comment on lines +244 to +250
# Run async guardrail in event loop for sync functions
return asyncio.run(
g.run(
lambda: asyncio.to_thread(func, *args, **kwargs),
input_mapper=input_mapper,
)
)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

asyncio.run() will fail if called from a running event loop.

The sync wrapper uses asyncio.run() which raises RuntimeError when called from within an existing event loop (common in Jupyter notebooks, GUI applications, or nested async contexts).

🛠️ Proposed fix to handle existing event loops
             `@wraps`(func)
             def sync_wrapper(*args: _P.args, **kwargs: _P.kwargs) -> _R:
                 from traceloop.sdk import Traceloop

                 client = Traceloop.get()
                 g = client.guardrails.create(
                     guards=guards_list,
                     on_failure=failure_handler,
                     name=name or func.__name__,
                 )
-                # Run async guardrail in event loop for sync functions
-                return asyncio.run(
-                    g.run(
-                        lambda: asyncio.to_thread(func, *args, **kwargs),
-                        input_mapper=input_mapper,
-                    )
-                )
+                coro = g.run(
+                    lambda: asyncio.to_thread(func, *args, **kwargs),
+                    input_mapper=input_mapper,
+                )
+                # Handle case when called from existing event loop
+                try:
+                    loop = asyncio.get_running_loop()
+                except RuntimeError:
+                    loop = None
+                
+                if loop is not None:
+                    # Already in an event loop - use thread to run new loop
+                    import concurrent.futures
+                    with concurrent.futures.ThreadPoolExecutor() as executor:
+                        future = executor.submit(asyncio.run, coro)
+                        return future.result()
+                else:
+                    return asyncio.run(coro)

             return sync_wrapper  # type: ignore[return-value]
🤖 Prompt for AI Agents
In `@packages/traceloop-sdk/traceloop/sdk/decorators/__init__.py` around lines 244
- 250, The sync wrapper uses asyncio.run(...) around g.run(...) which raises
RuntimeError if an event loop is already running; detect whether an event loop
is running (asyncio.get_event_loop().is_running() or asyncio.get_running_loop()
with try/except) and, if it is, execute the coroutine by running a new event
loop in a separate thread (submit a callable that calls asyncio.run(g.run(...)))
so the current loop isn't touched; otherwise keep using asyncio.run directly.
Update the call site that currently wraps g.run(lambda: asyncio.to_thread(func,
...), input_mapper=...) to branch: when no loop is running use asyncio.run(...),
when a loop is running run the same coroutine inside a new thread via
Thread/Executor and wait for that thread/future result. Ensure you reference the
same g.run invocation and the lambda that calls asyncio.to_thread(func, *args,
**kwargs).

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@packages/traceloop-sdk/traceloop/sdk/guardrail/guards.py`:
- Around line 36-90: The guard_fn currently uses dict(input_data) as a
last-resort which raises TypeError for non-mapping/non-iterable types; update
the input conversion in _create_guard -> guard_fn to explicitly handle mappings
and pydantic models and raise a clear TypeError for unsupported types: use
isinstance(input_data, Mapping) to assign input_dict, elif hasattr(input_data,
"model_dump") to call model_dump(), else raise TypeError(f"Unsupported input
type for guard '{evaluator_slug}': {type(input_data).__name__}") (remove the
fallback dict(input_data)).
🧹 Nitpick comments (1)
packages/traceloop-sdk/traceloop/sdk/decorators/__init__.py (1)

165-170: Inconsistent type annotation syntax.

Line 167 uses | syntax while line 168 uses Union[...]. Consider using a consistent style throughout.

♻️ Suggested fix for consistency
 def guardrail(
     *guards: Guard,
-    input_mapper: InputMapper | None = None,
-    on_failure: Union[OnFailureHandler, str, None] = None,
+    input_mapper: Optional[InputMapper] = None,
+    on_failure: Optional[Union[OnFailureHandler, str]] = None,
     name: str = "",
 ) -> Callable[[Callable[_P, _R]], Callable[_P, _R]]:

Comment on lines +36 to +90
def _create_guard(
evaluator_details: EvaluatorDetails,
condition: Callable[[Any], bool],
timeout_in_sec: int = 60,
) -> Guard:
"""
Convert an EvaluatorDetails to a guard function.

Args:
evaluator_details: The evaluator configuration
condition: Function that receives evaluator result and returns bool.
True = pass, False = fail.
timeout_in_sec: Maximum time to wait for evaluator execution

Returns:
Async function suitable for client.guardrails.create(guards=[...])
"""

evaluator_slug = evaluator_details.slug
evaluator_version = evaluator_details.version
evaluator_config = evaluator_details.config
condition_field = evaluator_details.condition_field

async def guard_fn(input_data: Any) -> bool:
from traceloop.sdk import Traceloop
from traceloop.sdk.evaluator.evaluator import Evaluator

# Convert Pydantic model to dict, or use dict directly
if isinstance(input_data, dict):
input_dict = input_data
elif hasattr(input_data, "model_dump"):
input_dict = input_data.model_dump()
else:
input_dict = dict(input_data)

client = Traceloop.get()
evaluator = Evaluator(client._async_http)

eval_response = await evaluator.run(
evaluator_slug=evaluator_slug,
input=input_dict,
evaluator_version=evaluator_version,
evaluator_config=evaluator_config,
timeout_in_sec=timeout_in_sec,
)

if condition_field:
result_to_validate = eval_response.result.evaluator_result[condition_field]
else:
result_to_validate = eval_response.result.evaluator_result

return condition(result_to_validate)

guard_fn.__name__ = evaluator_slug
return guard_fn
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Potential TypeError on line 69 for non-mapping input types.

The fallback dict(input_data) will raise TypeError if input_data is not iterable as key-value pairs (e.g., a string, number, or custom object without __iter__).

Proposed fix with explicit error handling
         # Convert Pydantic model to dict, or use dict directly
         if isinstance(input_data, dict):
             input_dict = input_data
         elif hasattr(input_data, "model_dump"):
             input_dict = input_data.model_dump()
         else:
-            input_dict = dict(input_data)
+            try:
+                input_dict = dict(input_data)
+            except (TypeError, ValueError) as e:
+                raise TypeError(
+                    f"Guard input must be a dict, Pydantic model, or dict-convertible type, "
+                    f"got {type(input_data).__name__}"
+                ) from e
🤖 Prompt for AI Agents
In `@packages/traceloop-sdk/traceloop/sdk/guardrail/guards.py` around lines 36 -
90, The guard_fn currently uses dict(input_data) as a last-resort which raises
TypeError for non-mapping/non-iterable types; update the input conversion in
_create_guard -> guard_fn to explicitly handle mappings and pydantic models and
raise a clear TypeError for unsupported types: use isinstance(input_data,
Mapping) to assign input_dict, elif hasattr(input_data, "model_dump") to call
model_dump(), else raise TypeError(f"Unsupported input type for guard
'{evaluator_slug}': {type(input_data).__name__}") (remove the fallback
dict(input_data)).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant