From 15eb2841b81dc2c9fe1cb1dd3a4c891cce3b2104 Mon Sep 17 00:00:00 2001 From: Koen Roevens Date: Thu, 29 Oct 2020 18:15:33 +0100 Subject: [PATCH] Fix ctx propagated in NewPublisher --- pkg/googlecloud/publisher.go | 50 ++++++++++++++++++++++++++++++------ 1 file changed, 42 insertions(+), 8 deletions(-) diff --git a/pkg/googlecloud/publisher.go b/pkg/googlecloud/publisher.go index 4386ff5..32da095 100644 --- a/pkg/googlecloud/publisher.go +++ b/pkg/googlecloud/publisher.go @@ -18,12 +18,15 @@ var ( ErrPublisherClosed = errors.New("publisher is closed") // ErrTopicDoesNotExist happens when trying to publish or subscribe to a topic that doesn't exist. ErrTopicDoesNotExist = errors.New("topic does not exist") + // ErrConnectTimeout happens when not being able to connect to Google Cloud PubSub on time. + ErrConnectTimeout = errors.New("connect timeout") ) type Publisher struct { - topics map[string]*pubsub.Topic - topicsLock sync.RWMutex - closed bool + topics map[string]*pubsub.Topic + topicsLock sync.RWMutex + closed bool + cancelCtxClient context.CancelFunc client *pubsub.Client config PublisherConfig @@ -76,16 +79,44 @@ func NewPublisher(config PublisherConfig, logger watermill.LoggerAdapter) (*Publ logger: logger, } - ctx, cancel := context.WithTimeout(context.Background(), config.ConnectTimeout) + connectCtx, cancel := context.WithTimeout(context.Background(), config.ConnectTimeout) defer cancel() + var grpcConnectionCtx context.Context + grpcConnectionCtx, pub.cancelCtxClient = context.WithCancel(context.Background()) + var err error - pub.client, err = pubsub.NewClient(ctx, config.ProjectID, config.ClientOptions...) - if err != nil { - return nil, err + + clientResultStream := make(chan clientResult) + go func() { + defer close(clientResultStream) + result := clientResult{} + // BLocking call + result.client, result.err = pubsub.NewClient(grpcConnectionCtx, config.ProjectID, config.ClientOptions...) + select { + case <-connectCtx.Done(): + case clientResultStream <- result: + } + }() + + select { + case <-connectCtx.Done(): + return nil, ErrConnectTimeout + case result := <- clientResultStream: + if err != nil { + return nil, err + } else if result.client == nil { + return nil, ErrConnectTimeout + } else { + pub.client = result.client + return pub, nil + } } +} - return pub, nil +type clientResult struct { + client *pubsub.Client + err error } // Publish publishes a set of messages on a Google Cloud Pub/Sub topic. @@ -143,6 +174,9 @@ func (p *Publisher) Close() error { return nil } p.closed = true + if p.cancelCtxClient != nil { + p.cancelCtxClient() + } p.topicsLock.Lock() for _, t := range p.topics {