Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

- **Custom TLS Configuration**: Added support for custom TLS/SSL configuration via the `TlsConfig` interface. The SDK uses `SecureTlsConfig` (system CA certificates) by default, with optional custom implementations for advanced use cases such as custom CA certificates, mutual TLS (mTLS), or custom cipher suites.

- **Configurable Message Size Limit**: Added `max_message_size_bytes` option to `StreamConfigurationOptions` to limit the size of individual records sent to the server. Defaults to 10MB. Records exceeding this limit will fail fast with an actionable error message before being sent to the server. Set to `-1` for unlimited (not recommended for production).

### Bug Fixes

### Documentation
Expand All @@ -14,6 +16,8 @@
- Added `TlsConfig` section to API Reference
- Updated example files to include custom TLS configuration examples
- Added brief mentions of advanced configuration options in appropriate sections
- Added `max_message_size_bytes` to Configuration table and API Reference
- Updated all example files to show message size limit configuration

### Internal Changes

Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,7 @@ For advanced use cases requiring custom authentication headers or TLS configurat
|--------|---------|-------------|
| `record_type` | `RecordType.PROTO` | Serialization format: `RecordType.PROTO` or `RecordType.JSON` |
| `max_inflight_records` | 50000 | Maximum number of unacknowledged records |
| `max_message_size_bytes` | 10485760 (10MB) | Maximum size of a single record in bytes. Set to `-1` for unlimited |
| `recovery` | True | Enable automatic stream recovery |
| `recovery_timeout_ms` | 15000 | Timeout for recovery operations (ms) |
| `recovery_backoff_ms` | 2000 | Delay between recovery attempts (ms) |
Expand Down Expand Up @@ -890,6 +891,7 @@ Configuration options for stream behavior.
StreamConfigurationOptions(
record_type: RecordType = RecordType.PROTO,
max_inflight_records: int = 50000,
max_message_size_bytes: int = 10485760,
recovery: bool = True,
recovery_timeout_ms: int = 15000,
recovery_backoff_ms: int = 2000,
Expand All @@ -903,6 +905,7 @@ StreamConfigurationOptions(
**Parameters:**
- `record_type` (RecordType) - Serialization format: `RecordType.PROTO` (default) or `RecordType.JSON`
- `max_inflight_records` (int) - Maximum number of unacknowledged records (default: 50000)
- `max_message_size_bytes` (int) - Maximum size of a single record in bytes (default: 10MB). Records exceeding this limit will raise a `ValueError` with an actionable error message. Set to `-1` for unlimited (not recommended)
- `recovery` (bool) - Enable or disable automatic stream recovery (default: True)
- `recovery_timeout_ms` (int) - Recovery operation timeout in milliseconds (default: 15000)
- `recovery_backoff_ms` (int) - Delay between recovery attempts in milliseconds (default: 2000)
Expand Down
1 change: 1 addition & 0 deletions examples/async_example_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ async def main():
max_inflight_records=10_000, # Allow 10k records in flight
recovery=True, # Enable automatic recovery
ack_callback=create_ack_callback(), # Track acknowledgments
# max_message_size_bytes=10 * 1024 * 1024, # Max record size (default: 10MB, use -1 for unlimited)
)
logger.info("✓ Stream configuration created")

Expand Down
1 change: 1 addition & 0 deletions examples/async_example_proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ async def main():
max_inflight_records=10_000, # Allow 10k records in flight
recovery=True, # Enable automatic recovery
ack_callback=create_ack_callback(), # Track acknowledgments
# max_message_size_bytes=10 * 1024 * 1024, # Max record size (default: 10MB, use -1 for unlimited)
)
logger.info("✓ Stream configuration created (Protobuf mode)")

Expand Down
1 change: 1 addition & 0 deletions examples/sync_example_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ def main():
recovery_timeout_ms=15000,
recovery_backoff_ms=2000,
recovery_retries=3,
# max_message_size_bytes=10 * 1024 * 1024, # Max record size (default: 10MB, use -1 for unlimited)
)
logger.info("✓ Stream configuration created")

Expand Down
1 change: 1 addition & 0 deletions examples/sync_example_proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ def main():
recovery_timeout_ms=15000,
recovery_backoff_ms=2000,
recovery_retries=3,
# max_message_size_bytes=10 * 1024 * 1024, # Max record size (default: 10MB, use -1 for unlimited)
)
logger.info("✓ Stream configuration created (Protobuf mode)")

Expand Down
142 changes: 142 additions & 0 deletions tests/test_record_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,5 +324,147 @@ def create_ephemeral_stream(generator, **kwargs):
mock_grpc_stream.cancel()


