Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
2 changes: 1 addition & 1 deletion .cursor/rules/prompt-templates.mdc
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
description:
globs: **/pipelex_libraries/pipelines/toml/**/*.toml
globs: pipelex/libraries/pipelines/**/*.toml
alwaysApply: false
---
This rule explains how to write prompt templates in PipeLLM definitions. These prompts will be rendered and become the user_text or the system_prompt part of LLM chat completion queries.
Expand Down
5 changes: 1 addition & 4 deletions .cursor/rules/pytest.mdc
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,7 @@ pipe_output: PipeOutput = await pipe_router.run_pipe(
pipe_code="pipe_name",
pipe_run_params=PipeRunParamsFactory.make_run_params(),
working_memory=working_memory,
job_metadata=JobMetadata(

top_job_id=cast(str, request.node.originalname), # type: ignore
),
job_metadata=JobMetadata(),
)
```

Expand Down
4 changes: 4 additions & 0 deletions .cursor/rules/standards.mdc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ alwaysApply: true

This document outlines the coding standards and quality control procedures that must be followed when contributing to this project.

## Style

Always use type hints. Use the types with Uppercase first letter for types like Dict[], List[] etc.

## Code Quality Checks

### Linting and Type Checking
Expand Down
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Changelog

## [v0.4.7] - 2025-06-26

- Added an API serializer: introducing the `compact_memory`, a new way to encode/decode the working memory as json, for the API.
- Added `StorageProviderAbstract`
- When creating a Concept with no structure specified and no explicit `refines`, set it to refine `native.Text`
- `JobMetadata`: added `job_name`. Removed `top_job_id` and `wfid`
- `PipeOutput`: added `pipeline_run_id`

## [v0.4.6] - 2025-06-24

- Changed the link to the doc in the `README.md`: https://docs.pipelex.com
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -406,11 +406,11 @@ li: lock install

check-TODOs: env
$(call PRINT_TITLE,"Checking for TODOs")
$(VENV_RUFF) check --select=TD -v .
$(VENV_RUFF) check --select=TD .

fix-unused-imports: env
$(call PRINT_TITLE,"Fixing unused imports")
$(VENV_RUFF) check --select=F401 --fix -v .
$(VENV_RUFF) check --select=F401 --fix .

doc: env
$(call PRINT_TITLE,"Serving documentation with mkdocs")
Expand Down
4 changes: 2 additions & 2 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ extra:
link: https://github.com/Pipelex/pipelex
name: Pipelex on GitHub
- icon: fontawesome/brands/python
link: https://pypi.org/project/kajson/
name: kajson on PyPI
link: https://pypi.org/project/pipelex/
name: pipelex on PyPI
- icon: fontawesome/brands/twitter
link: https://x.com/PipelexAI
name: Pipelex on X
Expand Down
124 changes: 124 additions & 0 deletions pipelex/client/api_serializer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
from datetime import datetime
from decimal import Decimal
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, cast

from pipelex.client.protocol import CompactMemory
from pipelex.core.concept_native import NativeConcept
from pipelex.core.pipe_output import PipeOutput
from pipelex.core.stuff_content import StuffContent, TextContent
from pipelex.core.stuff_factory import StuffContentFactory
from pipelex.core.working_memory import WorkingMemory
from pipelex.exceptions import ApiSerializationError


class ApiSerializer:
"""Handles API-specific serialization with kajson, datetime formatting, and cleanup."""

# Fixed datetime format for API consistency
API_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S"
FIELDS_TO_SKIP = ("__class__", "__module__")

@classmethod
def serialize_working_memory_for_api(cls, working_memory: WorkingMemory) -> CompactMemory:
"""
Convert WorkingMemory to API-ready format using kajson with proper datetime handling.

Args:
working_memory: The WorkingMemory to serialize

Returns:
Dict ready for API transmission with datetime strings and no __class__/__module__
"""
compact_memory: CompactMemory = {}

for stuff_name, stuff in working_memory.root.items():
if stuff.concept_code == NativeConcept.TEXT.code:
stuff_content = cast(TextContent, stuff.content)
item_dict: Dict[str, Any] = {
"concept_code": stuff.concept_code,
"content": stuff_content.text,
}
else:
content_dict = stuff.content.model_dump(serialize_as_any=True)
clean_content = cls._clean_and_format_content(content_dict)

item_dict = {
"concept_code": stuff.concept_code,
"content": clean_content,
}

compact_memory[stuff_name] = item_dict

return compact_memory

@classmethod
def serialize_pipe_output_for_api(cls, pipe_output: PipeOutput) -> CompactMemory:
"""
Convert PipeOutput to API-ready format.

Args:
pipe_output: The PipeOutput to serialize

Returns:
Dict ready for API transmission
"""
return {"compact_memory": cls.serialize_working_memory_for_api(pipe_output.working_memory)}

@classmethod
def _clean_and_format_content(cls, content: Any) -> Any:
"""
Recursively clean content by removing the fields in FIELDS_TO_SKIP and formatting datetimes.

Args:
content: Content to clean

Returns:
Cleaned content with formatted datetimes
"""
if isinstance(content, dict):
cleaned: Dict[str, Any] = {}
content_dict = cast(Dict[str, Any], content)
for key in content_dict:
if key in cls.FIELDS_TO_SKIP:
continue
cleaned[key] = cls._clean_and_format_content(content_dict[key])
return cleaned
elif isinstance(content, list):
cleaned_list: List[Any] = []
content_list = cast(List[Any], content)
for idx in range(len(content_list)):
cleaned_list.append(cls._clean_and_format_content(content_list[idx]))
return cleaned_list
elif isinstance(content, datetime):
return content.strftime(cls.API_DATETIME_FORMAT)
elif isinstance(content, Enum):
return content.value # Convert enum to its value
elif isinstance(content, Decimal):
return float(content) # Convert Decimal to float for JSON compatibility
elif isinstance(content, Path):
return str(content) # Convert Path to string representation
else:
return content

@classmethod
def make_stuff_content_from_api_data(cls, concept_code: str, value: Dict[str, Any] | str) -> StuffContent:
"""
Create StuffContent from API data using concept code.

Args:
concept_code: The concept code to determine the content type
value: The content value from API

Returns:
StuffContent instance

Raises:
ApiSerializationError: If concept cannot be resolved or content creation fails
"""
try:
return StuffContentFactory.make_stuffcontent_from_concept_code_with_fallback(concept_code=concept_code, value=value)

except Exception as exc:
raise ApiSerializationError(f"Failed to create StuffContent for concept '{concept_code}': {exc}") from exc
19 changes: 10 additions & 9 deletions pipelex/client/client.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from typing import Any, Optional, cast
from typing import Any, Optional

import httpx
from kajson import kajson
from typing_extensions import override

from pipelex.client.protocol import PipelexProtocol, PipelineRequest, PipelineResponse
from pipelex.client.pipeline_request_factory import PipelineRequestFactory
from pipelex.client.pipeline_response_factory import PipelineResponseFactory
from pipelex.client.protocol import PipelexProtocol, PipelineResponse
from pipelex.core.pipe_run_params import PipeOutputMultiplicity
from pipelex.core.working_memory import WorkingMemory
from pipelex.exceptions import ClientAuthenticationError
Expand Down Expand Up @@ -78,14 +79,14 @@ async def execute_pipeline(
output_multiplicity: Optional[PipeOutputMultiplicity] = None,
dynamic_output_concept_code: Optional[str] = None,
) -> PipelineResponse:
pipeline_request = PipelineRequest(
pipeline_request = PipelineRequestFactory.make_from_working_memory(
working_memory=working_memory,
output_name=output_name,
output_multiplicity=output_multiplicity,
dynamic_output_concept_code=dynamic_output_concept_code,
)
response = await self._make_api_call(f"pipelex/v1/pipeline/{pipe_code}/execute", request=kajson.dumps(pipeline_request))
return cast(PipelineResponse, kajson.loads(response))
response = await self._make_api_call(f"v1/pipeline/{pipe_code}/execute", request=pipeline_request.model_dump_json())
return PipelineResponseFactory.make_from_api_response(response)

@override
async def start_pipeline(
Expand All @@ -96,11 +97,11 @@ async def start_pipeline(
output_multiplicity: Optional[PipeOutputMultiplicity] = None,
dynamic_output_concept_code: Optional[str] = None,
) -> PipelineResponse:
pipeline_request = PipelineRequest(
pipeline_request = PipelineRequestFactory.make_from_working_memory(
working_memory=working_memory,
output_name=output_name,
output_multiplicity=output_multiplicity,
dynamic_output_concept_code=dynamic_output_concept_code,
)
response = await self._make_api_call(f"pipelex/v1/pipeline/{pipe_code}/start", request=kajson.dumps(pipeline_request))
return cast(PipelineResponse, kajson.loads(response))
response = await self._make_api_call(f"v1/pipeline/{pipe_code}/start", request=pipeline_request.model_dump_json())
return PipelineResponseFactory.make_from_api_response(response)
89 changes: 89 additions & 0 deletions pipelex/client/pipeline_request_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
from typing import Any, Dict, Optional

from pipelex.client.api_serializer import ApiSerializer
from pipelex.client.protocol import COMPACT_MEMORY_KEY, CompactMemory, PipelineRequest
from pipelex.core.pipe_run_params import PipeOutputMultiplicity
from pipelex.core.stuff_factory import StuffFactory
from pipelex.core.working_memory import WorkingMemory
from pipelex.core.working_memory_factory import WorkingMemoryFactory


class PipelineRequestFactory:
"""Factory class for creating PipelineRequest objects from WorkingMemory."""

@staticmethod
def make_from_working_memory(
working_memory: Optional[WorkingMemory] = None,
output_name: Optional[str] = None,
output_multiplicity: Optional[PipeOutputMultiplicity] = None,
dynamic_output_concept_code: Optional[str] = None,
) -> PipelineRequest:
"""
Create a PipelineRequest from a WorkingMemory object.

Args:
working_memory: The WorkingMemory to convert
output_name: Name of the output slot to write to
output_multiplicity: Output multiplicity setting
dynamic_output_concept_code: Override for the dynamic output concept code

Returns:
PipelineRequest with the working memory serialized to reduced format
"""
compact_memory = None
if working_memory is not None:
compact_memory = ApiSerializer.serialize_working_memory_for_api(working_memory)

return PipelineRequest(
compact_memory=compact_memory,
output_name=output_name,
output_multiplicity=output_multiplicity,
dynamic_output_concept_code=dynamic_output_concept_code,
)

@staticmethod
def make_working_memory_from_reduced(compact_memory: Optional[CompactMemory]) -> WorkingMemory:
"""
Create a WorkingMemory from a reduced memory dictionary.

Args:
compact_memory: Dictionary in the format from API

Returns:
WorkingMemory object reconstructed from the reduced format
"""
working_memory = WorkingMemoryFactory.make_empty()
if compact_memory is None:
return working_memory

for stuff_key, stuff_data in compact_memory.items():
concept_code = stuff_data.get("concept_code", "")
content_value = stuff_data.get("content", {})

# Use API serializer to create content
content = ApiSerializer.make_stuff_content_from_api_data(concept_code=concept_code, value=content_value)

# Create stuff directly
stuff = StuffFactory.make_stuff(concept_str=concept_code, name=stuff_key, content=content)

working_memory.add_new_stuff(name=stuff_key, stuff=stuff)

return working_memory

@staticmethod
def make_request_from_body(request_body: Dict[str, Any]) -> PipelineRequest:
"""
Create a PipelineRequest from raw request body dictionary.

Args:
request_body: Raw dictionary from API request body

Returns:
PipelineRequest object with dictionary working_memory
"""
return PipelineRequest(
compact_memory=request_body.get(COMPACT_MEMORY_KEY),
output_name=request_body.get("output_name"),
output_multiplicity=request_body.get("output_multiplicity"),
dynamic_output_concept_code=request_body.get("dynamic_output_concept_code"),
)
64 changes: 64 additions & 0 deletions pipelex/client/pipeline_response_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from typing import Any, Dict, Optional

from pipelex.client.api_serializer import ApiSerializer
from pipelex.client.protocol import PipelineResponse, PipelineState
from pipelex.core.pipe_output import PipeOutput


class PipelineResponseFactory:
"""Factory class for creating PipelineResponse objects from PipeOutput."""

@staticmethod
def make_from_pipe_output(
pipe_output: Optional[PipeOutput] = None,
pipeline_run_id: str = "",
created_at: str = "",
pipeline_state: PipelineState = PipelineState.COMPLETED,
finished_at: Optional[str] = None,
status: Optional[str] = "success",
message: Optional[str] = None,
error: Optional[str] = None,
) -> PipelineResponse:
"""
Create a PipelineResponse from a PipeOutput object.

Args:
pipe_output: The PipeOutput to convert
pipeline_run_id: Unique identifier for the pipeline run
created_at: Timestamp when the pipeline was created
pipeline_state: Current state of the pipeline
finished_at: Timestamp when the pipeline finished
status: Status of the API call
message: Optional message providing additional information
error: Optional error message

Returns:
PipelineResponse with the pipe output serialized to reduced format
"""
reduced_output = None
if pipe_output is not None:
reduced_output = ApiSerializer.serialize_pipe_output_for_api(pipe_output=pipe_output)

return PipelineResponse(
pipeline_run_id=pipeline_run_id,
created_at=created_at,
pipeline_state=pipeline_state,
finished_at=finished_at,
pipe_output=reduced_output,
status=status,
message=message,
error=error,
)

@staticmethod
def make_from_api_response(response: Dict[str, Any]) -> PipelineResponse:
"""
Create a PipelineResponse from an API response dictionary.

Args:
response: Dictionary containing the API response data

Returns:
PipelineResponse instance created from the response data
"""
return PipelineResponse(**response)
Loading