Skip to content

Commit f22ca62

Browse files
committed
feat: enhance directory processing with task queuing and input handling
1 parent a394ffa commit f22ca62

File tree

7 files changed

+63
-12
lines changed

7 files changed

+63
-12
lines changed

domain/processors/builtin/vector_index.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,15 @@ class VectorIndexProcessor:
114114
}
115115
]
116116
produces_file = False
117+
requires_input_bytes = False
117118

118119
async def process(self, input_bytes: bytes, path: str, config: Dict[str, Any]) -> Response:
120+
async def ensure_input_bytes() -> bytes:
121+
if input_bytes:
122+
return input_bytes
123+
from domain.virtual_fs import VirtualFSService
124+
return await VirtualFSService.read_file(path)
125+
119126
action = config.get("action", "create")
120127
index_type = config.get("index_type", "vector")
121128
vector_db = VectorDBService()
@@ -159,7 +166,8 @@ async def process(self, input_bytes: bytes, path: str, config: Dict[str, Any]) -
159166
await vector_db.delete_vector(vector_collection, path)
160167

161168
if file_ext in ["jpg", "jpeg", "png", "bmp"]:
162-
processed_bytes, compression = _compress_image_for_embedding(input_bytes)
169+
file_bytes = await ensure_input_bytes()
170+
processed_bytes, compression = _compress_image_for_embedding(file_bytes)
163171
base64_image = base64.b64encode(processed_bytes).decode("utf-8")
164172
description = await describe_image_base64(base64_image)
165173
embedding = await get_text_embedding(description)
@@ -180,7 +188,8 @@ async def process(self, input_bytes: bytes, path: str, config: Dict[str, Any]) -
180188

181189
if file_ext in ["txt", "md"]:
182190
try:
183-
text = input_bytes.decode("utf-8")
191+
file_bytes = await ensure_input_bytes()
192+
text = file_bytes.decode("utf-8")
184193
except UnicodeDecodeError:
185194
return Response(content="文本文件解码失败", status_code=400)
186195

domain/processors/service.py

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,44 @@ async def process_directory(cls, req: ProcessDirectoryRequest):
8585
suffix = raw_suffix
8686
overwrite = req.overwrite
8787

88+
if produces_file:
89+
if not overwrite and not suffix:
90+
raise HTTPException(400, detail="Suffix is required when not overwriting files")
91+
else:
92+
overwrite = False
93+
suffix = None
94+
payload = {
95+
"path": req.path,
96+
"processor_type": req.processor_type,
97+
"config": req.config,
98+
"overwrite": overwrite,
99+
"max_depth": req.max_depth,
100+
"suffix": suffix,
101+
}
102+
task = await task_queue_service.add_task("process_directory_scan", payload)
103+
return {"task_id": task.id}
104+
105+
@classmethod
106+
async def scan_directory(cls, req: ProcessDirectoryRequest):
107+
if req.max_depth is not None and req.max_depth < 0:
108+
raise HTTPException(400, detail="max_depth must be >= 0")
109+
110+
is_dir = await VirtualFSService.path_is_directory(req.path)
111+
if not is_dir:
112+
raise HTTPException(400, detail="Path must be a directory")
113+
114+
schema = get_config_schema(req.processor_type)
115+
_processor = get(req.processor_type)
116+
if not schema or not _processor:
117+
raise HTTPException(404, detail="Processor not found")
118+
119+
produces_file = bool(schema.get("produces_file"))
120+
raw_suffix = req.suffix if req.suffix is not None else None
121+
if raw_suffix is not None and raw_suffix.strip() == "":
122+
raw_suffix = None
123+
suffix = raw_suffix
124+
overwrite = req.overwrite
125+
88126
if produces_file:
89127
if not overwrite and not suffix:
90128
raise HTTPException(400, detail="Suffix is required when not overwriting files")
@@ -133,7 +171,7 @@ def apply_suffix(path_str: str, suffix_str: str) -> str:
133171
new_name = f"{name}{suffix_str}"
134172
return str(path_obj.with_name(new_name))
135173

136-
scheduled_tasks: List[str] = []
174+
scheduled_count = 0
137175
stack: List[Tuple[str, int]] = [(rel, 0)]
138176
page_size = 200
139177

@@ -161,7 +199,7 @@ def apply_suffix(path_str: str, suffix_str: str) -> str:
161199
save_to = None
162200
if produces_file and not overwrite and suffix:
163201
save_to = apply_suffix(absolute_path, suffix)
164-
task = await task_queue_service.add_task(
202+
await task_queue_service.add_task(
165203
"process_file",
166204
{
167205
"path": absolute_path,
@@ -171,16 +209,13 @@ def apply_suffix(path_str: str, suffix_str: str) -> str:
171209
"overwrite": overwrite,
172210
},
173211
)
174-
scheduled_tasks.append(task.id)
212+
scheduled_count += 1
175213

176214
if total is None or page * page_size >= total:
177215
break
178216
page += 1
179217

180-
return {
181-
"task_ids": scheduled_tasks,
182-
"scheduled": len(scheduled_tasks),
183-
}
218+
return {"scheduled": scheduled_count}
184219

185220
@classmethod
186221
async def get_source(cls, processor_type: str):

domain/tasks/task_queue.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,12 @@ async def _execute_task(self, task: Task):
8686
overwrite=params.get("overwrite", False),
8787
)
8888
task.result = result
89+
elif task.name == "process_directory_scan":
90+
from domain.processors import ProcessDirectoryRequest, ProcessorService
91+
92+
params = task.task_info or {}
93+
req = ProcessDirectoryRequest(**params)
94+
task.result = await ProcessorService.scan_directory(req)
8995
elif task.name == "automation_task" or self._is_processor_task(task.name):
9096
from models.database import AutomationTask
9197

web/src/api/processors.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ export const processorsApi = {
4343
max_depth?: number | null;
4444
suffix?: string | null;
4545
}) =>
46-
request<{ task_ids: string[]; scheduled: number }>('/processors/process-directory', {
46+
request<{ task_id: string }>('/processors/process-directory', {
4747
method: 'POST',
4848
json: params,
4949
}),

web/src/i18n/locales/en.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -617,6 +617,7 @@
617617
"Source Editor": "Source Editor",
618618
"Module Path": "Module Path",
619619
"Directory processing always overwrites original files": "Directory processing always overwrites original files",
620+
"Directory execution will enqueue one task per file": "Directory execution will enqueue a scan task, then one task per file",
620621
"No data": "No data",
621622
"Select File": "Select File",
622623
"Select Path": "Select Path",

web/src/i18n/locales/zh.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -608,7 +608,7 @@
608608
"Source Editor": "源码编辑",
609609
"Module Path": "模块路径",
610610
"Directory processing always overwrites original files": "选择目录时会强制覆盖原文件",
611-
"Directory execution will enqueue one task per file": "目录模式会为每个文件单独创建任务",
611+
"Directory execution will enqueue one task per file": "目录模式会先创建扫描任务,后台再为每个文件创建任务",
612612
"Directory scope": "目录范围",
613613
"Current level only": "仅当前层级",
614614
"Include subdirectories": "包含子目录",

web/src/pages/ProcessorsPage.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ const ProcessorsPage = memo(function ProcessorsPage() {
276276
max_depth: maxDepth,
277277
suffix: suffixValue,
278278
});
279-
messageApi.success(`${t('Task submitted')}: ${resp.scheduled}`);
279+
messageApi.success(`${t('Task submitted')}: ${resp.task_id}`);
280280
}
281281
} else {
282282
const payload: any = {

0 commit comments

Comments
 (0)