Skip to content

Commit 1c20fac

Browse files
robacourtclaude
andauthored
Fix Materializer startup race condition via offset coordination (#3794)
## Summary Fixes #3787 - addresses the snapshot/replication race condition in the Materializer that caused "Key already exists" crashes. ### The race condition The Materializer subscribed to the Consumer **before** reading from storage: ```elixir # BEFORE: In handle_continue(:start_materializer, ...) Consumer.subscribe_materializer(stack_id, shape_handle, self()) # <- Subscribes first {:noreply, state, {:continue, {:read_stream, shape_storage}}} # <- Then reads ALL storage ``` During the window between subscribing and reading, any changes that arrive via `Consumer.new_changes()` would be delivered to the Materializer. If those changes included records that were also in the snapshot being read, the Materializer received duplicates and crashed. ### Production example (maxwell instance, 27 Jan 2026) ``` 18:10:10.437 [error] GenServer Materializer "97489818-..." terminating ** (RuntimeError) Key "public"."offers"/"d3c8d8a5-5060-4a36-a67d-240de0c95a88" already exists ``` The transaction that triggered it: ```elixir %Electric.Replication.Changes.NewRecord{ relation: {"public", "offers"}, record: %{"id" => "d3c8d8a5-5060-4a36-a67d-240de0c95a88"}, key: "\"public\".\"offers\"/\"d3c8d8a5-5060-4a36-a67d-240de0c95a88\"", move_tags: ["e12422d3af57a36d01a50b4645a517e4"] # <- Move-in event } ``` The record was already in the snapshot (matched via `is_template = true` OR the subquery), AND was delivered via replication with `move_tags` as a move-in event. ### The fix Consumer now returns its current offset when a Materializer subscribes. The Materializer reads storage **only up to that offset**, not beyond. Changes after that offset will be delivered via `new_changes` messages, ensuring each change is delivered exactly once. ```elixir # AFTER: Consumer returns offset on subscription def handle_call({:subscribe_materializer, pid}, _from, state) do {:reply, {:ok, state.latest_offset}, %{state | materializer_subscribed?: true}, ...} end # AFTER: Materializer uses offset to bound storage reads {:ok, subscribed_offset} = Consumer.subscribe_materializer(stack_id, shape_handle, self()) {:noreply, %{state | subscribed_offset: subscribed_offset}, {:continue, {:read_stream, ...}}} # In handle_continue({:read_stream, ...}) {:ok, offset, stream} = get_stream_up_to_offset(state.offset, state.subscribed_offset, storage) ``` ## Test plan - [x] Added test verifying offset coordination prevents duplicates - [x] All existing Materializer tests pass (35 tests) - [x] Full test suite passes (1334 tests) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 2a0902e commit 1c20fac

File tree

4 files changed

+146
-24
lines changed

4 files changed

+146
-24
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
'@core/sync-service': patch
3+
---
4+
5+
Fix Materializer startup race condition that caused "Key already exists" crashes
6+
7+
The Materializer subscribed to the Consumer before reading from storage, creating a window where the same record could be delivered twice (via storage AND via new_changes). Now the Consumer returns its current offset on subscription, and the Materializer reads storage only up to that offset.

packages/sync-service/lib/electric/shapes/consumer.ex

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ defmodule Electric.Shapes.Consumer do
5454
|> GenServer.call(:await_snapshot_start, timeout)
5555
end
5656

57-
@spec subscribe_materializer(Electric.stack_id(), Electric.shape_handle(), pid()) :: :ok
57+
@spec subscribe_materializer(Electric.stack_id(), Electric.shape_handle(), pid()) ::
58+
{:ok, LogOffset.t()}
5859
def subscribe_materializer(stack_id, shape_handle, pid) do
5960
stack_id
6061
|> consumer_pid(shape_handle)
@@ -210,7 +211,9 @@ defmodule Electric.Shapes.Consumer do
210211
def handle_call({:subscribe_materializer, pid}, _from, state) do
211212
Logger.debug("Subscribing materializer for #{state.shape_handle}")
212213
Process.monitor(pid, tag: :materializer_down)
213-
{:reply, :ok, %{state | materializer_subscribed?: true}, state.hibernate_after}
214+
215+
{:reply, {:ok, state.latest_offset}, %{state | materializer_subscribed?: true},
216+
state.hibernate_after}
214217
end
215218

216219
def handle_call({:stop, reason}, _from, state) do

packages/sync-service/lib/electric/shapes/consumer/materializer.ex

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ defmodule Electric.Shapes.Consumer.Materializer do
9797
tag_indices: %{},
9898
value_counts: %{},
9999
offset: LogOffset.before_all(),
100+
subscribed_offset: nil,
100101
ref: nil,
101102
subscribers: MapSet.new()
102103
})
@@ -113,13 +114,15 @@ defmodule Electric.Shapes.Consumer.Materializer do
113114
try do
114115
case Consumer.await_snapshot_start(stack_id, shape_handle, :infinity) do
115116
:started ->
116-
Consumer.subscribe_materializer(stack_id, shape_handle, self())
117+
{:ok, subscribed_offset} =
118+
Consumer.subscribe_materializer(stack_id, shape_handle, self())
117119

