Skip to content

Commit f50f37a

Browse files
(ipc) Add feature to initialise Rayforce server from Python runtime
1 parent 1c14c03 commit f50f37a

File tree

9 files changed

+324
-129
lines changed

9 files changed

+324
-129
lines changed
Lines changed: 188 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
# :material-network: IPC Inter-Process Communication
2+
3+
Rayforce-Py provides IPC (Inter-Process Communication) functionality to communicate with other Rayforce instances over the network. This allows you to build distributed systems, microservices, and client-server architectures using Rayforce's high-performance runtime.
4+
5+
## :material-information: Overview
6+
7+
IPC in Rayforce-Py consists of two main components:
8+
9+
- **`IPCServer`** - Listens for incoming connections on a specified port
10+
- **`IPCClient`** - Connects to remote Rayforce instances and executes queries
11+
12+
The IPC implementation uses Rayforce's native runtime event loop, ensuring optimal performance and seamless integration with the Rayforce ecosystem.
13+
14+
---
15+
16+
## :material-server: IPC Server
17+
18+
The `IPCServer` class allows you to create a server that listens for incoming IPC connections. The server runs on Rayforce's native event loop.
19+
20+
### Creating a Server
21+
22+
To create an IPC server, import `IPCServer` and initialize it with a port:
23+
24+
```python
25+
>>> from rayforce import IPCServer
26+
27+
>>> server = IPCServer(port=5000)
28+
>>> server
29+
IPCServer(port=5000)
30+
```
31+
32+
### Starting the Server
33+
34+
The `listen()` method starts the server and blocks until the event loop exits:
35+
36+
```python
37+
>>> server.listen()
38+
Rayforce IPC Server listening on 0.0.0.0:5000 (id:123)
39+
```
40+
41+
!!! note "Blocking Operation"
42+
The `listen()` method is **blocking** - it will run until the event loop is stopped (e.g., via KeyboardInterrupt or program termination). The server automatically closes the port when the event loop exits, so no manual cleanup is needed.
43+
44+
---
45+
46+
## :material-client: IPC Client
47+
48+
The `IPCClient` class manages connections to remote Rayforce instances. It maintains a connection pool and provides a simple API for executing queries.
49+
50+
### Initializing a Client
51+
52+
To use IPC, first import the `IPCClient` and `IPCConnection`:
53+
54+
```python
55+
>>> from rayforce import IPCClient
56+
```
57+
58+
Then initialize the client for a specific host and port:
59+
60+
```python
61+
>>> client = IPCClient(host="localhost", port=5000)
62+
>>> client
63+
IPCClient(host=localhost, port=5000, pool_size: 0)
64+
```
65+
66+
### Opening a Connection
67+
68+
There are two ways to open a connection with a Rayforce instance.
69+
70+
#### Manual Connection Management
71+
72+
Manually open a connection via the `.acquire()` method:
73+
74+
```python
75+
>>> conn = client.acquire()
76+
>>> conn
77+
Connection(id:4382902992) - established at 2025-09-15T21:33:39.932434
78+
>>> isinstance(conn, IPCConnection)
79+
True
80+
```
81+
82+
Close the connection when done:
83+
84+
```python
85+
>>> conn.close()
86+
>>> conn
87+
Connection(id:4382902992) - disposed at 2025-09-15T21:34:46.752071
88+
```
89+
90+
#### Context Manager (Recommended)
91+
92+
Use the context manager for automatic connection cleanup:
93+
94+
```python
95+
>>> with client.acquire() as conn:
96+
... print(conn)
97+
Connection(id:4832080144) - established at 2025-09-15T21:35:39.232321
98+
>>> print(conn)
99+
Connection(id:4832080144) - disposed at 2025-09-15T21:35:39.232500
100+
```
101+
102+
---
103+
104+
## :material-code-tags: Executing Queries
105+
106+
The `.execute()` method allows you to send various types of data to the remote Rayforce instance.
107+
108+
### Sending String Queries
109+
110+
You can send raw string queries:
111+
112+
```python
113+
>>> with client.acquire() as conn:
114+
... result = conn.execute("(+ 1 2)")
115+
>>> print(result)
116+
I64(3)
117+
```
118+
119+
### Sending Query Objects
120+
121+
IPC supports sending Rayforce query objects directly, providing type safety and better integration:
122+
123+
```python
124+
>>> from rayforce import Table, Column
125+
126+
>>> table = Table.from_name("server_side_table_name")
127+
>>> query = table.select("id", "name").where(Column("id") > 1)
128+
129+
>>> with client.acquire() as conn:
130+
... result = conn.execute(query)
131+
```
132+
133+
### Supported Query Types
134+
135+
IPC supports all major query types:
136+
137+
- `SelectQuery` - SELECT operations
138+
- `UpdateQuery` - UPDATE operations
139+
- `InsertQuery` - INSERT operations
140+
- `UpsertQuery` - UPSERT operations
141+
- `LeftJoin`, `InnerJoin`, `WindowJoin` - JOIN operations
142+
143+
---
144+
145+
## :material-database-cog: Connection Pooling
146+
147+
The `IPCClient` maintains a pool of connections. You can manage multiple connections:
148+
149+
```python
150+
>>> client = IPCClient(host="localhost", port=5000)
151+
152+
>>> conn1 = client.acquire()
153+
>>> conn2 = client.acquire()
154+
>>> client
155+
IPCClient(host=localhost, port=5000, pool_size: 2)
156+
```
157+
158+
To dispose all connections at once:
159+
160+
```python
161+
>>> client.dispose_connections()
162+
>>> client
163+
IPCClient(host=localhost, port=5000, pool_size: 0)
164+
```
165+
166+
---
167+
168+
## :material-alert: Error Handling
169+
170+
IPC operations can raise `RayforceIPCError` in various scenarios:
171+
172+
```python
173+
>>> from rayforce import IPCClient, errors
174+
175+
>>> client = IPCClient(host="localhost", port=9999)
176+
>>> try:
177+
... with client.acquire() as conn:
178+
... result = conn.execute("(+ 1 2)")
179+
... except errors.RayforceIPCError as e:
180+
... print(f"IPC Error: {e}")
181+
```
182+
183+
Common error scenarios:
184+
185+
- **Connection refused** - Server is not running or port is incorrect
186+
- **Port already in use** - Another process is using the specified port
187+
- **Connection closed** - Server closed the connection
188+
- **Invalid port** - Port number is out of valid range (1-65535)

