Skip to content

importer: fix SST metadata loss on retry in distributed merge#161876

Draft
mw5h wants to merge 8 commits intocockroachdb:masterfrom
mw5h:importer_distmerge_checkpoint
Draft

importer: fix SST metadata loss on retry in distributed merge#161876
mw5h wants to merge 8 commits intocockroachdb:masterfrom
mw5h:importer_distmerge_checkpoint

Conversation

@mw5h
Copy link
Contributor

@mw5h mw5h commented Jan 27, 2026

Before this change, IMPORT with distributed merge emitted SST metadata as a third column in row results (row[2]). This caused a critical bug on job retry: when the import job resumed after a node failure, processorOutput was reinitialized as an empty array. Only incomplete files (ResumePos < MaxInt64) were reprocessed, so SST metadata from completed files was lost. The merge phase then ran with an incomplete SST list, producing wrong data and fingerprint mismatches.

This commit changes SST metadata to flow through the progress/metadata channel instead of row results, following the same pattern as index backfill. Import processors now emit SST metadata via BulkProcessorProgress.SSTMetadata. The coordinator accumulates this metadata via the metaFn callback and periodically persists it to ImportProgress.CompletedMapWork. On retry, the SST metadata is restored from job progress, ensuring the merge phase has the complete SST list.

To avoid code duplication, we reuse jobspb.IndexBackfillSSTManifest (from index backfill) for SST metadata storage. Conversion functions in sst_conversion.go translate between bulksst.SSTFiles (used during processing) and IndexBackfillSSTManifest (used for persistence).

Changes:

  • Added sst_metadata field (IndexBackfillSSTManifest) to BulkProcessorProgress
  • Added completed_map_work field (IndexBackfillSSTManifest) to ImportProgress
  • Created sst_conversion.go with conversion helpers
  • Modified import_processor.go to emit SST metadata via progress channel from within the worker goroutine before the channel closes
  • Updated import_processor_planning.go to accumulate SST metadata from progress metadata, persist to CompletedMapWork, and restore on retry
  • Removed row[2] SST handling from rowResultWriter callback

Fixes: #161490

Epic: None

Release note: None

@mw5h mw5h requested a review from spilchen January 27, 2026 18:18
@mw5h mw5h requested review from a team as code owners January 27, 2026 18:18
@mw5h mw5h requested review from ZhouXing19 and andrew-r-thomas and removed request for a team January 27, 2026 18:18
@blathers-crl
Copy link

blathers-crl bot commented Jan 27, 2026

It looks like your PR touches production code but doesn't add or edit any test code. Did you consider adding tests to your PR?

🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf.

@cockroach-teamcity
Copy link
Member

This change is Reviewable

@mw5h mw5h force-pushed the importer_distmerge_checkpoint branch from 664deb5 to f080226 Compare January 28, 2026 01:00
Copy link
Contributor

@spilchen spilchen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any test that we can add for this that runs in the CI? Potentially even taking an existing IMPORT resume test and modifying it so that it works for distributed merge.

@spilchen reviewed 1 file and made 5 comments.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andrew-r-thomas, @mw5h, and @ZhouXing19).


pkg/jobs/jobspb/jobs.proto line 791 at r1 (raw file):

  // finished during the map phase of distributed merge so that the files
  // are not lost on retry.
  repeated IndexBackfillSSTManifest completed_map_work = 9 [(gogoproto.nullable) = false];

Can we consider renaming IndexBackfillSSTManifest to something more generic? It's a bit of a code smell to have something in an import struct refer to an backfill struct. I do like the reuse though.