118120
Process.monitor(Consumer.whereis(stack_id, shape_handle),
119121
tag: {:consumer_down, state.shape_handle}
120122
)
121123

122-
{:noreply, state, {:continue, {:read_stream, shape_storage}}}
124+
{:noreply, %{state | subscribed_offset: subscribed_offset},
125+
{:continue, {:read_stream, shape_storage}}}
123126

124127
{:error, _reason} ->
125128
{:stop, :shutdown, state}
@@ -133,7 +136,8 @@ defmodule Electric.Shapes.Consumer.Materializer do
133136
end
134137

135138
def handle_continue({:read_stream, storage}, state) do
136-
{:ok, offset, stream} = get_stream_up_to_date(state.offset, storage)
139+
{:ok, offset, stream} =
140+
get_stream_up_to_offset(state.offset, state.subscribed_offset, storage)
137141

138142
{state, _} =
139143
stream
@@ -143,22 +147,20 @@ defmodule Electric.Shapes.Consumer.Materializer do
143147
{:noreply, %{state | offset: offset}}
144148
end
145149

146-
def get_stream_up_to_date(min_offset, storage) do
147-
case Storage.get_chunk_end_log_offset(min_offset, storage) do
148-
nil ->
149-
{:ok, max_offset} = Storage.fetch_latest_offset(storage)
150-
151-
if is_log_offset_lte(max_offset, min_offset) do
152-
{:ok, min_offset, []}
153-
else
154-
stream = Storage.get_log_stream(min_offset, max_offset, storage)
155-
{:ok, max_offset, stream}
156-
end
157-
158-
max_offset ->
159-
stream1 = Storage.get_log_stream(min_offset, max_offset, storage)
160-
{:ok, offset, stream2} = get_stream_up_to_date(max_offset, storage)
161-
{:ok, offset, Stream.concat(stream1, stream2)}
150+
@doc """
151+
Get a stream of log entries from storage, bounded by the subscribed offset.
152+
153+
The subscribed_offset is the Consumer's latest_offset at the time of subscription.
154+
We only read up to this offset to avoid duplicates - any changes after this offset
155+
will be delivered via new_changes messages from the Consumer.
156+
"""
157+
def get_stream_up_to_offset(min_offset, subscribed_offset, storage) do
158+
# If subscribed_offset is nil or at/before min_offset, nothing to read
159+
if is_nil(subscribed_offset) or is_log_offset_lte(subscribed_offset, min_offset) do
160+
{:ok, min_offset, []}
161+
else
162+
stream = Storage.get_log_stream(min_offset, subscribed_offset, storage)
163+
{:ok, subscribed_offset, stream}
162164
end
163165
end
164166

packages/sync-service/test/electric/shapes/consumer/materializer_test.exs

Lines changed: 113 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,11 @@ defmodule Electric.Shapes.Consumer.MaterializerTest do
6262
})
6363

6464
respond_to_call(:await_snapshot_start, :started)
65-
respond_to_call(:subscribe_materializer, :ok)
65+
66+
respond_to_call(
67+
:subscribe_materializer,
68+
{:ok, Electric.Replication.LogOffset.last_before_real_offsets()}
69+
)
6670

6771
assert Materializer.wait_until_ready(ctx) == :ok
6872
end
@@ -79,7 +83,11 @@ defmodule Electric.Shapes.Consumer.MaterializerTest do
7983
})
8084

8185
respond_to_call(:await_snapshot_start, :started)
82-
respond_to_call(:subscribe_materializer, :ok)
86+
87+
respond_to_call(
88+
:subscribe_materializer,
89+
{:ok, Electric.Replication.LogOffset.last_before_real_offsets()}
90+
)
8391

8492
assert Materializer.wait_until_ready(ctx) == :ok
8593

