Skip to content

Conversation

@chenxuesdu
Copy link
Contributor

For mutable change stream bounded queries, Spanner only allow the max end timestamp to be 30 minutes in the future. So when user provided a end timestamp to read, we choose the min of max allow timestamp and the provided timestamp.

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @chenxuesdu, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses a specific constraint in Google Cloud Spanner regarding mutable change stream bounded queries, where the end timestamp cannot exceed 30 minutes into the future. The changes introduce a mechanism to detect if a change stream is mutable and, if so, automatically adjust the provided end timestamp to comply with this limitation. This ensures that queries remain valid and prevents potential errors when users specify end timestamps beyond the allowed future window.

Highlights

  • Mutable Change Stream Identification: A new isMutableChangeStream flag has been introduced and propagated through relevant classes (DaoFactory, ReadChangeStreamPartitionDoFn, QueryChangeStreamAction) to specifically identify and handle mutable change streams.
  • Bounded Query End Timestamp Enforcement: Logic has been added to QueryChangeStreamAction to enforce Spanner's limitation on mutable change stream bounded queries. The end timestamp for such queries is now capped to a maximum of 30 minutes in the future from the current time, choosing the minimum between the user-provided timestamp and this limit.
  • Code Structure and Dependency Injection: The isMutableChangeStream flag is passed as a constructor argument to QueryChangeStreamAction and initialized within ReadChangeStreamPartitionDoFn from DaoFactory, ensuring proper dependency injection and enabling conditional logic based on the change stream type.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@github-actions
Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

@chenxuesdu chenxuesdu force-pushed the boundedV2Query branch 3 times, most recently from 0c1258c to 76aa3cd Compare January 30, 2026 21:41
if (this.isMutableChangeStream) {
final Timestamp current = Timestamp.now();
Timestamp maxTimestamp =
Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + 30 * 60, current.getNanos());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we leave some margin for the end timestamp, e.g. 25min or 20min in the future, in case that the query is processed too fast and this future timestamp still too far in the future?

@jiangzzhu
Copy link
Contributor

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request correctly implements the logic to handle the end timestamp for mutable change stream bounded queries in Spanner, which limits the end timestamp to a maximum of 30 minutes in the future. The changes are well-implemented, and the new logic is propagated correctly through the different layers of the application. The added tests effectively verify the new behavior. My review includes a few suggestions to improve code readability and maintainability by replacing magic numbers with named constants.

Comment on lines +393 to +401
private Timestamp getBoundedQueryEndTimestamp(Timestamp endTimestamp) {
if (this.isMutableChangeStream) {
final Timestamp current = Timestamp.now();
Timestamp maxTimestamp =
Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + 30 * 60, current.getNanos());
return maxTimestamp.compareTo(endTimestamp) < 0 ? maxTimestamp : endTimestamp;
}
return endTimestamp;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For better readability and maintainability, it's good practice to extract magic numbers into constants. Also, using Comparators.min can make the intent of the code clearer.

You would need to add the following import:
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Comparators;

  private Timestamp getBoundedQueryEndTimestamp(Timestamp endTimestamp) {
    if (this.isMutableChangeStream) {
      final long maxSecondsInFuture = 30 * 60;
      final Timestamp current = Timestamp.now();
      final Timestamp maxTimestamp =
          Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + maxSecondsInFuture, current.getNanos());
      return Comparators.min(maxTimestamp, endTimestamp);
    }
    return endTimestamp;
  }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe define MAX_FUTURE_END_TIME_SCEONDS = 30 * 60 into constant value of the class, in case we need to tune it in the future.

