Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions src/powermem/core/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
"""

import logging
import os
import warnings
import hashlib
import json
Expand All @@ -29,6 +30,7 @@
from ..intelligence.memory_optimizer import MemoryOptimizer
from ..intelligence.plugin import IntelligentMemoryPlugin, EbbinghausIntelligencePlugin
from ..utils.utils import remove_code_blocks, convert_config_object_to_dict, parse_vision_messages, set_timezone
from ..utils.io import export_to_json, export_to_csv, import_from_json, import_from_csv
from ..prompts.intelligent_memory_prompts import (
FACT_RETRIEVAL_PROMPT,
FACT_EXTRACTION_PROMPT,
Expand Down Expand Up @@ -1952,3 +1954,80 @@ def from_config(cls, config: Optional[Dict[str, Any]] = None, **kwargs):
converted_config = _auto_convert_config(config)

return cls(config=converted_config, **kwargs)

def export_memories(
self,
format: str = "json",
user_id: Optional[str] = None,
agent_id: Optional[str] = None,
run_id: Optional[str] = None,
limit: int = 1000,
) -> str:
"""Export memories to JSON or CSV format.

Args:
format: Export format ("json" or "csv")
user_id: Filter by user ID
agent_id: Filter by agent ID
run_id: Filter by run ID
limit: Maximum number of memories to export

Returns:
str: Exported content string
"""
result = self.get_all(user_id=user_id, agent_id=agent_id, run_id=run_id, limit=limit)
memories = result.get("results", [])

if format.lower() == "json":
return export_to_json(memories)
elif format.lower() == "csv":
return export_to_csv(memories)
else:
raise ValueError(f"Unsupported export format: {format}")

def import_memories(
self,
source: str,
format: str = "json",
user_id: Optional[str] = None,
agent_id: Optional[str] = None,
) -> Dict[str, int]:
"""Import memories from JSON or CSV format.

Args:
source: Content string to import
format: Import format ("json" or "csv")
user_id: Override user ID for imported memories
agent_id: Override agent ID for imported memories

Returns:
Dict with success and failed counts
"""
if format.lower() == "json":
memories = import_from_json(source)
elif format.lower() == "csv":
memories = import_from_csv(source)
else:
raise ValueError(f"Unsupported import format: {format}")

success = 0
failed = 0

for memory in memories:
try:
# Use overridden IDs if provided
mem_user_id = user_id or memory.get('user_id')
mem_agent_id = agent_id or memory.get('agent_id')

self.add(
content=memory['content'],
user_id=mem_user_id,
agent_id=mem_agent_id,
metadata=memory.get('metadata', {}),
)
success += 1
except Exception as e:
logger.error(f"Failed to import memory: {e}")
failed += 1

return {"success": success, "failed": failed}
80 changes: 80 additions & 0 deletions src/powermem/utils/io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
"""
Memory Import/Export utilities

This module provides functions for exporting memories to JSON/CSV
and importing memories from JSON/CSV files.
"""

import json
import csv
import io as io_module
from typing import List, Dict, Any
from datetime import datetime


def export_to_json(memories: List[Dict[str, Any]]) -> str:
"""Export memories to JSON format."""
def default_serializer(obj):
if isinstance(obj, datetime):
return obj.isoformat()
raise TypeError(f"Object of type {type(obj)} is not JSON serializable")
return json.dumps(memories, indent=2, default=default_serializer)


def export_to_csv(memories: List[Dict[str, Any]]) -> str:
"""Export memories to CSV format."""
if not memories:
return ""
output = io_module.StringIO()
fieldnames = ['id', 'content', 'role', 'metadata', 'created_at', 'updated_at']
writer = csv.DictWriter(output, fieldnames=fieldnames)
writer.writeheader()
for memory in memories:
row = {
'id': memory.get('id', ''),
'content': memory.get('content', ''),
'role': memory.get('role', 'user'),
'metadata': json.dumps(memory.get('metadata', {})),
'created_at': str(memory.get('created_at', '')),
'updated_at': str(memory.get('updated_at', ''))
}
writer.writerow(row)
return output.getvalue()


def import_from_json(json_str: str) -> List[Dict[str, Any]]:
"""Import memories from JSON format."""
memories = json.loads(json_str)
result = []
for memory in memories:
if isinstance(memory, dict):
cleaned = {
'id': memory.get('id'),
'content': memory.get('content', ''),
'role': memory.get('role', 'user'),
'metadata': memory.get('metadata', {}),
'created_at': memory.get('created_at'),
'updated_at': memory.get('updated_at')
}
result.append(cleaned)
return result


def import_from_csv(csv_str: str) -> List[Dict[str, Any]]:
"""Import memories from CSV format."""
if not csv_str.strip():
return []
input_stream = io_module.StringIO(csv_str)
reader = csv.DictReader(input_stream)
memories = []
for row in reader:
memory = {
'id': row.get('id'),
'content': row.get('content', ''),
'role': row.get('role', 'user'),
'metadata': json.loads(row.get('metadata', '{}')),
'created_at': row.get('created_at'),
'updated_at': row.get('updated_at')
}
memories.append(memory)
return memories
79 changes: 78 additions & 1 deletion src/server/api/v1/memories.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

import logging
from typing import List, Optional
from fastapi import APIRouter, Depends, Query, Request
from fastapi import APIRouter, Depends, Query, Request, UploadFile, File
from fastapi.responses import Response
from slowapi import Limiter
from slowapi.util import get_remote_address

Expand Down Expand Up @@ -437,3 +438,79 @@ async def delete_memory(
data={"memory_id": memory_id},
message="Memory deleted successfully",
)


@router.get(
"/export",
summary="Export memories",
description="Export memories to JSON or CSV file",
)
@limiter.limit(get_rate_limit_string())
async def export_memories(
request: Request,
format: str = Query("json", description="Export format (json/csv)"),
user_id: Optional[str] = Query(None, description="Filter by user ID"),
agent_id: Optional[str] = Query(None, description="Filter by agent ID"),
run_id: Optional[str] = Query(None, description="Filter by run ID"),
limit: int = Query(1000, ge=1, le=10000, description="Max memories to export"),
api_key: str = Depends(verify_api_key),
service: MemoryService = Depends(get_memory_service),
):
"""Export memories"""
content = service.memory.export_memories(
format=format,
user_id=user_id,
agent_id=agent_id,
run_id=run_id,
limit=limit,
)

media_type = "application/json" if format.lower() == "json" else "text/csv"
filename = f"memories_export.{format.lower()}"

return Response(
content=content,
media_type=media_type,
headers={"Content-Disposition": f"attachment; filename={filename}"},
)


@router.post(
"/import",
response_model=APIResponse,
summary="Import memories",
description="Import memories from JSON or CSV file",
)
@limiter.limit(get_rate_limit_string())
async def import_memories(
request: Request,
file: UploadFile = File(...),
user_id: Optional[str] = Query(None, description="Override user ID"),
agent_id: Optional[str] = Query(None, description="Override agent ID"),
api_key: str = Depends(verify_api_key),
service: MemoryService = Depends(get_memory_service),
):
"""Import memories"""
content = (await file.read()).decode("utf-8")

# Auto-detect format from filename extension
filename = file.filename or "import.json"
fmt = "json"
if filename.lower().endswith(".csv"):
fmt = "csv"
elif filename.lower().endswith(".json"):
fmt = "json"

result = service.memory.import_memories(
source=content,
format=fmt,
is_file=False,
user_id=user_id,
agent_id=agent_id,
)

return APIResponse(
success=True,
data=result,
message=f"Import completed: {result['success']} success, {result['failed']} failed",
)