class TestMessageSizeLimit(unittest.IsolatedAsyncioTestCase):
"""Test message size limit configuration."""

@for_both_sdks
async def test_record_exceeds_size_limit(self, sdk: SdkManager):
"""Test that record exceeding size limit raises ValueError with actionable message."""
calls_count = 0
mock_grpc_stream = sdk.get_mock_class()(calls_count)

def create_ephemeral_stream(generator, **kwargs):
nonlocal calls_count
nonlocal mock_grpc_stream
calls_count += 1
mock_grpc_stream = sdk.get_mock_class()(calls_count, generator)
return mock_grpc_stream

mock_channel = MockGrpcChannel()
mock_channel.injected_methods["/databricks.zerobus.Zerobus/EphemeralStream"] = create_ephemeral_stream

# Set a small message size limit (100 bytes)
options = StreamConfigurationOptions(
recovery=False,
max_inflight_records=150,
token_factory=token_factory,
record_type=RecordType.JSON,
max_message_size_bytes=100,
)

with patch(sdk.get_grpc_override(), return_value=mock_channel):
sdk_handle = sdk.create(SERVER_ENDPOINT)
stream = await sdk_handle.create_stream(
TableProperties(TABLE_NAME),
options,
)

# Try to ingest a record larger than 100 bytes
large_record = {"data": "x" * 200}

with self.assertRaises(ValueError) as context:
await stream.ingest_record(large_record)

error_message = str(context.exception)
self.assertIn("exceeds maximum allowed size", error_message)
self.assertIn("100 bytes", error_message)
self.assertIn("max_message_size_bytes", error_message)

await stream.close()

mock_grpc_stream.cancel()

@for_both_sdks
async def test_record_within_size_limit(self, sdk: SdkManager):
"""Test that records within size limit work normally."""
calls_count = 0
mock_grpc_stream = sdk.get_mock_class()(calls_count)

def create_ephemeral_stream(generator, **kwargs):
nonlocal calls_count
nonlocal mock_grpc_stream
calls_count += 1
mock_grpc_stream = sdk.get_mock_class()(calls_count, generator)
mock_grpc_stream.inject_response(False, InjectedRecordResponse(0, timeout_ms=10))
return mock_grpc_stream

mock_channel = MockGrpcChannel()
mock_channel.injected_methods["/databricks.zerobus.Zerobus/EphemeralStream"] = create_ephemeral_stream

# Set a size limit that accommodates the record
options = StreamConfigurationOptions(
recovery=False,
max_inflight_records=150,
token_factory=token_factory,
record_type=RecordType.JSON,
max_message_size_bytes=1000,
)

with patch(sdk.get_grpc_override(), return_value=mock_channel):
sdk_handle = sdk.create(SERVER_ENDPOINT)
stream = await sdk_handle.create_stream(
TableProperties(TABLE_NAME),
options,
)

# Ingest a record smaller than 1000 bytes
small_record = {"data": "small"}
ack = await stream.ingest_record(small_record)
offset_ack = await ack
self.assertEqual(offset_ack, 0)

await stream.close()

mock_grpc_stream.cancel()

@for_both_sdks
async def test_unlimited_message_size(self, sdk: SdkManager):
"""Test that setting max_message_size_bytes to -1 allows unlimited size."""
calls_count = 0
mock_grpc_stream = sdk.get_mock_class()(calls_count)

def create_ephemeral_stream(generator, **kwargs):
nonlocal calls_count
nonlocal mock_grpc_stream
calls_count += 1
mock_grpc_stream = sdk.get_mock_class()(calls_count, generator)
mock_grpc_stream.inject_response(False, InjectedRecordResponse(0, timeout_ms=10))
return mock_grpc_stream

mock_channel = MockGrpcChannel()
mock_channel.injected_methods["/databricks.zerobus.Zerobus/EphemeralStream"] = create_ephemeral_stream

# Set unlimited message size
options = StreamConfigurationOptions(
recovery=False,
max_inflight_records=150,
token_factory=token_factory,
record_type=RecordType.JSON,
max_message_size_bytes=-1,
)

with patch(sdk.get_grpc_override(), return_value=mock_channel):
sdk_handle = sdk.create(SERVER_ENDPOINT)
stream = await sdk_handle.create_stream(
TableProperties(TABLE_NAME),
options,
)

# Should accept large records when unlimited
large_record = {"data": "x" * 10000}
ack = await stream.ingest_record(large_record)
offset_ack = await ack
self.assertEqual(offset_ack, 0)

await stream.close()

mock_grpc_stream.cancel()

