@@ -552,7 +552,7 @@ def _meta_resolver(): # noqa: D401
552552 phase_times ["html_to_pdf" ] = time .time () - html_conversion_start
553553
554554 # Check if we're using ColPali
555- using_colpali = (
555+ using_colpali = bool (
556556 use_colpali and ingestion_service .colpali_embedding_model and ingestion_service .colpali_vector_store
557557 )
558558 logger .debug (
@@ -870,6 +870,38 @@ def _meta_resolver(): # noqa: D401
870870 phase_times ["generate_embeddings" ] = 0
871871 phase_times ["create_chunk_objects" ] = 0
872872
873+ # 11b. Delete old chunks if this is a re-ingestion (requeue)
874+ # Must run before any new chunks are stored (both regular and ColPali)
875+ if doc .chunk_ids :
876+ logger .info (f"Re-ingestion detected for { document_id } , deleting { len (doc .chunk_ids )} old chunks" )
877+ deletion_tasks = []
878+ if hasattr (vector_store , "delete_chunks_by_document_id" ):
879+ deletion_tasks .append (vector_store .delete_chunks_by_document_id (document_id , auth .app_id ))
880+ # Always try to clean colpali store — the doc may have been ingested
881+ # with colpali previously even if this re-ingestion doesn't use it
882+ cleanup_colpali_store = colpali_vector_store
883+ if not cleanup_colpali_store and settings .ENABLE_COLPALI :
884+ try :
885+ cleanup_colpali_store = await _get_worker_colpali_store (database )
886+ except Exception as e :
887+ logger .warning (f"Could not init colpali store for cleanup: { e } " )
888+ if cleanup_colpali_store and hasattr (cleanup_colpali_store , "delete_chunks_by_document_id" ):
889+ deletion_tasks .append (cleanup_colpali_store .delete_chunks_by_document_id (document_id , auth .app_id ))
890+ chunk_v2_store = ctx .get ("chunk_v2_store" )
891+ if chunk_v2_store and auth .app_id and hasattr (chunk_v2_store , "delete_chunks_by_document_id" ):
892+ deletion_tasks .append (chunk_v2_store .delete_chunks_by_document_id (document_id , auth ))
893+ if deletion_tasks :
894+ try :
895+ results = await asyncio .wait_for (
896+ asyncio .gather (* deletion_tasks , return_exceptions = True ),
897+ timeout = 30 ,
898+ )
899+ for i , result in enumerate (results ):
900+ if isinstance (result , Exception ):
901+ logger .error (f"Error deleting old chunks (task { i } ): { result } " )
902+ except asyncio .TimeoutError :
903+ logger .error (f"Timeout deleting old chunks for { document_id } , proceeding anyway" )
904+
873905 # 12. Handle ColPali embeddings
874906 chunk_objects_multivector = []
875907 colpali_chunk_ids : List [str ] = []
@@ -1108,6 +1140,7 @@ def _meta_resolver(): # noqa: D401
11081140 # Update document status to completed after all processing
11091141 doc .system_metadata ["page_count" ] = final_page_count
11101142 doc .system_metadata ["status" ] = "completed"
1143+ doc .system_metadata ["use_colpali" ] = using_colpali
11111144 doc .system_metadata ["updated_at" ] = datetime .now (UTC )
11121145 # Clear progress info on completion
11131146 doc .system_metadata .pop ("progress" , None )
0 commit comments