Skip to content

Commit 56ab9e1

Browse files
committed
Add logic to handle end timestamp of mutable change stream bounded query.
1 parent 928ad7e commit 56ab9e1

File tree

6 files changed

+154
-8
lines changed

6 files changed

+154
-8
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/ActionFactory.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,8 @@ public synchronized QueryChangeStreamAction queryChangeStreamAction(
187187
PartitionStartRecordAction partitionStartRecordAction,
188188
PartitionEndRecordAction partitionEndRecordAction,
189189
PartitionEventRecordAction partitionEventRecordAction,
190-
ChangeStreamMetrics metrics) {
190+
ChangeStreamMetrics metrics,
191+
boolean isMutableChangeStream) {
191192
if (queryChangeStreamActionInstance == null) {
192193
queryChangeStreamActionInstance =
193194
new QueryChangeStreamAction(
@@ -201,7 +202,8 @@ public synchronized QueryChangeStreamAction queryChangeStreamAction(
201202
partitionStartRecordAction,
202203
partitionEndRecordAction,
203204
partitionEventRecordAction,
204-
metrics);
205+
metrics,
206+
isMutableChangeStream);
205207
}
206208
return queryChangeStreamActionInstance;
207209
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ public class QueryChangeStreamAction {
8989
private final PartitionEndRecordAction partitionEndRecordAction;
9090
private final PartitionEventRecordAction partitionEventRecordAction;
9191
private final ChangeStreamMetrics metrics;
92+
private final boolean isMutableChangeStream;
9293

9394
/**
9495
* Constructs an action class for performing a change stream query for a given partition.
@@ -106,6 +107,7 @@ public class QueryChangeStreamAction {
106107
* @param PartitionEndRecordAction action class to process {@link PartitionEndRecord}s
107108
* @param PartitionEventRecordAction action class to process {@link PartitionEventRecord}s
108109
* @param metrics metrics gathering class
110+
* @param isMutableChangeStream whether the change stream is mutable or not
109111
*/
110112
QueryChangeStreamAction(
111113
ChangeStreamDao changeStreamDao,
@@ -118,7 +120,8 @@ public class QueryChangeStreamAction {
118120
PartitionStartRecordAction partitionStartRecordAction,
119121
PartitionEndRecordAction partitionEndRecordAction,
120122
PartitionEventRecordAction partitionEventRecordAction,
121-
ChangeStreamMetrics metrics) {
123+
ChangeStreamMetrics metrics,
124+
boolean isMutableChangeStream) {
122125
this.changeStreamDao = changeStreamDao;
123126
this.partitionMetadataDao = partitionMetadataDao;
124127
this.changeStreamRecordMapper = changeStreamRecordMapper;
@@ -130,6 +133,7 @@ public class QueryChangeStreamAction {
130133
this.partitionEndRecordAction = partitionEndRecordAction;
131134
this.partitionEventRecordAction = partitionEventRecordAction;
132135
this.metrics = metrics;
136+
this.isMutableChangeStream = isMutableChangeStream;
133137
}
134138

135139
/**
@@ -195,13 +199,17 @@ public ProcessContinuation run(
195199
final Timestamp endTimestamp = partition.getEndTimestamp();
196200
final boolean isBoundedRestriction = !endTimestamp.equals(MAX_INCLUSIVE_END_AT);
197201
final Timestamp changeStreamQueryEndTimestamp =
198-
isBoundedRestriction ? endTimestamp : getNextReadChangeStreamEndTimestamp();
202+
isBoundedRestriction
203+
? getBoundedQueryEndTimestamp(endTimestamp)
204+
: getNextReadChangeStreamEndTimestamp();
199205

200206
// Once the changeStreamQuery completes we may need to resume reading from the partition if we
201207
// had an unbounded restriction for which we set an arbitrary query end timestamp and for which
202208
// we didn't encounter any indications that the partition is done (explicit end records or
203209
// exceptions about being out of timestamp range).
204-
boolean stopAfterQuerySucceeds = isBoundedRestriction;
210+
boolean stopAfterQuerySucceeds =
211+
isBoundedRestriction && changeStreamQueryEndTimestamp.equals(endTimestamp);
212+
205213
try (ChangeStreamResultSet resultSet =
206214
changeStreamDao.changeStreamQuery(
207215
token, startTimestamp, changeStreamQueryEndTimestamp, partition.getHeartbeatMillis())) {
@@ -379,4 +387,16 @@ private Timestamp getNextReadChangeStreamEndTimestamp() {
379387
final Timestamp current = Timestamp.now();
380388
return Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + 2 * 60, current.getNanos());
381389
}
390+
391+
// For Mutable Change Stream, Spanner only allow the max query end timestamp to be 30 minutes in
392+
// the future.
393+
private Timestamp getBoundedQueryEndTimestamp(Timestamp endTimestamp) {
394+
if (this.isMutableChangeStream) {
395+
final Timestamp current = Timestamp.now();
396+
Timestamp maxTimestamp =
397+
Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + 30 * 60, current.getNanos());
398+
return maxTimestamp.compareTo(endTimestamp) < 0 ? maxTimestamp : endTimestamp;
399+
}
400+
return endTimestamp;
401+
}
382402
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,4 +151,8 @@ public synchronized ChangeStreamDao getChangeStreamDao() {
151151
}
152152
return changeStreamDaoInstance;
153153
}
154+
155+
public boolean isMutableChangeStream() {
156+
return this.isMutableChangeStream;
157+
}
154158
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFn.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ public class ReadChangeStreamPartitionDoFn extends DoFn<PartitionMetadata, DataC
7373
private final MapperFactory mapperFactory;
7474
private final ActionFactory actionFactory;
7575
private final ChangeStreamMetrics metrics;
76+
private final boolean isMutableChangeStream;
7677
/**
7778
* Needs to be set through the {@link
7879
* ReadChangeStreamPartitionDoFn#setThroughputEstimator(BytesThroughputEstimator)} call.
@@ -104,6 +105,7 @@ public ReadChangeStreamPartitionDoFn(
104105
this.mapperFactory = mapperFactory;
105106
this.actionFactory = actionFactory;
106107
this.metrics = metrics;
108+
this.isMutableChangeStream = daoFactory.isMutableChangeStream();
107109
this.throughputEstimator = new NullThroughputEstimator<>();
108110
}
109111

@@ -215,7 +217,8 @@ public void setup() {
215217
partitionStartRecordAction,
216218
partitionEndRecordAction,
217219
partitionEventRecordAction,
218-
metrics);
220+
metrics,
221+
isMutableChangeStream);
219222
}
220223

221224
/**

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamActionTest.java

Lines changed: 116 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@
2121
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State.SCHEDULED;
2222
import static org.junit.Assert.assertEquals;
2323
import static org.junit.Assert.assertNotEquals;
24+
import static org.junit.Assert.assertTrue;
2425
import static org.mockito.ArgumentMatchers.any;
26+
import static org.mockito.ArgumentMatchers.anyLong;
2527
import static org.mockito.ArgumentMatchers.eq;
2628
import static org.mockito.Mockito.mock;
2729
import static org.mockito.Mockito.never;
@@ -116,7 +118,8 @@ public void setUp() throws Exception {
116118
partitionStartRecordAction,
117119
partitionEndRecordAction,
118120
partitionEventRecordAction,
119-
metrics);
121+
metrics,
122+
false);
120123
final Struct row = mock(Struct.class);
121124
partition =
122125
PartitionMetadata.newBuilder()
@@ -916,6 +919,118 @@ public void testQueryChangeStreamWithChildPartitionsRecordUnboundedRestriction()
916919
verify(partitionMetadataDao, never()).updateWatermark(any(), any());
917920
}
918921

922+
@Test
923+
public void testQueryChangeStreamWithMutableChangeStreamCappedEndTimestamp() {
924+
// Initialize action with isMutableChangeStream = true
925+
action =
926+
new QueryChangeStreamAction(
927+
changeStreamDao,
928+
partitionMetadataDao,
929+
changeStreamRecordMapper,
930+
partitionMetadataMapper,
931+
dataChangeRecordAction,
932+
heartbeatRecordAction,
933+
childPartitionsRecordAction,
934+
partitionStartRecordAction,
935+
partitionEndRecordAction,
936+
partitionEventRecordAction,
937+
metrics,
938+
true);
939+
940+
// Set endTimestamp to 60 minutes in the future
941+
Timestamp now = Timestamp.now();
942+
Timestamp endTimestamp =
943+
Timestamp.ofTimeSecondsAndNanos(now.getSeconds() + 60 * 60, now.getNanos());
944+
945+
partition = partition.toBuilder().setEndTimestamp(endTimestamp).build();
946+
when(restriction.getTo()).thenReturn(endTimestamp);
947+
when(partitionMetadataMapper.from(any())).thenReturn(partition);
948+
949+
final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
950+
final ArgumentCaptor<Timestamp> timestampCaptor = ArgumentCaptor.forClass(Timestamp.class);
951+
when(changeStreamDao.changeStreamQuery(
952+
eq(PARTITION_TOKEN), eq(PARTITION_START_TIMESTAMP),
953+
timestampCaptor.capture(), eq(PARTITION_HEARTBEAT_MILLIS)))
954+
.thenReturn(resultSet);
955+
when(resultSet.next()).thenReturn(false); // Query finishes (reaches cap)
956+
when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
957+
when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true);
958+
959+
final ProcessContinuation result =
960+
action.run(
961+
partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer);
962+
963+
// Verify query was capped at ~30 minutes
964+
long diff = timestampCaptor.getValue().getSeconds() - now.getSeconds();
965+
assertTrue("Query should be capped at approx 30 minutes (1800s)", Math.abs(diff - 1800) < 10);
966+
967+
// Crucial: Should RESUME to process the rest later
968+
assertEquals(ProcessContinuation.resume(), result);
969+
}
970+
971+
@Test
972+
public void testQueryChangeStreamWithMutableChangeStreamUncappedEndTimestamp() {
973+
action =
974+
new QueryChangeStreamAction(
975+
changeStreamDao,
976+
partitionMetadataDao,
977+
changeStreamRecordMapper,
978+
partitionMetadataMapper,
979+
dataChangeRecordAction,
980+
heartbeatRecordAction,
981+
childPartitionsRecordAction,
982+
partitionStartRecordAction,
983+
partitionEndRecordAction,
984+
partitionEventRecordAction,
985+
metrics,
986+
true);
987+
988+
// Set endTimestamp to only 10 minutes in the future
989+
Timestamp now = Timestamp.now();
990+
Timestamp endTimestamp =
991+
Timestamp.ofTimeSecondsAndNanos(now.getSeconds() + 10 * 60, now.getNanos());
992+
993+
partition = partition.toBuilder().setEndTimestamp(endTimestamp).build();
994+
when(restriction.getTo()).thenReturn(endTimestamp);
995+
when(partitionMetadataMapper.from(any())).thenReturn(partition);
996+
997+
final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
998+
when(changeStreamDao.changeStreamQuery(
999+
eq(PARTITION_TOKEN), eq(PARTITION_START_TIMESTAMP),
1000+
eq(endTimestamp), eq(PARTITION_HEARTBEAT_MILLIS)))
1001+
.thenReturn(resultSet);
1002+
when(resultSet.next()).thenReturn(false);
1003+
when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
1004+
when(restrictionTracker.tryClaim(endTimestamp)).thenReturn(true);
1005+
1006+
final ProcessContinuation result =
1007+
action.run(
1008+
partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer);
1009+
1010+
// Should STOP because we reached the actual requested endTimestamp
1011+
assertEquals(ProcessContinuation.stop(), result);
1012+
}
1013+
1014+
@Test
1015+
public void testQueryChangeStreamUnboundedResumesCorrectly() {
1016+
// Unbounded restriction (streaming forever)
1017+
setupUnboundedPartition();
1018+
1019+
final ChangeStreamResultSet resultSet = mock(ChangeStreamResultSet.class);
1020+
when(changeStreamDao.changeStreamQuery(any(), any(), any(), anyLong())).thenReturn(resultSet);
1021+
when(resultSet.next()).thenReturn(false);
1022+
when(watermarkEstimator.currentWatermark()).thenReturn(WATERMARK);
1023+
when(restrictionTracker.tryClaim(any(Timestamp.class))).thenReturn(true);
1024+
1025+
final ProcessContinuation result =
1026+
action.run(
1027+
partition, restrictionTracker, outputReceiver, watermarkEstimator, bundleFinalizer);
1028+
1029+
// Should return RESUME to continue reading the stream every 2 minutes
1030+
assertEquals(ProcessContinuation.resume(), result);
1031+
verify(metrics).incQueryCounter();
1032+
}
1033+
9191034
private static class BundleFinalizerStub implements BundleFinalizer {
9201035
@Override
9211036
public void afterBundleCommit(Instant callbackExpiry, Callback callback) {

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/ReadChangeStreamPartitionDoFnTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State.SCHEDULED;
2121
import static org.junit.Assert.assertEquals;
2222
import static org.mockito.ArgumentMatchers.any;
23+
import static org.mockito.ArgumentMatchers.anyBoolean;
2324
import static org.mockito.Mockito.mock;
2425
import static org.mockito.Mockito.never;
2526
import static org.mockito.Mockito.verify;
@@ -149,7 +150,8 @@ public void setUp() {
149150
partitionStartRecordAction,
150151
partitionEndRecordAction,
151152
partitionEventRecordAction,
152-
metrics))
153+
metrics,
154+
anyBoolean()))
153155
.thenReturn(queryChangeStreamAction);
154156

155157
doFn.setup();

0 commit comments

Comments
 (0)