Skip to content

ballerina-platform/module-ballerina-messaging

Repository files navigation

Ballerina Messaging Module

Build codecov Trivy GraalVM Check GitHub Last Commit Github issues

The Ballerina Messaging module provides a message store interface and a message store listener to implement guaranteed message delivery in Ballerina applications.

Message Store Interface

The MessageStore interface defines the fundamental contract for message persistence and retrieval. Implementations of this interface allow Ballerina applications to interact with different message storage systems in a uniform manner.

# Represents the message payload with a unique consumer ID.
public type Message record {|
    # The unique identifier for the message
    string id;
    # The message payload
    anydata payload;
|};

# Represents a message store interface for storing and retrieving messages.
public type MessageStore isolated client object {

    # Stores a message in the message store.
    #
    # + payload - The message payload to be stored
    # + return - An error if the message could not be stored, or `()`
    isolated remote function store(anydata payload) returns error?;

    # Retrieves the top message from the message store without removing it.
    #
    # + return - The retrieved message, or () if the store is empty, or an error if an error occurs
    isolated remote function retrieve() returns Message|error?;

    # Acknowledges the top message retrieved from the message store.
    #
    # + id - The unique identifier of the message to acknowledge. This should be the same as the `id`
    # of the message retrieved from the store.
    # + success - Indicates whether the message was processed successfully or not
    # + return - An error if the acknowledgment could not be processed, or `()`
    isolated remote function acknowledge(string id, boolean success = true) returns error?;
};

Store Listener

The Store Listener is responsible for orchestrating message consumption from any MessageStore implementation. It operates by polling the associated message store at configurable intervals and dispatching messages to an attached service.

To initialize a listener, provide an instance of a MessageStore:

// Example using an in-memory store
messaging:Store msgStore = new messaging:InMemoryMessageStore();

listener messaging:StoreListener msgStoreListener = new(msgStore);

The listener's behavior, including polling frequency, retry mechanisms, and dead-letter queue (DLQ) support, can be customized using the listener configuration.

# Represents the message store listener configuration,
public type StoreListenerConfiguration record {|
    # The interval in seconds at which the listener polls for new messages
    decimal pollingInterval = 1;
    # The maximum number of retries for processing a message. 
    # If set to 0, the message will not be retried
    int maxRetries = 3;
    # The interval in seconds between retries for processing a message
    decimal retryInterval = 1;
    # If true, the message will be acknowledged with a failure after the maximum number of retries.
    # Else the message will be acknowledged with success
    boolean ackWithFailureAfterMaxRetries = true;
    # An optional message store to store messages that could not be processed after the maximum 
    # number of retries. On successful storage, the message will be acknowledged with success
    Store deadLetterStore?;
|};

Message Store Service

A message store service, defined by the messaging:StoreService type, can be attached to a messaging:Listener to process messages retrieved from the message store. This service exposes a single remote method, onMessage, which is invoked when a new message is received.

# This service object defines the contract for processing messages from a message store.
public type Service distinct isolated service object {

    # This remote function is called when a new message is received from the message store.
    #
    # + payload - The message payload to be processed
    # + return - An error if the message could not be processed, or a nil value
    isolated remote function onMessage(anydata payload) returns error?;
};

If the onMessage function returns an error, the message processing will be retried based on the configured maxRetries and retryInterval. If the maximum retries are exhausted and a deadLetterStore is configured, the message will be moved to the dead-letter store.

Example

The following example demonstrates how to utilize this package to set up an in-memory message store and a listener to process messages:

import ballerina/http;
import ballerina/io;
import ballerina/messaging;

// Initialize an in-memory message store
messaging:Store msgStore = new messaging:InMemoryMessageStore();

// Initialize a message store listener with custom configuration
listener messaging:Listener msgStoreListener = new(msgStore, {
    pollingInterval: 10,  // Poll every 10 seconds
    maxRetries: 2,        // Retry message processing up to 2 times
    retryInterval: 2      // Wait 2 seconds between retries
});

// Define and attach a service to the listener to handle incoming messages
service on msgStoreListener {

    isolated remote function onMessage(anydata payload) returns error? {
        io:println("Received message payload: ", payload);

        // Simulate a processing failure for specific message payload
        if payload is string && payload == "fail" {
            return error("Message processing failed due to 'fail' payload");
        }
        // If no error is returned, the message is acknowledged as successfully processed
    }
}

// Defines an HTTP service to produce messages to the message store
service /api/v1 on new http:Listener(8080) {

    // Endpoint to send messages to the message store
    resource function post messages(@http:Payload anydata payload) returns http:Accepted|error {
        check msgStore.store(payload);
        return http:ACCEPTED;
    }
}

Issues and projects

The Issues and Projects tabs are disabled for this repository as this is part of the Ballerina library. To report bugs, request new features, start new discussions, view project boards, etc., visit the Ballerina library parent repository.

This repository only contains the source code for the package.

Building from the source

Prerequisites

  1. Download and install Java SE Development Kit (JDK) version 21. You can download it from either of the following sources:

    Note: After installation, remember to set the JAVA_HOME environment variable to the directory where JDK was installed.

  2. Download and install Ballerina Swan Lake.

  3. Download and install Docker.

    Note: Ensure that the Docker daemon is running before executing any tests.

  4. Generate a Github access token with read package permissions, then set the following env variables:

    export packageUser=<Your GitHub Username>
    export packagePAT=<GitHub Personal Access Token>

Build options

Execute the commands below to build from the source.

  1. To build the package:

    ./gradlew clean build
  2. To run the tests:

    ./gradlew clean test
  3. To build the without the tests:

    ./gradlew clean build -x test
  4. To debug package with a remote debugger:

    ./gradlew clean build -Pdebug=<port>
  5. To debug with Ballerina language:

    ./gradlew clean build -PbalJavaDebug=<port>
  6. Publish the generated artifacts to the local Ballerina central repository:

    ./gradlew clean build -PpublishToLocalCentral=true
  7. Publish the generated artifacts to the Ballerina central repository:

    ./gradlew clean build -PpublishToCentral=true

Contributing to Ballerina

As an open source project, Ballerina welcomes contributions from the community.

For more information, go to the contribution guidelines.

Code of conduct

All contributors are encouraged to read the Ballerina Code of Conduct.

Useful links

About

Ballerina Messaging Module

Topics

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors 4

  •  
  •  
  •  
  •