Skip to content

Commit 170cb2f

Browse files
authored
Fix Reader HasMessageAvailable behavior after seeking by timestamp (#344)
* Fix Reader HasMessageAvailable behavior after seeking by timestamp * Improve logic
1 parent 49adf1c commit 170cb2f

File tree

2 files changed

+55
-2
lines changed

2 files changed

+55
-2
lines changed

src/Pulsar.Client/Internal/ConsumerImpl.fs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1097,6 +1097,7 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
10971097
hasSoughtByTimestamp <- true
10981098
Commands.newSeekByTimestamp consumerId requestId timestamp, MessageId.Earliest
10991099
| SeekType.MessageId messageId ->
1100+
hasSoughtByTimestamp <- false
11001101
match messageId.ChunkMessageIds with
11011102
| Some chunkMessageIds when chunkMessageIds.Length >0 ->
11021103
Commands.newSeekByMsgId consumerId requestId chunkMessageIds[0], chunkMessageIds[0]
@@ -1145,8 +1146,9 @@ type internal ConsumerImpl<'T> (consumerConfig: ConsumerConfiguration<'T>, clien
11451146
let! lastMessageIdResult = getLastMessageIdAsync()
11461147
let lastMessageId = lastMessageIdResult.LastMessageId
11471148
// if the consumer is configured to read inclusive then we need to seek to the last message
1148-
do! postAndAsyncReply this.Mb (fun channel ->
1149-
SeekAsync (SeekType.MessageId lastMessageId, channel))
1149+
if consumerConfig.ResetIncludeHead && not hasSoughtByTimestamp then
1150+
do! postAndAsyncReply this.Mb (fun channel ->
1151+
SeekAsync (SeekType.MessageId lastMessageId, channel))
11501152
match lastMessageIdResult.MarkDeletePosition with
11511153
| Some markDeletePosition ->
11521154
if lastMessageId.EntryId < %0L then

tests/IntegrationTests/Reader.fs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,53 @@ let tests =
350350

351351
Log.Debug("Finished HasMessageAvailableAfterSeekTimestamp initializeLastMessageIdInBroker: {0}", initializeLastMessageIdInBroker)
352352
}
353+
354+
let checkReaderLoopWhileHasMessageAvailableAfterSeekTimestamp () =
355+
task {
356+
Log.Debug("Started Reader loop while HasMessageAvailable after SeekTimestamp")
357+
let client = getClient()
358+
let topicName = "public/default/test-reader-loop-has-message-available-after-seek-timestamp-" + Guid.NewGuid().ToString("N")
359+
360+
let! (producer : IProducer<string>) =
361+
client.NewProducer(Schema.STRING())
362+
.Topic(topicName)
363+
.EnableBatching(false)
364+
.CreateAsync()
365+
366+
let timestampBeforeSend = %(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds())
367+
let! _ = producer.SendAsync("m1")
368+
let! _ = producer.SendAsync("m2")
369+
let! _ = producer.SendAsync("m3")
370+
do! producer.DisposeAsync()
371+
372+
let! (reader : IReader<string>) =
373+
client.NewReader(Schema.STRING())
374+
.Topic(topicName)
375+
.ReceiverQueueSize(1)
376+
.StartMessageId(MessageId.Latest)
377+
.CreateAsync()
378+
379+
do! Task.Delay(1000)
380+
do! reader.SeekAsync(timestampBeforeSend)
381+
do! Task.Delay(1000)
382+
383+
use cts = new System.Threading.CancellationTokenSource(TimeSpan.FromSeconds(10.0))
384+
let received = ResizeArray<string>()
385+
386+
let mutable continueLooping = true
387+
while continueLooping do
388+
let! hasMessage = reader.HasMessageAvailableAsync()
389+
if hasMessage then
390+
let! msg = reader.ReadNextAsync(cts.Token)
391+
received.Add(msg.GetValue())
392+
else
393+
continueLooping <- false
394+
395+
Expect.sequenceEqual "" [ "m1"; "m2"; "m3" ] received
396+
do! reader.DisposeAsync()
397+
398+
Log.Debug("Finished Reader loop while HasMessageAvailable after SeekTimestamp")
399+
}
353400

354401
testList "Reader" [
355402

@@ -408,4 +455,8 @@ let tests =
408455
testTask "HasMessageAvailable after SeekTimestamp with initializeLastMessageIdInBroker" {
409456
do! checkHasMessageAvailableAfterSeekTimestamp true
410457
}
458+
459+
testTask "Check HasMessageAvailable works after SeekTimestamp" {
460+
do! checkReaderLoopWhileHasMessageAvailableAfterSeekTimestamp()
461+
}
411462
]

0 commit comments

Comments
 (0)