Skip to content

Commit e6dd444

Browse files
committed
feat(nats): add NATSPublisher and NATSSubscriber for NATS Core
- Add NATSPublisher implementing Publisher interface using NATS Core publish - Add NATSSubscriber implementing Subscriber interface using NATS Core subscriptions - Include OpenTelemetry tracing with distributed trace context propagation - Support uninterruptible and handlerTimeout options in NATSSubscriber - Add comprehensive tests for the new modules
1 parent 7b31f14 commit e6dd444

File tree

5 files changed

+698
-0
lines changed

5 files changed

+698
-0
lines changed
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
---
2+
"@effect-messaging/nats": minor
3+
---
4+
5+
Add NATSPublisher and NATSSubscriber for NATS Core (without JetStream)
6+
7+
- `NATSPublisher`: Implements the `Publisher` interface from `@effect-messaging/core` using NATS Core publish (fire-and-forget)
8+
- `NATSSubscriber`: Implements the `Subscriber` interface from `@effect-messaging/core` using NATS Core subscriptions
9+
- Both include OpenTelemetry tracing with distributed trace context propagation via headers
10+
- `NATSSubscriber` supports `uninterruptible` and `handlerTimeout` options
11+
- Note: NATS Core has no persistence - messages published before subscription starts are lost

packages/nats/src/NATSPublisher.ts

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
/**
2+
* @since 0.3.0
3+
*/
4+
import * as Publisher from "@effect-messaging/core/Publisher"
5+
import * as PublisherError from "@effect-messaging/core/PublisherError"
6+
import type * as NATSCore from "@nats-io/nats-core"
7+
import * as Effect from "effect/Effect"
8+
import * as Option from "effect/Option"
9+
import * as Schedule from "effect/Schedule"
10+
import type * as Tracer from "effect/Tracer"
11+
import * as NATSConnection from "./NATSConnection.js"
12+
import * as NATSError from "./NATSError.js"
13+
import * as NATSHeaders from "./NATSHeaders.js"
14+
15+
/**
16+
* @category type ids
17+
* @since 0.3.0
18+
*/
19+
export const TypeId: unique symbol = Symbol.for("@effect-messaging/nats/NATSPublisher")
20+
21+
/**
22+
* @category type ids
23+
* @since 0.3.0
24+
*/
25+
export type TypeId = typeof TypeId
26+
27+
/**
28+
* @category models
29+
* @since 0.3.0
30+
*/
31+
export interface NATSPublishMessage {
32+
subject: string
33+
payload: NATSCore.Payload
34+
options?: NATSCore.PublishOptions
35+
}
36+
37+
/**
38+
* @category models
39+
* @since 0.3.0
40+
*/
41+
export interface NATSPublisher extends Publisher.Publisher<NATSPublishMessage> {
42+
readonly [TypeId]: TypeId
43+
}
44+
45+
const ATTR_SERVER_ADDRESS = "server.address" as const
46+
const ATTR_SERVER_PORT = "server.port" as const
47+
const ATTR_MESSAGING_DESTINATION_NAME = "messaging.destination.name" as const
48+
const ATTR_MESSAGING_OPERATION_NAME = "messaging.operation.name" as const
49+
const ATTR_MESSAGING_OPERATION_TYPE = "messaging.operation.type" as const
50+
const ATTR_MESSAGING_SYSTEM = "messaging.system" as const
51+
52+
/** @internal */
53+
const publishEffect = (
54+
connection: NATSConnection.NATSConnection,
55+
message: NATSPublishMessage,
56+
span: Tracer.Span
57+
) => {
58+
const headers = NATSHeaders.mergeNatsHeaders(message.options?.headers, NATSHeaders.encodeTraceContext(span))
59+
60+
return connection.publish(
61+
message.subject,
62+
message.payload,
63+
{ ...message.options, headers }
64+
)
65+
}
66+
67+
/** @internal */
68+
const publish = (
69+
connection: NATSConnection.NATSConnection,
70+
connectionInfo: NATSCore.ServerInfo,
71+
retrySchedule: Schedule.Schedule<unknown, NATSError.NATSConnectionError>
72+
) =>
73+
(message: NATSPublishMessage): Effect.Effect<void, PublisherError.PublisherError, never> =>
74+
Effect.useSpan(
75+
`nats.publish ${message.subject}`,
76+
{
77+
kind: "producer",
78+
captureStackTrace: false,
79+
attributes: {
80+
[ATTR_SERVER_ADDRESS]: connectionInfo.host,
81+
[ATTR_SERVER_PORT]: connectionInfo.port,
82+
[ATTR_MESSAGING_SYSTEM]: "nats",
83+
[ATTR_MESSAGING_OPERATION_NAME]: "publish",
84+
[ATTR_MESSAGING_OPERATION_TYPE]: "send",
85+
[ATTR_MESSAGING_DESTINATION_NAME]: message.subject
86+
}
87+
},
88+
(span) =>
89+
publishEffect(connection, message, span).pipe(
90+
Effect.retry(retrySchedule),
91+
Effect.catchTag(
92+
"NATSConnectionError",
93+
(error) =>
94+
Effect.fail(new PublisherError.PublisherError({ reason: "Failed to publish message", cause: error }))
95+
)
96+
)
97+
)
98+
99+
/**
100+
* @category constructors
101+
* @since 0.3.0
102+
*/
103+
export interface NATSPublisherConfig {
104+
readonly retrySchedule?: Schedule.Schedule<unknown, NATSError.NATSConnectionError>
105+
}
106+
107+
/**
108+
* @category constructors
109+
* @since 0.3.0
110+
*/
111+
export const make = (
112+
config?: NATSPublisherConfig
113+
): Effect.Effect<
114+
NATSPublisher,
115+
NATSError.NATSConnectionError,
116+
NATSConnection.NATSConnection
117+
> =>
118+
Effect.gen(function*() {
119+
const connection = yield* NATSConnection.NATSConnection
120+
121+
// Get connection info for span attributes
122+
const connectionInfo = yield* Option.match(connection.info, {
123+
onNone: () => Effect.fail(new NATSError.NATSConnectionError({ reason: "Connection info not available" })),
124+
onSome: Effect.succeed
125+
})
126+
127+
const publisher: NATSPublisher = {
128+
[TypeId]: TypeId,
129+
[Publisher.TypeId]: Publisher.TypeId,
130+
publish: publish(connection, connectionInfo, config?.retrySchedule ?? Schedule.stop)
131+
}
132+
133+
return publisher
134+
})
Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
/**
2+
* @since 0.3.0
3+
*/
4+
import * as Subscriber from "@effect-messaging/core/Subscriber"
5+
import * as SubscriberError from "@effect-messaging/core/SubscriberError"
6+
import type * as NATSCore from "@nats-io/nats-core"
7+
import * as Cause from "effect/Cause"
8+
import * as Context from "effect/Context"
9+
import type * as Duration from "effect/Duration"
10+
import * as Effect from "effect/Effect"
11+
import * as Function from "effect/Function"
12+
import * as Layer from "effect/Layer"
13+
import * as Option from "effect/Option"
14+
import * as Predicate from "effect/Predicate"
15+
import * as Stream from "effect/Stream"
16+
import * as NATSConnection from "./NATSConnection.js"
17+
import * as NATSError from "./NATSError.js"
18+
import * as NATSHeaders from "./NATSHeaders.js"
19+
import type * as NATSMessage from "./NATSMessage.js"
20+
import type * as NATSSubscription from "./NATSSubscription.js"
21+
22+
/**
23+
* @category type ids
24+
* @since 0.3.0
25+
*/
26+
export const TypeId: unique symbol = Symbol.for("@effect-messaging/nats/NATSSubscriber")
27+
28+
/**
29+
* @category type ids
30+
* @since 0.3.0
31+
*/
32+
export type TypeId = typeof TypeId
33+
34+
/**
35+
* @category models
36+
* @since 0.3.0
37+
*/
38+
export interface NATSSubscriber extends Subscriber.Subscriber<NATSMessage.NATSMessage> {
39+
readonly [TypeId]: TypeId
40+
}
41+
42+
/**
43+
* @category models
44+
* @since 0.3.0
45+
*/
46+
export interface NATSSubscriberOptions {
47+
uninterruptible?: boolean
48+
handlerTimeout?: Duration.DurationInput
49+
}
50+
51+
/**
52+
* Context tag for accessing the current NATS message in a handler
53+
*
54+
* @category tags
55+
* @since 0.3.0
56+
*/
57+
export const NATSConsumeMessage = Context.GenericTag<NATSMessage.NATSMessage>(
58+
"@effect-messaging/nats/NATSConsumeMessage"
59+
)
60+
61+
/**
62+
* Layer for providing the current NATS message to a handler
63+
*
64+
* @category layers
65+
* @since 0.3.0
66+
*/
67+
export const layer = (
68+
message: NATSMessage.NATSMessage
69+
): Layer.Layer<NATSMessage.NATSMessage> => Layer.succeed(NATSConsumeMessage, message)
70+
71+
const ATTR_SERVER_ADDRESS = "server.address" as const
72+
const ATTR_SERVER_PORT = "server.port" as const
73+
const ATTR_MESSAGING_DESTINATION_NAME = "messaging.destination.name" as const
74+
const ATTR_MESSAGING_OPERATION_NAME = "messaging.operation.name" as const
75+
const ATTR_MESSAGING_OPERATION_TYPE = "messaging.operation.type" as const
76+
const ATTR_MESSAGING_SYSTEM = "messaging.system" as const
77+
const ATTR_MESSAGING_MESSAGE_ID = "messaging.message.id" as const
78+
79+
/** @internal */
80+
const subscribe = (
81+
subscription: NATSSubscription.NATSSubscription,
82+
connectionInfo: NATSCore.ServerInfo,
83+
options: NATSSubscriberOptions
84+
) =>
85+
<E, R>(
86+
handler: Effect.Effect<void, E, R | NATSMessage.NATSMessage>
87+
): Effect.Effect<void, SubscriberError.SubscriberError, Exclude<R, NATSMessage.NATSMessage>> =>
88+
subscription.stream.pipe(
89+
Stream.runForEach((message) =>
90+
Effect.fork(
91+
Effect.useSpan(
92+
`nats.consume ${message.subject}`,
93+
{
94+
parent: Option.getOrUndefined(NATSHeaders.decodeTraceContextOptional(message.headers)),
95+
kind: "consumer",
96+
captureStackTrace: false,
97+
attributes: {
98+
[ATTR_SERVER_ADDRESS]: connectionInfo.host,
99+
[ATTR_SERVER_PORT]: connectionInfo.port,
100+
[ATTR_MESSAGING_SYSTEM]: "nats",
101+
[ATTR_MESSAGING_OPERATION_TYPE]: "receive",
102+
[ATTR_MESSAGING_DESTINATION_NAME]: message.subject,
103+
[ATTR_MESSAGING_MESSAGE_ID]: message.sid
104+
}
105+
},
106+
(span) =>
107+
Effect.gen(function*() {
108+
yield* Effect.logDebug(`nats.consume ${message.subject}`)
109+
yield* handler.pipe(
110+
options.handlerTimeout
111+
? Effect.timeoutFail({
112+
duration: options.handlerTimeout,
113+
onTimeout: () =>
114+
new SubscriberError.SubscriberError({ reason: "NATSSubscriber: handler timed out" })
115+
})
116+
: Function.identity
117+
)
118+
span.attribute(ATTR_MESSAGING_OPERATION_NAME, "process")
119+
}).pipe(
120+
Effect.provide(layer(message)),
121+
Effect.tapErrorCause((cause) =>
122+
Effect.gen(function*() {
123+
// Log the error - NATS Core has no ack/nak mechanism, so we just log and continue
124+
yield* Effect.logError(Cause.pretty(cause))
125+
span.attribute(ATTR_MESSAGING_OPERATION_NAME, "error")
126+
span.attribute(
127+
"error.type",
128+
Cause.squashWith(
129+
cause,
130+
(_) => Predicate.hasProperty(_, "tag") ? _.tag : _ instanceof Error ? _.name : `${_}`
131+
)
132+
)
133+
span.attribute("error.stack", Cause.pretty(cause))
134+
span.attribute(
135+
"error.message",
136+
Cause.squashWith(
137+
cause,
138+
(_) => Predicate.hasProperty(_, "reason") ? _.reason : _ instanceof Error ? _.message : `${_}`
139+
)
140+
)
141+
})
142+
),
143+
options.uninterruptible ? Effect.uninterruptible : Effect.interruptible,
144+
Effect.withParentSpan(span)
145+
)
146+
)
147+
)
148+
),
149+
Effect.mapError((error) =>
150+
new SubscriberError.SubscriberError({ reason: "NATSSubscriber failed to subscribe", cause: error })
151+
)
152+
)
153+
154+
/** @internal */
155+
const healthCheck = (
156+
subscription: NATSSubscription.NATSSubscription
157+
): Effect.Effect<void, SubscriberError.SubscriberError, never> =>
158+
subscription.isClosed.pipe(
159+
Effect.flatMap((isClosed) =>
160+
isClosed
161+
? Effect.fail(new SubscriberError.SubscriberError({ reason: "Subscription is closed" }))
162+
: Effect.void
163+
),
164+
Effect.catchTag("NATSSubscriptionError", (error) =>
165+
new SubscriberError.SubscriberError({ reason: "Healthcheck failed", cause: error }))
166+
)
167+
168+
/**
169+
* Create a NATSSubscriber from an existing NATSSubscription.
170+
*
171+
* Note: NATS Core subscriptions are fire-and-forget. Messages are not persisted
172+
* and there is no acknowledgment mechanism. If the handler fails or times out,
173+
* the message is lost.
174+
*
175+
* @category constructors
176+
* @since 0.3.0
177+
*/
178+
export const fromSubscription = (
179+
subscription: NATSSubscription.NATSSubscription,
180+
options: NATSSubscriberOptions = {}
181+
): Effect.Effect<
182+
NATSSubscriber,
183+
NATSError.NATSConnectionError,
184+
NATSConnection.NATSConnection
185+
> =>
186+
Effect.gen(function*() {
187+
const connection = yield* NATSConnection.NATSConnection
188+
const connectionInfo = yield* Option.match(connection.info, {
189+
onNone: () => Effect.fail(new NATSError.NATSConnectionError({ reason: "Connection info not available" })),
190+
onSome: Effect.succeed
191+
})
192+
193+
const subscriber: NATSSubscriber = {
194+
[TypeId]: TypeId,
195+
[Subscriber.TypeId]: Subscriber.TypeId,
196+
subscribe: subscribe(subscription, connectionInfo, options),
197+
healthCheck: healthCheck(subscription)
198+
}
199+
200+
return subscriber
201+
})
202+
203+
/**
204+
* Create a NATSSubscriber by subscribing to a subject.
205+
*
206+
* This is a convenience constructor that internally calls `subscribe()` on the connection.
207+
*
208+
* Note: NATS Core subscriptions are fire-and-forget. Messages are not persisted
209+
* and there is no acknowledgment mechanism. If the handler fails or times out,
210+
* the message is lost.
211+
*
212+
* @category constructors
213+
* @since 0.3.0
214+
*/
215+
export const make = (
216+
subject: string,
217+
subscriptionOptions?: NATSCore.SubscriptionOptions,
218+
options: NATSSubscriberOptions = {}
219+
): Effect.Effect<
220+
NATSSubscriber,
221+
NATSError.NATSConnectionError,
222+
NATSConnection.NATSConnection
223+
> =>
224+
Effect.gen(function*() {
225+
const connection = yield* NATSConnection.NATSConnection
226+
const subscription = yield* connection.subscribe(subject, subscriptionOptions)
227+
return yield* fromSubscription(subscription, options)
228+
})

packages/nats/src/index.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,21 @@ export * as NATSHeaders from "./NATSHeaders.js"
8383
*/
8484
export * as NATSMessage from "./NATSMessage.js"
8585

86+
/**
87+
* @since 0.3.0
88+
*/
89+
export * as NATSPublisher from "./NATSPublisher.js"
90+
8691
/**
8792
* @since 0.1.0
8893
*/
8994
export * as NATSQueuedIterator from "./NATSQueuedIterator.js"
9095

96+
/**
97+
* @since 0.3.0
98+
*/
99+
export * as NATSSubscriber from "./NATSSubscriber.js"
100+
91101
/**
92102
* @since 0.1.0
93103
*/

0 commit comments

Comments
 (0)