Skip to content

Commit 4be3b82

Browse files
committed
fix 'XREAD ... $' on a non-existing stream
1 parent 2f1aeb3 commit 4be3b82

File tree

2 files changed

+19
-1
lines changed

2 files changed

+19
-1
lines changed

cmd_stream.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -944,7 +944,12 @@ parsing:
944944
return
945945
} else if id == "$" {
946946
db := m.DB(getCtx(c).selectedDB)
947-
opts.ids[i] = db.streamKeys[opts.streams[i]].lastID()
947+
stream, ok := db.streamKeys[opts.streams[i]]
948+
if ok {
949+
opts.ids[i] = stream.lastID()
950+
} else {
951+
opts.ids[i] = "0-0"
952+
}
948953
}
949954
}
950955
args = nil

integration/stream_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,19 @@ func TestStream(t *testing.T) {
283283
wg.Wait()
284284
c.Do("XREAD", "BLOCK", "1000", "STREAMS", "pl", "$")
285285
})
286+
287+
// special '$' ID on non-existing stream
288+
testRaw2(t, func(c, c2 *client) {
289+
var wg sync.WaitGroup
290+
wg.Add(1)
291+
go func() {
292+
time.Sleep(10 * time.Millisecond)
293+
c2.Do("XADD", "pl", "60-1", "nosuch", "Mercury")
294+
wg.Done()
295+
}()
296+
wg.Wait()
297+
c.Do("XREAD", "BLOCK", "1000", "STREAMS", "nosuch", "$")
298+
})
286299
})
287300
}
288301

0 commit comments

Comments
 (0)