Skip to content

Commit 90c554a

Browse files
authored
Merge pull request #224 from alicebob/dollar
allow '$' in XREAD
2 parents a31f12a + 6798993 commit 90c554a

File tree

2 files changed

+17
-1
lines changed

2 files changed

+17
-1
lines changed

cmd_stream.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -646,7 +646,7 @@ parsing:
646646

647647
opts.streams, opts.ids = args[0:len(args)/2], args[len(args)/2:]
648648
for _, id := range opts.ids {
649-
if _, err := parseStreamID(id); err != nil {
649+
if _, err := parseStreamID(id); id != `$` && err != nil {
650650
setDirty(c)
651651
c.WriteError(msgInvalidStreamID)
652652
return
@@ -718,6 +718,9 @@ func xread(db *RedisDB, streams []string, ids []string, count int) map[string][]
718718
if len(returnedEntries) == entryCount {
719719
break
720720
}
721+
if id == "$" {
722+
id = s.lastID()
723+
}
721724
if streamCmp(entry.ID, id) <= 0 {
722725
continue
723726
}

integration/stream_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,19 @@ func TestStream(t *testing.T) {
244244
c.Error("not an int", "XREAD", "BLOCK", "foo", "STREAMS", "pl", "0")
245245
c.Error("negative", "XREAD", "BLOCK", "-12", "STREAMS", "pl", "0")
246246
})
247+
248+
// special '$' ID
249+
testRaw2(t, func(c, c2 *client) {
250+
var wg sync.WaitGroup
251+
wg.Add(1)
252+
go func() {
253+
time.Sleep(10 * time.Millisecond)
254+
c2.Do("XADD", "pl", "60-1", "name", "Mercury")
255+
wg.Done()
256+
}()
257+
wg.Wait()
258+
c.Do("XREAD", "BLOCK", "1000", "STREAMS", "pl", "$")
259+
})
247260
})
248261
}
249262

0 commit comments

Comments
 (0)