importer: fix SST metadata loss on retry in distributed merge#161876
importer: fix SST metadata loss on retry in distributed merge#161876mw5h wants to merge 8 commits intocockroachdb:masterfrom
Conversation
|
It looks like your PR touches production code but doesn't add or edit any test code. Did you consider adding tests to your PR? 🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
664deb5 to
f080226
Compare
spilchen
left a comment
There was a problem hiding this comment.
Is there any test that we can add for this that runs in the CI? Potentially even taking an existing IMPORT resume test and modifying it so that it works for distributed merge.
@spilchen reviewed 1 file and made 5 comments.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @andrew-r-thomas, @mw5h, and @ZhouXing19).
pkg/jobs/jobspb/jobs.proto line 791 at r1 (raw file):
// finished during the map phase of distributed merge so that the files // are not lost on retry. repeated IndexBackfillSSTManifest completed_map_work = 9 [(gogoproto.nullable) = false];
Can we consider renaming IndexBackfillSSTManifest to something more generic? It's a bit of a code smell to have something in an import struct refer to an backfill struct. I do like the reuse though.
pkg/sql/importer/import_processor_planning.go line 263 at r1 (raw file):
} res.Add(counts) if len(row) == 3 {
Is SST info still being sent? I think it's done in Next().
// When using distributed merge, the processor will emit the SSTs and their
// start and end keys.
var fileDatums rowenc.EncDatumRow
if idp.files != nil {
bytes, err := protoutil.Marshal(idp.files)
if err != nil {
idp.MoveToDraining(err)
return nil, idp.DrainHelper()
}
sstInfo := tree.NewDBytes(tree.DBytes(bytes))
fileDatums = rowenc.EncDatumRow{
rowenc.DatumToEncDatumUnsafe(types.Bytes, sstInfo),
}
}
do we still need this?
pkg/sql/importer/import_processor.go line 190 at r1 (raw file):
// When using distributed merge, emit SST metadata via progress metadata // before closing the channel. This ensures the metadata is properly // checkpointed and survives job retries.
This only works if the processor completes their work successfully. Isn't it possible for retry to begin in the middle of a file (using ResumePos)? If that happens, won't it be missing the SST metadata. Perhaps we should have the SST metadata update tied with the update to ResumePos? That way they stay in lock step.
pkg/sql/importer/sst_conversion.go line 1 at r1 (raw file):
// Copyright 2026 The Cockroach Authors.
This helper file is good. I have very similar logic in rowexec/indexbackfiller_sst_sink.go (see collectNewManifests). I wonder if we can combine the two so we have this mapping once? Maybe put in a common package that both importer and index backfill can use.
f080226 to
bdb7462
Compare
There was a problem hiding this comment.
Yikes. Bad squash.
@mw5h made 1 comment.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @andrew-r-thomas and @ZhouXing19).
Potential Bug(s) DetectedThe three-stage Claude Code analysis has identified potential bug(s) in this PR that may warrant investigation. Next Steps: Note: When viewing the workflow output, scroll to the bottom to find the Final Analysis Summary. After you review the findings, please tag the issue as follows:
|
bdb7462 to
e029083
Compare
mw5h
left a comment
There was a problem hiding this comment.
@mw5h made 4 comments.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @andrew-r-thomas, @spilchen, and @ZhouXing19).
pkg/jobs/jobspb/jobs.proto line 791 at r1 (raw file):
Previously, spilchen wrote…
Can we consider renaming
IndexBackfillSSTManifestto something more generic? It's a bit of a code smell to have something in an import struct refer to an backfill struct. I do like the reuse though.
I added a new commit to this branch to handle this.
pkg/sql/importer/import_processor.go line 190 at r1 (raw file):
Previously, spilchen wrote…
This only works if the processor completes their work successfully. Isn't it possible for retry to begin in the middle of a file (using
ResumePos)? If that happens, won't it be missing the SST metadata. Perhaps we should have the SST metadata update tied with the update to ResumePos? That way they stay in lock step.
Good point. I think this new version resolves this by updating both at once.
pkg/sql/importer/import_processor_planning.go line 263 at r1 (raw file):
Previously, spilchen wrote…
Is SST info still being sent? I think it's done in Next().
// When using distributed merge, the processor will emit the SSTs and their // start and end keys. var fileDatums rowenc.EncDatumRow if idp.files != nil { bytes, err := protoutil.Marshal(idp.files) if err != nil { idp.MoveToDraining(err) return nil, idp.DrainHelper() } sstInfo := tree.NewDBytes(tree.DBytes(bytes)) fileDatums = rowenc.EncDatumRow{ rowenc.DatumToEncDatumUnsafe(types.Bytes, sstInfo), } }do we still need this?
Nope.
pkg/sql/importer/sst_conversion.go line 1 at r1 (raw file):
Previously, spilchen wrote…
This helper file is good. I have very similar logic in
rowexec/indexbackfiller_sst_sink.go(seecollectNewManifests). I wonder if we can combine the two so we have this mapping once? Maybe put in a common package that both importer and index backfill can use.
Pulled this out into a shared helper.
Potential Bug(s) DetectedThe three-stage Claude Code analysis has identified potential bug(s) in this PR that may warrant investigation. Next Steps: Note: When viewing the workflow output, scroll to the bottom to find the Final Analysis Summary. After you review the findings, please tag the issue as follows:
|
e029083 to
caa618b
Compare
caa618b to
297ac21
Compare
Potential Bug(s) DetectedThe three-stage Claude Code analysis has identified potential bug(s) in this PR that may warrant investigation. Next Steps: Note: When viewing the workflow output, scroll to the bottom to find the Final Analysis Summary. After you review the findings, please tag the issue as follows:
|
spilchen
left a comment
There was a problem hiding this comment.
Regarding the tests for this, can we consider updating TestCSVImportCanBeResumed so that it runs in distributed merge and non-distributed merge? That would give us a bit of test coverage at least.
@spilchen reviewed 22 files and all commit messages, made 2 comments, and resolved 4 discussions.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @andrew-r-thomas, @mw5h, and @ZhouXing19).
pkg/sql/importer/import_processor.go line 252 at r6 (raw file):
} // SST metadata is now emitted via the progress channel (see lines 188-199)
nit: we should probably leave out line numbers in the comment as this could get stale in a hurry
297ac21 to
9924471
Compare
mw5h
left a comment
There was a problem hiding this comment.
Yeah, that's a good idea. I tried to get an explicit unit test together and just kinda failed. It's hard to wire up just part of the chain.
@mw5h made 2 comments.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @andrew-r-thomas, @spilchen, and @ZhouXing19).
pkg/sql/importer/import_processor.go line 252 at r6 (raw file):
Previously, spilchen wrote…
nit: we should probably leave out line numbers in the comment as this could get stale in a hurry
Yup. Claude loves to do this in comments and, in general, over-comments. I thought I had got them all but I was scrambling at the end of the day yesterday and missed a spot.
spilchen
left a comment
There was a problem hiding this comment.
@spilchen reviewed 12 files and all commit messages, and made 1 comment.
Reviewable status:complete! 1 of 0 LGTMs obtained (waiting on @andrew-r-thomas and @ZhouXing19).
9924471 to
6d71e4d
Compare
afece32 to
1807ee9
Compare
Potential Bug(s) DetectedThe three-stage Claude Code analysis has identified potential bug(s) in this PR that may warrant investigation. Next Steps: Note: When viewing the workflow output, scroll to the bottom to find the Final Analysis Summary. After you review the findings, please tag the issue as follows:
|
IndexBackfillSSTManifest was originally designed for index backfill operations but is now also used by IMPORT's distributed merge pipeline. The name is misleading since it suggests the type is specific to index backfill when it's actually a general-purpose SST manifest for bulk operations. This commit renames the type to BulkSSTManifest to better reflect its usage across both index backfill and import operations. The rename touches protobuf definitions and all references in backfill and import code. Release note: none Epic: None Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Before this change, IMPORT with distributed merge emitted SST metadata as a third column in row results (row[2]). This caused a critical bug on job retry: when the import job resumed after a node failure, processorOutput was reinitialized as an empty array. Only incomplete files (ResumePos < MaxInt64) were reprocessed, so SST metadata from completed files was lost. The merge phase then ran with an incomplete SST list, producing wrong data and fingerprint mismatches. This commit changes SST metadata to flow through the progress/metadata channel instead of row results, following the same pattern as index backfill. Import processors now emit SST metadata via BulkProcessorProgress.SSTMetadata incrementally as SSTs are created, synchronized with ResumePos updates. This ensures that on retry, the coordinator has metadata for all SSTs created before the failure, preventing orphaned files in external storage. The coordinator accumulates this metadata via the metaFn callback and periodically persists it to ImportProgress.CompletedMapWork. On retry, the SST metadata is restored from job progress, ensuring the merge phase has the complete SST list. To avoid code duplication, we reuse jobspb.IndexBackfillSSTManifest (from index backfill) for SST metadata storage. Shared conversion functions in pkg/sql/bulksst/manifest_conversion.go translate between bulksst.SSTFiles (used during processing) and IndexBackfillSSTManifest (used for persistence), eliminating duplicate code between import and index backfill. Changes: - Added sst_metadata field (IndexBackfillSSTManifest) to BulkProcessorProgress - Added completed_map_work field (IndexBackfillSSTManifest) to ImportProgress - Created pkg/sql/bulksst/manifest_conversion.go with shared conversion utilities (SSTFilesToManifests, ManifestsToSSTFiles) - Modified import_processor.go to emit SST metadata incrementally via progress channel, synchronized with ResumePos updates - SST metadata emission tracks reported files by URI to avoid relying on file list ordering - Updated import_processor_planning.go to accumulate SST metadata from progress metadata, persist to CompletedMapWork, and restore on retry - Updated pkg/sql/rowexec/indexbackfiller_sst_sink.go to use shared conversion utilities - Removed row[2] SST handling from rowResultWriter callback Fixes: cockroachdb#161490 Epic: None Release note: None
This commit extracts the indexBackfillSink interface from rowexec into a shared BulkSink interface in pkg/sql/bulksst. This refactoring prepares for code reuse between index backfill and IMPORT for distributed merge functionality. Before this change, the indexBackfillSink interface and bulkAdderIndexBackfillSink implementation were local to rowexec, preventing reuse in the importer package. Since both index backfill and IMPORT use the same bulk write patterns (BulkAdder ingestion vs SST file creation with manifest tracking), extracting a common interface eliminates the need for duplicate implementations. The new BulkSink interface in pkg/sql/bulksst defines the contract for bulk write destinations: - Add() enqueues KV pairs - Flush() persists buffered data - SetOnFlush() installs progress callbacks - ConsumeFlushManifests() retrieves SST metadata (for distributed merge) - Close() releases resources BulkAdderSink wraps kvserverbase.BulkAdder to provide the legacy ingestion path where SSTs are sent directly to KV via AddSSTable requests. sstIndexBackfillSink (in rowexec) provides the distributed merge path where SSTs are staged in external storage. This design allows both packages to use the same abstraction without circular dependencies, since both already import pkg/sql/bulksst. Changes: - Created pkg/sql/bulksst/sink.go defining BulkSink interface and BulkAdderSink implementation - Updated pkg/sql/rowexec to use bulksst.BulkSink instead of local indexBackfillSink interface - Removed duplicate bulkAdderIndexBackfillSink from rowexec - Updated sstIndexBackfillSink to implement bulksst.BulkSink - Updated tests to reference bulksst.BulkAdderSink Epic: None Release note: None Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit moves sstIndexBackfillSink from pkg/sql/rowexec to pkg/sql/bulksst, renaming it to SSTSink. This enables code reuse between index backfill and IMPORT for distributed merge functionality. Before this change, sstIndexBackfillSink was implemented in rowexec and tightly coupled to execinfra.FlowCtx, preventing reuse in the importer package. Since both index backfill and IMPORT need identical SST sink functionality for distributed merge (writing SSTs to external storage, tracking manifests, and coordinating with OnFlush callbacks), moving the implementation to a shared location eliminates duplication. The new bulksst.SSTSink provides a general-purpose SST sink that: - Writes KV pairs to SST files in external storage - Tracks SST file metadata via manifests - Supports OnFlush callbacks for progress reporting - Handles duplicate detection for unique indexes The constructor bulksst.NewSSTSink takes dependencies directly rather than requiring execinfra.FlowCtx, making it usable from any package: - settings: cluster settings for SST batch configuration - externalStorageFromURI: factory for creating external storage - clock: for timestamping SST files - distributedMergeFilePrefix: base URI for SST file storage - writeAsOf: timestamp to write on all KVs - processorID: unique ID for per-processor subdirectories - checkDuplicates: whether to detect duplicate keys Tests for SSTSink were moved from indexbackfiller_sst_sink_test.go to bulksst/sink_test.go, and references in rowexec were updated to use the shared implementation. Changes: - Moved sstIndexBackfillSink to bulksst.SSTSink in pkg/sql/bulksst/sink.go - Added NewSSTSink constructor with dependency injection - Moved tests to pkg/sql/bulksst/sink_test.go - Updated rowexec/indexbackfiller.go to call bulksst.NewSSTSink - Updated rowexec/indexbackfiller_test.go to use bulksst.SSTSink type - Removed pkg/sql/rowexec/indexbackfiller_sst_sink.go - Removed pkg/sql/rowexec/indexbackfiller_sst_sink_test.go - Updated BUILD.bazel files in both packages Epic: None Release note: None Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1807ee9 to
35cf271
Compare
|
Your pull request contains more than 1000 changes. It is strongly encouraged to split big PRs into smaller chunks. 🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
Potential Bug(s) DetectedThe three-stage Claude Code analysis has identified potential bug(s) in this PR that may warrant investigation. Next Steps: Note: When viewing the workflow output, scroll to the bottom to find the Final Analysis Summary. After you review the findings, please tag the issue as follows:
|
Before this change, IMPORT with distributed merge emitted SST metadata as a third column in row results (row[2]). This caused a critical bug on job retry: when the import job resumed after a node failure, processorOutput was reinitialized as an empty array. Only incomplete files (ResumePos < MaxInt64) were reprocessed, so SST metadata from completed files was lost. The merge phase then ran with an incomplete SST list, producing wrong data and fingerprint mismatches.
This commit changes SST metadata to flow through the progress/metadata channel instead of row results, following the same pattern as index backfill. Import processors now emit SST metadata via BulkProcessorProgress.SSTMetadata. The coordinator accumulates this metadata via the metaFn callback and periodically persists it to ImportProgress.CompletedMapWork. On retry, the SST metadata is restored from job progress, ensuring the merge phase has the complete SST list.
To avoid code duplication, we reuse jobspb.IndexBackfillSSTManifest (from index backfill) for SST metadata storage. Conversion functions in sst_conversion.go translate between bulksst.SSTFiles (used during processing) and IndexBackfillSSTManifest (used for persistence).
Changes:
Fixes: #161490
Epic: None
Release note: None