pkg/sql/importer/import_processor_planning.go line 263 at r1 (raw file):

		}
		res.Add(counts)
		if len(row) == 3 {

Is SST info still being sent? I think it's done in Next().

    // When using distributed merge, the processor will emit the SSTs and their
    // start and end keys.
    var fileDatums rowenc.EncDatumRow
    if idp.files != nil {
        bytes, err := protoutil.Marshal(idp.files)
        if err != nil {
            idp.MoveToDraining(err)
            return nil, idp.DrainHelper()
        }
        sstInfo := tree.NewDBytes(tree.DBytes(bytes))
        fileDatums = rowenc.EncDatumRow{
            rowenc.DatumToEncDatumUnsafe(types.Bytes, sstInfo),
        }
    }

do we still need this?


pkg/sql/importer/import_processor.go line 190 at r1 (raw file):

		// When using distributed merge, emit SST metadata via progress metadata
		// before closing the channel. This ensures the metadata is properly
		// checkpointed and survives job retries.

This only works if the processor completes their work successfully. Isn't it possible for retry to begin in the middle of a file (using ResumePos)? If that happens, won't it be missing the SST metadata. Perhaps we should have the SST metadata update tied with the update to ResumePos? That way they stay in lock step.


pkg/sql/importer/sst_conversion.go line 1 at r1 (raw file):

// Copyright 2026 The Cockroach Authors.

This helper file is good. I have very similar logic in rowexec/indexbackfiller_sst_sink.go (see collectNewManifests). I wonder if we can combine the two so we have this mapping once? Maybe put in a common package that both importer and index backfill can use.

@mw5h mw5h force-pushed the importer_distmerge_checkpoint branch from f080226 to bdb7462 Compare January 28, 2026 20:41
Copy link
Contributor Author

@mw5h mw5h left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yikes. Bad squash.

@mw5h made 1 comment.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andrew-r-thomas and @ZhouXing19).

@github-actions
Copy link

Potential Bug(s) Detected

The three-stage Claude Code analysis has identified potential bug(s) in this PR that may warrant investigation.

Next Steps:
Please review the detailed findings in the workflow run.

Note: When viewing the workflow output, scroll to the bottom to find the Final Analysis Summary.

After you review the findings, please tag the issue as follows:

  • If the detected issue is real or was helpful in any way, please tag the issue with O-AI-Review-Real-Issue-Found
  • If the detected issue was not helpful in any way, please tag the issue with O-AI-Review-Not-Helpful

@github-actions github-actions bot added the o-AI-Review-Potential-Issue-Detected AI reviewer found potential issue. Never assign manually—auto-applied by GH action only. label Jan 28, 2026
@mw5h mw5h force-pushed the importer_distmerge_checkpoint branch from bdb7462 to e029083 Compare January 29, 2026 00:38
@mw5h mw5h requested a review from a team as a code owner January 29, 2026 00:38
Copy link
Contributor Author

@mw5h mw5h left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mw5h made 4 comments.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andrew-r-thomas, @spilchen, and @ZhouXing19).


pkg/jobs/jobspb/jobs.proto line 791 at r1 (raw file):

Previously, spilchen wrote…

Can we consider renaming IndexBackfillSSTManifest to something more generic? It's a bit of a code smell to have something in an import struct refer to an backfill struct. I do like the reuse though.

I added a new commit to this branch to handle this.


pkg/sql/importer/import_processor.go line 190 at r1 (raw file):

Previously, spilchen wrote…

This only works if the processor completes their work successfully. Isn't it possible for retry to begin in the middle of a file (using ResumePos)? If that happens, won't it be missing the SST metadata. Perhaps we should have the SST metadata update tied with the update to ResumePos? That way they stay in lock step.

Good point. I think this new version resolves this by updating both at once.


pkg/sql/importer/import_processor_planning.go line 263 at r1 (raw file):

Previously, spilchen wrote…

Is SST info still being sent? I think it's done in Next().

    // When using distributed merge, the processor will emit the SSTs and their
    // start and end keys.
    var fileDatums rowenc.EncDatumRow
    if idp.files != nil {
        bytes, err := protoutil.Marshal(idp.files)
        if err != nil {
            idp.MoveToDraining(err)
            return nil, idp.DrainHelper()
        }
        sstInfo := tree.NewDBytes(tree.DBytes(bytes))
        fileDatums = rowenc.EncDatumRow{
            rowenc.DatumToEncDatumUnsafe(types.Bytes, sstInfo),
        }
    }

do we still need this?

Nope.


pkg/sql/importer/sst_conversion.go line 1 at r1 (raw file):

Previously, spilchen wrote…