@@ -763,7 +771,11 @@ defmodule Electric.Shapes.Consumer.MaterializerTest do
763771
})
764772

765773
respond_to_call(:await_snapshot_start, :started)
766-
respond_to_call(:subscribe_materializer, :ok)
774+
775+
respond_to_call(
776+
:subscribe_materializer,
777+
{:ok, Electric.Replication.LogOffset.last_before_real_offsets()}
778+
)
767779

768780
assert Materializer.wait_until_ready(ctx) == :ok
769781
Materializer.subscribe(ctx)
@@ -790,6 +802,104 @@ defmodule Electric.Shapes.Consumer.MaterializerTest do
790802
|> Enum.map(&Changes.fill_key(&1, pk_cols))
791803
end
792804

805+
describe "startup offset coordination" do
806+
test "no duplicate when offset coordination prevents overlap", ctx do
807+
# This test verifies the fix: offset coordination prevents the duplicate.
808+
# We use a mocked Consumer that returns an offset, and the Materializer
809+
# should only read storage up to that offset, preventing duplicates.
810+
811+
Process.flag(:trap_exit, true)
812+
813+
shape_handle = "offset-test-#{System.unique_integer()}"
814+
815+
# Setup storage with a record at offset first()
816+
storage = Storage.for_shape(shape_handle, ctx.storage)
817+
Storage.start_link(storage)
818+
writer = Storage.init_writer!(storage, @shape)
819+
Storage.mark_snapshot_as_started(storage)
820+
821+
# Write a record to storage at LogOffset.first()
822+
alias Electric.Replication.LogOffset
823+
first_offset = LogOffset.first()
824+
825+
writer =
826+
Storage.append_to_log!(
827+
[
828+
{first_offset, ~s|"public"."test_table"/"1"|, :insert,
829+
~s|{"key":"\\"public\\".\\"test_table\\"/\\"1\\"","value":{"id":"1","value":"10"},"headers":{"operation":"insert"}}|}
830+
],
831+
writer
832+
)
833+
834+
Storage.hibernate(writer)
835+
836+
# Mock Consumer that returns offset AND sends duplicate
837+
test_pid = self()
838+
839+
consumer =
840+
spawn(fn ->
841+
receive do
842+
{:"$gen_call", {from, ref}, :await_snapshot_start} ->
843+
send(from, {ref, :started})
844+
end
845+
846+
receive do
847+
{:"$gen_call", {from, ref}, {:subscribe_materializer, mat_pid}} ->
848+
# Return offset BEFORE the record - Materializer will read nothing from storage
849+
send(from, {ref, {:ok, LogOffset.before_all()}})
850+
851+
# Simulate the race: send the same record via new_changes
852+
# This should NOT crash because Materializer didn't read it from storage
853+
GenServer.call(
854+
mat_pid,
855+
{:new_changes,
856+
[
857+
%Changes.NewRecord{
858+
relation: {"public", "test_table"},
859+
key: ~s|"public"."test_table"/"1"|,
860+
record: %{"id" => "1", "value" => "10"},
861+
move_tags: []
862+
}
863+
]}
864+
)
865+
866+
send(test_pid, :consumer_done)
867+
end
868+
869+
Process.sleep(:infinity)
870+
end)
871+
872+
ConsumerRegistry.register_consumer(consumer, shape_handle, ctx.stack_id)
873+
874+
# Start Materializer - should NOT crash
875+
result =
876+
Materializer.start_link(%{
877+
stack_id: ctx.stack_id,
878+
shape_handle: shape_handle,
879+
storage: ctx.storage,
880+
columns: ["value"],
881+
materialized_type: {:array, :int8}
882+
})
883+
884+
case result do
885+
{:ok, pid} ->
886+
# Wait for Consumer mock to finish
887+
assert_receive :consumer_done, 5000
888+
# Materializer should be alive
889+
assert Process.alive?(pid)
890+
# And should have the value
891+
assert Materializer.get_link_values(%{
892+
stack_id: ctx.stack_id,
893+
shape_handle: shape_handle
894+
}) ==
895+
MapSet.new([10])
896+
897+
{:error, reason} ->
898+
flunk("Materializer failed to start: #{inspect(reason)}")
899+
end
900+
end
901+
end
902+
793903
describe "startup race condition handling" do
794904
# Tests for the race condition where Consumer dies between await_snapshot_start
795905
# and subscribe_materializer. See concurrency_analysis/MATERIALIZER_RACE_ANALYSIS.md

0 commit comments

Comments
 (0)