Comment on lines +923 to +969
public void testQueryChangeStreamWithMutableChangeStreamCappedEndTimestamp() {
// Initialize action with isMutableChangeStream = true
action =
new QueryChangeStreamAction(
changeStreamDao,
partitionMetadataDao,
changeStreamRecordMapper,
partitionMetadataMapper,
dataChangeRecordAction,
heartbeatRecordAction,
childPartitionsRecordAction,
partitionStartRecordAction,
partitionEndRecordAction,
partitionEventRecordAction,
metrics,
true);

// Set endTimestamp to 60 minutes in the future
Timestamp now = Timestamp.now();
Timestamp endTimestamp =
Timestamp.ofTimeSecondsAndNanos(now.getSeconds() + 60 * 60, now.getNanos());

partition = partition.toBuilder().setEndTimestamp(endTimestamp).build();
when(restriction.getTo()).thenReturn(endTimestamp);
when(partitionMetadataMapper.from(any())).thenReturn(partition);

final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
final ArgumentCaptor<Timestamp> timestampCaptor = ArgumentCaptor.forClass(Timestamp.class);
when(changeStreamDao.changeStreamQuery(
eq(PARTITION_TOKEN), eq(PARTITION_START_TIMESTAMP),
timestampCaptor.capture(), eq(PARTITION_HEARTBEAT_MILLIS)))
.thenReturn(resultSet);
when(resultSet.next()).thenReturn(false); // Query finishes (reaches cap)
when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true);

final ProcessContinuation result =
action.run(
partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer);

// Verify query was capped at ~30 minutes
long diff = timestampCaptor.getValue().getSeconds() - now.getSeconds();
assertTrue("Query should be capped at approx 30 minutes (1800s)", Math.abs(diff - 1800) < 10);

// Crucial: Should RESUME to process the rest later
assertEquals(ProcessContinuation.resume(), result);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using named constants for magic numbers like 60 * 60 and 1800 improves code readability and maintainability.

  public void testQueryChangeStreamWithMutableChangeStreamCappedEndTimestamp() {
    final long sixtyMinutesInSeconds = 60 * 60;
    final long thirtyMinutesInSeconds = 30 * 60;

    // Initialize action with isMutableChangeStream = true
    action =
        new QueryChangeStreamAction(
            changeStreamDao,
            partitionMetadataDao,
            changeStreamRecordMapper,
            partitionMetadataMapper,
            dataChangeRecordAction,
            heartbeatRecordAction,
            childPartitionsRecordAction,
            partitionStartRecordAction,
            partitionEndRecordAction,
            partitionEventRecordAction,
            metrics,
            true);

    // Set endTimestamp to 60 minutes in the future
    Timestamp now = Timestamp.now();
    Timestamp endTimestamp =
        Timestamp.ofTimeSecondsAndNanos(now.getSeconds() + sixtyMinutesInSeconds, now.getNanos());

    partition = partition.toBuilder().setEndTimestamp(endTimestamp).build();
    when(restriction.getTo()).thenReturn(endTimestamp);
    when(partitionMetadataMapper.from(any())).thenReturn(partition);

    final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
    final ArgumentCaptor<Timestamp> timestampCaptor = ArgumentCaptor.forClass(Timestamp.class);
    when(changeStreamDao.changeStreamQuery(
            eq(PARTITION_TOKEN),
            eq(PARTITION_START_TIMESTAMP),
            timestampCaptor.capture(),
            eq(PARTITION_HEARTBEAT_MILLIS)))
        .thenReturn(resultSet);
    when(resultSet.next()).thenReturn(false); // Query finishes (reaches cap)
    when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
    when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true);

    final ProcessContinuation result =
        action.run(
            partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer);

    // Verify query was capped at ~30 minutes
    long diff = timestampCaptor.getValue().getSeconds() - now.getSeconds();
    assertTrue(
        "Query should be capped at approx 30 minutes (1800s)",
        Math.abs(diff - thirtyMinutesInSeconds) < 10);

    // Crucial: Should RESUME to process the rest later
    assertEquals(ProcessContinuation.resume(), result);
  }

