Skip to content

Commit ac5d799

Browse files
authored
feat: add kafka channel support (#178)
1 parent 81665c6 commit ac5d799

File tree

24 files changed

+810
-597
lines changed

24 files changed

+810
-597
lines changed

docs/generators/channels.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ export default {
1212
preset: 'channels',
1313
outputPath: './src/__gen__/',
1414
language: 'typescript',
15-
protocols: ['nats']
15+
protocols: ['nats', 'kafka']
1616
}
1717
]
1818
};
@@ -26,7 +26,7 @@ This is supported through the following inputs: [`asyncapi`](#inputs)
2626

2727
It supports the following languages; [`typescript`](#typescript)
2828

29-
It supports the following protocols; [`nats`](../protocols/nats.md)
29+
It supports the following protocols; [`nats`](../protocols/nats.md), [`kafka`](../protocols/kafka.md)
3030

3131
## Options
3232
These are the available options for the `channels` generator;
@@ -41,6 +41,7 @@ These are the available options for the `channels` generator;
4141

4242
Depending on which protocol, these are the dependencies:
4343
- `NATS`: https://github.com/nats-io/nats.js v2
44+
- `Kafka`: https://github.com/tulios/kafkajs v2
4445

4546
For TypeScript what is generated is a single file that include functions to help easier interact with AsyncAPI channels. For example;
4647

docs/protocols/kafka.md

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
---
2+
sidebar_position: 99
3+
---
4+
5+
# Kafka
6+
Kafka is currently supported through the following generators ([channels](../generators/channels.md)):
7+
8+
| **Languages** | Publish | Subscribe
9+
|---|---|---|
10+
| TypeScript | ✔️ | ✔️ |
11+
12+
All of this is available through [AsyncAPI](../inputs/asyncapi.md). If you use
13+
14+
## Channels
15+
Read more about the [channels](../generators/channels.md) generator here before continuing.
16+
17+
This generator provides support functions for each resource ensuring you the right payload and parameter are used.
18+
<table>
19+
<thead>
20+
<tr>
21+
<th>Input (AsyncAPI)</th>
22+
<th>Using the code</th>
23+
</tr>
24+
</thead>
25+
<tbody>
26+
<tr>
27+
<td>
28+
29+
```yaml
30+
asyncapi: 3.0.0
31+
info:
32+
title: Account Service
33+
version: 1.0.0
34+
description: This service is in charge of processing user signups
35+
channels:
36+
userSignups:
37+
address: user/signedup
38+
messages:
39+
userSignedup:
40+
$ref: '#/components/messages/UserSignedUp'
41+
operations:
42+
publishUserSignups:
43+
action: send
44+
channel:
45+
$ref: '#/channels/userSignups'
46+
consumeUserSignups:
47+
action: receive
48+
channel:
49+
$ref: '#/channels/userSignups'
50+
components:
51+
messages:
52+
UserSignedUp:
53+
payload:
54+
type: object
55+
properties:
56+
displayName:
57+
type: string
58+
description: Name of the user
59+
email:
60+
type: string
61+
format: email
62+
description: Email of the user
63+
64+
```
65+
</td>
66+
<td>
67+
68+
```ts
69+
import { Kafka } from 'kafkajs';
70+
// Location depends on the payload generator configurations
71+
import { UserSignedup } from './__gen__/payloads/UserSignedup';
72+
// Location depends on the channel generator configurations
73+
import { Protocols } from './__gen__/channels';
74+
const { kafka } = Protocols;
75+
const { consumeFromConsumeUserSignups, produceToPublishUserSignups } = kafka;
76+
77+
/**
78+
* Setup the regular client
79+
*/
80+
const kafkaClient = new Kafka({
81+
clientId: 'test',
82+
brokers: ['localhost:9093'],
83+
});
84+
85+
const myPayload = new UserSignedup({displayName: 'test', email: 'test@test.dk'});
86+
const myParameters = new UserSignedUpParameters({userId: 'test'});
87+
88+
// Consume the messages with the generated channel function
89+
const consumerCallback = async (
90+
err,
91+
msg: UserSignedUp | undefined,
92+
parameters: UserSignedUpParameters | undefined,
93+
kafkaMsg: EachMessagePayload | undefined
94+
) => {
95+
// Do stuff once you consumer from the topic
96+
};
97+
const consumer = await consumeFromConsumeUserSignups(
98+
consumerCallback,
99+
myParameters,
100+
kafkaClient,
101+
{
102+
fromBeginning: true,
103+
groupId: 'testId1'
104+
}
105+
);
106+
107+
// Produce the messages with the generated channel function
108+
const producer = await produceToPublishUserSignups(myPayload, myParameters, kafkaClient);
109+
```
110+
</td>
111+
</tr>
112+
</tbody>
113+
</table>

package.json

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,10 +115,12 @@
115115
"runtime:typescript": "npm run runtime:typescript:setup && npm run runtime:typescript:test",
116116
"runtime:typescript:setup": "npm run runtime:prepare && npm run runtime:services:start && cd test/runtime/typescript && npm ci && npm run generate",
117117
"runtime:typescript:test": "cd test/runtime/typescript && npm run test",
118-
"runtime:services:start": "npm run runtime:nats:start",
119-
"runtime:services:stop": "npm run runtime:nats:stop",
118+
"runtime:services:start": "npm run runtime:nats:start && npm run runtime:kafka:start",
119+
"runtime:services:stop": "npm run runtime:nats:stop && npm run runtime:kafka:stop",
120120
"runtime:nats:start": "cd test/runtime && docker compose -f ./docker-compose-nats.yml up -d",
121121
"runtime:nats:stop": "cd test/runtime && docker compose -f ./docker-compose-nats.yml down",
122+
"runtime:kafka:start": "cd test/runtime && docker compose -f ./docker-compose-kafka.yml up -d",
123+
"runtime:kafka:stop": "cd test/runtime && docker compose -f ./docker-compose-kafka.yml down",
122124
"test:blackbox": "concurrently --group -n typescript \"npm run test:blackbox:typescript\"",
123125
"test:blackbox:typescript": "cross-env CI=true jest ./test/blackbox/typescript.spec.ts"
124126
},

src/codegen/generators/helpers/payloads.ts

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -159,24 +159,21 @@ export function findReplyId(
159159
reply: OperationReplyInterface,
160160
channel: ChannelInterface
161161
) {
162-
return (
163-
(reply.json() as any)?.id ??
164-
`${findOperationId(operation, reply.channel() ?? channel)}_reply`
165-
);
162+
return `${findOperationId(operation, reply.channel() ?? channel)}_reply`;
166163
}
167164
export function findOperationId(
168165
operation: OperationInterface,
169166
channel: ChannelInterface
170167
) {
171-
let operationId = operation.id();
172-
operationId = operation.hasOperationId()
173-
? operation.operationId()
174-
: operationId;
175168
const userSpecificName = findExtensionObject(operation)
176169
? findExtensionObject(operation)['channelName']
177170
: undefined;
178171
if (userSpecificName) {
179172
return userSpecificName;
180173
}
174+
let operationId = operation.id();
175+
operationId = operation.hasOperationId()
176+
? operation.operationId()
177+
: operationId;
181178
return operationId ?? channel.id();
182179
}

src/codegen/generators/typescript/channels/asyncapi.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@ type Action = 'send' | 'receive' | 'subscribe' | 'publish';
44
const sendingFunctionTypes = [
55
ChannelFunctionTypes.NATS_JETSTREAM_PUBLISH,
66
ChannelFunctionTypes.NATS_PUBLISH,
7-
ChannelFunctionTypes.NATS_REQUEST
7+
ChannelFunctionTypes.NATS_REQUEST,
8+
ChannelFunctionTypes.KAFKA_PUBLISH
89
];
910
const receivingFunctionTypes = [
1011
ChannelFunctionTypes.NATS_JETSTREAM_PULL_SUBSCRIBE,
1112
ChannelFunctionTypes.NATS_JETSTREAM_PUSH_SUBSCRIBE,
1213
ChannelFunctionTypes.NATS_REPLY,
13-
ChannelFunctionTypes.NATS_SUBSCRIBE
14+
ChannelFunctionTypes.NATS_SUBSCRIBE,
15+
ChannelFunctionTypes.KAFKA_SUBSCRIBE
1416
];
1517

1618
// eslint-disable-next-line sonarjs/cognitive-complexity

src/codegen/generators/typescript/channels/index.ts

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import {
4141
addPayloadsToDependencies,
4242
getMessageTypeAndModule
4343
} from './utils';
44+
import * as KafkaRenderer from './protocols/kafka';
4445
export {
4546
renderedFunctionType,
4647
TypeScriptChannelRenderType,
@@ -366,7 +367,111 @@ export async function generateTypeScriptChannels(
366367
dependencies.push(...(new Set(renderedDependencies) as any));
367368
break;
368369
}
370+
case 'kafka': {
371+
// AsyncAPI v2 explicitly say to use RFC 6570 URI template, Kafka does not support '/' subject separation, so lets replace it
372+
let topic = simpleContext.topic;
373+
if (topic.startsWith('/')) {
374+
topic = topic.slice(1);
375+
}
376+
topic = topic.replace(/\//g, generator.kafkaTopicSeparator);
377+
let kafkaContext: RenderRegularParameters = {
378+
...simpleContext,
379+
topic,
380+
messageType: ''
381+
};
382+
const renders = [];
383+
const payload = payloads.channelModels[channel.id()];
384+
if (payload === undefined) {
385+
throw new Error(
386+
`Could not find payload for ${channel.id()} for channel typescript generator`
387+
);
388+
}
389+
const {messageModule, messageType} =
390+
getMessageTypeAndModule(payload);
391+
kafkaContext = {...kafkaContext, messageType, messageModule};
392+
const operations = channel.operations().all();
393+
if (operations.length > 0 && !ignoreOperation) {
394+
for (const operation of operations) {
395+
const payloadId = findOperationId(operation, channel);
396+
const payload = payloads.operationModels[payloadId];
397+
if (payload === undefined) {
398+
throw new Error(
399+
`Could not find payload for ${payloadId} for channel typescript generator ${JSON.stringify(payloads.operationModels, null, 4)}`
400+
);
401+
}
402+
const {messageModule, messageType} =
403+
getMessageTypeAndModule(payload);
404+
kafkaContext = {
405+
...kafkaContext,
406+
messageType,
407+
messageModule,
408+
subName: findNameFromOperation(operation, channel)
409+
};
410+
const action = operation.action();
411+
if (
412+
shouldRenderFunctionType(
413+
functionTypeMapping,
414+
ChannelFunctionTypes.KAFKA_PUBLISH,
415+
action,
416+
generator.asyncapiReverseOperations
417+
)
418+
) {
419+
renders.push(KafkaRenderer.renderPublish(kafkaContext));
420+
}
421+
if (
422+
shouldRenderFunctionType(
423+
functionTypeMapping,
424+
ChannelFunctionTypes.KAFKA_SUBSCRIBE,
425+
action,
426+
generator.asyncapiReverseOperations
427+
)
428+
) {
429+
renders.push(KafkaRenderer.renderSubscribe(kafkaContext));
430+
}
431+
}
432+
} else {
433+
if (
434+
shouldRenderFunctionType(
435+
functionTypeMapping,
436+
ChannelFunctionTypes.KAFKA_PUBLISH,
437+
'send',
438+
generator.asyncapiReverseOperations
439+
)
440+
) {
441+
renders.push(KafkaRenderer.renderPublish(kafkaContext));
442+
}
443+
if (
444+
shouldRenderFunctionType(
445+
functionTypeMapping,
446+
ChannelFunctionTypes.KAFKA_SUBSCRIBE,
447+
'receive',
448+
generator.asyncapiReverseOperations
449+
)
450+
) {
451+
renders.push(KafkaRenderer.renderSubscribe(kafkaContext));
452+
}
453+
}
454+
protocolCodeFunctions[protocol].push(
455+
...renders.map((value) => value.code)
456+
);
369457

458+
externalProtocolFunctionInformation[protocol].push(
459+
...renders.map((value) => {
460+
return {
461+
functionType: value.functionType,
462+
functionName: value.functionName,
463+
messageType: value.messageType,
464+
replyType: value.replyType,
465+
parameterType: parameter?.model?.type
466+
};
467+
})
468+
);
469+
const renderedDependencies = renders
470+
.map((value) => value.dependencies)
471+
.flat(Infinity);
472+
dependencies.push(...(new Set(renderedDependencies) as any));
473+
break;
474+
}
370475
default: {
371476
break;
372477
}
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
export {renderPublish} from './publish';
2+
export {renderSubscribe} from './subscribe';

0 commit comments

Comments
 (0)