Megaphone is a tiny, in-memory async broadcast library for Go
Megaphone is a tiny, in-memory async broadcast library for Go that uses generics to enable simple, type-safe broadcast workflows. Using megaphone you can publish to many subscribers simultaneously in a thread-safe way that fits neatly into common workflows. Although similar patterns have been implemented in existing libraries, we couldn't find a simple one with support for graceful shutdowns, so we created this library to use ourselves (for Kubetail). We hope you find it useful too.
go get github.com/kubetail-org/megaphoneimport "github.com/kubetail-org/megaphone"
// Initialize (use type-arg to specify message type)
mp := megaphone.New[string]()
// Publish message (fire-and-forget)
mp.Publish("my-topic", "my-message")
// Subscribe to messages using a callback function (async)
sub, err := mp.Subscribe("my-topic", func(msg string) {
fmt.Println("from callback: " + msg)
})
// Stop listening for new messages
sub.Unsubscribe()
// Drain subscriber (calls Unsubscribe() then blocks until in-progress are finished)
sub.Drain()
// Drain subscriber with context (e.g. for deadlines)
err = sub.DrainWithContext(ctx)
// Reject new subscribers, silently ignore future publishers
mp.Close()
// Drain all subscribers (calls Close() then blocks until all drains are finished)
mp.Drain()
// Drain all subscribers with context (e.g. for deadlines)
err = mp.DrainWithContext(ctx)- In-memory only: Messages are not persisted and publishing to a topic with no subscribers silently drops the message
- At-most-once delivery: Each subscriber receives the message exactly once if it is already subscribed at the time of publication or never if it isn't
- No ordering guarantees: Callbacks run in separate goroutines and may complete out of order
- Graceful shutdown: Use Drain() or DrainWithContext() to wait for in-flight callbacks before shutdown
Creates a new Megaphone instance parameterized with a message type.
mp := megaphone.New[string]()Publishes a message to all subscribers of a topic. This is a fire-and-forget operation that returns immediately.
mp.Publish("my-topic", "hello world")Creates a subscriber for a topic. Messages are delivered asynchronously via the callback (invoked in a separate goroutine for each message).
sub, err := mp.Subscribe("my-topic", func(msg string) {
fmt.Println("received: " + msg)
})Rejects new subscribers and silently ignores new publishers. In-flight callbacks continue running; use Drain if you need to wait for them to finish.
mp.Close()Drains all subscribers, blocking until all pending messages have been processed. Implicitly calls Close() when complete.
mp.Drain()Drains all subscribers with context support for deadlines and cancellation. Returns nil on success, or ctx.Err() if context is cancelled before drain completes.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := mp.DrainWithContext(ctx); err != nil {
log.Println("drain timed out, some messages may be in flight")
}Removes the subscriber from the topic, stopping it from receiving new messages.
sub.Unsubscribe()Blocks until all pending messages for this subscriber have been processed.
sub.Drain()Drains the subscriber with context support for deadlines and cancellation. Returns nil on success, or ctx.Err() if context is cancelled before drain completes.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := sub.DrainWithContext(ctx); err != nil {
log.Println("drain timed out")
}- The Megaphone API is heavily influenced by nats.go