Skip to content

Commit cbe380f

Browse files
authored
xreadgroup block (#210)
support "block" and "noack" in "xreadgroup"
1 parent 8e7ebef commit cbe380f

File tree

6 files changed

+197
-83
lines changed

6 files changed

+197
-83
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -176,8 +176,8 @@ Implemented commands:
176176
- XINFO STREAM -- partly
177177
- XLEN
178178
- XRANGE
179-
- XREAD -- partly
180-
- XREADGROUP -- partly
179+
- XREAD
180+
- XREADGROUP
181181
- XREVRANGE
182182
- XPENDING
183183
- Scripting

cmd_stream.go

Lines changed: 118 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -357,7 +357,6 @@ func (m *Miniredis) cmdXinfoStream(c *server.Peer, args []string) {
357357
}
358358

359359
// XREADGROUP
360-
// NOACK is not supported, BLOCK is not supported
361360
func (m *Miniredis) cmdXreadgroup(c *server.Peer, cmd string, args []string) {
362361
// XREADGROUP GROUP group consumer STREAMS key ID
363362
if len(args) < 6 {
@@ -366,21 +365,26 @@ func (m *Miniredis) cmdXreadgroup(c *server.Peer, cmd string, args []string) {
366365
return
367366
}
368367

368+
var opts struct {
369+
group string
370+
consumer string
371+
count int
372+
noack bool
373+
streams []string
374+
ids []string
375+
block bool
376+
blockTimeout time.Duration
377+
}
378+
369379
if strings.ToUpper(args[0]) != "GROUP" {
370380
setDirty(c)
371-
c.WriteError("ERR incorrect command")
381+
c.WriteError(msgSyntaxError)
372382
return
373383
}
374384

375-
group, consumer, args := args[1], args[2], args[3:]
376-
377-
var (
378-
count int
379-
err error
380-
streams []string
381-
ids []string
382-
)
385+
opts.group, opts.consumer, args = args[1], args[2], args[3:]
383386

387+
var err error
384388
parsing:
385389
for len(args) > 0 {
386390
switch strings.ToUpper(args[0]) {
@@ -390,29 +394,30 @@ parsing:
390394
break parsing
391395
}
392396

393-
count, err = strconv.Atoi(args[1])
397+
opts.count, err = strconv.Atoi(args[1])
394398
if err != nil {
395399
break parsing
396400
}
397401

398402
args = args[2:]
399403
case "BLOCK":
400-
if len(args) < 2 {
401-
err = errors.New(errWrongNumber(cmd))
404+
err = parseBlock(cmd, args, &opts.block, &opts.blockTimeout)
405+
if err != nil {
402406
break parsing
403407
}
404408
args = args[2:]
405409
case "NOACK":
406410
args = args[1:]
411+
opts.noack = true
407412
case "STREAMS":
408413
args = args[1:]
409414

410415
if len(args)%2 != 0 {
411-
err = errors.New(errWrongNumber(cmd))
416+
err = errors.New(msgXreadUnbalanced)
412417
break parsing
413418
}
414419

415-
streams, ids = args[0:len(args)/2], args[len(args)/2:]
420+
opts.streams, opts.ids = args[0:len(args)/2], args[len(args)/2:]
416421
break parsing
417422
default:
418423
err = fmt.Errorf("ERR incorrect argument %s", args[0])
@@ -426,65 +431,105 @@ parsing:
426431
return
427432
}
428433

429-
if len(streams) == 0 || len(ids) == 0 {
434+
if len(opts.streams) == 0 || len(opts.ids) == 0 {
430435
setDirty(c)
431436
c.WriteError(errWrongNumber(cmd))
432437
return
433438
}
434439

435-
withTx(m, c, func(c *server.Peer, ctx *connCtx) {
436-
db := m.db(ctx.selectedDB)
437-
438-
res := map[string][]StreamEntry{}
439-
for i, key := range streams {
440-
id := ids[i]
440+
for _, id := range opts.ids {
441+
if id != `>` {
442+
opts.block = false
443+
}
444+
}
441445

442-
g, err := db.streamGroup(key, group)
446+
if !opts.block {
447+
withTx(m, c, func(c *server.Peer, ctx *connCtx) {
448+
db := m.db(ctx.selectedDB)
449+
res, err := xreadgroup(
450+
db,
451+
opts.group,
452+
opts.consumer,
453+
opts.noack,
454+
opts.streams,
455+
opts.ids,
456+
opts.count,
457+
m.effectiveNow(),
458+
)
443459
if err != nil {
444460
c.WriteError(err.Error())
445461
return
446462
}
447-
if g == nil {
448-
c.WriteError(errXreadgroup(key, group).Error())
449-
return
450-
}
463+
writeXread(c, opts.streams, res)
464+
})
465+
return
466+
}
451467

452-
if _, err := parseStreamID(id); id != `>` && err != nil {
468+
blocking(
469+
m,
470+
c,
471+
opts.blockTimeout,
472+
func(c *server.Peer, ctx *connCtx) bool {
473+
db := m.db(ctx.selectedDB)
474+
res, err := xreadgroup(
475+
db,
476+
opts.group,
477+
opts.consumer,
478+
opts.noack,
479+
opts.streams,
480+
opts.ids,
481+
opts.count,
482+
m.effectiveNow(),
483+
)
484+
if err != nil {
453485
c.WriteError(err.Error())
454-
return
486+
return true
455487
}
456-
entries := g.readGroup(m.effectiveNow(), consumer, id, count)
457-
if id == `>` && len(entries) == 0 {
458-
continue
488+
if len(res) == 0 {
489+
return false
459490
}
491+
writeXread(c, opts.streams, res)
492+
return true
493+
},
494+
func(c *server.Peer) { // timeout
495+
c.WriteLen(-1)
496+
},
497+
)
498+
}
460499

461-
res[key] = entries
462-
}
500+
func xreadgroup(
501+
db *RedisDB,
502+
group,
503+
consumer string,
504+
noack bool,
505+
streams []string,
506+
ids []string,
507+
count int,
508+
now time.Time,
509+
) (map[string][]StreamEntry, error) {
510+
res := map[string][]StreamEntry{}
511+
for i, key := range streams {
512+
id := ids[i]
463513

464-
if len(res) == 0 {
465-
c.WriteLen(-1)
466-
return
514+
g, err := db.streamGroup(key, group)
515+
if err != nil {
516+
return nil, err
517+
}
518+
if g == nil {
519+
return nil, errXreadgroup(key, group)
467520
}
468-
c.WriteLen(len(res))
469-
for _, stream := range streams {
470-
entries, ok := res[stream]
471-
if !ok {
472-
continue
473-
}
474521

475-
c.WriteLen(2)
476-
c.WriteBulk(stream)
477-
c.WriteLen(len(entries))
478-
for _, entry := range entries {
479-
c.WriteLen(2)
480-
c.WriteBulk(entry.ID)
481-
c.WriteLen(len(entry.Values))
482-
for _, v := range entry.Values {
483-
c.WriteBulk(v)
484-
}
485-
}
522+
if _, err := parseStreamID(id); id != `>` && err != nil {
523+
return nil, err
486524
}
487-
})
525+
entries := g.readGroup(now, consumer, id, count, noack)
526+
if id == `>` && len(entries) == 0 {
527+
continue
528+
}
529+
530+
res[key] = entries
531+
}
532+
return res, nil
488533
}
489534

490535
// XACK
@@ -582,21 +627,10 @@ parsing:
582627
}
583628
args = args[2:]
584629
case "BLOCK":
585-
if len(args) < 2 {
586-
err = errors.New(errWrongNumber(cmd))
587-
break parsing
588-
}
589-
opts.block = true
590-
ms, nerr := strconv.Atoi(args[1])
591-
if nerr != nil {
592-
err = errors.New(msgInvalidInt)
593-
break parsing
594-
}
595-
if ms < 0 {
596-
err = errors.New("ERR timeout is negative")
630+
err = parseBlock(cmd, args, &opts.block, &opts.blockTimeout)
631+
if err != nil {
597632
break parsing
598633
}
599-
opts.blockTimeout = time.Millisecond * time.Duration(ms)
600634
args = args[2:]
601635
case "STREAMS":
602636
args = args[1:]
@@ -886,3 +920,19 @@ func writeXpending(
886920
c.WriteInt(e.count)
887921
}
888922
}
923+
924+
func parseBlock(cmd string, args []string, block *bool, timeout *time.Duration) error {
925+
if len(args) < 2 {
926+
return errors.New(errWrongNumber(cmd))
927+
}
928+
(*block) = true
929+
ms, err := strconv.Atoi(args[1])
930+
if err != nil {
931+
return errors.New(msgInvalidInt)
932+
}
933+
if ms < 0 {
934+
return errors.New("ERR timeout is negative")
935+
}
936+
(*timeout) = time.Millisecond * time.Duration(ms)
937+
return nil
938+
}

integration/stream_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,8 @@ func TestStreamGroup(t *testing.T) {
366366
c.Do("XREADGROUP", "GROUP", "processing", "alice", "STREAMS", "planets", ">")
367367
c.Do("XREADGROUP", "GROUP", "processing", "alice", "COUNT", "1", "STREAMS", "planets", ">")
368368
c.Do("XREADGROUP", "GROUP", "processing", "alice", "COUNT", "999", "STREAMS", "planets", ">")
369+
c.Do("XREADGROUP", "GROUP", "processing", "alice", "COUNT", "0", "STREAMS", "planets", ">")
370+
c.Do("XREADGROUP", "GROUP", "processing", "alice", "COUNT", "-1", "STREAMS", "planets", ">")
369371
c.Do("XACK", "planets", "processing", "42-1")
370372
c.Do("XDEL", "planets", "42-1")
371373
c.Do("XGROUP", "CREATE", "planets", "newcons", "$", "MKSTREAM")
@@ -376,13 +378,72 @@ func TestStreamGroup(t *testing.T) {
376378
c.Do("XREADGROUP", "GROUP", "processing", "bob", "STREAMS", "planets", "42-9")
377379
c.Error("stream ID", "XREADGROUP", "GROUP", "processing", "bob", "STREAMS", "planets", "foo")
378380

381+
// NOACK
382+
{
383+
c.Do("XGROUP", "CREATE", "colors", "pr", "$", "MKSTREAM")
384+
c.Do("XADD", "colors", "42-2", "name", "Green")
385+
c.Do("XREADGROUP", "GROUP", "pr", "alice", "NOACK", "STREAMS", "colors", ">")
386+
c.Do("XREADGROUP", "GROUP", "pr", "alice", "NOACK", "STREAMS", "colors", "0")
387+
c.Do("XACK", "colors", "p", "42-2")
388+
}
389+
379390
// errors
391+
c.Error("wrong number", "XREADGROUP")
392+
c.Error("wrong number", "XREADGROUP", "GROUP")
393+
c.Error("wrong number", "XREADGROUP", "foo")
394+
c.Error("wrong number", "XREADGROUP", "GROUP", "foo")
395+
c.Error("wrong number", "XREADGROUP", "GROUP", "foo", "bar")
396+
c.Error("wrong number", "XREADGROUP", "GROUP", "foo", "bar", "ZTREAMZ")
397+
c.Error("wrong number", "XREADGROUP", "GROUP", "foo", "bar", "STREAMS", "foo")
398+
c.Error("Unbalanced", "XREADGROUP", "GROUP", "foo", "bar", "STREAMS", "foo", "bar", ">")
399+
c.Error("syntax error", "XREADGROUP", "_____", "foo", "bar", "STREAMS", "foo", ">")
380400
c.Error("consumer group", "XREADGROUP", "GROUP", "nosuch", "alice", "STREAMS", "planets", ">")
381401
c.Error("consumer group", "XREADGROUP", "GROUP", "processing", "alice", "STREAMS", "nosuchplanets", ">")
382402
c.Do("SET", "scalar", "bar")
383403
c.Error("wrong kind", "XGROUP", "CREATE", "scalar", "processing", "$", "MKSTREAM")
384404
c.Error("BUSYGROUP", "XGROUP", "CREATE", "planets", "processing", "$", "MKSTREAM")
385405
})
406+
407+
testRaw2(t, func(c, c2 *client) {
408+
c.Do("XGROUP", "CREATE", "pl", "processing", "$", "MKSTREAM")
409+
c.Do("XADD", "pl", "55-88", "name", "Mercury")
410+
// something is available: doesn't block
411+
c.Do("XREADGROUP", "GROUP", "processing", "foo", "BLOCK", "10", "STREAMS", "pl", ">")
412+
// c.Do("XREADGROUP", "GROUP", "processing", "foo", "BLOCK", "0", "STREAMS", "pl", ">")
413+
414+
// blocks
415+
{
416+
var wg sync.WaitGroup
417+
wg.Add(1)
418+
go func() {
419+
c.Do("XREADGROUP", "GROUP", "processing", "foo", "BLOCK", "999999", "STREAMS", "pl", ">")
420+
wg.Done()
421+
}()
422+
time.Sleep(50 * time.Millisecond)
423+
c2.Do("XADD", "pl", "60-1", "name", "Mercury")
424+
wg.Wait()
425+
}
426+
427+
// timeout
428+
{
429+
c.Do("XREADGROUP", "GROUP", "processing", "foo", "BLOCK", "10", "STREAMS", "pl", ">")
430+
}
431+
432+
// block is ignored if id isn't ">"
433+
{
434+
c.Do("XREADGROUP", "GROUP", "processing", "foo", "BLOCK", "9999999999", "STREAMS", "pl", "8")
435+
}
436+
437+
// block is ignored if _any_ id isn't ">"
438+
{
439+
c.Do("XGROUP", "CREATE", "pl2", "processing", "$", "MKSTREAM")
440+
c.Do("XREADGROUP", "GROUP", "processing", "foo", "BLOCK", "9999999999", "STREAMS", "pl", "pl2", "8", ">")
441+
}
442+
443+
c.Error("not an int", "XREADGROUP", "GROUP", "foo", "bar", "BLOCK", "foo", "STREAMS", "foo", ">")
444+
c.Error("No such", "XREADGROUP", "GROUP", "foo", "bar", "BLOCK", "999999", "STREAMS", "pl", "invalid")
445+
c.Error("negative", "XREADGROUP", "GROUP", "foo", "bar", "BLOCK", "-1", "STREAMS", "foo", ">")
446+
})
386447
})
387448

388449
t.Run("XACK", func(t *testing.T) {

integration/test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -513,7 +513,7 @@ func (c *client) Error(msg string, cmd string, args ...string) {
513513
c.t.Errorf("expected (real)\n%q\nto contain %q", real, msg)
514514
}
515515
if !strings.Contains(mini, msg) {
516-
c.t.Errorf("expected (mini)\n%q\nto contain %q", mini, msg)
516+
c.t.Errorf("expected (mini)\n%q\nto contain %q\nreal:\n%s", mini, msg, real)
517517
}
518518
// if real != mini {
519519
// c.t.Errorf("expected error:\n%q\ngot:\n%q", real, mini)

0 commit comments

Comments
 (0)