-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Add logic to handle end timestamp of mutable change stream bounded query #37459
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Summary of ChangesHello @chenxuesdu, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request addresses a specific constraint in Google Cloud Spanner regarding mutable change stream bounded queries, where the end timestamp cannot exceed 30 minutes into the future. The changes introduce a mechanism to detect if a change stream is mutable and, if so, automatically adjust the provided end timestamp to comply with this limitation. This ensures that queries remain valid and prevents potential errors when users specify end timestamps beyond the allowed future window. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
|
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
0c1258c to
76aa3cd
Compare
76aa3cd to
56ab9e1
Compare
| if (this.isMutableChangeStream) { | ||
| final Timestamp current = Timestamp.now(); | ||
| Timestamp maxTimestamp = | ||
| Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + 30 * 60, current.getNanos()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we leave some margin for the end timestamp, e.g. 25min or 20min in the future, in case that the query is processed too fast and this future timestamp still too far in the future?
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request correctly implements the logic to handle the end timestamp for mutable change stream bounded queries in Spanner, which limits the end timestamp to a maximum of 30 minutes in the future. The changes are well-implemented, and the new logic is propagated correctly through the different layers of the application. The added tests effectively verify the new behavior. My review includes a few suggestions to improve code readability and maintainability by replacing magic numbers with named constants.
| private Timestamp getBoundedQueryEndTimestamp(Timestamp endTimestamp) { | ||
| if (this.isMutableChangeStream) { | ||
| final Timestamp current = Timestamp.now(); | ||
| Timestamp maxTimestamp = | ||
| Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + 30 * 60, current.getNanos()); | ||
| return maxTimestamp.compareTo(endTimestamp) < 0 ? maxTimestamp : endTimestamp; | ||
| } | ||
| return endTimestamp; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For better readability and maintainability, it's good practice to extract magic numbers into constants. Also, using Comparators.min can make the intent of the code clearer.
You would need to add the following import:
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Comparators;
private Timestamp getBoundedQueryEndTimestamp(Timestamp endTimestamp) {
if (this.isMutableChangeStream) {
final long maxSecondsInFuture = 30 * 60;
final Timestamp current = Timestamp.now();
final Timestamp maxTimestamp =
Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + maxSecondsInFuture, current.getNanos());
return Comparators.min(maxTimestamp, endTimestamp);
}
return endTimestamp;
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe define MAX_FUTURE_END_TIME_SCEONDS = 30 * 60 into constant value of the class, in case we need to tune it in the future.
| public void testQueryChangeStreamWithMutableChangeStreamCappedEndTimestamp() { | ||
| // Initialize action with isMutableChangeStream = true | ||
| action = | ||
| new QueryChangeStreamAction( | ||
| changeStreamDao, | ||
| partitionMetadataDao, | ||
| changeStreamRecordMapper, | ||
| partitionMetadataMapper, | ||
| dataChangeRecordAction, | ||
| heartbeatRecordAction, | ||
| childPartitionsRecordAction, | ||
| partitionStartRecordAction, | ||
| partitionEndRecordAction, | ||
| partitionEventRecordAction, | ||
| metrics, | ||
| true); | ||
|
|
||
| // Set endTimestamp to 60 minutes in the future | ||
| Timestamp now = Timestamp.now(); | ||
| Timestamp endTimestamp = | ||
| Timestamp.ofTimeSecondsAndNanos(now.getSeconds() + 60 * 60, now.getNanos()); | ||
|
|
||
| partition = partition.toBuilder().setEndTimestamp(endTimestamp).build(); | ||
| when(restriction.getTo()).thenReturn(endTimestamp); | ||
| when(partitionMetadataMapper.from(any())).thenReturn(partition); | ||
|
|
||
| final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class); | ||
| final ArgumentCaptor<Timestamp> timestampCaptor = ArgumentCaptor.forClass(Timestamp.class); | ||
| when(changeStreamDao.changeStreamQuery( | ||
| eq(PARTITION_TOKEN), eq(PARTITION_START_TIMESTAMP), | ||
| timestampCaptor.capture(), eq(PARTITION_HEARTBEAT_MILLIS))) | ||
| .thenReturn(resultSet); | ||
| when(resultSet.next()).thenReturn(false); // Query finishes (reaches cap) | ||
| when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK); | ||
| when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true); | ||
|
|
||
| final ProcessContinuation result = | ||
| action.run( | ||
| partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer); | ||
|
|
||
| // Verify query was capped at ~30 minutes | ||
| long diff = timestampCaptor.getValue().getSeconds() - now.getSeconds(); | ||
| assertTrue("Query should be capped at approx 30 minutes (1800s)", Math.abs(diff - 1800) < 10); | ||
|
|
||
| // Crucial: Should RESUME to process the rest later | ||
| assertEquals(ProcessContinuation.resume(), result); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using named constants for magic numbers like 60 * 60 and 1800 improves code readability and maintainability.
public void testQueryChangeStreamWithMutableChangeStreamCappedEndTimestamp() {
final long sixtyMinutesInSeconds = 60 * 60;
final long thirtyMinutesInSeconds = 30 * 60;
// Initialize action with isMutableChangeStream = true
action =
new QueryChangeStreamAction(
changeStreamDao,
partitionMetadataDao,
changeStreamRecordMapper,
partitionMetadataMapper,
dataChangeRecordAction,
heartbeatRecordAction,
childPartitionsRecordAction,
partitionStartRecordAction,
partitionEndRecordAction,
partitionEventRecordAction,
metrics,
true);
// Set endTimestamp to 60 minutes in the future
Timestamp now = Timestamp.now();
Timestamp endTimestamp =
Timestamp.ofTimeSecondsAndNanos(now.getSeconds() + sixtyMinutesInSeconds, now.getNanos());
partition = partition.toBuilder().setEndTimestamp(endTimestamp).build();
when(restriction.getTo()).thenReturn(endTimestamp);
when(partitionMetadataMapper.from(any())).thenReturn(partition);
final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
final ArgumentCaptor<Timestamp> timestampCaptor = ArgumentCaptor.forClass(Timestamp.class);
when(changeStreamDao.changeStreamQuery(
eq(PARTITION_TOKEN),
eq(PARTITION_START_TIMESTAMP),
timestampCaptor.capture(),
eq(PARTITION_HEARTBEAT_MILLIS)))
.thenReturn(resultSet);
when(resultSet.next()).thenReturn(false); // Query finishes (reaches cap)
when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true);
final ProcessContinuation result =
action.run(
partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer);
// Verify query was capped at ~30 minutes
long diff = timestampCaptor.getValue().getSeconds() - now.getSeconds();
assertTrue(
"Query should be capped at approx 30 minutes (1800s)",
Math.abs(diff - thirtyMinutesInSeconds) < 10);
// Crucial: Should RESUME to process the rest later
assertEquals(ProcessContinuation.resume(), result);
}| public void testQueryChangeStreamWithMutableChangeStreamUncappedEndTimestamp() { | ||
| action = | ||
| new QueryChangeStreamAction( | ||
| changeStreamDao, | ||
| partitionMetadataDao, | ||
| changeStreamRecordMapper, | ||
| partitionMetadataMapper, | ||
| dataChangeRecordAction, | ||
| heartbeatRecordAction, | ||
| childPartitionsRecordAction, | ||
| partitionStartRecordAction, | ||
| partitionEndRecordAction, | ||
| partitionEventRecordAction, | ||
| metrics, | ||
| true); | ||
|
|
||
| // Set endTimestamp to only 10 minutes in the future | ||
| Timestamp now = Timestamp.now(); | ||
| Timestamp endTimestamp = | ||
| Timestamp.ofTimeSecondsAndNanos(now.getSeconds() + 10 * 60, now.getNanos()); | ||
|
|
||
| partition = partition.toBuilder().setEndTimestamp(endTimestamp).build(); | ||
| when(restriction.getTo()).thenReturn(endTimestamp); | ||
| when(partitionMetadataMapper.from(any())).thenReturn(partition); | ||
|
|
||
| final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class); | ||
| when(changeStreamDao.changeStreamQuery( | ||
| eq(PARTITION_TOKEN), eq(PARTITION_START_TIMESTAMP), | ||
| eq(endTimestamp), eq(PARTITION_HEARTBEAT_MILLIS))) | ||
| .thenReturn(resultSet); | ||
| when(resultSet.next()).thenReturn(false); | ||
| when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK); | ||
| when(restrictionTracker.tryClaim(endTimestamp)).thenReturn(true); | ||
|
|
||
| final ProcessContinuation result = | ||
| action.run( | ||
| partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer); | ||
|
|
||
| // Should STOP because we reached the actual requested endTimestamp | ||
| assertEquals(ProcessContinuation.stop(), result); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using a named constant for the magic number 10 * 60 would improve readability.
public void testQueryChangeStreamWithMutableChangeStreamUncappedEndTimestamp() {
final long tenMinutesInSeconds = 10 * 60;
action =
new QueryChangeStreamAction(
changeStreamDao,
partitionMetadataDao,
changeStreamRecordMapper,
partitionMetadataMapper,
dataChangeRecordAction,
heartbeatRecordAction,
childPartitionsRecordAction,
partitionStartRecordAction,
partitionEndRecordAction,
partitionEventRecordAction,
metrics,
true);
// Set endTimestamp to only 10 minutes in the future
Timestamp now = Timestamp.now();
Timestamp endTimestamp =
Timestamp.ofTimeSecondsAndNanos(now.getSeconds() + tenMinutesInSeconds, now.getNanos());
partition = partition.toBuilder().setEndTimestamp(endTimestamp).build();
when(restriction.getTo()).thenReturn(endTimestamp);
when(partitionMetadataMapper.from(any())).thenReturn(partition);
final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
when(changeStreamDao.changeStreamQuery(
eq(PARTITION_TOKEN),
eq(PARTITION_START_TIMESTAMP),
eq(endTimestamp),
eq(PARTITION_HEARTBEAT_MILLIS)))
.thenReturn(resultSet);
when(resultSet.next()).thenReturn(false);
when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
when(restrictionTracker.tryClaim(endTimestamp)).thenReturn(true);
final ProcessContinuation result =
action.run(
partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer);
// Should STOP because we reached the actual requested endTimestamp
assertEquals(ProcessContinuation.stop(), result);
}| // exceptions about being out of timestamp range). | ||
| boolean stopAfterQuerySucceeds = isBoundedRestriction; | ||
| boolean stopAfterQuerySucceeds = | ||
| isBoundedRestriction && changeStreamQueryEndTimestamp.equals(endTimestamp); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For v1, this changeStreamQueryEndTimestamp.equals(endTimestamp) is valid.
For v2, this changeStreamQueryEndTimestamp.equals(endTimestamp) may never happen. Is that intended? Let's walk through v2 scenario a little bit more.
| private Timestamp getBoundedQueryEndTimestamp(Timestamp endTimestamp) { | ||
| if (this.isMutableChangeStream) { | ||
| final Timestamp current = Timestamp.now(); | ||
| Timestamp maxTimestamp = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For v1 we use endTs,
For v2 we should use getNextReadChangeStreamEndTimestamp() here so that for V2, bounded endTs or unbounded endTs have similar query patterns.
For mutable change stream bounded queries, Spanner only allow the max end timestamp to be 30 minutes in the future. So when user provided a end timestamp to read, we choose the min of max allow timestamp and the provided timestamp.