Rembus is a middleware to implement high performance and fault-tolerant distributed applications using RPC and Pub/Sub communication styles.
Data at Rest backend is powered by DuckDB offering fast speed and powerful analitycal features.
-
Binary message encoding using CBOR.
-
Built-in support for exchanging DataFrames.
-
DuckDB DuckLake support.
-
Pub/Sub QOS0, QOS1 and QOS2.
-
Pub/Sub space topic routing and wildcard subscription (
*/*/temperature). -
Macro-based API that make writing RPC and Pub/Sub applications simple and fast.
-
MQTT integration.
-
Multiple transport protocols: Tcp, Web Socket, ZeroMQ.
using Pkg; Pkg.add("Rembus")The scope of Rembus is to facilitate communication between distributed applications.
An application instrumented by Rembus uses the concept of Component to
abstract the two communication patterns:
- Remote Procedure Call (RPC): a client component invokes a function on a server component and waits for the result.
- Publish/Subscribe (Pub/Sub): a publisher component sends messages to a topic, and subscriber components receive messages from that topic.
A Component presents one or many of these roles:
- publish a message to a topic channel.
- subscribe to a topic channel to receive messages.
- expose a function to be invoked remotely.
- invoke a remote function on another component.
- route messages between components (broker role).
The are three factory constructors to create a Component, each one
reflecting its main role:
-
component(url::String): a component that connects to a broker or a server. -
broker(): a component that listen for connection requests and routes messages between components. -
server(): a component that listen for connection requests from others components. A server does not route messages between connected components.
Create a component that connects to myhost.org on port 8000 with the unique
name mynode:
using Rembus
node = component("ws://myhost.org:8000/mynode")The node handle can be used to invoke remote functions and publish messages
to Pub/Sub topics.
# Invoke a remote function 'get_status' on the remote component
status = rpc(node, "get_status", Dict("verbose"=>true))
# Publish a message to the topic 'alerts/temperature' with a dictionary payload
publish(node, "alerts/temperature", Dict("value"=>75.0, "unit"=>"C"))A remote component, named myservices, implements and exposes the get_status
method:
function get_status(options::Dict)
if haskey(options, "verbose") && options["verbose"]
return Dict(
"status"=>"ok",
"uptime"=>uptime(),
"load"=>cpuload(),
"mem_free"=>memfree()
)
else
return Dict("status"=>"ok", "uptime"=>uptime())
end
end
node = component("ws://myhost.org:8000/myservices")
expose(node, get_status)
wait(node)To subscribe to Pub/Sub topics use the subscribe function:
function alert_handler(topic, payload; ctx=nothing, node=nothing)
println("message from $topic: $payload")
end
subscribe(node, "alerts/temperature", alert_handler)
wait(node)A subscriber function receives messages but not return a response like RPC calls.
Note that both the exposer and the subscriber components must call wait(node)
to keep the component running and processing incoming requests.
Start a broker component listening on the default WebSocket port 8000:
using Rembus
bro = broker()
# Eventual broker custom logic
# ...
# Configure the broker to run forever until Ctrl-C
wait(bro)
broker()is the same ascomponent(): a component without an url endpoint.
wait(bro)is mandatory only when the broker is started in a script: in a REPL session is a no-op.
See broker documentation for more details on how to configure the broker.
The one-line shortcut for starting a broker from the terminal shell is:
julia -e "import Rembus; Rembus.brokerd()"Rembus has built-in support for persisting data using DuckDB DuckLake.
The DuckLake "analytical data lake" provides for fast-store and powerful query features for large datasets.
The DuckLake storage backend can be configured using the DUCKLAKE_URL
environment variable and supports multiple database engines:
-
DuckDB: the default backend for storing tabular data is a local DuckDB database file.
-
Sqlite:
DUCKLAKE_URL="ducklake:sqlite:$HOME/.config/rembus/rembus.sqlite -
Postgres:
DUCKLAKE_URL="ducklake:postgresql://user:password@host:port/rembus"
If DUCKLAKE_URL is not defined then Rembus uses a local DuckDB database file
located at $HOME/.config/rembus/rembus.duckdb.
For Postgres backend make sure to create the database
rembusbefore starting the broker.
A broker can be configured to persist Pub/Sub messages to a DuckLake storage using a custom schema definition for the topics that are defined in the schema.
For example the following JSON formatted schema defines two tables: sensor and
telemetry.
sensor is a mote with a unique distinguish name dn that describe a type of
physical sensor deployed in a site
telemetry table store periodic temperature and pressure telemetry data
sent by the sensor motes.
json_string = """
{
"tables": [
{
"table": "sensor",
"topic": ":site/:type/:dn/sensor",
"columns": [
{"col": "site", "type": "TEXT", "nullable": false},
{"col": "type", "type": "TEXT", "nullable": false},
{"col": "dn", "type": "TEXT"}
],
"keys": ["dn"]
},
{
"table": "telemetry",
"topic": ":dn/telemetry",
"columns": [
{"col": "dn", "type": "TEXT"},
{"col": "temperature", "type": "DOUBLE"},
{"col": "pressure", "type": "DOUBLE"}
],
"extras": {"recv_ts": "ts", "slot": "time_bucket"}
}
]
}
"""The sensor table persists messages published to topics matching the
pattern :site/:type/:dn/sensor where :site, :type and :dn are dynamic
topic segments that are mapped to the corresponding table columns.
With DuckLake enabled Rembus can persist and retrieve Pub/Sub messages in batches directly from DuckDB.
# DuckDB use the package extension mechanism, so DuckDB MUST BE loaded first
# to enable DuckLake support.
using DuckDB
using Rembus
bro = broker(DuckDB.DB(), schema=json_string)
wait(bro)Full example: broker
Subscribe to the topic **/telemetry to receive all telemetry messages:
using Rembus
function telemetry(topic, payload; ctx=nothing, node=nothing)
println("π‘ telemetry on $topic: $payload")
end
meter = component("ws://localhost:8000/mymeter")
subscribe(meter, "**/telemetry", telemetry)
wait(meter)Full example: subscribe_telemetry.jl
Publish a message to the topic belluno/HVAC/agordo.sala1/sensor with an empty
payload:
using Rembus
pub = component("ws://localhost:8000/my_edge_gateway")
# ws, localhost and 8000 are the default values, so you can omit them
# pub = component("my_edge_gateway"))
publish(pub, "belluno/HVAC/agordo.sala1/sensor")Full example: publish_sensor.jl
Publish a message to the topic agordo.sala1/telemetry with a dictionary
payload:
publish(
pub,
"agordo.sala1/telemetry",
Dict("temperature" => 18.5, "pressure" => 1013.25)
)Full example: publish_telemetry.jl
For each table object defined in the schema.json are exposed two services, one
for querying and one for deleting data at rest:
-
query_{table}for selecting items. -
delete_{table}for deleting items;
For example for getting the telemetry data at rest:
df = rpc(node, "query_telemetry", Dict("where"=>"dn like 'agordo/%'"))Full example: query_telemetry.jl
Rembus provides a macro-based API to simplify the development of distributed applications and instrument julia functions with RPC and Pub/Sub capabilities.
@component "myserver"
function myservice(arg1)
return "hello $arg1 π"
end
@expose myservice
# Serve forever until Ctrl-C
@waitThe
@componentmacro declares a unique name for the component that get known to the broker. On the broker side such identity permits to bind a twin operating on the behalf of the component either when it is offline.
response = @rpc myservice("rembus")When a name is not declared with
@componentthen a random uuid identifier is associated with the component each time the application starts.
@component "myconsumer"
function mytopic(df::DataFrame)
println("mean_a=$(mean(df.a)), mean_b=$(mean(df.b))")
end
@subscribe mytopic
# Receive messages forever until Ctrl-C
@waitdf = DataFrame(a=1:1_000_000, b=rand(1_000_000))
# Fire and forget is the fastest publishing mechanism.
# at most once delivery guarantee.
@publish mytopic(df)
# Messages are acknowledged and eventually retransmitted.
# at least once delivery guarantee.
@publish mytopic(df) Rembus.QOS1
# Exactly once delivery guarantee.
@publish mytopic(df) Rembus.QOS2
