-
Notifications
You must be signed in to change notification settings - Fork 370
Description
Expected behavior
The requirement is to retrieve the ID of the next message to be consumed for a given subscriber.
admin.Subscriptions().PeekMessages(topicName,subscription,1)
get 1 message from pulsar broker
Tell us what should happen
Actual behavior
net/http: HTTP/1.x transport connection broken: malformed MIME header line: X-Pulsar-Partition-Key
Tell us what happens instead
Steps to reproduce
How can we reproduce the issue
Tests conducted using the master branch of the pulsar-client-go repository revealed the following error when using PeekMessages to retrieve messages: "net/http: HTTP/1.x transport connection broken: malformed MIME header line: X-Pulsar-Partition-Key: L". This is because in Go versions 1.20 and above, the client directly classifies HTTP Response Headers as malformed during the MIME Header parsing phase.
step 1. create subscription
step 2. send message
func main() {
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://xxxx:6650"})
if err != nil {
log.Fatal(err)
}
defer client.Close()
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "public/xx/xxxx",
})
if err != nil {
log.Fatal(err)
}
defer producer.Close()
ctx := context.Background()
for i := 0; i < 1; i++ {
_, err := producer.Send(ctx, &pulsar.ProducerMessage{
Key: "\u0000\u0000\u0000\u0000\u001d¥L\u0000\u0000\u0000\u0000\u0000\u0006��",
Properties: map[string]string{
"X-Test": "\u0000\u0000\u0000\u0000\u001d¥L\u0000\u0000\u0000\u0000\u0000\u0006��",
"X-Newline": "abc\n123",
"X-Binary": string([]byte{0xff, 0xfe, 0xfd}),
},
Payload: []byte(fmt.Sprintf("hello-%d", i)),
})
if err != nil {
log.Fatal(err)
}
}
}
step 3. peek message
func main() {
admin, err := admin.New(&config.Config{
WebServiceURL: "http://xxxx:8080",
})
if err != nil {
panic(err)
}
name, err := utils.GetTopicName("public/xxx/xxxx-partition-x")
if err != nil {
panic(err)
}
msgs, err := admin.Subscriptions().PeekMessages(*name, "xxxxx", 1)
if err != nil {
panic(err)
}
fmt.Println(msgs)
}
invalid key :\u0000\u0000\u0000\u0000\u001d¥L\u0000\u0000\u0000\u0000\u0000\u0006��
System configuration
Pulsar version: 3.0.11
pulsar-client-go:master