Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 21 additions & 13 deletions kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ const (
// OffsetNewest indicates the next appropriate message available on the broker.
OffsetNewest int64 = sarama.OffsetNewest

defaultMetadataRefreshFrequency = 10 * time.Minute
defaultConsumerSessionTimeout = 10 * time.Second
defaultMetadataRefreshFrequency = 10 * time.Minute
defaultConsumerSessionTimeout = 10 * time.Second
defaultConsumerMaxProcessingTime = 100 * time.Millisecond
)

// AsyncMessageSource represents a kafka message source and implements the
Expand All @@ -31,6 +32,7 @@ type AsyncMessageSourceConfig struct {
MetadataRefreshFrequency time.Duration
OffsetsRetention time.Duration
SessionTimeout time.Duration
MaxProcessingTime time.Duration
Version string
}

Expand All @@ -47,9 +49,13 @@ func (ams *AsyncMessageSourceConfig) buildSaramaConsumerConfig() (*sarama.Config
if ams.SessionTimeout != 0 {
st = ams.SessionTimeout
}
pt := defaultConsumerMaxProcessingTime
if ams.MaxProcessingTime != 0 {
pt = ams.MaxProcessingTime
}

config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Consumer.MaxProcessingTime = pt
config.Consumer.Offsets.Initial = offset
config.Metadata.RefreshFrequency = mrf
config.Consumer.Group.Session.Timeout = st
Expand Down Expand Up @@ -83,23 +89,25 @@ func NewAsyncMessageSource(c AsyncMessageSourceConfig) (substrate.AsyncMessageSo
}

return &asyncMessageSource{
client: client,
consumerGroup: consumerGroup,
topic: c.Topic,
client: client,
consumerGroup: consumerGroup,
topic: c.Topic,
channelBufferSize: config.ChannelBufferSize,
}, nil
}

type asyncMessageSource struct {
client sarama.Client
consumerGroup sarama.ConsumerGroup
topic string
client sarama.Client
consumerGroup sarama.ConsumerGroup
topic string
channelBufferSize int
}