This helper file is good. I have very similar logic in rowexec/indexbackfiller_sst_sink.go (see collectNewManifests). I wonder if we can combine the two so we have this mapping once? Maybe put in a common package that both importer and index backfill can use.

Pulled this out into a shared helper.

@github-actions
Copy link

Potential Bug(s) Detected

The three-stage Claude Code analysis has identified potential bug(s) in this PR that may warrant investigation.

Next Steps:
Please review the detailed findings in the workflow run.

Note: When viewing the workflow output, scroll to the bottom to find the Final Analysis Summary.

After you review the findings, please tag the issue as follows:

  • If the detected issue is real or was helpful in any way, please tag the issue with O-AI-Review-Real-Issue-Found
  • If the detected issue was not helpful in any way, please tag the issue with O-AI-Review-Not-Helpful

@cockroach-teamcity cockroach-teamcity added the X-perf-gain Microbenchmarks CI: Added if a performance gain is detected label Jan 29, 2026
@mw5h mw5h force-pushed the importer_distmerge_checkpoint branch from e029083 to caa618b Compare January 29, 2026 01:27
@mw5h mw5h added the O-AI-Review-Real-Issue-Found AI reviewer found real issue label Jan 29, 2026
@mw5h mw5h force-pushed the importer_distmerge_checkpoint branch from caa618b to 297ac21 Compare January 29, 2026 02:01
@github-actions
Copy link

Potential Bug(s) Detected

The three-stage Claude Code analysis has identified potential bug(s) in this PR that may warrant investigation.

Next Steps:
Please review the detailed findings in the workflow run.

Note: When viewing the workflow output, scroll to the bottom to find the Final Analysis Summary.

After you review the findings, please tag the issue as follows:

  • If the detected issue is real or was helpful in any way, please tag the issue with O-AI-Review-Real-Issue-Found
  • If the detected issue was not helpful in any way, please tag the issue with O-AI-Review-Not-Helpful

Copy link
Contributor

@spilchen spilchen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the tests for this, can we consider updating TestCSVImportCanBeResumed so that it runs in distributed merge and non-distributed merge? That would give us a bit of test coverage at least.

@spilchen reviewed 22 files and all commit messages, made 2 comments, and resolved 4 discussions.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andrew-r-thomas, @mw5h, and @ZhouXing19).


pkg/sql/importer/import_processor.go line 252 at r6 (raw file):

	}

	// SST metadata is now emitted via the progress channel (see lines 188-199)

nit: we should probably leave out line numbers in the comment as this could get stale in a hurry

@mw5h mw5h force-pushed the importer_distmerge_checkpoint branch from 297ac21 to 9924471 Compare January 30, 2026 00:13
Copy link
Contributor Author

@mw5h mw5h left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's a good idea. I tried to get an explicit unit test together and just kinda failed. It's hard to wire up just part of the chain.

@mw5h made 2 comments.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andrew-r-thomas, @spilchen, and @ZhouXing19).


pkg/sql/importer/import_processor.go line 252 at r6 (raw file):

Previously, spilchen wrote…

nit: we should probably leave out line numbers in the comment as this could get stale in a hurry

Yup. Claude loves to do this in comments and, in general, over-comments. I thought I had got them all but I was scrambling at the end of the day yesterday and missed a spot.

Copy link
Contributor

@spilchen spilchen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm: thanks for revising.

@spilchen reviewed 12 files and all commit messages, and made 1 comment.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on @andrew-r-thomas and @ZhouXing19).

@mw5h mw5h force-pushed the importer_distmerge_checkpoint branch from 9924471 to 6d71e4d Compare January 30, 2026 20:27
@mw5h mw5h force-pushed the importer_distmerge_checkpoint branch 2 times, most recently from afece32 to 1807ee9 Compare February 2, 2026 16:11
@github-actions
Copy link

github-actions bot commented Feb 2, 2026

Potential Bug(s) Detected

The three-stage Claude Code analysis has identified potential bug(s) in this PR that may warrant investigation.

Next Steps:
Please review the detailed findings in the workflow run.

Note: When viewing the workflow output, scroll to the bottom to find the Final Analysis Summary.

