Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 90 additions & 2 deletions lightrag/kg/postgres_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -986,6 +986,37 @@ async def _migrate_text_chunks_add_llm_cache_list(self):
f"Failed to add llm_cache_list column to LIGHTRAG_DOC_CHUNKS: {e}"
)

async def _migrate_text_chunks_add_custom_data(self):
"""Add custom_data column to LIGHTRAG_DOC_CHUNKS table if it doesn't exist"""
try:
# Check if custom_data column exists
check_column_sql = """
SELECT column_name
FROM information_schema.columns
WHERE table_name = 'lightrag_doc_chunks'
AND column_name = 'custom_data'
"""

column_info = await self.query(check_column_sql)
if not column_info:
logger.info("Adding custom_data column to LIGHTRAG_DOC_CHUNKS table")
add_column_sql = """
ALTER TABLE LIGHTRAG_DOC_CHUNKS
ADD COLUMN custom_data JSONB NULL DEFAULT '{}'::jsonb
"""
await self.execute(add_column_sql)
logger.info(
"Successfully added custom_data column to LIGHTRAG_DOC_CHUNKS table"
)
else:
logger.info(
"custom_data column already exists in LIGHTRAG_DOC_CHUNKS table"
)
except Exception as e:
logger.warning(
f"Failed to add custom_data column to LIGHTRAG_DOC_CHUNKS: {e}"
)

async def _migrate_doc_status_add_track_id(self):
"""Add track_id column to LIGHTRAG_DOC_STATUS table if it doesn't exist and create index"""
try:
Expand Down Expand Up @@ -1373,6 +1404,14 @@ async def check_tables(self):
f"PostgreSQL, Failed to migrate text chunks llm_cache_list field: {e}"
)

# Migrate text chunks to add custom_data field if needed
try:
await self._migrate_text_chunks_add_custom_data()
except Exception as e:
logger.error(
f"PostgreSQL, Failed to migrate text chunks custom_data field: {e}"
)

# Migrate field lengths for entity_name, source_id, target_id, and file_path
try:
await self._migrate_field_lengths()
Expand Down Expand Up @@ -1900,6 +1939,28 @@ async def finalize(self):
self.db = None

################ QUERY METHODS ################
def _merge_custom_data_into_chunk(self, chunk_data: dict[str, Any]) -> None:
"""
Parse custom_data JSON string from database and merge custom fields into chunk data.

Args:
chunk_data: The chunk data dict from database query result
"""
# Parse custom_data JSON string back to dict
custom_data = chunk_data.get("custom_data", {})
if isinstance(custom_data, str):
try:
custom_data = json.loads(custom_data)
except json.JSONDecodeError:
custom_data = {}

# Merge custom_data fields into chunk data
if custom_data:
chunk_data.update(custom_data)

# Remove custom_data key from result to avoid duplication
chunk_data.pop("custom_data", None)

async def get_by_id(self, id: str) -> dict[str, Any] | None:
"""Get data by id."""
sql = SQL_TEMPLATES["get_by_id_" + self.namespace]
Expand All @@ -1915,6 +1976,10 @@ async def get_by_id(self, id: str) -> dict[str, Any] | None:
except json.JSONDecodeError:
llm_cache_list = []
response["llm_cache_list"] = llm_cache_list

# Parse custom_data JSON string back to dict and merge into response
self._merge_custom_data_into_chunk(response)

create_time = response.get("create_time", 0)
update_time = response.get("update_time", 0)
response["create_time"] = create_time
Expand Down Expand Up @@ -2049,6 +2114,10 @@ def _order_results(
except json.JSONDecodeError:
llm_cache_list = []
result["llm_cache_list"] = llm_cache_list

# Parse custom_data JSON string back to dict and merge into result
self._merge_custom_data_into_chunk(result)

create_time = result.get("create_time", 0)
update_time = result.get("update_time", 0)
result["create_time"] = create_time
Expand Down Expand Up @@ -2182,6 +2251,20 @@ async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
current_time = datetime.datetime.now(timezone.utc).replace(tzinfo=None)
for k, v in data.items():
upsert_sql = SQL_TEMPLATES["upsert_text_chunk"]
# Extract custom_data from chunk data, excluding standard fields
custom_data = {}
standard_fields = {
"tokens",
"chunk_order_index",
"full_doc_id",
"content",
"file_path",
"llm_cache_list",
}
for field_key, field_value in v.items():
if field_key not in standard_fields:
custom_data[field_key] = field_value

_data = {
"workspace": self.workspace,
"id": k,
Expand All @@ -2191,6 +2274,7 @@ async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
"content": v["content"],
"file_path": v["file_path"],
"llm_cache_list": json.dumps(v.get("llm_cache_list", [])),
"custom_data": json.dumps(custom_data),
"create_time": current_time,
"update_time": current_time,
}
Expand Down Expand Up @@ -5419,6 +5503,7 @@ def namespace_to_table_name(namespace: str) -> str:
content TEXT,
file_path TEXT NULL,
llm_cache_list JSONB NULL DEFAULT '[]'::jsonb,
custom_data JSONB NULL DEFAULT '{}'::jsonb,
create_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT LIGHTRAG_DOC_CHUNKS_PK PRIMARY KEY (workspace, id)
Expand Down Expand Up @@ -5556,6 +5641,7 @@ def namespace_to_table_name(namespace: str) -> str:
"get_by_id_text_chunks": """SELECT id, tokens, COALESCE(content, '') as content,
chunk_order_index, full_doc_id, file_path,
COALESCE(llm_cache_list, '[]'::jsonb) as llm_cache_list,
COALESCE(custom_data, '{}'::jsonb) as custom_data,
EXTRACT(EPOCH FROM create_time)::BIGINT as create_time,
EXTRACT(EPOCH FROM update_time)::BIGINT as update_time
FROM LIGHTRAG_DOC_CHUNKS WHERE workspace=$1 AND id=$2
Expand All @@ -5572,6 +5658,7 @@ def namespace_to_table_name(namespace: str) -> str:
"get_by_ids_text_chunks": """SELECT id, tokens, COALESCE(content, '') as content,
chunk_order_index, full_doc_id, file_path,
COALESCE(llm_cache_list, '[]'::jsonb) as llm_cache_list,
COALESCE(custom_data, '{}'::jsonb) as custom_data,
EXTRACT(EPOCH FROM create_time)::BIGINT as create_time,
EXTRACT(EPOCH FROM update_time)::BIGINT as update_time
FROM LIGHTRAG_DOC_CHUNKS WHERE workspace=$1 AND id = ANY($2)
Expand Down Expand Up @@ -5641,15 +5728,16 @@ def namespace_to_table_name(namespace: str) -> str:
""",
"upsert_text_chunk": """INSERT INTO LIGHTRAG_DOC_CHUNKS (workspace, id, tokens,
chunk_order_index, full_doc_id, content, file_path, llm_cache_list,
create_time, update_time)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
custom_data, create_time, update_time)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT (workspace,id) DO UPDATE
SET tokens=EXCLUDED.tokens,
chunk_order_index=EXCLUDED.chunk_order_index,
full_doc_id=EXCLUDED.full_doc_id,
content = EXCLUDED.content,
file_path=EXCLUDED.file_path,
llm_cache_list=EXCLUDED.llm_cache_list,
custom_data=EXCLUDED.custom_data,
update_time = EXCLUDED.update_time
""",
"upsert_full_entities": """INSERT INTO LIGHTRAG_FULL_ENTITIES (workspace, id, entity_names, count,
Expand Down
Loading