Skip to content

Commit bc29cee

Browse files
(ws) Add network module
1 parent 49ce34e commit bc29cee

File tree

10 files changed

+153
-60
lines changed

10 files changed

+153
-60
lines changed
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
# :material-web: WebSocket Server
2+
3+
Rayforce-Py provides WebSocket server functionality that allows you to expose Rayforce queries over WebSocket connections. This enables WebSocket-compatible clients to interact with Rayforce using the standard WebSocket protocol.
4+
5+
## :material-information: Overview
6+
7+
The WebSocket implementation in Rayforce-Py consists of:
8+
9+
- **`WebSocketServer`** - An async WebSocket server that accepts connections and executes Rayforce queries
10+
- **`WebSocketConnection`** - Represents an individual WebSocket connection (used internally)
11+
12+
The WebSocket server uses Rayforce's native IPC protocol for message serialization, ensuring compatibility with Rayforce's binary format while providing a standard WebSocket interface.
13+
14+
!!! note "Optional Dependency"
15+
WebSocket functionality requires the `websockets` library. Install it with:
16+
```bash
17+
pip install websockets
18+
```
19+
20+
---
21+
22+
## :material-server: WebSocket Server
23+
24+
The `WebSocketServer` class creates an async WebSocket server that listens for incoming connections and processes Rayforce queries.
25+
26+
### Installation
27+
28+
First, ensure you have the `websockets` library installed:
29+
30+
```bash
31+
pip install websockets
32+
```
33+
34+
### Creating a Server
35+
36+
Import `WebSocketServer` from the network module:
37+
38+
```python
39+
>>> from rayforce.network.websocket import WebSocketServer
40+
41+
>>> server = WebSocketServer(port=8765)
42+
>>> server
43+
WebSocketServer(port=8765)
44+
```
45+
46+
### Starting the Server
47+
48+
The `run()` method starts the server and runs until stopped (e.g., via Ctrl+C):
49+
50+
```python
51+
>>> import asyncio
52+
>>> from rayforce.network.websocket import WebSocketServer
53+
54+
>>> async def main():
55+
... server = WebSocketServer(port=8765)
56+
... await server.run()
57+
58+
>>> asyncio.run(main())
59+
Rayforce WebSocket Server listening on 0.0.0.0:8765
60+
^C
61+
Rayforce WebSocket Server stopped.
62+
```
63+
64+
!!! note "Async Operation"
65+
The `run()` method is **async** and must be called with `await` or within an async context.
66+
67+
---
68+
69+
## :material-code-tags: Message Format
70+
71+
The WebSocket server accepts two types of messages:
72+
73+
### Text Messages
74+
75+
Text messages are treated as Rayforce string queries and are evaluated directly
76+
77+
### Binary Messages
78+
79+
Binary messages must be in Rayforce's native IPC format:
80+
- **Header**: 16 bytes (prefix, version, flags, endian, msgtype, size)
81+
- **Payload**: Serialized RayObject using `ser_obj`
82+
83+
The server deserializes the binary data, evaluates it, and returns the result in the same binary format.
84+
85+
86+
#### Next: [IPC Documentation](./IPC.md)

docs/mkdocs.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ nav:
134134
- Order By: content/documentation/query-guide/order-by.md
135135
- Group By: content/documentation/query-guide/group-by.md
136136
- IPC: content/documentation/IPC.md
137+
- Websocket: content/documentation/websocket.md
137138
- Plugins:
138139
- Polars: content/documentation/plugins/polars.md
139140
- Pandas: content/documentation/plugins/pandas.md