docs/docs/content/documentation/plugins/ipc.md

Lines changed: 0 additions & 112 deletions
This file was deleted.

docs/mkdocs.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,8 @@ nav:
133133
- Joins: content/documentation/query-guide/joins.md
134134
- Order By: content/documentation/query-guide/order-by.md
135135
- Group By: content/documentation/query-guide/group-by.md
136+
- IPC: content/documentation/IPC.md
136137
- Plugins:
137-
- IPC: content/documentation/plugins/ipc.md
138138
- Polars: content/documentation/plugins/polars.md
139139
- Pandas: content/documentation/plugins/pandas.md
140140
- KDB: content/documentation/plugins/kdb.md

rayforce/__init__.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,8 +90,9 @@
9090
Vector,
9191
)
9292
from .utils import ( # noqa: E402
93+
IPCClient,
9394
IPCConnection,
94-
IPCEngine,
95+
IPCServer,
9596
eval_obj,
9697
eval_str,
9798
python_to_ray,
@@ -113,8 +114,9 @@
113114
"Date",
114115
"Dict",
115116
"Fn",
117+
"IPCClient",
116118
"IPCConnection",
117-
"IPCEngine",
119+
"IPCServer",
118120
"List",
119121
"Null",
120122
"Operation",

rayforce/_rayforce_c.pyi

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,5 +94,8 @@ def upsert(table: RayObject, keys: RayObject, data: RayObject) -> RayObject: ...
9494
def hopen(path: RayObject) -> RayObject: ...
9595
def hclose(handle: RayObject) -> None: ...
9696
def write(handle: RayObject, data: RayObject) -> None: ...
97+
def ipc_listen(port: int) -> int: ...
98+
def ipc_close_listener(listener_id: int) -> None: ...
99+
def runtime_run() -> int: ...
97100
def get_obj_type(obj: RayObject) -> int: ...
98101
def init_runtime() -> None: ...

rayforce/ffi.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,21 @@ def hclose(handle: r.RayObject) -> None:
282282
def write(handle: r.RayObject, data: r.RayObject) -> None:
283283
return r.write(handle, data)
284284

285+
@staticmethod
286+
@errors.error_handler
287+
def ipc_listen(port: int) -> int:
288+
return r.ipc_listen(port)
289+
290+
@staticmethod
291+
@errors.error_handler
292+
def ipc_close_listener(listener_id: int) -> None:
293+
return r.ipc_close_listener(listener_id)
294+
295+
@staticmethod
296+
@errors.error_handler
297+
def runtime_run() -> int:
298+
return r.runtime_run()
299+
285300
@staticmethod
286301
@errors.error_handler
287302
def set_obj(obj: r.RayObject, idx: r.RayObject, value: r.RayObject) -> None:

rayforce/utils/__init__.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
from .conversion import python_to_ray, ray_to_python
22
from .evaluation import eval_obj, eval_str
3-
from .ipc import IPCConnection, IPCEngine
3+
from .ipc import IPCClient, IPCConnection, IPCServer
44

55
__all__ = [
6+
"IPCClient",
67
"IPCConnection",
7-
"IPCEngine",
8+
"IPCServer",
89
"eval_obj",
910
"eval_str",
1011
"python_to_ray",

0 commit comments

Comments
 (0)