From 1a94654f4cebcb1966c80ad43d07ff1f696287e9 Mon Sep 17 00:00:00 2001 From: Pierre Ricadat Date: Fri, 3 Jan 2025 17:58:05 +0900 Subject: [PATCH 1/2] Add a layer to simplify local testing --- .../shardcake/interfaces/Pods.scala | 2 +- .../devsisters/shardcake/LocalSharding.scala | 121 ++++++++++++++++++ .../shardcake/BroadcastingSpec.scala | 9 +- .../devsisters/shardcake/ShardingSpec.scala | 7 +- 4 files changed, 129 insertions(+), 10 deletions(-) create mode 100644 entities/src/main/scala/com/devsisters/shardcake/LocalSharding.scala diff --git a/core/src/main/scala/com/devsisters/shardcake/interfaces/Pods.scala b/core/src/main/scala/com/devsisters/shardcake/interfaces/Pods.scala index 3fd4054..dc288eb 100644 --- a/core/src/main/scala/com/devsisters/shardcake/interfaces/Pods.scala +++ b/core/src/main/scala/com/devsisters/shardcake/interfaces/Pods.scala @@ -60,7 +60,7 @@ object Pods { /** * A layer that creates a service that does nothing when called. - * Useful for testing ShardManager or when using Sharding.local. + * Useful for testing ShardManager or when we don't need messages being sent. */ val noop: ULayer[Pods] = ZLayer.succeed(new Pods { diff --git a/entities/src/main/scala/com/devsisters/shardcake/LocalSharding.scala b/entities/src/main/scala/com/devsisters/shardcake/LocalSharding.scala new file mode 100644 index 0000000..849990c --- /dev/null +++ b/entities/src/main/scala/com/devsisters/shardcake/LocalSharding.scala @@ -0,0 +1,121 @@ +package com.devsisters.shardcake + +import com.devsisters.shardcake.interfaces.{ Pods, Serialization, Storage } +import com.devsisters.shardcake.interfaces.Pods.BinaryMessage +import zio.{ Promise, Queue, RLayer, Task, ULayer, URLayer, ZIO, ZLayer } +import zio.stream.ZStream + +object LocalSharding { + + private trait LocalQueue { + def localQueue: Queue[LocalQueueMessage] + } + + private sealed trait LocalQueueMessage + private object LocalQueueMessage { + case class SendMessage(request: BinaryMessage, response: Promise[Nothing, Option[Array[Byte]]]) + extends LocalQueueMessage + case class SendStream( + request: ZStream[Any, Throwable, BinaryMessage], + response: Promise[Nothing, Option[Array[Byte]]] + ) extends LocalQueueMessage + case class SendMessageAndReceiveStream( + request: BinaryMessage, + response: Promise[Nothing, ZStream[Any, Throwable, Array[Byte]]] + ) extends LocalQueueMessage + case class SendStreamAndReceiveStream( + request: ZStream[Any, Throwable, BinaryMessage], + response: Promise[Nothing, ZStream[Any, Throwable, Array[Byte]]] + ) extends LocalQueueMessage + } + + private val localQueue: ULayer[LocalQueue] = + ZLayer( + Queue + .unbounded[LocalQueueMessage] + .map(queue => + new LocalQueue { + def localQueue: Queue[LocalQueueMessage] = queue + } + ) + ) + + private val localPods: URLayer[LocalQueue, Pods] = + ZLayer { + ZIO.serviceWith[LocalQueue](_.localQueue).map { queue => + new Pods { + def assignShards(pod: PodAddress, shards: Set[ShardId]): Task[Unit] = ZIO.unit + def unassignShards(pod: PodAddress, shards: Set[ShardId]): Task[Unit] = ZIO.unit + def ping(pod: PodAddress): Task[Unit] = ZIO.unit + + def sendMessage(pod: PodAddress, message: BinaryMessage): Task[Option[Array[Byte]]] = + Promise.make[Nothing, Option[Array[Byte]]].flatMap { promise => + queue.offer(LocalQueueMessage.SendMessage(message, promise)) *> promise.await + } + + def sendStream( + pod: PodAddress, + entityId: String, + messages: ZStream[Any, Throwable, BinaryMessage] + ): Task[Option[Array[Byte]]] = + Promise.make[Nothing, Option[Array[Byte]]].flatMap { promise => + queue.offer(LocalQueueMessage.SendStream(messages, promise)).fork *> promise.await + } + + def sendMessageAndReceiveStream( + pod: PodAddress, + message: BinaryMessage + ): ZStream[Any, Throwable, Array[Byte]] = + ZStream.unwrap { + Promise.make[Nothing, ZStream[Any, Throwable, Array[Byte]]].flatMap { promise => + queue.offer(LocalQueueMessage.SendMessageAndReceiveStream(message, promise)) *> promise.await + } + } + + def sendStreamAndReceiveStream( + pod: PodAddress, + entityId: String, + messages: ZStream[Any, Throwable, BinaryMessage] + ): ZStream[Any, Throwable, Array[Byte]] = + ZStream.unwrap { + Promise.make[Nothing, ZStream[Any, Throwable, Array[Byte]]].flatMap { promise => + queue.offer(LocalQueueMessage.SendStreamAndReceiveStream(messages, promise)).fork *> promise.await + } + } + } + } + } + + private val localServer: RLayer[Sharding with LocalQueue, Unit] = + ZLayer.scoped { + for { + sharding <- ZIO.service[Sharding] + queue <- ZIO.serviceWith[LocalQueue](_.localQueue) + _ <- ZStream + .fromQueueWithShutdown(queue) + .runForeach { + case LocalQueueMessage.SendMessage(request, response) => + sharding.sendToLocalEntity(request).flatMap(response.succeed) + case LocalQueueMessage.SendStream(request, response) => + sharding.sendStreamToLocalEntity(request).flatMap(response.succeed) + case LocalQueueMessage.SendMessageAndReceiveStream(request, response) => + response.succeed(sharding.sendToLocalEntityAndReceiveStream(request)) + case LocalQueueMessage.SendStreamAndReceiveStream(request, response) => + response.succeed(sharding.sendStreamToLocalEntityAndReceiveStream(request)) + } + .forkScoped + } yield () + } + + /** + * A special layer meant for testing that uses a local queue rather than an external transport. + * This layer will only work in a single JVM and is not suitable for production use. + */ + val live: RLayer[ShardManagerClient with Storage with Serialization with Config, Sharding] = + ZLayer.makeSome[ShardManagerClient with Storage with Serialization with Config, Sharding]( + localQueue, + localPods, + localServer, + Sharding.live + ) +} diff --git a/entities/src/test/scala/com/devsisters/shardcake/BroadcastingSpec.scala b/entities/src/test/scala/com/devsisters/shardcake/BroadcastingSpec.scala index f46032f..b12a0fd 100644 --- a/entities/src/test/scala/com/devsisters/shardcake/BroadcastingSpec.scala +++ b/entities/src/test/scala/com/devsisters/shardcake/BroadcastingSpec.scala @@ -1,15 +1,15 @@ package com.devsisters.shardcake -import com.devsisters.shardcake.interfaces.{ Pods, Serialization, Storage } -import zio.{ Config => _, _ } +import com.devsisters.shardcake.interfaces.{ Serialization, Storage } import zio.test.TestAspect.{ sequential, withLiveClock } import zio.test._ +import zio.{ Config => _, _ } import scala.util.Success object BroadcastingSpec extends ZIOSpecDefault { - private val config = ZLayer.succeed(Config.default) + private val config = ZLayer.succeed(Config.default.copy(simulateRemotePods = true)) def spec: Spec[TestEnvironment with Scope, Any] = suite("BroadcastingSpec")( @@ -28,9 +28,8 @@ object BroadcastingSpec extends ZIOSpecDefault { } } ).provideShared( - Sharding.live, Serialization.javaSerialization, - Pods.noop, + LocalSharding.live, ShardManagerClient.local, Storage.memory, config diff --git a/entities/src/test/scala/com/devsisters/shardcake/ShardingSpec.scala b/entities/src/test/scala/com/devsisters/shardcake/ShardingSpec.scala index ee94cff..d41abf1 100644 --- a/entities/src/test/scala/com/devsisters/shardcake/ShardingSpec.scala +++ b/entities/src/test/scala/com/devsisters/shardcake/ShardingSpec.scala @@ -2,11 +2,11 @@ package com.devsisters.shardcake import com.devsisters.shardcake.CounterActor.CounterMessage._ import com.devsisters.shardcake.CounterActor._ -import com.devsisters.shardcake.interfaces.{ Pods, Serialization, Storage } -import zio.{ Config => _, _ } +import com.devsisters.shardcake.interfaces.{ Serialization, Storage } import zio.stream.{ SubscriptionRef, ZStream } import zio.test.TestAspect.{ sequential, withLiveClock } import zio.test._ +import zio.{ Config => _, _ } object ShardingSpec extends ZIOSpecDefault { def spec: Spec[TestEnvironment with Scope, Any] = @@ -144,9 +144,8 @@ object ShardingSpec extends ZIOSpecDefault { } } ).provideShared( - Sharding.live, Serialization.javaSerialization, - Pods.noop, + LocalSharding.live, ShardManagerClient.local, Storage.memory, ZLayer.succeed(Config.default) From 6733460df22bd87831bd2a297bc5c43208c1f840 Mon Sep 17 00:00:00 2001 From: Pierre Ricadat Date: Fri, 3 Jan 2025 18:03:27 +0900 Subject: [PATCH 2/2] Make things public --- .../com/devsisters/shardcake/LocalSharding.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/entities/src/main/scala/com/devsisters/shardcake/LocalSharding.scala b/entities/src/main/scala/com/devsisters/shardcake/LocalSharding.scala index 849990c..4655ea5 100644 --- a/entities/src/main/scala/com/devsisters/shardcake/LocalSharding.scala +++ b/entities/src/main/scala/com/devsisters/shardcake/LocalSharding.scala @@ -7,12 +7,12 @@ import zio.stream.ZStream object LocalSharding { - private trait LocalQueue { + trait LocalQueue { def localQueue: Queue[LocalQueueMessage] } - private sealed trait LocalQueueMessage - private object LocalQueueMessage { + sealed trait LocalQueueMessage + object LocalQueueMessage { case class SendMessage(request: BinaryMessage, response: Promise[Nothing, Option[Array[Byte]]]) extends LocalQueueMessage case class SendStream( @@ -29,7 +29,7 @@ object LocalSharding { ) extends LocalQueueMessage } - private val localQueue: ULayer[LocalQueue] = + val localQueue: ULayer[LocalQueue] = ZLayer( Queue .unbounded[LocalQueueMessage] @@ -40,7 +40,7 @@ object LocalSharding { ) ) - private val localPods: URLayer[LocalQueue, Pods] = + val localPods: URLayer[LocalQueue, Pods] = ZLayer { ZIO.serviceWith[LocalQueue](_.localQueue).map { queue => new Pods { @@ -86,7 +86,7 @@ object LocalSharding { } } - private val localServer: RLayer[Sharding with LocalQueue, Unit] = + val localServer: RLayer[Sharding with LocalQueue, Unit] = ZLayer.scoped { for { sharding <- ZIO.service[Sharding]