Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/ov.conf.example
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
"mineru_timeout": 300.0
},
"code": {
"enable_ast": true,
"code_summary_mode": "ast",
"extract_functions": true,
"extract_classes": true,
"extract_imports": true,
Expand Down
29 changes: 29 additions & 0 deletions openviking/parse/parsers/code/ast/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd.
# SPDX-License-Identifier: Apache-2.0
"""Public API for AST-based code skeleton extraction."""

from typing import Optional

from openviking.parse.parsers.code.ast.extractor import get_extractor


def extract_skeleton(file_name: str, content: str, verbose: bool = False) -> Optional[str]:
"""Extract a skeleton from source code.

Supports Python, JS/TS, Java, C/C++, Rust, Go via tree-sitter.
Returns None for unsupported languages or on extraction failure,
signalling the caller to fall back to LLM.

Args:
file_name: File name with extension (used for language detection).
content: Source code content.
verbose: If True, include full docstrings (for ast_llm / LLM input).
If False, only first line of each docstring (for ast / embedding).

Returns:
Plain-text skeleton string, or None if unsupported / failed.
"""
return get_extractor().extract_skeleton(file_name, content, verbose=verbose)


__all__ = ["extract_skeleton"]
107 changes: 107 additions & 0 deletions openviking/parse/parsers/code/ast/extractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd.
# SPDX-License-Identifier: Apache-2.0
"""ASTExtractor: language detection + dispatch to per-language extractors."""

import importlib
import logging
from pathlib import Path
from typing import Dict, Optional

from openviking.parse.parsers.code.ast.languages.base import LanguageExtractor
from openviking.parse.parsers.code.ast.skeleton import CodeSkeleton

logger = logging.getLogger(__name__)

# File extension → internal language key
_EXT_MAP: Dict[str, str] = {
".py": "python",
".js": "javascript",
".jsx": "javascript",
".ts": "typescript",
".tsx": "typescript",
".java": "java",
".c": "cpp",
".cpp": "cpp",
".cc": "cpp",
".h": "cpp",
".hpp": "cpp",
".rs": "rust",
".go": "go",
}

# Language key → (module path, class name, constructor kwargs)
_EXTRACTOR_REGISTRY: Dict[str, tuple] = {
"python": ("openviking.parse.parsers.code.ast.languages.python", "PythonExtractor", {}),
"javascript": ("openviking.parse.parsers.code.ast.languages.js_ts", "JsTsExtractor", {"lang": "javascript"}),
"typescript": ("openviking.parse.parsers.code.ast.languages.js_ts", "JsTsExtractor", {"lang": "typescript"}),
"java": ("openviking.parse.parsers.code.ast.languages.java", "JavaExtractor", {}),
"cpp": ("openviking.parse.parsers.code.ast.languages.cpp", "CppExtractor", {}),
"rust": ("openviking.parse.parsers.code.ast.languages.rust", "RustExtractor", {}),
"go": ("openviking.parse.parsers.code.ast.languages.go", "GoExtractor", {}),
}


class ASTExtractor:
"""Dispatches to per-language tree-sitter extractors for supported languages.
Unsupported languages return None, signalling the caller to fall back to LLM.
"""

def __init__(self):
self._cache: Dict[str, Optional[LanguageExtractor]] = {}

def _detect_language(self, file_name: str) -> Optional[str]:
suffix = Path(file_name).suffix.lower()
return _EXT_MAP.get(suffix)

def _get_extractor(self, lang: Optional[str]) -> Optional[LanguageExtractor]:
if lang is None or lang not in _EXTRACTOR_REGISTRY:
return None

if lang in self._cache:
return self._cache[lang]

module_path, class_name, kwargs = _EXTRACTOR_REGISTRY[lang]
try:
mod = importlib.import_module(module_path)
cls = getattr(mod, class_name)
extractor = cls(**kwargs)
self._cache[lang] = extractor
return extractor
except Exception as e:
logger.warning("AST extractor unavailable for language '%s', falling back to LLM: %s", lang, e)
self._cache[lang] = None
return None

def extract_skeleton(self, file_name: str, content: str, verbose: bool = False) -> Optional[str]:
"""Extract skeleton text from source code.
Returns None for unsupported languages or on extraction failure,
signalling the caller to fall back to LLM.
Args:
verbose: If True, include full docstrings (for ast_llm / LLM input).
If False, only first line of each docstring (for ast / embedding).
"""
lang = self._detect_language(file_name)
extractor = self._get_extractor(lang)
if extractor is None:
return None

try:
skeleton: CodeSkeleton = extractor.extract(file_name, content)
return skeleton.to_text(verbose=verbose)
except Exception as e:
logger.warning("AST extraction failed for '%s' (language: %s), falling back to LLM: %s", file_name, lang, e)
return None


# Module-level singleton
_extractor: Optional[ASTExtractor] = None


def get_extractor() -> ASTExtractor:
global _extractor
if _extractor is None:
_extractor = ASTExtractor()
return _extractor
2 changes: 2 additions & 0 deletions openviking/parse/parsers/code/ast/languages/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd.
# SPDX-License-Identifier: Apache-2.0
13 changes: 13 additions & 0 deletions openviking/parse/parsers/code/ast/languages/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd.
# SPDX-License-Identifier: Apache-2.0
"""Abstract base class for language-specific AST extractors."""

from abc import ABC, abstractmethod

from openviking.parse.parsers.code.ast.skeleton import CodeSkeleton


class LanguageExtractor(ABC):
@abstractmethod
def extract(self, file_name: str, content: str) -> CodeSkeleton:
"""Extract code skeleton from source. Raises on unrecoverable error."""
169 changes: 169 additions & 0 deletions openviking/parse/parsers/code/ast/languages/cpp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd.
# SPDX-License-Identifier: Apache-2.0
"""C/C++ AST extractor using tree-sitter-cpp."""

from typing import List

from openviking.parse.parsers.code.ast.languages.base import LanguageExtractor
from openviking.parse.parsers.code.ast.skeleton import ClassSkeleton, CodeSkeleton, FunctionSig


def _node_text(node, content_bytes: bytes) -> str:
return content_bytes[node.start_byte:node.end_byte].decode("utf-8", errors="replace")


def _parse_block_comment(raw: str) -> str:
"""Strip /** ... */ markers and leading * from each line."""
raw = raw.strip()
if raw.startswith("/**"):
raw = raw[3:]
elif raw.startswith("/*"):
raw = raw[2:]
if raw.endswith("*/"):
raw = raw[:-2]
lines = [l.strip().lstrip("*").strip() for l in raw.split("\n")]
return "\n".join(l for l in lines if l).strip()


def _preceding_doc(siblings: list, idx: int, content_bytes: bytes) -> str:
"""Return Doxygen block comment immediately before siblings[idx], or ''."""
if idx == 0:
return ""
prev = siblings[idx - 1]
if prev.type == "comment":
return _parse_block_comment(_node_text(prev, content_bytes))
return ""


def _extract_function_declarator(node, content_bytes: bytes):
name = ""
params = ""
for child in node.children:
if child.type in ("identifier", "field_identifier") and not name:
name = _node_text(child, content_bytes)
elif child.type == "qualified_identifier" and not name:
name = _node_text(child, content_bytes)
elif child.type == "function_declarator":
n, p = _extract_function_declarator(child, content_bytes)
if n:
name = n
if p:
params = p
elif child.type == "parameter_list":
raw = _node_text(child, content_bytes).strip()
if raw.startswith("(") and raw.endswith(")"):
raw = raw[1:-1]
params = raw.strip()
return name, params


def _extract_function(node, content_bytes: bytes, docstring: str = "") -> FunctionSig:
name = ""
params = ""
return_type = ""

for child in node.children:
if child.type == "function_declarator":
name, params = _extract_function_declarator(child, content_bytes)
elif child.type in ("type_specifier", "primitive_type", "type_identifier",
"qualified_identifier", "auto"):
if not return_type:
return_type = _node_text(child, content_bytes)
elif child.type == "pointer_declarator":
for sub in child.children:
if sub.type == "function_declarator":
name, params = _extract_function_declarator(sub, content_bytes)

return FunctionSig(name=name, params=params, return_type=return_type, docstring=docstring)


def _extract_class(node, content_bytes: bytes, docstring: str = "") -> ClassSkeleton:
name = ""
bases: List[str] = []
body_node = None

for child in node.children:
if child.type == "type_identifier" and not name:
name = _node_text(child, content_bytes)
elif child.type == "base_class_clause":
for sub in child.children:
if sub.type == "type_identifier":
bases.append(_node_text(sub, content_bytes))
elif child.type == "field_declaration_list":
body_node = child

methods: List[FunctionSig] = []
if body_node:
siblings = list(body_node.children)
for idx, child in enumerate(siblings):
if child.type == "function_definition":
doc = _preceding_doc(siblings, idx, content_bytes)
methods.append(_extract_function(child, content_bytes, docstring=doc))
elif child.type in ("declaration", "field_declaration"):
ret_type = ""
fn_name = ""
fn_params = ""
for sub in child.children:
if sub.type in ("type_specifier", "primitive_type", "type_identifier",
"qualified_identifier") and not ret_type:
ret_type = _node_text(sub, content_bytes)
elif sub.type == "function_declarator":
fn_name, fn_params = _extract_function_declarator(sub, content_bytes)
break
if fn_name:
doc = _preceding_doc(siblings, idx, content_bytes)
methods.append(FunctionSig(name=fn_name, params=fn_params, return_type=ret_type, docstring=doc))

return ClassSkeleton(name=name, bases=bases, docstring=docstring, methods=methods)


class CppExtractor(LanguageExtractor):
def __init__(self):
import tree_sitter_cpp as tscpp
from tree_sitter import Language, Parser

self._language = Language(tscpp.language())
self._parser = Parser(self._language)

def extract(self, file_name: str, content: str) -> CodeSkeleton:
content_bytes = content.encode("utf-8")
tree = self._parser.parse(content_bytes)
root = tree.root_node

imports: List[str] = []
classes: List[ClassSkeleton] = []
functions: List[FunctionSig] = []

siblings = list(root.children)
for idx, child in enumerate(siblings):
if child.type == "preproc_include":
for sub in child.children:
if sub.type in ("string_literal", "system_lib_string"):
raw = _node_text(sub, content_bytes).strip().strip('"<>')
imports.append(raw)
elif child.type in ("class_specifier", "struct_specifier"):
doc = _preceding_doc(siblings, idx, content_bytes)
classes.append(_extract_class(child, content_bytes, docstring=doc))
elif child.type == "function_definition":
doc = _preceding_doc(siblings, idx, content_bytes)
functions.append(_extract_function(child, content_bytes, docstring=doc))
elif child.type == "namespace_definition":
for sub in child.children:
if sub.type == "declaration_list":
inner = list(sub.children)
for i2, s2 in enumerate(inner):
if s2.type in ("class_specifier", "struct_specifier"):
doc = _preceding_doc(inner, i2, content_bytes)
classes.append(_extract_class(s2, content_bytes, docstring=doc))
elif s2.type == "function_definition":
doc = _preceding_doc(inner, i2, content_bytes)
functions.append(_extract_function(s2, content_bytes, docstring=doc))

return CodeSkeleton(
file_name=file_name,
language="C/C++",
module_doc="",
imports=imports,
classes=classes,
functions=functions,
)
Loading
Loading