-
Notifications
You must be signed in to change notification settings - Fork 19
Description
The Problem
In pipelines/kubeflow-pipeline.py:
if utility.has_collection(collection_name):
utility.drop_collection(collection_name)
print(f"Dropped existing collection: {collection_name}")Every pipeline run deletes ALL data and rebuilds from scratch. This means:
- You can't index multiple data sources (docs + issues)
- Re-running causes temporary data loss
Why This Matters for Agentic RAG
The Agentic RAG GSoC proposal requires indexing multiple data sources:
- Kubeflow documentation
- GitHub Issues (for troubleshooting)
- Platform architecture docs
With the current code, running the issues pipeline would delete all docs. This blocks the proposal entirely. Fixing this is a prerequisite for building a multi-source RAG agent.
Proposed Solutions
Option 1: Source-Based Delete
Add a source field to every record, then delete only matching records before inserting.
# Delete old data from this source only
collection.delete(expr=f'source == "docs:kubeflow/website"')
# Insert new data
collection.insert(records)Pros: Simple concept.
Cons: Requires schema change, delete-by-expression scans all rows.
Option 2: Milvus Partitions (Recommended)
Docs (https://milvus.io/docs/manage-partitions.md#Create-Partition)
Milvus has a first-class feature for this: Partitions. A partition is a subdivision of a collection that can be dropped/created independently.
Collection: docs_rag
├── Partition: docs__kubeflow_website (docs)
├── Partition: issues__kubeflow_kubeflow (issues)
└── Partition: issues__kubeflow_pipelines (issues)
Each pipeline run only drops and rebuilds its own partition:
partition_name = f"{source_type}__{repo_name.replace('/', '_')}"
# Drop only this partition
if collection.has_partition(partition_name):
collection.drop_partition(partition_name)
# Create and populate
collection.create_partition(partition_name)
collection.insert(records, partition_name=partition_name)Pros:
- No schema change needed
- O(1) drop (instant, no scanning)
- Milvus-native feature designed for this
- Search still works across all partitions automatically
# Searches ALL partitions automatically
collection.search(query_vec, ...)Questions for Maintainers
- Any concerns with switching to a partitioned collection?
- Preference between Option 1 and Option 2?
I am happy to implement whichever approach makes sense.