After you review the findings, please tag the issue as follows:

  • If the detected issue is real or was helpful in any way, please tag the issue with O-AI-Review-Real-Issue-Found
  • If the detected issue was not helpful in any way, please tag the issue with O-AI-Review-Not-Helpful

mw5h and others added 8 commits February 4, 2026 16:17
IndexBackfillSSTManifest was originally designed for index backfill
operations but is now also used by IMPORT's distributed merge pipeline.
The name is misleading since it suggests the type is specific to index
backfill when it's actually a general-purpose SST manifest for bulk
operations.

This commit renames the type to BulkSSTManifest to better reflect its
usage across both index backfill and import operations. The rename
touches protobuf definitions and all references in backfill and import
code.

Release note: none

Epic: None

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Before this change, IMPORT with distributed merge emitted SST metadata
as a third column in row results (row[2]). This caused a critical bug
on job retry: when the import job resumed after a node failure,
processorOutput was reinitialized as an empty array. Only incomplete
files (ResumePos < MaxInt64) were reprocessed, so SST metadata from
completed files was lost. The merge phase then ran with an incomplete
SST list, producing wrong data and fingerprint mismatches.

This commit changes SST metadata to flow through the progress/metadata
channel instead of row results, following the same pattern as index
backfill. Import processors now emit SST metadata via
BulkProcessorProgress.SSTMetadata incrementally as SSTs are created,
synchronized with ResumePos updates. This ensures that on retry, the
coordinator has metadata for all SSTs created before the failure,
preventing orphaned files in external storage. The coordinator
accumulates this metadata via the metaFn callback and periodically
persists it to ImportProgress.CompletedMapWork. On retry, the SST
metadata is restored from job progress, ensuring the merge phase has
the complete SST list.

To avoid code duplication, we reuse jobspb.IndexBackfillSSTManifest
(from index backfill) for SST metadata storage. Shared conversion
functions in pkg/sql/bulksst/manifest_conversion.go translate between
bulksst.SSTFiles (used during processing) and IndexBackfillSSTManifest
(used for persistence), eliminating duplicate code between import and
index backfill.

Changes:
- Added sst_metadata field (IndexBackfillSSTManifest) to BulkProcessorProgress
- Added completed_map_work field (IndexBackfillSSTManifest) to ImportProgress
- Created pkg/sql/bulksst/manifest_conversion.go with shared conversion
  utilities (SSTFilesToManifests, ManifestsToSSTFiles)
- Modified import_processor.go to emit SST metadata incrementally via
  progress channel, synchronized with ResumePos updates
- SST metadata emission tracks reported files by URI to avoid relying
  on file list ordering
- Updated import_processor_planning.go to accumulate SST metadata from
  progress metadata, persist to CompletedMapWork, and restore on retry
- Updated pkg/sql/rowexec/indexbackfiller_sst_sink.go to use shared
  conversion utilities
- Removed row[2] SST handling from rowResultWriter callback

Fixes: cockroachdb#161490

Epic: None

Release note: None
This commit extracts the indexBackfillSink interface from rowexec into
a shared BulkSink interface in pkg/sql/bulksst. This refactoring
prepares for code reuse between index backfill and IMPORT for
distributed merge functionality.

Before this change, the indexBackfillSink interface and
bulkAdderIndexBackfillSink implementation were local to rowexec,
preventing reuse in the importer package. Since both index backfill and
IMPORT use the same bulk write patterns (BulkAdder ingestion vs SST
file creation with manifest tracking), extracting a common interface
eliminates the need for duplicate implementations.

The new BulkSink interface in pkg/sql/bulksst defines the contract for
bulk write destinations:
- Add() enqueues KV pairs
- Flush() persists buffered data
- SetOnFlush() installs progress callbacks
- ConsumeFlushManifests() retrieves SST metadata (for distributed merge)
- Close() releases resources

BulkAdderSink wraps kvserverbase.BulkAdder to provide the legacy
ingestion path where SSTs are sent directly to KV via AddSSTable
requests. sstIndexBackfillSink (in rowexec) provides the distributed
merge path where SSTs are staged in external storage.

