Skip to content

Commit de6eb79

Browse files
authored
Prevent Shard Manager from processing until persistence succeeds (#166)
1 parent 1ad1304 commit de6eb79

File tree

1 file changed

+6
-5
lines changed

1 file changed

+6
-5
lines changed

manager/src/main/scala/com/devsisters/shardcake/ShardManager.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ class ShardManager(
4242
_ <- ManagerMetrics.pods.increment
4343
_ <- eventsHub.publish(ShardingEvent.PodRegistered(pod.address))
4444
_ <- ZIO.when(state.unassignedShards.nonEmpty)(rebalance(rebalanceImmediately = false))
45-
_ <- persistPods.forkDaemon
45+
_ <- persistPods
4646
} yield (),
4747
onFalse = ZIO.logWarning(s"Pod $pod requested to register but is not alive, ignoring") *>
4848
ZIO.fail(new RuntimeException(s"Pod $pod is not healthy, refusing to register"))
@@ -160,14 +160,15 @@ class ShardManager(
160160
_ <- (Clock.sleep(config.rebalanceRetryInterval) *> rebalance(rebalanceImmediately)).forkDaemon
161161
.when(failedPods.nonEmpty && rebalanceImmediately)
162162
// persist state changes to Redis
163-
_ <- persistAssignments.forkDaemon.when(areChanges)
163+
_ <- persistAssignments.when(areChanges)
164164
} yield ()
165165
}
166166

167-
private def withRetry[E, A](zio: IO[E, A]): UIO[Unit] =
167+
private def withRetry[A](zio: Task[A]): UIO[Unit] =
168168
zio
169169
.retry[Any, Any](Schedule.spaced(config.persistRetryInterval) && Schedule.recurs(config.persistRetryCount))
170-
.ignore
170+
.orDie
171+
.unit
171172

172173
private def persistAssignments: UIO[Unit] =
173174
withRetry(
@@ -260,7 +261,7 @@ object ShardManager {
260261
ZIO.logWarningCause("Failed to persist pods on shutdown", cause)
261262
)
262263
}
263-
_ <- shardManager.persistPods.forkDaemon
264+
_ <- shardManager.persistPods
264265
// rebalance immediately if there are unassigned shards
265266
_ <- shardManager.rebalance(rebalanceImmediately = initialState.unassignedShards.nonEmpty).forkDaemon
266267
// start a regular rebalance at the given interval

0 commit comments

Comments
 (0)