type consumerMessage struct {
cm *sarama.ConsumerMessage

discard bool
offset *struct {
sessVersion int
offset *struct {
topic string
partition int32
offset int64
Expand Down Expand Up @@ -132,8 +140,8 @@ func (cm *consumerMessage) DiscardPayload() {

func (ams *asyncMessageSource) ConsumeMessages(ctx context.Context, messages chan<- substrate.Message, acks <-chan substrate.Message) error {
rg, ctx := rungroup.New(ctx)
toAck := make(chan *consumerMessage)
sessCh := make(chan sarama.ConsumerGroupSession)
toAck := make(chan *consumerMessage, ams.channelBufferSize)
sessCh := make(chan *session)
rebalanceCh := make(chan struct{})

rg.Go(func() error {
Expand Down
62 changes: 37 additions & 25 deletions kafka/consumer_group_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,30 @@ import (
"github.com/uw-labs/substrate"
)

type session struct {
version int
saramaSession sarama.ConsumerGroupSession
}

type consumerGroupHandler struct {
ctx context.Context
toAck chan<- *consumerMessage
sessCh chan<- sarama.ConsumerGroupSession
sessCh chan<- *session
rebalanceCh chan<- struct{}
version int
}

// Setup is run at the beginning of a new session, before ConsumeClaim.
func (c *consumerGroupHandler) Setup(sess sarama.ConsumerGroupSession) error {
c.version++
s := &session{
version: c.version,
saramaSession: sess,
}
// send session to the ack processor
select {
case <-c.ctx.Done():
case c.sessCh <- sess:
case c.sessCh <- s:
}
return nil
}
Expand Down Expand Up @@ -51,7 +62,7 @@ func (c *consumerGroupHandler) ConsumeClaim(_ sarama.ConsumerGroupSession, claim
if !ok {
return nil
}
cm := &consumerMessage{cm: m}
cm := &consumerMessage{cm: m, sessVersion: c.version}
select {
case c.toAck <- cm:
case <-c.ctx.Done():
Expand All @@ -65,37 +76,43 @@ type kafkaAcksProcessor struct {
toClient chan<- substrate.Message
fromKafka <-chan *consumerMessage
acks <-chan substrate.Message
sessCh <-chan sarama.ConsumerGroupSession
sessCh <-chan *session
rebalanceCh <-chan struct{}

sess sarama.ConsumerGroupSession
forAcking []*consumerMessage
sess sarama.ConsumerGroupSession
sessVersion int
forAcking []*consumerMessage
}

func (ap *kafkaAcksProcessor) run(ctx context.Context) error {
// First set session, so that we can acknowledge messages.
func (ap *kafkaAcksProcessor) setSession(ctx context.Context) bool {
select {
case <-ctx.Done():
return nil
case ap.sess = <-ap.sessCh:
return false
case s := <-ap.sessCh:
ap.sess = s.saramaSession
ap.sessVersion = s.version
}
return true
}

func (ap *kafkaAcksProcessor) run(ctx context.Context) error {
// First set session, so that we can acknowledge messages.
if ok := ap.setSession(ctx); !ok {
return nil
}
for {
select {
case <-ctx.Done():
return nil
case <-ap.rebalanceCh:
// Mark all pending messages to be discarded, as rebalance happened.
for _, msg := range ap.forAcking {
msg.discard = true
}
// Wait for the new session.
select {
case <-ctx.Done():
if ok := ap.setSession(ctx); !ok {
return nil
case ap.sess = <-ap.sessCh:
}
case msg := <-ap.fromKafka:
if msg.sessVersion != ap.sessVersion {
continue // skip message that was written consumed for previous session version
}
if err := ap.processMessage(ctx, msg); err != nil {
if err == context.Canceled {
// This error is returned when a context cancellation is encountered
Expand All @@ -119,14 +136,9 @@ func (ap *kafkaAcksProcessor) processMessage(ctx context.Context, msg *consumerM
case <-ctx.Done():
return context.Canceled
case <-ap.rebalanceCh:
// Mark all pending messages to be discarded, as rebalance happened.
for _, msg := range ap.forAcking {
msg.discard = true
}
// Wait for the new session.
select {
case <-ctx.Done():
case ap.sess = <-ap.sessCh:
if ok := ap.setSession(ctx); !ok {
return context.Canceled
}
return nil // We can return immediately as the current message can be discarded.
case ap.toClient <- msg:
Expand All @@ -153,7 +165,7 @@ func (ap *kafkaAcksProcessor) processAck(ack substrate.Message) error {
Acked: ack,
Expected: ap.forAcking[0],
}
case ap.forAcking[0].discard:
case ap.forAcking[0].sessVersion != ap.sessVersion:
// Discard pending message that was consumed before a rebalance.
ap.forAcking = ap.forAcking[1:]
default:
Expand Down
18 changes: 18 additions & 0 deletions kafka/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,22 @@
// consumer-group - The consumer group id
// metadata-refresh - How frequently to refresh the cluster metadata. E.g., '10s' '2m'
//
// Finally to debug behaviour of the underlying Sarama library you can set environment variable
// SUBSTRATE_KAFKA_DEBUG_LOG=true
// to enable Sarama debug logging.

package kafka

import (
"log"
"os"
"strings"

"github.com/Shopify/sarama"
)

func init() {
if strings.ToLower(os.Getenv("SUBSTRATE_KAFKA_DEBUG_LOG")) == "true" {
sarama.Logger = log.New(os.Stdout, "[Sarama] ", log.LstdFlags)
}
}
8 changes: 8 additions & 0 deletions kafka/url.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ func newKafkaSource(u *url.URL) (substrate.AsyncMessageSource, error) {
}
conf.SessionTimeout = d
}
dur = q.Get("max-processing-time")
if dur != "" {
d, err := time.ParseDuration(dur)
if err != nil {
return nil, fmt.Errorf("failed to parse max processing time : %v", err)
}
conf.MaxProcessingTime = d
}

conf.Version = q.Get("version")

Expand Down
3 changes: 2 additions & 1 deletion kafka/url_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,13 @@ func TestKafkaSource(t *testing.T) {
},
{
name: "everything",
input: "kafka://localhost:123/t1/?offset=newest&consumer-group=g1&metadata-refresh=2s&broker=localhost:234&broker=localhost:345&version=0.10.2.0&session-timeout=30s",
input: "kafka://localhost:123/t1/?offset=newest&consumer-group=g1&metadata-refresh=2s&broker=localhost:234&broker=localhost:345&version=0.10.2.0&session-timeout=30s&max-processing-time=5s",
expected: AsyncMessageSourceConfig{
Brokers: []string{"localhost:123", "localhost:234", "localhost:345"},
ConsumerGroup: "g1",
MetadataRefreshFrequency: 2 * time.Second,
SessionTimeout: 30 * time.Second,
MaxProcessingTime: 5 * time.Second,
Offset: sarama.OffsetNewest,
Topic: "t1",
Version: "0.10.2.0",
Expand Down