From f580efb3a57a91957b44c3602d8169c9004e1c22 Mon Sep 17 00:00:00 2001 From: Harmen Date: Wed, 31 Dec 2025 11:11:37 +0100 Subject: [PATCH 1/4] support for "-*" --- cmd_stream_test.go | 10 +++++++++- integration/stream_test.go | 5 +++++ stream.go | 13 +++++++++++-- 3 files changed, 25 insertions(+), 3 deletions(-) diff --git a/cmd_stream_test.go b/cmd_stream_test.go index 3bf57f60..ee9fa69d 100644 --- a/cmd_stream_test.go +++ b/cmd_stream_test.go @@ -56,9 +56,13 @@ func TestStream(t *testing.T) { ok(t, err) equals(t, "978321845004-0", id) + id, err = s.XAdd("s1", "978321845004-*", []string{"name", "bazz"}) + ok(t, err) + equals(t, "978321845004-1", id) + stream, err := s.Stream("s1") ok(t, err) - equals(t, 2, len(stream)) + equals(t, 3, len(stream)) equals(t, StreamEntry{ ID: "12345-67", Values: []string{"name", "bar"}, @@ -67,6 +71,10 @@ func TestStream(t *testing.T) { ID: "978321845004-0", Values: []string{"name", "baz"}, }, stream[1]) + equals(t, StreamEntry{ + ID: "978321845004-1", + Values: []string{"name", "bazz"}, + }, stream[2]) }) useRESP3(t, c) diff --git a/integration/stream_test.go b/integration/stream_test.go index 5d2d39ac..2049a57a 100644 --- a/integration/stream_test.go +++ b/integration/stream_test.go @@ -25,6 +25,11 @@ func TestStream(t *testing.T) { "18446744073709551000-0", "name", "Earth", ) + c.DoLoosely("XADD", + "planets", + "18446744073709551000-*", + "name", "Pluto", + ) c.Do("XADD", "reallynosuchkey", "NOMKSTREAM", diff --git a/stream.go b/stream.go index f2dd466d..7b59edc6 100644 --- a/stream.go +++ b/stream.go @@ -230,14 +230,23 @@ func (s *streamKey) createGroup(group, id string) error { } // streamAdd adds an entry to a stream. Returns the new entry ID. -// If id is empty or "*" the ID will be generated automatically. +// If id is empty, "*", or "123-*", the ID will be generated automatically. // `values` should have an even length. func (s *streamKey) add(entryID string, values []string, now time.Time) (string, error) { s.mu.Lock() defer s.mu.Unlock() - if entryID == "" || entryID == "*" { + switch { + case entryID == "" || entryID == "*": entryID = s.generateID(now) + default: + // "-*" + parts := strings.Split(entryID, "-") + if len(parts) == 2 && parts[1] == "*" { + if ts, err := strconv.Atoi(parts[0]); err == nil { + entryID = s.generateID(time.Unix(int64(ts), 0)) + } + } } entryID, err := formatStreamID(entryID) From 8e2543d215a3555bda5cbf42227a5b91da6c9d08 Mon Sep 17 00:00:00 2001 From: Harmen Date: Wed, 31 Dec 2025 11:28:07 +0100 Subject: [PATCH 2/4] nicer loop --- integration/ephemeral.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/integration/ephemeral.go b/integration/ephemeral.go index b4d16232..647d1709 100644 --- a/integration/ephemeral.go +++ b/integration/ephemeral.go @@ -89,12 +89,13 @@ func runRedis(extraConfig string) (*ephemeral, string) { timeout := time.Now().Add(1 * time.Second) for time.Now().Before(timeout) { conn, err := net.Dial("tcp", addr) - if err == nil { - conn.Close() - e := ephemeral(*c) - return &e, addr + if err != nil { + time.Sleep(1 * time.Millisecond) + continue } - time.Sleep(3 * time.Millisecond) + conn.Close() + e := ephemeral(*c) + return &e, addr } panic(fmt.Sprintf("No connection on port %d", port)) } From 9518cac2888d4df27489a286939504ea19f97d5b Mon Sep 17 00:00:00 2001 From: Harmen Date: Wed, 31 Dec 2025 11:32:11 +0100 Subject: [PATCH 3/4] fix units --- stream.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/stream.go b/stream.go index 7b59edc6..b169d627 100644 --- a/stream.go +++ b/stream.go @@ -58,7 +58,7 @@ func newStreamKey() *streamKey { // generateID doesn't lock the mutex func (s *streamKey) generateID(now time.Time) string { - ts := uint64(now.UnixNano()) / 1_000_000 + ts := uint64(now.UnixMilli()) next := fmt.Sprintf("%d-%d", ts, 0) if s.lastAllocatedID != "" && streamCmp(s.lastAllocatedID, next) >= 0 { @@ -244,7 +244,7 @@ func (s *streamKey) add(entryID string, values []string, now time.Time) (string, parts := strings.Split(entryID, "-") if len(parts) == 2 && parts[1] == "*" { if ts, err := strconv.Atoi(parts[0]); err == nil { - entryID = s.generateID(time.Unix(int64(ts), 0)) + entryID = s.generateID(time.UnixMilli(int64(ts))) } } } From c1ed644499e2e4c78cd5030bfefdff6163491893 Mon Sep 17 00:00:00 2001 From: Harmen Date: Wed, 31 Dec 2025 11:38:30 +0100 Subject: [PATCH 4/4] cleanup --- integration/stream_test.go | 2 +- stream.go | 10 ++++------ 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/integration/stream_test.go b/integration/stream_test.go index 2049a57a..4331a114 100644 --- a/integration/stream_test.go +++ b/integration/stream_test.go @@ -25,7 +25,7 @@ func TestStream(t *testing.T) { "18446744073709551000-0", "name", "Earth", ) - c.DoLoosely("XADD", + c.Do("XADD", "planets", "18446744073709551000-*", "name", "Pluto", diff --git a/stream.go b/stream.go index b169d627..27cc5209 100644 --- a/stream.go +++ b/stream.go @@ -57,9 +57,7 @@ func newStreamKey() *streamKey { } // generateID doesn't lock the mutex -func (s *streamKey) generateID(now time.Time) string { - ts := uint64(now.UnixMilli()) - +func (s *streamKey) generateID(ts uint64) string { next := fmt.Sprintf("%d-%d", ts, 0) if s.lastAllocatedID != "" && streamCmp(s.lastAllocatedID, next) >= 0 { last, _ := parseStreamID(s.lastAllocatedID) @@ -238,13 +236,13 @@ func (s *streamKey) add(entryID string, values []string, now time.Time) (string, switch { case entryID == "" || entryID == "*": - entryID = s.generateID(now) + entryID = s.generateID(uint64(now.UnixMilli())) default: // "-*" parts := strings.Split(entryID, "-") if len(parts) == 2 && parts[1] == "*" { - if ts, err := strconv.Atoi(parts[0]); err == nil { - entryID = s.generateID(time.UnixMilli(int64(ts))) + if ts, err := strconv.ParseUint(parts[0], 10, 64); err == nil { + entryID = s.generateID(uint64(ts)) } } }