Skip to content

Commit dfe3831

Browse files
authored
support for "<timestamp>-*" (#426)
* support for "<timestamp>-*"
1 parent 72a6b15 commit dfe3831

File tree

4 files changed

+33
-12
lines changed

4 files changed

+33
-12
lines changed

cmd_stream_test.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,13 @@ func TestStream(t *testing.T) {
5656
ok(t, err)
5757
equals(t, "978321845004-0", id)
5858

59+
id, err = s.XAdd("s1", "978321845004-*", []string{"name", "bazz"})
60+
ok(t, err)
61+
equals(t, "978321845004-1", id)
62+
5963
stream, err := s.Stream("s1")
6064
ok(t, err)
61-
equals(t, 2, len(stream))
65+
equals(t, 3, len(stream))
6266
equals(t, StreamEntry{
6367
ID: "12345-67",
6468
Values: []string{"name", "bar"},
@@ -67,6 +71,10 @@ func TestStream(t *testing.T) {
6771
ID: "978321845004-0",
6872
Values: []string{"name", "baz"},
6973
}, stream[1])
74+
equals(t, StreamEntry{
75+
ID: "978321845004-1",
76+
Values: []string{"name", "bazz"},
77+
}, stream[2])
7078
})
7179

7280
useRESP3(t, c)

integration/ephemeral.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -89,12 +89,13 @@ func runRedis(extraConfig string) (*ephemeral, string) {
8989
timeout := time.Now().Add(1 * time.Second)
9090
for time.Now().Before(timeout) {
9191
conn, err := net.Dial("tcp", addr)
92-
if err == nil {
93-
conn.Close()
94-
e := ephemeral(*c)
95-
return &e, addr
92+
if err != nil {
93+
time.Sleep(1 * time.Millisecond)
94+
continue
9695
}
97-
time.Sleep(3 * time.Millisecond)
96+
conn.Close()
97+
e := ephemeral(*c)
98+
return &e, addr
9899
}
99100
panic(fmt.Sprintf("No connection on port %d", port))
100101
}

integration/stream_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@ func TestStream(t *testing.T) {
2525
"18446744073709551000-0",
2626
"name", "Earth",
2727
)
28+
c.Do("XADD",
29+
"planets",
30+
"18446744073709551000-*",
31+
"name", "Pluto",
32+
)
2833
c.Do("XADD",
2934
"reallynosuchkey",
3035
"NOMKSTREAM",

stream.go

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,7 @@ func newStreamKey() *streamKey {
5757
}
5858

5959
// generateID doesn't lock the mutex
60-
func (s *streamKey) generateID(now time.Time) string {
61-
ts := uint64(now.UnixNano()) / 1_000_000
62-
60+
func (s *streamKey) generateID(ts uint64) string {
6361
next := fmt.Sprintf("%d-%d", ts, 0)
6462
if s.lastAllocatedID != "" && streamCmp(s.lastAllocatedID, next) >= 0 {
6563
last, _ := parseStreamID(s.lastAllocatedID)
@@ -230,14 +228,23 @@ func (s *streamKey) createGroup(group, id string) error {
230228
}
231229

232230
// streamAdd adds an entry to a stream. Returns the new entry ID.
233-
// If id is empty or "*" the ID will be generated automatically.
231+
// If id is empty, "*", or "123-*", the ID will be generated automatically.
234232
// `values` should have an even length.
235233
func (s *streamKey) add(entryID string, values []string, now time.Time) (string, error) {
236234
s.mu.Lock()
237235
defer s.mu.Unlock()
238236

239-
if entryID == "" || entryID == "*" {
240-
entryID = s.generateID(now)
237+
switch {
238+
case entryID == "" || entryID == "*":
239+
entryID = s.generateID(uint64(now.UnixMilli()))
240+
default:
241+
// "<timestamp>-*"
242+
parts := strings.Split(entryID, "-")
243+
if len(parts) == 2 && parts[1] == "*" {
244+
if ts, err := strconv.ParseUint(parts[0], 10, 64); err == nil {
245+
entryID = s.generateID(uint64(ts))
246+
}
247+
}
241248
}
242249

243250
entryID, err := formatStreamID(entryID)

0 commit comments

Comments
 (0)