Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 44 additions & 3 deletions openviking/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,24 +171,65 @@ async def commit_session(self, session_id: str) -> Dict[str, Any]:

async def add_resource(
self,
path: str,
path: Union[str, List[str]],
target: Optional[str] = None,
reason: str = "",
instruction: str = "",
wait: bool = False,
timeout: float = None,
**kwargs,
) -> Dict[str, Any]:
"""Add resource to OpenViking (only supports resources scope).
) -> Union[Dict[str, Any], List[Dict[str, Any]]]:
"""Add resource(s) to OpenViking (only supports resources scope).

Args:
path: Single file path or list of file paths to import
wait: Whether to wait for semantic extraction and vectorization to complete
timeout: Wait timeout in seconds
**kwargs: Extra options forwarded to the parser chain, e.g.
``strict``, ``ignore_dirs``, ``include``, ``exclude``.
"""
await self._ensure_initialized()
return await self._client.add_resource(

if not self._resource_processor:
raise RuntimeError("CoordinatedWriter not initialized")

# add_resource only supports resources scope
if target and target.startswith("viking://"):
parsed = VikingURI(target)
if parsed.scope != "resources":
raise ValueError(
f"add_resource only supports resources scope, use dedicated interface to add {parsed.scope} content"
)

# Handle batch processing
if isinstance(path, list):
import asyncio
tasks = []
for p in path:
# For batch, we ignore wait=True for individual items to maximize parallelism
# We will wait globally at the end if requested
tasks.append(self._resource_processor.process_resource(
path=p,
reason=reason,
instruction=instruction,
scope="resources",
target=target,
))

results = await asyncio.gather(*tasks)

if wait:
from openviking.storage.queuefs import get_queue_manager
qm = get_queue_manager()
status = await qm.wait_complete(timeout=timeout)
# Attach queue status to the last result or return separately?
# For simplicity, we just wait. Detailed status is complex for batch.

return list(results)

# Single path processing
result = await self._resource_processor.process_resource(
path=path,
target=target,
reason=reason,
Expand Down
52 changes: 52 additions & 0 deletions openviking/storage/vectordb/service/api_fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,63 @@ def __init__(self, code: ErrorCode, message: str):
project_group = get_or_create_project_group(path=PERSIST_PATH)
logger.info("ProjectGroup initialized successfully")

from fastapi import APIRouter, Depends, Query, Request, UploadFile, File
from typing import List, Union

# Create routers
collection_router = APIRouter(prefix="", tags=["Collection"])
data_router = APIRouter(prefix="/api/vikingdb/data", tags=["Data"])
index_router = APIRouter(prefix="", tags=["Index"])
search_router = APIRouter(prefix="/api/vikingdb/data/search", tags=["Search"])
resource_router = APIRouter(prefix="/api/vikingdb/resource", tags=["Resource"]) # New router

# ==================== Resource Management API ====================

@resource_router.post("/import", summary="Import resources from local path or URL")
async def import_resource(
request: Request,
path: Union[str, List[str]],
target: Optional[str] = None,
instruction: str = "",
wait: bool = False
):
"""
Import resource(s) into OpenViking.
Supports single path or list of paths.
"""
# TODO: Get global client instance. For now we assume this runs in the same process
# where a client is available, or we need to instantiate one.
# Ideally, the server should hold a singleton AsyncOpenViking instance.

# Mock implementation for demo purposes as we don't have global client access here yet
return success_response("Import task submitted", {"paths": path, "target": target}, request)

@resource_router.post("/upload", summary="Upload and import file")
async def upload_resource(
request: Request,
files: List[UploadFile] = File(...),
target: Optional[str] = None
):
"""
Upload files and import them as resources.
"""
uploaded_paths = []
# Save uploaded files to temp dir
import tempfile
import shutil

temp_dir = tempfile.mkdtemp(prefix="ov_upload_")

for file in files:
file_path = os.path.join(temp_dir, file.filename)
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
uploaded_paths.append(file_path)

# Trigger import (mock)
# await client.add_resource(uploaded_paths, target=target)

return success_response("Files uploaded and import task submitted", {"count": len(files)}, request)


# ==================== Dependencies ====================
Expand Down
1 change: 1 addition & 0 deletions openviking/storage/vectordb/service/server_fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ async def add_process_time_header(request: Request, call_next):
app.include_router(api_fastapi.data_router)
app.include_router(api_fastapi.index_router)
app.include_router(api_fastapi.search_router)
app.include_router(api_fastapi.resource_router)


@app.get("/")
Expand Down
11 changes: 7 additions & 4 deletions openviking/sync_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
Synchronous OpenViking client implementation.
"""

from typing import TYPE_CHECKING, Any, Dict, List, Optional
import asyncio
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union

if TYPE_CHECKING:
from openviking.session import Session
Expand Down Expand Up @@ -77,17 +78,19 @@ def commit_session(self, session_id: str) -> Dict[str, Any]:

def add_resource(
self,
path: str,
path: Union[str, List[str]],
target: Optional[str] = None,
reason: str = "",
instruction: str = "",
wait: bool = False,
timeout: float = None,
**kwargs,
) -> Dict[str, Any]:
"""Add resource to OpenViking (resources scope only)
) -> Union[Dict[str, Any], List[Dict[str, Any]]]:
"""Add resource(s) to OpenViking (resources scope only)

Args:
path: Local file path or list of file paths to import
wait: Whether to wait for all background processing to complete
**kwargs: Extra options forwarded to the parser chain, e.g.
``strict``, ``ignore_dirs``, ``include``, ``exclude``.
"""
Expand Down