Comment on lines +972 to +1012
public void testQueryChangeStreamWithMutableChangeStreamUncappedEndTimestamp() {
action =
new QueryChangeStreamAction(
changeStreamDao,
partitionMetadataDao,
changeStreamRecordMapper,
partitionMetadataMapper,
dataChangeRecordAction,
heartbeatRecordAction,
childPartitionsRecordAction,
partitionStartRecordAction,
partitionEndRecordAction,
partitionEventRecordAction,
metrics,
true);

// Set endTimestamp to only 10 minutes in the future
Timestamp now = Timestamp.now();
Timestamp endTimestamp =
Timestamp.ofTimeSecondsAndNanos(now.getSeconds() + 10 * 60, now.getNanos());

partition = partition.toBuilder().setEndTimestamp(endTimestamp).build();
when(restriction.getTo()).thenReturn(endTimestamp);
when(partitionMetadataMapper.from(any())).thenReturn(partition);

final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
when(changeStreamDao.changeStreamQuery(
eq(PARTITION_TOKEN), eq(PARTITION_START_TIMESTAMP),
eq(endTimestamp), eq(PARTITION_HEARTBEAT_MILLIS)))
.thenReturn(resultSet);
when(resultSet.next()).thenReturn(false);
when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
when(restrictionTracker.tryClaim(endTimestamp)).thenReturn(true);

final ProcessContinuation result =
action.run(
partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer);

// Should STOP because we reached the actual requested endTimestamp
assertEquals(ProcessContinuation.stop(), result);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using a named constant for the magic number 10 * 60 would improve readability.

  public void testQueryChangeStreamWithMutableChangeStreamUncappedEndTimestamp() {
    final long tenMinutesInSeconds = 10 * 60;
    action =
        new QueryChangeStreamAction(
            changeStreamDao,
            partitionMetadataDao,
            changeStreamRecordMapper,
            partitionMetadataMapper,
            dataChangeRecordAction,
            heartbeatRecordAction,
            childPartitionsRecordAction,
            partitionStartRecordAction,
            partitionEndRecordAction,
            partitionEventRecordAction,
            metrics,
            true);

    // Set endTimestamp to only 10 minutes in the future
    Timestamp now = Timestamp.now();
    Timestamp endTimestamp =
        Timestamp.ofTimeSecondsAndNanos(now.getSeconds() + tenMinutesInSeconds, now.getNanos());

    partition = partition.toBuilder().setEndTimestamp(endTimestamp).build();
    when(restriction.getTo()).thenReturn(endTimestamp);
    when(partitionMetadataMapper.from(any())).thenReturn(partition);

    final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
    when(changeStreamDao.changeStreamQuery(
            eq(PARTITION_TOKEN),
            eq(PARTITION_START_TIMESTAMP),
            eq(endTimestamp),
            eq(PARTITION_HEARTBEAT_MILLIS)))
        .thenReturn(resultSet);
    when(resultSet.next()).thenReturn(false);
    when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
    when(restrictionTracker.tryClaim(endTimestamp)).thenReturn(true);

    final ProcessContinuation result =
        action.run(
            partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer);

    // Should STOP because we reached the actual requested endTimestamp
    assertEquals(ProcessContinuation.stop(), result);
  }

// exceptions about being out of timestamp range).
boolean stopAfterQuerySucceeds = isBoundedRestriction;
boolean stopAfterQuerySucceeds =
isBoundedRestriction && changeStreamQueryEndTimestamp.equals(endTimestamp);
Copy link

@tianz101 tianz101 Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For v1, this changeStreamQueryEndTimestamp.equals(endTimestamp) is valid.
For v2, this changeStreamQueryEndTimestamp.equals(endTimestamp) may never happen. Is that intended? Let's walk through v2 scenario a little bit more.

private Timestamp getBoundedQueryEndTimestamp(Timestamp endTimestamp) {
if (this.isMutableChangeStream) {
final Timestamp current = Timestamp.now();
Timestamp maxTimestamp =
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For v1 we use endTs,
For v2 we should use getNextReadChangeStreamEndTimestamp() here so that for V2, bounded endTs or unbounded endTs have similar query patterns.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants