Skip to content

Commit c974c4a

Browse files
authored
Bump Google pubsub client to v2 (#42)
## Breaking change `SubscriberConfig` no longer exposes the `SubscriptionConfig` field. Instead, use `GenerateSubscription` function. The function returns `*pubsubpb.Subscription` which mostly maps 1:1 with the old `SubscriptionConfig`. ```diff googlecloud.SubscriberConfig{ ProjectID: "tests", GenerateSubscriptionName: subscriptionName, - SubscriptionConfig: pubsub.SubscriptionConfig{ - RetainAckedMessages: false, - EnableMessageOrdering: enableMessageOrdering, + GenerateSubscription: func(params googlecloud.GenerateSubscriptionParams) *pubsubpb.Subscription { + return &pubsubpb.Subscription{ + RetainAckedMessages: false, + EnableMessageOrdering: enableMessageOrdering, + } }, Unmarshaler: unmarshaler, }, ```
1 parent 2053ccb commit c974c4a

File tree

8 files changed

+234
-186
lines changed

8 files changed

+234
-186
lines changed

docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
# For Watermill based application docker please check https://watermill.io/docs/getting-started/
33
services:
44
googlecloud:
5-
image: google/cloud-sdk:489.0.0
5+
image: google/cloud-sdk:535.0.0
66
entrypoint: gcloud --quiet beta emulators pubsub start --host-port=0.0.0.0:8085 --verbosity=debug --log-http
77
ports:
88
- 8085:8085

go.mod

Lines changed: 33 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,53 @@
1-
module github.com/ThreeDotsLabs/watermill-googlecloud
1+
module github.com/ThreeDotsLabs/watermill-googlecloud/v2
22

33
go 1.23.0
44

55
require (
6-
cloud.google.com/go v0.115.1 // indirect
7-
cloud.google.com/go/pubsub v1.42.0
6+
cloud.google.com/go v0.121.4 // indirect
87
github.com/ThreeDotsLabs/watermill v1.4.8-0.20250825132336-9e066d39bea6
98
github.com/cenkalti/backoff/v3 v3.2.2
10-
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
119
github.com/google/uuid v1.6.0
1210
github.com/lithammer/shortuuid/v3 v3.0.7 // indirect
1311
github.com/pkg/errors v0.9.1
14-
github.com/stretchr/testify v1.9.0
15-
golang.org/x/net v0.28.0 // indirect
16-
golang.org/x/oauth2 v0.27.0 // indirect
17-
golang.org/x/sys v0.24.0 // indirect
18-
golang.org/x/text v0.17.0 // indirect
19-
google.golang.org/api v0.194.0
20-
google.golang.org/genproto v0.0.0-20240823204242-4ba0660f739c // indirect
21-
google.golang.org/grpc v1.65.0
12+
github.com/stretchr/testify v1.10.0
13+
golang.org/x/net v0.42.0 // indirect
14+
golang.org/x/oauth2 v0.30.0 // indirect
15+
golang.org/x/sys v0.34.0 // indirect
16+
golang.org/x/text v0.27.0 // indirect
17+
google.golang.org/api v0.243.0
18+
google.golang.org/genproto v0.0.0-20250603155806-513f23925822 // indirect
19+
google.golang.org/grpc v1.74.2
2220
)
2321

22+
require cloud.google.com/go/pubsub/v2 v2.0.0
23+
2424
require (
25-
cloud.google.com/go/auth v0.9.1 // indirect
26-
cloud.google.com/go/auth/oauth2adapt v0.2.4 // indirect
27-
cloud.google.com/go/compute/metadata v0.5.0 // indirect
28-
cloud.google.com/go/iam v1.2.0 // indirect
25+
cloud.google.com/go/auth v0.16.3 // indirect
26+
cloud.google.com/go/auth/oauth2adapt v0.2.8 // indirect
27+
cloud.google.com/go/compute/metadata v0.7.0 // indirect
28+
cloud.google.com/go/iam v1.5.2 // indirect
2929
github.com/davecgh/go-spew v1.1.1 // indirect
3030
github.com/felixge/httpsnoop v1.0.4 // indirect
31-
github.com/go-logr/logr v1.4.2 // indirect
31+
github.com/go-logr/logr v1.4.3 // indirect
3232
github.com/go-logr/stdr v1.2.2 // indirect
33-
github.com/google/s2a-go v0.1.8 // indirect
34-
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
35-
github.com/googleapis/gax-go/v2 v2.13.0 // indirect
33+
github.com/google/s2a-go v0.1.9 // indirect
34+
github.com/googleapis/enterprise-certificate-proxy v0.3.6 // indirect
35+
github.com/googleapis/gax-go/v2 v2.15.0 // indirect
3636
github.com/oklog/ulid v1.3.1 // indirect
3737
github.com/pmezard/go-difflib v1.0.0 // indirect
38+
go.einride.tech/aip v0.73.0 // indirect
3839
go.opencensus.io v0.24.0 // indirect
39-
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 // indirect
40-
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 // indirect
41-
go.opentelemetry.io/otel v1.29.0 // indirect
42-
go.opentelemetry.io/otel/metric v1.29.0 // indirect
43-
go.opentelemetry.io/otel/trace v1.29.0 // indirect
44-
golang.org/x/crypto v0.26.0 // indirect
45-
golang.org/x/sync v0.8.0 // indirect
46-
golang.org/x/time v0.6.0 // indirect
47-
google.golang.org/genproto/googleapis/api v0.0.0-20240823204242-4ba0660f739c // indirect
48-
google.golang.org/genproto/googleapis/rpc v0.0.0-20240823204242-4ba0660f739c // indirect
49-
google.golang.org/protobuf v1.34.2 // indirect
40+
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
41+
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.61.0 // indirect
42+
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0 // indirect
43+
go.opentelemetry.io/otel v1.36.0 // indirect
44+
go.opentelemetry.io/otel/metric v1.36.0 // indirect
45+
go.opentelemetry.io/otel/trace v1.36.0 // indirect
46+
golang.org/x/crypto v0.40.0 // indirect
47+
golang.org/x/sync v0.16.0 // indirect
48+
golang.org/x/time v0.12.0 // indirect
49+
google.golang.org/genproto/googleapis/api v0.0.0-20250721164621-a45f3dfb1074 // indirect
50+
google.golang.org/genproto/googleapis/rpc v0.0.0-20250715232539-7130f93afb79 // indirect
51+
google.golang.org/protobuf v1.36.6 // indirect
5052
gopkg.in/yaml.v3 v3.0.1 // indirect
5153
)

go.sum

Lines changed: 76 additions & 69 deletions
Large diffs are not rendered by default.

pkg/googlecloud/marshaler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package googlecloud
22

33
import (
4-
"cloud.google.com/go/pubsub"
4+
"cloud.google.com/go/pubsub/v2"
55
"github.com/pkg/errors"
66

77
"github.com/ThreeDotsLabs/watermill/message"

pkg/googlecloud/publisher.go

Lines changed: 55 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,16 @@ package googlecloud
22

33
import (
44
"context"
5+
"fmt"
56
"sync"
67
"time"
78

8-
"cloud.google.com/go/pubsub"
9+
"cloud.google.com/go/pubsub/v2"
10+
"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
911
"github.com/pkg/errors"
1012
"google.golang.org/api/option"
13+
"google.golang.org/grpc/codes"
14+
"google.golang.org/grpc/status"
1115

1216
"github.com/ThreeDotsLabs/watermill"
1317
"github.com/ThreeDotsLabs/watermill/message"
@@ -23,9 +27,9 @@ var (
2327
)
2428

2529
type Publisher struct {
26-
topics map[string]*pubsub.Topic
27-
topicsLock sync.RWMutex
28-
closed bool
30+
publishers map[string]*pubsub.Publisher
31+
publishersLock sync.RWMutex
32+
closed bool
2933

3034
client *pubsub.Client
3135
config PublisherConfig
@@ -81,9 +85,9 @@ func NewPublisher(config PublisherConfig, logger watermill.LoggerAdapter) (*Publ
8185
}
8286

8387
pub := &Publisher{
84-
topics: map[string]*pubsub.Topic{},
85-
config: config,
86-
logger: logger,
88+
publishers: map[string]*pubsub.Publisher{},
89+
config: config,
90+
logger: logger,
8791
}
8892

8993
ctx, cancel := context.WithTimeout(context.Background(), config.ConnectTimeout)
@@ -155,13 +159,13 @@ func (p *Publisher) Publish(topic string, messages ...*message.Message) error {
155159
ctx, cancel := context.WithDeadline(ctx, deadline)
156160
defer cancel()
157161

158-
t, err := p.topic(ctx, topic)
162+
pub, err := p.publisher(ctx, topic)
159163
if err != nil {
160164
return err
161165
}
162166

163167
for _, msg := range messages {
164-
err = p.publishMessage(t, msg, topic, deadline)
168+
err = p.publishMessage(pub, msg, topic, deadline)
165169
if err != nil {
166170
return err
167171
}
@@ -170,7 +174,7 @@ func (p *Publisher) Publish(topic string, messages ...*message.Message) error {
170174
return nil
171175
}
172176

173-
func (p *Publisher) publishMessage(t *pubsub.Topic, msg *message.Message, topic string, deadline time.Time) error {
177+
func (p *Publisher) publishMessage(pub *pubsub.Publisher, msg *message.Message, topic string, deadline time.Time) error {
174178
ctx, cancel := context.WithDeadline(msg.Context(), deadline)
175179
defer cancel()
176180

@@ -185,12 +189,12 @@ func (p *Publisher) publishMessage(t *pubsub.Topic, msg *message.Message, topic
185189
return errors.Wrapf(err, "cannot marshal message %s", msg.UUID)
186190
}
187191

188-
result := t.Publish(ctx, googlecloudMsg)
192+
result := pub.Publish(ctx, googlecloudMsg)
189193

190194
serverMessageID, err := result.Get(ctx)
191195
if err != nil {
192196
if p.config.EnableMessageOrdering && p.config.EnableMessageOrderingAutoResumePublishOnError && googlecloudMsg.OrderingKey != "" {
193-
t.ResumePublish(googlecloudMsg.OrderingKey)
197+
pub.ResumePublish(googlecloudMsg.OrderingKey)
194198
}
195199
return errors.Wrapf(err, "publishing message %s failed", msg.UUID)
196200
}
@@ -212,61 +216,80 @@ func (p *Publisher) Close() error {
212216
}
213217
p.closed = true
214218

215-
p.topicsLock.Lock()
216-
for _, t := range p.topics {
217-
t.Stop()
219+
p.publishersLock.Lock()
220+
for _, pub := range p.publishers {
221+
pub.Stop()
218222
}
219-
p.topicsLock.Unlock()
223+
p.publishersLock.Unlock()
220224

221225
return p.client.Close()
222226
}
223227

224-
func (p *Publisher) topic(ctx context.Context, topic string) (t *pubsub.Topic, err error) {
225-
p.topicsLock.RLock()
226-
t, ok := p.topics[topic]
227-
p.topicsLock.RUnlock()
228+
func (p *Publisher) publisher(ctx context.Context, topic string) (pub *pubsub.Publisher, err error) {
229+
p.publishersLock.RLock()
230+
pub, ok := p.publishers[topic]
231+
p.publishersLock.RUnlock()
228232
if ok {
229-
return t, nil
233+
return pub, nil
230234
}
231235

232-
p.topicsLock.Lock()
236+
p.publishersLock.Lock()
233237
defer func() {
234238
if err == nil {
235-
t.EnableMessageOrdering = p.config.EnableMessageOrdering
236-
p.topics[topic] = t
239+
pub.EnableMessageOrdering = p.config.EnableMessageOrdering
240+
p.publishers[topic] = pub
237241
}
238-
p.topicsLock.Unlock()
242+
p.publishersLock.Unlock()
239243
}()
240244

241-
t = p.client.Topic(topic)
245+
pub = p.client.Publisher(topic)
242246

243247
// todo: theoretically, one could want different publish settings per topic, which is supported by the client lib
244248
// different instances of publisher may be used then
245249
if p.config.PublishSettings != nil {
246-
t.PublishSettings = *p.config.PublishSettings
250+
pub.PublishSettings = *p.config.PublishSettings
247251
}
248252

249253
if p.config.DoNotCheckTopicExistence {
250-
return t, nil
254+
return pub, nil
251255
}
252256

253-
exists, err := t.Exists(ctx)
257+
exists, err := topicExists(ctx, p.client, p.config.ProjectID, topic)
254258
if err != nil {
255259
return nil, errors.Wrapf(err, "could not check if topic %s exists", topic)
256260
}
257261

258262
if exists {
259-
return t, nil
263+
return pub, nil
260264
}
261265

262266
if p.config.DoNotCreateTopicIfMissing {
263267
return nil, errors.Wrap(ErrTopicDoesNotExist, topic)
264268
}
265269

266-
t, err = p.client.CreateTopic(ctx, topic)
270+
_, err = p.client.TopicAdminClient.CreateTopic(ctx, &pubsubpb.Topic{
271+
Name: fullyQualifiedTopicName(p.config.ProjectID, topic),
272+
})
267273
if err != nil {
268274
return nil, errors.Wrapf(err, "could not create topic %s", topic)
269275
}
270276

271-
return t, nil
277+
return pub, nil
278+
}
279+
280+
func topicExists(ctx context.Context, client *pubsub.Client, projectID string, topic string) (bool, error) {
281+
_, err := client.TopicAdminClient.GetTopic(ctx, &pubsubpb.GetTopicRequest{
282+
Topic: fullyQualifiedTopicName(projectID, topic),
283+
})
284+
if err != nil {
285+
if st, ok := status.FromError(err); ok && st.Code() == codes.NotFound {
286+
return false, nil
287+
}
288+
return false, errors.Wrapf(err, "could not check if topic %s exists", topic)
289+
}
290+
return true, nil
291+
}
292+
293+
func fullyQualifiedTopicName(projectID string, topic string) string {
294+
return fmt.Sprintf("projects/%s/topics/%s", projectID, topic)
272295
}

pkg/googlecloud/pubsub_bench_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import (
44
"testing"
55

66
"github.com/ThreeDotsLabs/watermill"
7-
"github.com/ThreeDotsLabs/watermill-googlecloud/pkg/googlecloud"
7+
"github.com/ThreeDotsLabs/watermill-googlecloud/v2/pkg/googlecloud"
88
"github.com/ThreeDotsLabs/watermill/message"
99
"github.com/ThreeDotsLabs/watermill/pubsub/tests"
1010
)

pkg/googlecloud/pubsub_test.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,14 @@ import (
77
"testing"
88
"time"
99

10-
"cloud.google.com/go/pubsub"
10+
"cloud.google.com/go/pubsub/v2/apiv1/pubsubpb"
1111
"github.com/google/uuid"
1212
"github.com/pkg/errors"
1313
"github.com/stretchr/testify/assert"
1414
"github.com/stretchr/testify/require"
1515

1616
"github.com/ThreeDotsLabs/watermill"
17-
"github.com/ThreeDotsLabs/watermill-googlecloud/pkg/googlecloud"
17+
"github.com/ThreeDotsLabs/watermill-googlecloud/v2/pkg/googlecloud"
1818
"github.com/ThreeDotsLabs/watermill/message"
1919
"github.com/ThreeDotsLabs/watermill/pubsub/tests"
2020
)
@@ -38,9 +38,11 @@ func newPubSub(t *testing.T, enableMessageOrdering bool, marshaler googlecloud.M
3838
googlecloud.SubscriberConfig{
3939
ProjectID: "tests",
4040
GenerateSubscriptionName: subscriptionName,
41-
SubscriptionConfig: pubsub.SubscriptionConfig{
42-
RetainAckedMessages: false,
43-
EnableMessageOrdering: enableMessageOrdering,
41+
GenerateSubscription: func(params googlecloud.GenerateSubscriptionParams) *pubsubpb.Subscription {
42+
return &pubsubpb.Subscription{
43+
RetainAckedMessages: false,
44+
EnableMessageOrdering: enableMessageOrdering,
45+
}
4446
},
4547
Unmarshaler: unmarshaler,
4648
},

0 commit comments

Comments
 (0)