Adds support for Kafka Keys, and handles nil for both keys and values…#39
Adds support for Kafka Keys, and handles nil for both keys and values…#39ranoble wants to merge 9 commits intofission:mainfrom
Conversation
… as kafka would expect
kafka-http-connector/main.go
Outdated
| key []byte | ||
| ) | ||
| tombstone := false | ||
| var cleaned []sarama.RecordHeader |
There was a problem hiding this comment.
cleaned is essentially any other key the producer sets that are not keda/fission related, right?
There was a problem hiding this comment.
Yes - couldn't think of a better name.
| conn.logger.Warn("Sending a Tombstone") | ||
| } | ||
|
|
||
| _, _, err := conn.producer.SendMessage(message) |
There was a problem hiding this comment.
Would it make sense for us to do the same for error topic as well?
|
@ranoble Thanks a lot for this PR ! This would be such a nice addition to the connector. I have asked a few questions, please take a look and get this merged :) |
…pic. This also allows a function to return headers to provide more information about an error, and these will be included in the Kafka message headers
common/util.go
Outdated
|
|
||
| if resp.StatusCode < 200 && resp.StatusCode > 300 { | ||
| return nil, fmt.Errorf("request returned failure: %v. http_endpoint: %v, source: %v", resp.StatusCode, data.HTTPEndpoint, data.SourceName) | ||
| return resp, fmt.Errorf("request returned failure: %v. http_endpoint: %v, source: %v", resp.StatusCode, data.HTTPEndpoint, data.SourceName) |
There was a problem hiding this comment.
@ranoble If status code isn't in that range, it's not a successful request and that's why I had returned nil. Can you please explain why this might be valid?
There was a problem hiding this comment.
To write the key, I need the key header. Even though the function fails, it still returns a valid result (just not the one we wanted).
This could / would support distributed tracing, or the option to pass back useful debug info, so that the consumer of the error topic can take the correct action.
There was a problem hiding this comment.
Got it. In that case, can you please do the following?
- Raise the PR in a separate branch as the common package is used by all connectors
- Change the function return arguments to something better than it currently is. I am thinking status code, response and error. What are your thoughts?
There was a problem hiding this comment.
More than happy to.
- Sure - will do so.
- I'm a little more nervous about this one. At this point this change is backwards compatible at the moment,
respcan be safely ignored by any client. After changing the return arguments, if someone is basing a custom connector off of this and upgrades that will cause a breaking change in their code (of course this may not actually be an issue in practise).
Happy to change it on your suggestion - but wanted to highlight that.
There was a problem hiding this comment.
That's true. Any change in the common package will have to be adopted by the remaining connectors . @therahulbhati
What are your thoughts?
There was a problem hiding this comment.
Yes, @RealHarshThakur it's true. We should have this as a separate PR, we would also need to incorporate those changes for each connector as part of that PR.
.github/workflows/ci.yml
Outdated
| helm repo update | ||
| kubectl create namespace keda | ||
| helm install keda kedacore/keda --namespace keda | ||
| helm install keda kedacore/keda --namespace keda --version 1.5.0 |
There was a problem hiding this comment.
@ranoble We can remove --version 1.5.0 flag specifying the version
|
@ranoble Could you please resolve the merge conflicts? |
… as kafka would expect
Keys are passed as a header:
KEDA-Message-Key. If no key exists, no header will be sent (to avoid the nil == '' problem). If a response header ofKEDA-Message-Keyexists then it is used in the response topic, if not; then no key is set so that the round robin partitioner is used as opposed to a hashed blank string.In the event a Tombstone is sent as a value of a incoming event, the value of the request is a blank string, but an additional header is set -
KEDA-Message-Tombstone.For a tombstone to be published to the response topic, both the header value needs to be set AND the body must be empty.
I'm not super happy with the headers, but it felt like the simplest most idiomatic method for achieving this.