This design allows both packages to use the same abstraction without
circular dependencies, since both already import pkg/sql/bulksst.

Changes:
- Created pkg/sql/bulksst/sink.go defining BulkSink interface and
  BulkAdderSink implementation
- Updated pkg/sql/rowexec to use bulksst.BulkSink instead of local
  indexBackfillSink interface
- Removed duplicate bulkAdderIndexBackfillSink from rowexec
- Updated sstIndexBackfillSink to implement bulksst.BulkSink
- Updated tests to reference bulksst.BulkAdderSink

Epic: None

Release note: None

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit moves sstIndexBackfillSink from pkg/sql/rowexec to
pkg/sql/bulksst, renaming it to SSTSink. This enables code reuse
between index backfill and IMPORT for distributed merge functionality.

Before this change, sstIndexBackfillSink was implemented in rowexec and
tightly coupled to execinfra.FlowCtx, preventing reuse in the importer
package. Since both index backfill and IMPORT need identical SST sink
functionality for distributed merge (writing SSTs to external storage,
tracking manifests, and coordinating with OnFlush callbacks), moving
the implementation to a shared location eliminates duplication.

The new bulksst.SSTSink provides a general-purpose SST sink that:
- Writes KV pairs to SST files in external storage
- Tracks SST file metadata via manifests
- Supports OnFlush callbacks for progress reporting
- Handles duplicate detection for unique indexes

The constructor bulksst.NewSSTSink takes dependencies directly rather
than requiring execinfra.FlowCtx, making it usable from any package:
- settings: cluster settings for SST batch configuration
- externalStorageFromURI: factory for creating external storage
- clock: for timestamping SST files
- distributedMergeFilePrefix: base URI for SST file storage
- writeAsOf: timestamp to write on all KVs
- processorID: unique ID for per-processor subdirectories
- checkDuplicates: whether to detect duplicate keys

Tests for SSTSink were moved from indexbackfiller_sst_sink_test.go to
bulksst/sink_test.go, and references in rowexec were updated to use the
shared implementation.

Changes:
- Moved sstIndexBackfillSink to bulksst.SSTSink in pkg/sql/bulksst/sink.go
- Added NewSSTSink constructor with dependency injection
- Moved tests to pkg/sql/bulksst/sink_test.go
- Updated rowexec/indexbackfiller.go to call bulksst.NewSSTSink
- Updated rowexec/indexbackfiller_test.go to use bulksst.SSTSink type
- Removed pkg/sql/rowexec/indexbackfiller_sst_sink.go
- Removed pkg/sql/rowexec/indexbackfiller_sst_sink_test.go
- Updated BUILD.bazel files in both packages

Epic: None

Release note: None

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
@mw5h mw5h force-pushed the importer_distmerge_checkpoint branch from 1807ee9 to 35cf271 Compare February 14, 2026 00:29
@blathers-crl
Copy link

blathers-crl bot commented Feb 14, 2026

Your pull request contains more than 1000 changes. It is strongly encouraged to split big PRs into smaller chunks.

🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf.

@mw5h mw5h marked this pull request as draft February 14, 2026 00:29
@github-actions
Copy link

Potential Bug(s) Detected

The three-stage Claude Code analysis has identified potential bug(s) in this PR that may warrant investigation.

Next Steps:
Please review the detailed findings in the workflow run.

Note: When viewing the workflow output, scroll to the bottom to find the Final Analysis Summary.

After you review the findings, please tag the issue as follows:

  • If the detected issue is real or was helpful in any way, please tag the issue with O-AI-Review-Real-Issue-Found
  • If the detected issue was not helpful in any way, please tag the issue with O-AI-Review-Not-Helpful

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

o-AI-Review-Potential-Issue-Detected AI reviewer found potential issue. Never assign manually—auto-applied by GH action only. O-AI-Review-Real-Issue-Found AI reviewer found real issue X-perf-gain Microbenchmarks CI: Added if a performance gain is detected

Projects

None yet

Development

Successfully merging this pull request may close these issues.

importer: distributed import not properly checkpointing state

3 participants