rayforce/__init__.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,11 @@
6464
RayforceUserError,
6565
RayforceValueError,
6666
)
67+
from .network import ( # noqa: E402
68+
IPCClient,
69+
IPCConnection,
70+
IPCServer,
71+
)
6772
from .types import ( # noqa: E402
6873
B8,
6974
C8,
@@ -90,9 +95,6 @@
9095
Vector,
9196
)
9297
from .utils import ( # noqa: E402
93-
IPCClient,
94-
IPCConnection,
95-
IPCServer,
9698
eval_obj,
9799
eval_str,
98100
python_to_ray,

rayforce/network/__init__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from .ipc import IPCClient, IPCConnection, IPCServer
2+
3+
__all__ = [
4+
"IPCClient",
5+
"IPCConnection",
6+
"IPCServer",
7+
# Do not import websockets here to avoid import errors.
8+
# Websockets aren't core, but rather an optional plugin.
9+
]
Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,10 @@
1414
from rayforce.types.scalars.numeric.unsigned import U8
1515

1616
try:
17-
import asyncio
18-
1917
from websockets import Server, serve # type: ignore[import-not-found]
2018
except ImportError as e:
21-
module_name = str(e).split("'")[1]
2219
raise ImportError(
23-
f"{module_name} library is required. Install it with: pip install {module_name}"
20+
"websockets library is required. Install it with: pip install websockets"
2421
) from e
2522

2623

rayforce/utils/__init__.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,7 @@
11
from .conversion import python_to_ray, ray_to_python
22
from .evaluation import eval_obj, eval_str
3-
from .ipc import IPCClient, IPCConnection, IPCServer
43

54
__all__ = [
6-
"IPCClient",
7-
"IPCConnection",
8-
"IPCServer",
95
"eval_obj",
106
"eval_str",
117
"python_to_ray",

tests/network/__init__.py

Whitespace-only changes.
Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from rayforce.types import Table
77
from rayforce import errors
88
from rayforce.ffi import FFI
9-
from rayforce.utils.ipc import (
9+
from rayforce.network.ipc import (
1010
IPCConnection,
1111
IPCClient,
1212
IPCServer,
@@ -44,11 +44,11 @@ def mock_handle(self):
4444

4545
@pytest.fixture
4646
def connection(self, mock_engine, mock_handle):
47-
with patch("rayforce.utils.ipc.FFI.get_obj_type", return_value=r.TYPE_I64):
47+
with patch("rayforce.network.ipc.FFI.get_obj_type", return_value=r.TYPE_I64):
4848
return IPCConnection(engine=mock_engine, handle=mock_handle)
4949

50-
@patch("rayforce.utils.ipc.FFI.write")
51-
@patch("rayforce.utils.ipc.ray_to_python")
50+
@patch("rayforce.network.ipc.FFI.write")
51+
@patch("rayforce.network.ipc.ray_to_python")
5252
def test_execute(self, mock_ray_to_python, mock_write, connection):
5353
mock_write.return_value = MagicMock()
5454
mock_ray_to_python.return_value = "result"
@@ -63,7 +63,7 @@ def test_execute_closed(self, connection):
6363
with pytest.raises(errors.RayforceIPCError, match="Cannot write to closed connection"):
6464
connection.execute("test_query")
6565

66-
@patch("rayforce.utils.ipc.FFI.hclose")
66+
@patch("rayforce.network.ipc.FFI.hclose")
6767
def test_close_removes_from_pool(self, mock_hclose, mock_engine, mock_handle):
6868
conn = IPCConnection(engine=mock_engine, handle=mock_handle)
6969
conn_id = id(conn)
@@ -74,7 +74,7 @@ def test_close_removes_from_pool(self, mock_hclose, mock_engine, mock_handle):
7474
assert conn._closed is True
7575
mock_hclose.assert_called_once_with(conn.handle)
7676

77-
@patch("rayforce.utils.ipc.FFI.hclose")
77+
@patch("rayforce.network.ipc.FFI.hclose")
7878
def test_close_idempotent(self, mock_hclose, connection):
7979
connection.close()
8080
connection.close()
@@ -92,8 +92,8 @@ class TestIPCEngine:
9292
def engine(self):
9393
return IPCClient(host="localhost", port=5000)
9494

95-
@patch("rayforce.utils.ipc.FFI.get_obj_type")
96-
@patch("rayforce.utils.ipc.FFI.hopen")
95+
@patch("rayforce.network.ipc.FFI.get_obj_type")
96+
@patch("rayforce.network.ipc.FFI.hopen")
9797
def test_acquire_success(self, mock_hopen, mock_get_obj_type, engine):
9898
mock_handle = MagicMock(spec=r.RayObject)
9999

@@ -111,9 +111,9 @@ def get_obj_type_side_effect(obj):
111111
assert conn.handle == mock_handle
112112
assert id(conn) in engine.pool
113113

114-
@patch("rayforce.utils.ipc.FFI.get_obj_type")
115-
@patch("rayforce.utils.ipc.FFI.hopen")
116-
@patch("rayforce.utils.ipc.FFI.get_error_obj")
114+
@patch("rayforce.network.ipc.FFI.get_obj_type")
115+
@patch("rayforce.network.ipc.FFI.hopen")
116+
@patch("rayforce.network.ipc.FFI.get_error_obj")
117117
def test_acquire_failure(self, mock_get_error, mock_hopen, mock_get_obj_type, engine):
118118
mock_error = MagicMock(spec=r.RayObject)
119119

@@ -129,8 +129,8 @@ def get_obj_type_side_effect(obj):
129129
with pytest.raises(errors.RayforceIPCError, match="Error when establishing connection"):
130130
engine.acquire()
131131

132-
@patch("rayforce.utils.ipc.FFI.get_obj_type")
133-
@patch("rayforce.utils.ipc.FFI.hopen")
132+
@patch("rayforce.network.ipc.FFI.get_obj_type")
133+
@patch("rayforce.network.ipc.FFI.hopen")
134134
def test_dispose_connections(self, mock_hopen, mock_get_obj_type, engine):
135135
mock_handle1 = MagicMock(spec=r.RayObject)
136136
mock_handle2 = MagicMock(spec=r.RayObject)
@@ -174,8 +174,8 @@ def test_init_invalid_port_too_low(self, port):
174174
with pytest.raises(errors.RayforceIPCError, match="Invalid port number"):
175175
IPCServer(port=port)
176176

177-
@patch("rayforce.utils.ipc.FFI.ipc_listen")
178-
@patch("rayforce.utils.ipc.FFI.runtime_run")
177+
@patch("rayforce.network.ipc.FFI.ipc_listen")
178+
@patch("rayforce.network.ipc.FFI.runtime_run")
179179
def test_listen_success(self, mock_runtime_run, mock_ipc_listen, server):
180180
mock_ipc_listen.return_value = 123
181181
mock_runtime_run.return_value = 0
@@ -186,9 +186,9 @@ def test_listen_success(self, mock_runtime_run, mock_ipc_listen, server):
186186
assert server._listener_id == 123
187187
mock_runtime_run.assert_called_once()
188188

189-
@patch("rayforce.utils.ipc.FFI.ipc_listen")
190-
@patch("rayforce.utils.ipc.FFI.runtime_run")
191-
@patch("rayforce.utils.ipc.FFI.ipc_close_listener")
189+
@patch("rayforce.network.ipc.FFI.ipc_listen")
190+
@patch("rayforce.network.ipc.FFI.runtime_run")
191+
@patch("rayforce.network.ipc.FFI.ipc_close_listener")
192192
def test_listen_closes_on_exception(
193193
self, mock_close, mock_runtime_run, mock_ipc_listen, server
194194
):
@@ -202,9 +202,9 @@ def test_listen_closes_on_exception(
202202
mock_close.assert_called_once_with(123)
203203
assert server._listener_id is None
204204

205-
@patch("rayforce.utils.ipc.FFI.ipc_listen")
206-
@patch("rayforce.utils.ipc.FFI.runtime_run")
207-
@patch("rayforce.utils.ipc.FFI.ipc_close_listener")
205+
@patch("rayforce.network.ipc.FFI.ipc_listen")
206+
@patch("rayforce.network.ipc.FFI.runtime_run")
207+
@patch("rayforce.network.ipc.FFI.ipc_close_listener")
208208
def test_listen_closes_on_keyboard_interrupt(
209209
self, mock_close, mock_runtime_run, mock_ipc_listen, server
210210
):
@@ -238,7 +238,7 @@ def mock_handle(self):
238238

239239
@pytest.fixture
240240
def connection(self, mock_engine, mock_handle):
241-
with patch("rayforce.utils.ipc.FFI.get_obj_type", return_value=r.TYPE_I64):
241+
with patch("rayforce.network.ipc.FFI.get_obj_type", return_value=r.TYPE_I64):
242242
return IPCConnection(engine=mock_engine, handle=mock_handle)
243243

244244
def _capture_and_eval(self, connection, query_obj):
@@ -250,8 +250,8 @@ def capture_write(handle, data):
250250
mock_result = MagicMock(spec=r.RayObject)
251251
return mock_result
252252

253-
with patch("rayforce.utils.ipc.FFI.write", side_effect=capture_write):
254-
with patch("rayforce.utils.ipc.ray_to_python", return_value="mocked_result"):
253+
with patch("rayforce.network.ipc.FFI.write", side_effect=capture_write):
254+
with patch("rayforce.network.ipc.ray_to_python", return_value="mocked_result"):
255255
connection.execute(query_obj)
256256

257257
assert captured_obj is not None

0 commit comments

Comments
 (0)