def test_default_message_size_is_10mb(self):
"""Test that default max_message_size_bytes is 10MB."""
options = StreamConfigurationOptions()
self.assertEqual(options.max_message_size_bytes, 10 * 1024 * 1024)


if __name__ == "__main__":
unittest.main()
17 changes: 12 additions & 5 deletions zerobus/sdk/aio/zerobus_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,11 +575,9 @@ async def ingest_record(self, record: Union[Message, dict]) -> asyncio.Future:
Args:
record: Either a Protobuf Message object or a dict (for JSON records).
Type must match the stream's configured record_type.

Returns:
asyncio.Future: A future that will be completed with the server's acknowledgment.
The caller can await this future to confirm receipt.

Raises:
ValueError: If record type doesn't match stream configuration.
ZerobusException: If the stream is not in a valid state for ingestion.
Expand All @@ -592,7 +590,6 @@ async def ingest_record(self, record: Union[Message, dict]) -> asyncio.Future:
"Pass a Protobuf Message object."
)
serialized_record = record.SerializeToString()

elif self._options.record_type == RecordType.JSON:
if not isinstance(record, dict):
raise ValueError(
Expand All @@ -601,10 +598,17 @@ async def ingest_record(self, record: Union[Message, dict]) -> asyncio.Future:
)
# Serialize dict to JSON string and encode to bytes
serialized_record = json.dumps(record).encode("utf-8")

else:
raise ValueError(f"Unsupported record type: {self._options.record_type}")

# Validate message size before sending
max_size = self._options.max_message_size_bytes
if max_size != -1 and len(serialized_record) > max_size:
raise ValueError(
f"Record size ({len(serialized_record)} bytes) exceeds maximum allowed size ({max_size} bytes). "
f"Reduce the record size or increase max_message_size_bytes in StreamConfigurationOptions."
)

# Wait for the flush to finish
async with self.__state_changed:
if self.__state == StreamState.FLUSHING:
Expand Down Expand Up @@ -806,7 +810,10 @@ async def create_stream(
channel = grpc.aio.secure_channel(
self.__host,
channel_credentials,
options=[("grpc.max_send_message_length", -1), ("grpc.max_receive_message_length", -1)],
options=[
("grpc.max_send_message_length", options.max_message_size_bytes),
("grpc.max_receive_message_length", -1),
],
)
stub = zerobus_service_pb2_grpc.ZerobusStub(channel)
stream = ZerobusStream(
Expand Down
4 changes: 4 additions & 0 deletions zerobus/sdk/shared/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ def __init__(self, **kwargs):
# Type of records to ingest into the stream (default: PROTO for backwards compatibility)
self.record_type: RecordType = RecordType.PROTO

# Maximum message size in bytes for gRPC messages (default: 10MB)
# Set to -1 for unlimited (not recommended for production)
self.max_message_size_bytes: int = 10 * 1024 * 1024

# Dynamically update attributes based on kwargs
for key, value in kwargs.items():
if hasattr(self, key):
Expand Down
15 changes: 12 additions & 3 deletions zerobus/sdk/sync/zerobus_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,6 @@ def ingest_record(self, record: Union[Message, dict]) -> RecordAcknowledgment:
"Pass a Protobuf Message object."
)
serialized_record = record.SerializeToString()

elif self._options.record_type == RecordType.JSON:
if not isinstance(record, dict):
raise ValueError(
Expand All @@ -724,10 +723,17 @@ def ingest_record(self, record: Union[Message, dict]) -> RecordAcknowledgment:
)
# Serialize dict to JSON string and encode to bytes
serialized_record = json.dumps(record).encode("utf-8")

else:
raise ValueError(f"Unsupported record type: {self._options.record_type}")

# Validate message size before sending
max_size = self._options.max_message_size_bytes
if max_size != -1 and len(serialized_record) > max_size:
raise ValueError(
f"Record size ({len(serialized_record)} bytes) exceeds maximum allowed size ({max_size} bytes). "
f"Reduce the record size or increase max_message_size_bytes in StreamConfigurationOptions."
)

with self.__state_changed:
if self.__state == StreamState.FLUSHING:
self.__state_changed.wait_for(lambda: self.__state != StreamState.FLUSHING)
Expand Down Expand Up @@ -915,7 +921,10 @@ def create_stream(
channel = grpc.secure_channel(
self.__host,
channel_credentials,
options=[("grpc.max_send_message_length", -1), ("grpc.max_receive_message_length", -1)],
options=[
("grpc.max_send_message_length", options.max_message_size_bytes),
("grpc.max_receive_message_length", -1),
],
)

stub = zerobus_service_pb2_grpc.ZerobusStub(channel)
Expand Down