diff --git a/cmd_stream_test.go b/cmd_stream_test.go index 3bf57f6..ee9fa69 100644 --- a/cmd_stream_test.go +++ b/cmd_stream_test.go @@ -56,9 +56,13 @@ func TestStream(t *testing.T) { ok(t, err) equals(t, "978321845004-0", id) + id, err = s.XAdd("s1", "978321845004-*", []string{"name", "bazz"}) + ok(t, err) + equals(t, "978321845004-1", id) + stream, err := s.Stream("s1") ok(t, err) - equals(t, 2, len(stream)) + equals(t, 3, len(stream)) equals(t, StreamEntry{ ID: "12345-67", Values: []string{"name", "bar"}, @@ -67,6 +71,10 @@ func TestStream(t *testing.T) { ID: "978321845004-0", Values: []string{"name", "baz"}, }, stream[1]) + equals(t, StreamEntry{ + ID: "978321845004-1", + Values: []string{"name", "bazz"}, + }, stream[2]) }) useRESP3(t, c) diff --git a/integration/ephemeral.go b/integration/ephemeral.go index b4d1623..647d170 100644 --- a/integration/ephemeral.go +++ b/integration/ephemeral.go @@ -89,12 +89,13 @@ func runRedis(extraConfig string) (*ephemeral, string) { timeout := time.Now().Add(1 * time.Second) for time.Now().Before(timeout) { conn, err := net.Dial("tcp", addr) - if err == nil { - conn.Close() - e := ephemeral(*c) - return &e, addr + if err != nil { + time.Sleep(1 * time.Millisecond) + continue } - time.Sleep(3 * time.Millisecond) + conn.Close() + e := ephemeral(*c) + return &e, addr } panic(fmt.Sprintf("No connection on port %d", port)) } diff --git a/integration/stream_test.go b/integration/stream_test.go index 5d2d39a..4331a11 100644 --- a/integration/stream_test.go +++ b/integration/stream_test.go @@ -25,6 +25,11 @@ func TestStream(t *testing.T) { "18446744073709551000-0", "name", "Earth", ) + c.Do("XADD", + "planets", + "18446744073709551000-*", + "name", "Pluto", + ) c.Do("XADD", "reallynosuchkey", "NOMKSTREAM", diff --git a/stream.go b/stream.go index f2dd466..27cc520 100644 --- a/stream.go +++ b/stream.go @@ -57,9 +57,7 @@ func newStreamKey() *streamKey { } // generateID doesn't lock the mutex -func (s *streamKey) generateID(now time.Time) string { - ts := uint64(now.UnixNano()) / 1_000_000 - +func (s *streamKey) generateID(ts uint64) string { next := fmt.Sprintf("%d-%d", ts, 0) if s.lastAllocatedID != "" && streamCmp(s.lastAllocatedID, next) >= 0 { last, _ := parseStreamID(s.lastAllocatedID) @@ -230,14 +228,23 @@ func (s *streamKey) createGroup(group, id string) error { } // streamAdd adds an entry to a stream. Returns the new entry ID. -// If id is empty or "*" the ID will be generated automatically. +// If id is empty, "*", or "123-*", the ID will be generated automatically. // `values` should have an even length. func (s *streamKey) add(entryID string, values []string, now time.Time) (string, error) { s.mu.Lock() defer s.mu.Unlock() - if entryID == "" || entryID == "*" { - entryID = s.generateID(now) + switch { + case entryID == "" || entryID == "*": + entryID = s.generateID(uint64(now.UnixMilli())) + default: + // "-*" + parts := strings.Split(entryID, "-") + if len(parts) == 2 && parts[1] == "*" { + if ts, err := strconv.ParseUint(parts[0], 10, 64); err == nil { + entryID = s.generateID(uint64(ts)) + } + } } entryID, err := formatStreamID(entryID)