Skip to content

Commit b134ccb

Browse files
robacourtclaude
andauthored
fix: Use lazy Stream operations in decode_json_stream to avoid memory spike (#3799)
## Summary Changed `Enum.filter` and `Enum.map` to `Stream.filter` and `Stream.map` in `decode_json_stream/1` to prevent loading the entire shape log into memory at once. ## Problem The `decode_json_stream/1` function in `Materializer` was breaking lazy evaluation: ```elixir stream |> Stream.map(&Jason.decode!/1) # lazy ✓ |> Enum.filter(...) # EAGER - materializes entire stream! |> Enum.map(...) # eager ``` When a shape has a long history (millions of operations), this causes a massive memory spike during Materializer startup because `Enum.filter` forces the entire stream into memory before processing continues. ## Solution Replace the eager `Enum` functions with lazy `Stream` equivalents: ```elixir stream |> Stream.map(&Jason.decode!/1) # lazy ✓ |> Stream.filter(...) # lazy ✓ |> Stream.map(...) # lazy ✓ ``` The downstream `apply_changes/2` function uses `Enum.reduce`, which correctly consumes the stream one element at a time, so records now flow through the entire pipeline lazily. ## Test Plan - [x] All existing tests pass --- Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 1c20fac commit b134ccb

File tree

2 files changed

+7
-2
lines changed

2 files changed

+7
-2
lines changed

.changeset/lazy-stream-decode.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@core/sync-service": patch
3+
---
4+
5+
Fix memory spike during Materializer startup by using lazy stream operations instead of eager Enum functions in `decode_json_stream/1`.

packages/sync-service/lib/electric/shapes/consumer/materializer.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -222,10 +222,10 @@ defmodule Electric.Shapes.Consumer.Materializer do
222222
defp decode_json_stream(stream) do
223223
stream
224224
|> Stream.map(&Jason.decode!/1)
225-
|> Enum.filter(fn decoded ->
225+
|> Stream.filter(fn decoded ->
226226
Map.has_key?(decoded, "key") || Map.has_key?(decoded["headers"], "event")
227227
end)
228-
|> Enum.map(fn
228+
|> Stream.map(fn
229229
%{
230230
"key" => key,
231231
"value" => value,

0 commit comments

Comments
 (0)