Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 42 additions & 8 deletions pkg/googlecloud/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ var (
ErrPublisherClosed = errors.New("publisher is closed")
// ErrTopicDoesNotExist happens when trying to publish or subscribe to a topic that doesn't exist.
ErrTopicDoesNotExist = errors.New("topic does not exist")
// ErrConnectTimeout happens when not being able to connect to Google Cloud PubSub on time.
ErrConnectTimeout = errors.New("connect timeout")
)

type Publisher struct {
topics map[string]*pubsub.Topic
topicsLock sync.RWMutex
closed bool
topics map[string]*pubsub.Topic
topicsLock sync.RWMutex
closed bool
cancelCtxClient context.CancelFunc

client *pubsub.Client
config PublisherConfig
Expand Down Expand Up @@ -76,16 +79,44 @@ func NewPublisher(config PublisherConfig, logger watermill.LoggerAdapter) (*Publ
logger: logger,
}

ctx, cancel := context.WithTimeout(context.Background(), config.ConnectTimeout)
connectCtx, cancel := context.WithTimeout(context.Background(), config.ConnectTimeout)
defer cancel()

var grpcConnectionCtx context.Context
grpcConnectionCtx, pub.cancelCtxClient = context.WithCancel(context.Background())

var err error
pub.client, err = pubsub.NewClient(ctx, config.ProjectID, config.ClientOptions...)
if err != nil {
return nil, err

clientResultStream := make(chan clientResult)
go func() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rvs-fluid-it I'm wondering if it would be possible to hide all this complexity in a separate func? Perhaps so we could re-use the same logic in subscriber as well?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. But how do we move from here? Maybe it's better that I prepare a new pull request containing fixes for the publisher as well for the subscriber. And I will pay attention to minimize duplication ... What do you think? Do you have other points that I need to take in to consideration?

Copy link
Member

@m110 m110 Nov 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's fine to update this PR or create a new one, whatever works for you. 🙂 I think the idea overall is solid, I would just pay attention so that the Publisher and Subscriber constructors are easy to follow, as it gets quite complex with select and two contexts. However, hiding this complexity in a separate function should be enough.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello, @rvs-fluid-it! I'm doing some housekeeping in Watermill repositories and I want to know if you have any update on that? 😉

If you don't have time now to work on that please let me know, so I can take care of that. Thanks!

defer close(clientResultStream)
result := clientResult{}
// BLocking call
result.client, result.err = pubsub.NewClient(grpcConnectionCtx, config.ProjectID, config.ClientOptions...)
select {
case <-connectCtx.Done():
case clientResultStream <- result:
}
}()

select {
case <-connectCtx.Done():
return nil, ErrConnectTimeout
case result := <- clientResultStream:
if err != nil {
return nil, err
} else if result.client == nil {
return nil, ErrConnectTimeout
} else {
pub.client = result.client
return pub, nil
}
}
}

return pub, nil
type clientResult struct {
client *pubsub.Client
err error
}

// Publish publishes a set of messages on a Google Cloud Pub/Sub topic.
Expand Down Expand Up @@ -143,6 +174,9 @@ func (p *Publisher) Close() error {
return nil
}
p.closed = true
if p.cancelCtxClient != nil {
p.cancelCtxClient()
}

p.topicsLock.Lock()
for _, t := range p.topics {
Expand Down