From 2641c33b4fb5f0a25de89e4675fe3b13a93dcbe5 Mon Sep 17 00:00:00 2001 From: Michal Bock Date: Thu, 26 Mar 2020 18:49:48 +0000 Subject: [PATCH 1/4] Add session version and use buffered channel in kafka consumer. The session version is an alternative to marking messages to be discarded after a rebalance. Using buffered channel should allow us to exhaust available message faster when rebalance happens. --- kafka/consumer.go | 22 ++++++------ kafka/consumer_group_handler.go | 62 ++++++++++++++++++++------------- 2 files changed, 49 insertions(+), 35 deletions(-) diff --git a/kafka/consumer.go b/kafka/consumer.go index 33ef22f..2a111ab 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -83,23 +83,25 @@ func NewAsyncMessageSource(c AsyncMessageSourceConfig) (substrate.AsyncMessageSo } return &asyncMessageSource{ - client: client, - consumerGroup: consumerGroup, - topic: c.Topic, + client: client, + consumerGroup: consumerGroup, + topic: c.Topic, + channelBufferSize: config.ChannelBufferSize, }, nil } type asyncMessageSource struct { - client sarama.Client - consumerGroup sarama.ConsumerGroup - topic string + client sarama.Client + consumerGroup sarama.ConsumerGroup + topic string + channelBufferSize int } type consumerMessage struct { cm *sarama.ConsumerMessage - discard bool - offset *struct { + sessVersion int + offset *struct { topic string partition int32 offset int64 @@ -132,8 +134,8 @@ func (cm *consumerMessage) DiscardPayload() { func (ams *asyncMessageSource) ConsumeMessages(ctx context.Context, messages chan<- substrate.Message, acks <-chan substrate.Message) error { rg, ctx := rungroup.New(ctx) - toAck := make(chan *consumerMessage) - sessCh := make(chan sarama.ConsumerGroupSession) + toAck := make(chan *consumerMessage, ams.channelBufferSize) + sessCh := make(chan *session) rebalanceCh := make(chan struct{}) rg.Go(func() error { diff --git a/kafka/consumer_group_handler.go b/kafka/consumer_group_handler.go index 1c119e6..2d66c89 100644 --- a/kafka/consumer_group_handler.go +++ b/kafka/consumer_group_handler.go @@ -7,19 +7,30 @@ import ( "github.com/uw-labs/substrate" ) +type session struct { + version int + saramaSession sarama.ConsumerGroupSession +} + type consumerGroupHandler struct { ctx context.Context toAck chan<- *consumerMessage - sessCh chan<- sarama.ConsumerGroupSession + sessCh chan<- *session rebalanceCh chan<- struct{} + version int } // Setup is run at the beginning of a new session, before ConsumeClaim. func (c *consumerGroupHandler) Setup(sess sarama.ConsumerGroupSession) error { + c.version++ + s := &session{ + version: c.version, + saramaSession: sess, + } // send session to the ack processor select { case <-c.ctx.Done(): - case c.sessCh <- sess: + case c.sessCh <- s: } return nil } @@ -51,7 +62,7 @@ func (c *consumerGroupHandler) ConsumeClaim(_ sarama.ConsumerGroupSession, claim if !ok { return nil } - cm := &consumerMessage{cm: m} + cm := &consumerMessage{cm: m, sessVersion: c.version} select { case c.toAck <- cm: case <-c.ctx.Done(): @@ -65,37 +76,43 @@ type kafkaAcksProcessor struct { toClient chan<- substrate.Message fromKafka <-chan *consumerMessage acks <-chan substrate.Message - sessCh <-chan sarama.ConsumerGroupSession + sessCh <-chan *session rebalanceCh <-chan struct{} - sess sarama.ConsumerGroupSession - forAcking []*consumerMessage + sess sarama.ConsumerGroupSession + sessVersion int + forAcking []*consumerMessage } -func (ap *kafkaAcksProcessor) run(ctx context.Context) error { - // First set session, so that we can acknowledge messages. +func (ap *kafkaAcksProcessor) setSession(ctx context.Context) bool { select { case <-ctx.Done(): - return nil - case ap.sess = <-ap.sessCh: + return false + case s := <-ap.sessCh: + ap.sess = s.saramaSession + ap.sessVersion = s.version } + return true +} +func (ap *kafkaAcksProcessor) run(ctx context.Context) error { + // First set session, so that we can acknowledge messages. + if ok := ap.setSession(ctx); !ok { + return nil + } for { select { case <-ctx.Done(): return nil case <-ap.rebalanceCh: - // Mark all pending messages to be discarded, as rebalance happened. - for _, msg := range ap.forAcking { - msg.discard = true - } // Wait for the new session. - select { - case <-ctx.Done(): + if ok := ap.setSession(ctx); !ok { return nil - case ap.sess = <-ap.sessCh: } case msg := <-ap.fromKafka: + if msg.sessVersion != ap.sessVersion { + continue // skip message that was written consumed for previous session version + } if err := ap.processMessage(ctx, msg); err != nil { if err == context.Canceled { // This error is returned when a context cancellation is encountered @@ -119,14 +136,9 @@ func (ap *kafkaAcksProcessor) processMessage(ctx context.Context, msg *consumerM case <-ctx.Done(): return context.Canceled case <-ap.rebalanceCh: - // Mark all pending messages to be discarded, as rebalance happened. - for _, msg := range ap.forAcking { - msg.discard = true - } // Wait for the new session. - select { - case <-ctx.Done(): - case ap.sess = <-ap.sessCh: + if ok := ap.setSession(ctx); !ok { + return context.Canceled } return nil // We can return immediately as the current message can be discarded. case ap.toClient <- msg: @@ -153,7 +165,7 @@ func (ap *kafkaAcksProcessor) processAck(ack substrate.Message) error { Acked: ack, Expected: ap.forAcking[0], } - case ap.forAcking[0].discard: + case ap.forAcking[0].sessVersion != ap.sessVersion: // Discard pending message that was consumed before a rebalance. ap.forAcking = ap.forAcking[1:] default: From 16503df33a26ec929de510b8c753bed8fec5715b Mon Sep 17 00:00:00 2001 From: Michal Bock Date: Thu, 26 Mar 2020 19:02:37 +0000 Subject: [PATCH 2/4] Disable returning of errors as we are not reading them. --- kafka/consumer.go | 1 - 1 file changed, 1 deletion(-) diff --git a/kafka/consumer.go b/kafka/consumer.go index 2a111ab..ec8e968 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -49,7 +49,6 @@ func (ams *AsyncMessageSourceConfig) buildSaramaConsumerConfig() (*sarama.Config } config := sarama.NewConfig() - config.Consumer.Return.Errors = true config.Consumer.Offsets.Initial = offset config.Metadata.RefreshFrequency = mrf config.Consumer.Group.Session.Timeout = st From cd993cde82807e170ca1ca1ccb943cc6ba93e0b0 Mon Sep 17 00:00:00 2001 From: Michal Bock Date: Thu, 26 Mar 2020 19:12:15 +0000 Subject: [PATCH 3/4] Add option to enable Sarama debug logging via environment variable. --- kafka/doc.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/kafka/doc.go b/kafka/doc.go index ac9716c..28d1b83 100644 --- a/kafka/doc.go +++ b/kafka/doc.go @@ -20,4 +20,22 @@ // consumer-group - The consumer group id // metadata-refresh - How frequently to refresh the cluster metadata. E.g., '10s' '2m' // +// Finally to debug behaviour of the underlying Sarama library you can set environment variable +// SUBSTRATE_KAFKA_DEBUG_LOG=true +// to enable Sarama debug logging. + package kafka + +import ( + "log" + "os" + "strings" + + "github.com/Shopify/sarama" +) + +func init() { + if strings.ToLower(os.Getenv("SUBSTRATE_KAFKA_DEBUG_LOG")) == "true" { + sarama.Logger = log.New(os.Stdout, "[Sarama] ", log.LstdFlags) + } +} From 74f0f026b909a16774848d7416ca4e98c610a05d Mon Sep 17 00:00:00 2001 From: Michal Bock Date: Thu, 26 Mar 2020 19:35:52 +0000 Subject: [PATCH 4/4] Add option to specify max processing time for kafka consumer. --- kafka/consumer.go | 11 +++++++++-- kafka/url.go | 8 ++++++++ kafka/url_test.go | 3 ++- 3 files changed, 19 insertions(+), 3 deletions(-) diff --git a/kafka/consumer.go b/kafka/consumer.go index ec8e968..48f39f8 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -17,8 +17,9 @@ const ( // OffsetNewest indicates the next appropriate message available on the broker. OffsetNewest int64 = sarama.OffsetNewest - defaultMetadataRefreshFrequency = 10 * time.Minute - defaultConsumerSessionTimeout = 10 * time.Second + defaultMetadataRefreshFrequency = 10 * time.Minute + defaultConsumerSessionTimeout = 10 * time.Second + defaultConsumerMaxProcessingTime = 100 * time.Millisecond ) // AsyncMessageSource represents a kafka message source and implements the @@ -31,6 +32,7 @@ type AsyncMessageSourceConfig struct { MetadataRefreshFrequency time.Duration OffsetsRetention time.Duration SessionTimeout time.Duration + MaxProcessingTime time.Duration Version string } @@ -47,8 +49,13 @@ func (ams *AsyncMessageSourceConfig) buildSaramaConsumerConfig() (*sarama.Config if ams.SessionTimeout != 0 { st = ams.SessionTimeout } + pt := defaultConsumerMaxProcessingTime + if ams.MaxProcessingTime != 0 { + pt = ams.MaxProcessingTime + } config := sarama.NewConfig() + config.Consumer.MaxProcessingTime = pt config.Consumer.Offsets.Initial = offset config.Metadata.RefreshFrequency = mrf config.Consumer.Group.Session.Timeout = st diff --git a/kafka/url.go b/kafka/url.go index 80f02f8..72c7c09 100644 --- a/kafka/url.go +++ b/kafka/url.go @@ -81,6 +81,14 @@ func newKafkaSource(u *url.URL) (substrate.AsyncMessageSource, error) { } conf.SessionTimeout = d } + dur = q.Get("max-processing-time") + if dur != "" { + d, err := time.ParseDuration(dur) + if err != nil { + return nil, fmt.Errorf("failed to parse max processing time : %v", err) + } + conf.MaxProcessingTime = d + } conf.Version = q.Get("version") diff --git a/kafka/url_test.go b/kafka/url_test.go index 8d8aa89..3b55b0f 100644 --- a/kafka/url_test.go +++ b/kafka/url_test.go @@ -123,12 +123,13 @@ func TestKafkaSource(t *testing.T) { }, { name: "everything", - input: "kafka://localhost:123/t1/?offset=newest&consumer-group=g1&metadata-refresh=2s&broker=localhost:234&broker=localhost:345&version=0.10.2.0&session-timeout=30s", + input: "kafka://localhost:123/t1/?offset=newest&consumer-group=g1&metadata-refresh=2s&broker=localhost:234&broker=localhost:345&version=0.10.2.0&session-timeout=30s&max-processing-time=5s", expected: AsyncMessageSourceConfig{ Brokers: []string{"localhost:123", "localhost:234", "localhost:345"}, ConsumerGroup: "g1", MetadataRefreshFrequency: 2 * time.Second, SessionTimeout: 30 * time.Second, + MaxProcessingTime: 5 * time.Second, Offset: sarama.OffsetNewest, Topic: "t1", Version: "0.10.2.0",