From 9f8f8ff301ac935481f0d16a3c201181f5a6e89c Mon Sep 17 00:00:00 2001 From: Yoonjae Jeon Date: Fri, 16 Jan 2026 17:22:04 +0900 Subject: [PATCH 1/5] Make serialization receive a type param --- .../com/devsisters/shardcake/Client.scala | 1 + .../com/devsisters/shardcake/Server.scala | 2 +- .../shardcake/interfaces/Serialization.scala | 35 ++++--- .../shardcake/JavaSerializationSpec.scala | 10 +- .../devsisters/shardcake/Broadcaster.scala | 3 +- .../devsisters/shardcake/LocalSharding.scala | 4 +- .../com/devsisters/shardcake/Messenger.scala | 13 ++- .../com/devsisters/shardcake/Sharding.scala | 92 ++++++++++--------- .../shardcake/internal/SendChannel.scala | 20 ++-- .../shardcake/BroadcastingSpec.scala | 8 +- .../devsisters/shardcake/ShardingSpec.scala | 4 +- .../main/scala/example/complex/GuildApp.scala | 5 +- .../main/scala/example/simple/GuildApp.scala | 2 +- .../src/test/scala/example/EndToEndSpec.scala | 2 +- .../scala/example/GrpcAuthExampleSpec.scala | 2 +- .../shardcake/KryoSerialization.scala | 44 ++++----- .../shardcake/KryoSerializationSpec.scala | 9 +- 17 files changed, 129 insertions(+), 127 deletions(-) diff --git a/benchmarks/src/main/scala/com/devsisters/shardcake/Client.scala b/benchmarks/src/main/scala/com/devsisters/shardcake/Client.scala index fe432b0d..479eb39a 100644 --- a/benchmarks/src/main/scala/com/devsisters/shardcake/Client.scala +++ b/benchmarks/src/main/scala/com/devsisters/shardcake/Client.scala @@ -1,5 +1,6 @@ package com.devsisters.shardcake +import com.devsisters.shardcake.KryoSerialization.Default._ import com.devsisters.shardcake.Server.Message.Ping import com.devsisters.shardcake.Server.PingPongEntity import zio.{ Config => _, _ } diff --git a/benchmarks/src/main/scala/com/devsisters/shardcake/Server.scala b/benchmarks/src/main/scala/com/devsisters/shardcake/Server.scala index 9d48bb4f..64b74b0c 100644 --- a/benchmarks/src/main/scala/com/devsisters/shardcake/Server.scala +++ b/benchmarks/src/main/scala/com/devsisters/shardcake/Server.scala @@ -1,6 +1,7 @@ package com.devsisters.shardcake import com.devsisters.shardcake.interfaces.Storage +import com.devsisters.shardcake.KryoSerialization.Default._ import zio.{ Config => _, _ } import zio.stream.ZStream @@ -51,7 +52,6 @@ object Server { val sharding: ZLayer[Config, Throwable, Sharding with GrpcConfig] = ZLayer.makeSome[Config, Sharding with GrpcConfig]( - KryoSerialization.live, memory, grpcConfig, shardManagerClient, diff --git a/core/src/main/scala/com/devsisters/shardcake/interfaces/Serialization.scala b/core/src/main/scala/com/devsisters/shardcake/interfaces/Serialization.scala index b5bd8f0b..4d53bcdd 100644 --- a/core/src/main/scala/com/devsisters/shardcake/interfaces/Serialization.scala +++ b/core/src/main/scala/com/devsisters/shardcake/interfaces/Serialization.scala @@ -1,46 +1,53 @@ package com.devsisters.shardcake.interfaces -import zio.{ Chunk, Task, ULayer, ZIO, ZLayer } +import zio.{ Chunk, Task, ZIO } import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream } /** * An interface to serialize user messages that will be sent between pods. */ -trait Serialization { +trait Serialization[Msg] { /** * Transforms the given message into binary */ - def encode(message: Any): Task[Array[Byte]] + def encode(message: Msg): Task[Array[Byte]] /** * Transform binary back into the given type */ - def decode[A](bytes: Array[Byte]): Task[A] + def decode(bytes: Array[Byte]): Task[Msg] /** * Transforms a chunk of messages into binary */ - def encodeChunk(messages: Chunk[Any]): Task[Chunk[Array[Byte]]] = + def encodeChunk(messages: Chunk[Msg]): Task[Chunk[Array[Byte]]] = ZIO.foreach(messages)(encode) /** * Transforms a chunk of binary back into the given type */ - def decodeChunk[A](bytes: Chunk[Array[Byte]]): Task[Chunk[A]] = - ZIO.foreach(bytes)(decode[A]) + def decodeChunk(bytes: Chunk[Array[Byte]]): Task[Chunk[Msg]] = + ZIO.foreach(bytes)(decode) } object Serialization { + implicit val unitSerialization: Serialization[Unit] = new Serialization[Unit] { + def encode(message: Unit): Task[Array[Byte]] = ZIO.succeed(Array.emptyByteArray) + def decode(bytes: Array[Byte]): Task[Unit] = ZIO.unit + } +} + +object JavaSerialization { /** - * A layer that uses Java serialization for encoding and decoding messages. + * A Java serialization for encoding and decoding messages. * This is useful for testing and not recommended to use in production. */ - val javaSerialization: ULayer[Serialization] = - ZLayer.succeed(new Serialization { - def encode(message: Any): Task[Array[Byte]] = + implicit def javaSerialization[T]: Serialization[T] = + new Serialization[T] { + def encode(message: T): Task[Array[Byte]] = ZIO.scoped { val stream = new ByteArrayOutputStream() ZIO @@ -49,11 +56,11 @@ object Serialization { .as(stream.toByteArray) } - def decode[A](bytes: Array[Byte]): Task[A] = + def decode(bytes: Array[Byte]): Task[T] = ZIO.scoped { ZIO .fromAutoCloseable(ZIO.attempt(new ObjectInputStream(new ByteArrayInputStream(bytes)))) - .flatMap(ois => ZIO.attempt(ois.readObject.asInstanceOf[A])) + .flatMap(ois => ZIO.attempt(ois.readObject.asInstanceOf[T])) } - }) + } } diff --git a/core/src/test/scala/com/devsisters/shardcake/JavaSerializationSpec.scala b/core/src/test/scala/com/devsisters/shardcake/JavaSerializationSpec.scala index fd009c40..d28adca8 100644 --- a/core/src/test/scala/com/devsisters/shardcake/JavaSerializationSpec.scala +++ b/core/src/test/scala/com/devsisters/shardcake/JavaSerializationSpec.scala @@ -1,7 +1,7 @@ package com.devsisters.shardcake -import com.devsisters.shardcake.interfaces.Serialization -import zio.{ Scope, ZIO } +import com.devsisters.shardcake.interfaces.JavaSerialization +import zio.Scope import zio.test._ object JavaSerializationSpec extends ZIOSpecDefault { @@ -11,9 +11,9 @@ object JavaSerializationSpec extends ZIOSpecDefault { case class Test(a: Int, b: String) val expected = Test(2, "test") for { - bytes <- ZIO.serviceWithZIO[Serialization](_.encode(expected)) - actual <- ZIO.serviceWithZIO[Serialization](_.decode[Test](bytes)) + bytes <- JavaSerialization.javaSerialization.encode(expected) + actual <- JavaSerialization.javaSerialization[Test].decode(bytes) } yield assertTrue(expected == actual) } - ).provideShared(Serialization.javaSerialization) + ) } diff --git a/entities/src/main/scala/com/devsisters/shardcake/Broadcaster.scala b/entities/src/main/scala/com/devsisters/shardcake/Broadcaster.scala index 4070e8cf..c7bfe73c 100644 --- a/entities/src/main/scala/com/devsisters/shardcake/Broadcaster.scala +++ b/entities/src/main/scala/com/devsisters/shardcake/Broadcaster.scala @@ -1,5 +1,6 @@ package com.devsisters.shardcake +import com.devsisters.shardcake.interfaces.Serialization import zio.UIO import scala.util.Try @@ -18,5 +19,5 @@ trait Broadcaster[-Msg] { /** * Broadcast a message and wait for a response from each consumer */ - def broadcast[Res](topic: String)(msg: Replier[Res] => Msg): UIO[Map[PodAddress, Try[Res]]] + def broadcast[Res: Serialization](topic: String)(msg: Replier[Res] => Msg): UIO[Map[PodAddress, Try[Res]]] } diff --git a/entities/src/main/scala/com/devsisters/shardcake/LocalSharding.scala b/entities/src/main/scala/com/devsisters/shardcake/LocalSharding.scala index 4655ea59..faa99f24 100644 --- a/entities/src/main/scala/com/devsisters/shardcake/LocalSharding.scala +++ b/entities/src/main/scala/com/devsisters/shardcake/LocalSharding.scala @@ -111,8 +111,8 @@ object LocalSharding { * A special layer meant for testing that uses a local queue rather than an external transport. * This layer will only work in a single JVM and is not suitable for production use. */ - val live: RLayer[ShardManagerClient with Storage with Serialization with Config, Sharding] = - ZLayer.makeSome[ShardManagerClient with Storage with Serialization with Config, Sharding]( + val live: RLayer[ShardManagerClient with Storage with Config, Sharding] = + ZLayer.makeSome[ShardManagerClient with Storage with Config, Sharding]( localQueue, localPods, localServer, diff --git a/entities/src/main/scala/com/devsisters/shardcake/Messenger.scala b/entities/src/main/scala/com/devsisters/shardcake/Messenger.scala index 5d89ef8f..d1fd5bfc 100644 --- a/entities/src/main/scala/com/devsisters/shardcake/Messenger.scala +++ b/entities/src/main/scala/com/devsisters/shardcake/Messenger.scala @@ -1,6 +1,7 @@ package com.devsisters.shardcake import com.devsisters.shardcake.errors.StreamCancelled +import com.devsisters.shardcake.interfaces.Serialization import zio._ import zio.stream.ZStream @@ -18,7 +19,7 @@ trait Messenger[-Msg] { /** * Send a message and wait for a response of type `Res` */ - def send[Res](entityId: String)(msg: Replier[Res] => Msg): Task[Res] + def send[Res: Serialization](entityId: String)(msg: Replier[Res] => Msg): Task[Res] /** * Send a message and receive a stream of responses of type `Res`. @@ -27,7 +28,9 @@ trait Messenger[-Msg] { * streaming responses. See `sendStreamAutoRestart` for an alternative that will automatically restart the stream * in case of rebalance. */ - def sendAndReceiveStream[Res](entityId: String)(msg: StreamReplier[Res] => Msg): Task[ZStream[Any, Throwable, Res]] + def sendAndReceiveStream[Res: Serialization](entityId: String)( + msg: StreamReplier[Res] => Msg + ): Task[ZStream[Any, Throwable, Res]] /** * Send a stream of messages. @@ -37,7 +40,7 @@ trait Messenger[-Msg] { /** * Send a stream of messages and receive a stream of responses of type `Res`. */ - def sendStreamAndReceiveStream[Res](entityId: String)( + def sendStreamAndReceiveStream[Res: Serialization](entityId: String)( messages: StreamReplier[Res] => ZStream[Any, Throwable, Msg] ): Task[ZStream[Any, Throwable, Res]] @@ -50,7 +53,7 @@ trait Messenger[-Msg] { * cursor from the responses so that when the remote entity is rebalanced, a new message can be sent with the right * cursor according to what we've seen in the previous stream of responses. */ - def sendAndReceiveStreamAutoRestart[Cursor, Res](entityId: String, cursor: Cursor)( + def sendAndReceiveStreamAutoRestart[Cursor, Res: Serialization](entityId: String, cursor: Cursor)( msg: (Cursor, StreamReplier[Res]) => Msg )( updateCursor: (Cursor, Res) => Cursor @@ -79,7 +82,7 @@ trait Messenger[-Msg] { * cursor from the responses so that when the remote entity is rebalanced, a new message can be sent with the right * cursor according to what we've seen in the previous stream of responses. */ - def sendStreamAndReceiveStreamAutoRestart[Cursor, Res](entityId: String, cursor: Cursor)( + def sendStreamAndReceiveStreamAutoRestart[Cursor, Res: Serialization](entityId: String, cursor: Cursor)( msg: (Cursor, StreamReplier[Res]) => ZStream[Any, Throwable, Msg] )( updateCursor: (Cursor, Res) => Cursor diff --git a/entities/src/main/scala/com/devsisters/shardcake/Sharding.scala b/entities/src/main/scala/com/devsisters/shardcake/Sharding.scala index ac4bfa40..c0c6e731 100644 --- a/entities/src/main/scala/com/devsisters/shardcake/Sharding.scala +++ b/entities/src/main/scala/com/devsisters/shardcake/Sharding.scala @@ -28,7 +28,6 @@ class Sharding private ( shardManager: ShardManagerClient, pods: Pods, storage: Storage, - serialization: Serialization, eventsHub: Hub[ShardingRegistrationEvent] ) { self => private[shardcake] def getShardId(recipientType: RecipientType[_], entityId: String): ShardId = @@ -182,40 +181,42 @@ class Sharding private ( private[shardcake] def isShuttingDown: UIO[Boolean] = isShuttingDownRef.get - def sendToLocalEntity(msg: BinaryMessage): Task[Option[Array[Byte]]] = + def sendToLocalEntity[Msg: Serialization](msg: BinaryMessage): Task[Option[Array[Byte]]] = for { - replyChannel <- ReplyChannel.single[Any] + replyChannel <- ReplyChannel.single[Msg] _ <- sendToLocalEntity(msg, replyChannel) res <- replyChannel.output - bytes <- ZIO.foreach(res)(serialization.encode) + bytes <- ZIO.foreach(res)(implicitly[Serialization[Msg]].encode) } yield bytes - def sendToLocalEntityAndReceiveStream(msg: BinaryMessage): ZStream[Any, Throwable, Array[Byte]] = + def sendToLocalEntityAndReceiveStream[Msg: Serialization](msg: BinaryMessage): ZStream[Any, Throwable, Array[Byte]] = ZStream.unwrap { for { - replyChannel <- ReplyChannel.stream[Any] + replyChannel <- ReplyChannel.stream[Msg] _ <- sendToLocalEntity(msg, replyChannel) - } yield replyChannel.output.mapChunksZIO(serialization.encodeChunk) + } yield replyChannel.output.mapChunksZIO(implicitly[Serialization[Msg]].encodeChunk) } - def sendStreamToLocalEntity(messages: ZStream[Any, Throwable, BinaryMessage]): Task[Option[Array[Byte]]] = + def sendStreamToLocalEntity[Msg: Serialization]( + messages: ZStream[Any, Throwable, BinaryMessage] + ): Task[Option[Array[Byte]]] = ZIO.scoped { for { - replyChannel <- ReplyChannel.single[Any] + replyChannel <- ReplyChannel.single[Msg] _ <- messages.runForeach(sendToLocalEntity(_, replyChannel)).onError(replyChannel.fail).forkScoped res <- replyChannel.output - bytes <- ZIO.foreach(res)(serialization.encode) + bytes <- ZIO.foreach(res)(implicitly[Serialization[Msg]].encode) } yield bytes } - def sendStreamToLocalEntityAndReceiveStream( + def sendStreamToLocalEntityAndReceiveStream[Msg: Serialization]( messages: ZStream[Any, Throwable, BinaryMessage] ): ZStream[Any, Throwable, Array[Byte]] = ZStream.unwrapScoped { for { - replyChannel <- ReplyChannel.stream[Any] + replyChannel <- ReplyChannel.stream[Msg] _ <- messages.runForeach(sendToLocalEntity(_, replyChannel)).onError(replyChannel.fail).forkScoped - } yield replyChannel.output.mapChunksZIO(serialization.encodeChunk) + } yield replyChannel.output.mapChunksZIO(implicitly[Serialization[Msg]].encodeChunk) } private def sendToLocalEntity(msg: BinaryMessage, replyChannel: ReplyChannel[Nothing]): Task[Unit] = @@ -273,7 +274,7 @@ class Sharding private ( } ) - private def sendToPod[Msg, Res]( + private def sendToPod[Msg: Serialization, Res: Serialization]( recipientTypeName: String, entityId: String, pod: PodAddress, @@ -291,23 +292,24 @@ class Sharding private ( replyChannel match { case _: ReplyChannel.FromPromise[_] => sendChannel - .send(pods, serialization, pod, entityId, recipientTypeName, replyId) + .send(pods, pod, entityId, recipientTypeName, replyId) .tapError(handleError) .flatMap { - case Some(bytes) => serialization.decode[Res](bytes).flatMap(replyChannel.replySingle) + case Some(bytes) => + implicitly[Serialization[Res]].decode(bytes).flatMap(replyChannel.replySingle) case None => replyChannel.end } case _: ReplyChannel.FromQueue[_] => replyChannel.replyStream( sendChannel - .sendAndReceiveStream(pods, serialization, pod, entityId, recipientTypeName, replyId) + .sendAndReceiveStream(pods, pod, entityId, recipientTypeName, replyId) .tapError(handleError) - .mapChunksZIO(serialization.decodeChunk[Res]) + .mapChunksZIO(implicitly[Serialization[Res]].decodeChunk) ) } } - def messenger[Msg]( + def messenger[Msg: Serialization]( entityType: EntityType[Msg], sendTimeout: MessengerTimeout = MessengerTimeout.InheritConfigTimeout ): Messenger[Msg] = @@ -323,7 +325,7 @@ class Sharding private ( timeout.fold(send.unit)(t => send.timeout(t).unit) } - def send[Res](entityId: String)(msg: Replier[Res] => Msg): Task[Res] = + def send[Res: Serialization](entityId: String)(msg: Replier[Res] => Msg): Task[Res] = Random.nextUUID.flatMap { uuid => val body = msg(Replier(uuid.toString)) val send = sendMessage[Res](entityId, body, Some(uuid.toString)).flatMap { @@ -333,7 +335,7 @@ class Sharding private ( timeout.fold(send)(t => send.timeoutFail(SendTimeoutException(entityType, entityId, body))(t).interruptible) } - def sendAndReceiveStream[Res]( + def sendAndReceiveStream[Res: Serialization]( entityId: String )(msg: StreamReplier[Res] => Msg): Task[ZStream[Any, Throwable, Res]] = Random.nextUUID.flatMap { uuid => @@ -346,21 +348,25 @@ class Sharding private ( timeout.fold(send)(t => send.timeout(t).unit) } - def sendStreamAndReceiveStream[Res](entityId: String)( + def sendStreamAndReceiveStream[Res: Serialization](entityId: String)( messages: StreamReplier[Res] => ZStream[Any, Throwable, Msg] ): Task[ZStream[Any, Throwable, Res]] = Random.nextUUID.flatMap { uuid => sendStreamAndReceiveStream[Res](entityId, messages(StreamReplier(uuid.toString)), Some(uuid.toString)) } - private def sendMessage[Res](entityId: String, msg: Msg, replyId: Option[String]): Task[Option[Res]] = + private def sendMessage[Res: Serialization]( + entityId: String, + msg: Msg, + replyId: Option[String] + ): Task[Option[Res]] = for { replyChannel <- ReplyChannel.single[Res] _ <- sendMessageGeneric(entityId, msg, replyId, replyChannel) res <- replyChannel.output } yield res - private def sendMessageAndReceiveStream[Res]( + private def sendMessageAndReceiveStream[Res: Serialization]( entityId: String, msg: Msg, replyId: Option[String] @@ -370,7 +376,7 @@ class Sharding private ( _ <- sendMessageGeneric(entityId, msg, replyId, replyChannel) } yield replyChannel.output - private def sendStreamAndReceiveStream[Res]( + private def sendStreamAndReceiveStream[Res: Serialization]( entityId: String, messages: ZStream[Any, Throwable, Msg], replyId: Option[String] @@ -380,7 +386,7 @@ class Sharding private ( _ <- sendStreamGeneric(entityId, messages, replyId, replyChannel) } yield replyChannel.output - private def sendMessageGeneric[Res]( + private def sendMessageGeneric[Res: Serialization]( entityId: String, msg: Msg, replyId: Option[String], @@ -413,7 +419,7 @@ class Sharding private ( else ZIO.fail(InvalidShardId(entityId, shardId)) } - private def sendStreamGeneric[Res]( + private def sendStreamGeneric[Res: Serialization]( entityId: String, messages: ZStream[Any, Throwable, Msg], replyId: Option[String], @@ -447,7 +453,7 @@ class Sharding private ( } } - def broadcaster[Msg]( + def broadcaster[Msg: Serialization]( topicType: TopicType[Msg], sendTimeout: MessengerTimeout = MessengerTimeout.InheritConfigTimeout ): Broadcaster[Msg] = @@ -461,13 +467,17 @@ class Sharding private ( def broadcastDiscard(topic: String)(msg: Msg): UIO[Unit] = sendMessage(topic, msg, None).unit - def broadcast[Res](topic: String)(msg: Replier[Res] => Msg): UIO[Map[PodAddress, Try[Res]]] = + def broadcast[Res: Serialization](topic: String)(msg: Replier[Res] => Msg): UIO[Map[PodAddress, Try[Res]]] = Random.nextUUID.flatMap { uuid => val body = msg(Replier(uuid.toString)) sendMessage[Res](topic, body, Some(uuid.toString)).interruptible } - private def sendMessage[Res](topic: String, msg: Msg, replyId: Option[String]): UIO[Map[PodAddress, Try[Res]]] = + private def sendMessage[Res: Serialization]( + topic: String, + msg: Msg, + replyId: Option[String] + ): UIO[Map[PodAddress, Try[Res]]] = for { pods <- getPods res <- ZIO @@ -502,7 +512,7 @@ class Sharding private ( } yield res.toMap } - def registerEntity[R, Req: Tag]( + def registerEntity[R, Req: Tag: Serialization]( entityType: EntityType[Req], behavior: (String, Queue[Req]) => RIO[R, Nothing], terminateMessage: Promise[Nothing, Unit] => Option[Req] = (_: Promise[Nothing, Unit]) => None, @@ -510,7 +520,7 @@ class Sharding private ( ): URIO[Scope with R, Unit] = registerRecipient(entityType, behavior, terminateMessage, entityMaxIdleTime) *> eventsHub.publish(ShardingRegistrationEvent.EntityRegistered(entityType)).unit - def registerTopic[R, Req: Tag]( + def registerTopic[R, Req: Tag: Serialization]( topicType: TopicType[Req], behavior: (String, Queue[Req]) => RIO[R, Nothing], terminateMessage: Promise[Nothing, Unit] => Option[Req] = (_: Promise[Nothing, Unit]) => None @@ -520,7 +530,7 @@ class Sharding private ( def getShardingRegistrationEvents: ZStream[Any, Nothing, ShardingRegistrationEvent] = ZStream.fromHub(eventsHub) - private def registerRecipient[R, Req: Tag]( + private def registerRecipient[R, Req: Tag: Serialization]( recipientType: RecipientType[Req], behavior: (String, Queue[Req]) => RIO[R, Nothing], terminateMessage: Promise[Nothing, Unit] => Option[Req] = (_: Promise[Nothing, Unit]) => None, @@ -529,8 +539,8 @@ class Sharding private ( for { entityManager <- EntityManager.make(recipientType, behavior, terminateMessage, self, config, entityMaxIdleTime) processBinary = (msg: BinaryMessage, replyChannel: ReplyChannel[Nothing]) => - serialization - .decode[Req](msg.body) + implicitly[Serialization[Req]] + .decode(msg.body) .flatMap(entityManager.send(msg.entityId, _, msg.replyId, replyChannel)) .catchAllCause(replyChannel.fail) _ <- entityStates.update(_.updated(recipientType.name, EntityState(entityManager, processBinary))) @@ -567,14 +577,13 @@ object Sharding { /** * A layer that sets up sharding communication between pods. */ - val live: ZLayer[Pods with ShardManagerClient with Storage with Serialization with Config, Throwable, Sharding] = + val live: ZLayer[Pods with ShardManagerClient with Storage with Config, Throwable, Sharding] = ZLayer.scoped { for { config <- ZIO.service[Config] pods <- ZIO.service[Pods] shardManager <- ZIO.service[ShardManagerClient] storage <- ZIO.service[Storage] - serialization <- ZIO.service[Serialization] shardsCache <- Ref.make(Map.empty[ShardId, PodAddress]) entityStates <- Ref.make[Map[String, EntityState]](Map()) singletons <- Ref.Synchronized @@ -604,7 +613,6 @@ object Sharding { shardManager, pods, storage, - serialization, eventsHub ) _ <- sharding.getShardingRegistrationEvents.mapZIO(event => ZIO.logInfo(event.toString)).runDrain.forkDaemon @@ -649,7 +657,7 @@ object Sharding { * You can use `ZIO.interrupt` from the behavior to stop it (it will be restarted the next time the entity receives a message). * If provided, the optional `terminateMessage` will be sent to the entity before it is stopped, allowing for cleanup logic. */ - def registerEntity[R, Req: Tag]( + def registerEntity[R, Req: Tag: Serialization]( entityType: EntityType[Req], behavior: (String, Queue[Req]) => RIO[R, Nothing], terminateMessage: Promise[Nothing, Unit] => Option[Req] = (_: Promise[Nothing, Unit]) => None, @@ -663,7 +671,7 @@ object Sharding { * You can use `ZIO.interrupt` from the behavior to stop it (it will be restarted the next time the topic receives a message). * If provided, the optional `terminateMessage` will be sent to the topic before it is stopped, allowing for cleanup logic. */ - def registerTopic[R, Req: Tag]( + def registerTopic[R, Req: Tag: Serialization]( topicType: TopicType[Req], behavior: (String, Queue[Req]) => RIO[R, Nothing], terminateMessage: Promise[Nothing, Unit] => Option[Req] = (_: Promise[Nothing, Unit]) => None @@ -674,7 +682,7 @@ object Sharding { * Get an object that allows sending messages to a given entity type. * You can provide a custom send timeout to override the one globally defined. */ - def messenger[Msg]( + def messenger[Msg: Serialization]( entityType: EntityType[Msg], sendTimeout: MessengerTimeout = MessengerTimeout.InheritConfigTimeout ): URIO[Sharding, Messenger[Msg]] = @@ -684,7 +692,7 @@ object Sharding { * Get an object that allows broadcasting messages to a given topic type. * You can provide a custom send timeout to override the one globally defined. */ - def broadcaster[Msg]( + def broadcaster[Msg: Serialization]( topicType: TopicType[Msg], sendTimeout: MessengerTimeout = MessengerTimeout.InheritConfigTimeout ): URIO[Sharding, Broadcaster[Msg]] = diff --git a/entities/src/main/scala/com/devsisters/shardcake/internal/SendChannel.scala b/entities/src/main/scala/com/devsisters/shardcake/internal/SendChannel.scala index d358fa97..087150de 100644 --- a/entities/src/main/scala/com/devsisters/shardcake/internal/SendChannel.scala +++ b/entities/src/main/scala/com/devsisters/shardcake/internal/SendChannel.scala @@ -6,24 +6,22 @@ import com.devsisters.shardcake.interfaces.{ Pods, Serialization } import zio.Task import zio.stream.ZStream -private[shardcake] sealed trait SendChannel[+A] { self => +private[shardcake] sealed trait SendChannel[A] { self => def foreach(f: A => Task[Unit]): Task[Unit] def send( pods: Pods, - serialization: Serialization, pod: PodAddress, entityId: String, recipientTypeName: String, replyId: Option[String] - ): Task[Option[Array[Byte]]] + )(implicit serialization: Serialization[A]): Task[Option[Array[Byte]]] def sendAndReceiveStream( pods: Pods, - serialization: Serialization, pod: PodAddress, entityId: String, recipientTypeName: String, replyId: Option[String] - ): ZStream[Any, Throwable, Array[Byte]] + )(implicit serialization: Serialization[A]): ZStream[Any, Throwable, Array[Byte]] } private[shardcake] object SendChannel { @@ -31,23 +29,21 @@ private[shardcake] object SendChannel { def foreach(f: A => Task[Unit]): Task[Unit] = f(msg) def send( pods: Pods, - serialization: Serialization, pod: PodAddress, entityId: String, recipientTypeName: String, replyId: Option[String] - ): Task[Option[Array[Byte]]] = + )(implicit serialization: Serialization[A]): Task[Option[Array[Byte]]] = serialization .encode(msg) .flatMap(bytes => pods.sendMessage(pod, BinaryMessage(entityId, recipientTypeName, bytes, replyId))) def sendAndReceiveStream( pods: Pods, - serialization: Serialization, pod: PodAddress, entityId: String, recipientTypeName: String, replyId: Option[String] - ): ZStream[Any, Throwable, Array[Byte]] = + )(implicit serialization: Serialization[A]): ZStream[Any, Throwable, Array[Byte]] = ZStream.unwrap( serialization .encode(msg) @@ -61,12 +57,11 @@ private[shardcake] object SendChannel { def foreach(f: A => Task[Unit]): Task[Unit] = messages.runForeach(f) def send( pods: Pods, - serialization: Serialization, pod: PodAddress, entityId: String, recipientTypeName: String, replyId: Option[String] - ): Task[Option[Array[Byte]]] = + )(implicit serialization: Serialization[A]): Task[Option[Array[Byte]]] = pods.sendStream( pod, entityId, @@ -78,12 +73,11 @@ private[shardcake] object SendChannel { ) def sendAndReceiveStream( pods: Pods, - serialization: Serialization, pod: PodAddress, entityId: String, recipientTypeName: String, replyId: Option[String] - ): ZStream[Any, Throwable, Array[Byte]] = { + )(implicit serialization: Serialization[A]): ZStream[Any, Throwable, Array[Byte]] = { val requestStream = messages.mapChunksZIO(messages => serialization .encodeChunk(messages) diff --git a/entities/src/test/scala/com/devsisters/shardcake/BroadcastingSpec.scala b/entities/src/test/scala/com/devsisters/shardcake/BroadcastingSpec.scala index b12a0fda..b21b33b7 100644 --- a/entities/src/test/scala/com/devsisters/shardcake/BroadcastingSpec.scala +++ b/entities/src/test/scala/com/devsisters/shardcake/BroadcastingSpec.scala @@ -1,9 +1,10 @@ package com.devsisters.shardcake -import com.devsisters.shardcake.interfaces.{ Serialization, Storage } -import zio.test.TestAspect.{ sequential, withLiveClock } +import com.devsisters.shardcake.interfaces.JavaSerialization.javaSerialization +import com.devsisters.shardcake.interfaces.Storage +import zio.test.TestAspect.{sequential, withLiveClock} import zio.test._ -import zio.{ Config => _, _ } +import zio.{Config => _, _} import scala.util.Success @@ -28,7 +29,6 @@ object BroadcastingSpec extends ZIOSpecDefault { } } ).provideShared( - Serialization.javaSerialization, LocalSharding.live, ShardManagerClient.local, Storage.memory, diff --git a/entities/src/test/scala/com/devsisters/shardcake/ShardingSpec.scala b/entities/src/test/scala/com/devsisters/shardcake/ShardingSpec.scala index d41abf1b..ac27f561 100644 --- a/entities/src/test/scala/com/devsisters/shardcake/ShardingSpec.scala +++ b/entities/src/test/scala/com/devsisters/shardcake/ShardingSpec.scala @@ -2,7 +2,8 @@ package com.devsisters.shardcake import com.devsisters.shardcake.CounterActor.CounterMessage._ import com.devsisters.shardcake.CounterActor._ -import com.devsisters.shardcake.interfaces.{ Serialization, Storage } +import com.devsisters.shardcake.interfaces.JavaSerialization.javaSerialization +import com.devsisters.shardcake.interfaces.Storage import zio.stream.{ SubscriptionRef, ZStream } import zio.test.TestAspect.{ sequential, withLiveClock } import zio.test._ @@ -144,7 +145,6 @@ object ShardingSpec extends ZIOSpecDefault { } } ).provideShared( - Serialization.javaSerialization, LocalSharding.live, ShardManagerClient.local, Storage.memory, diff --git a/examples/src/main/scala/example/complex/GuildApp.scala b/examples/src/main/scala/example/complex/GuildApp.scala index c0278acf..a977302e 100644 --- a/examples/src/main/scala/example/complex/GuildApp.scala +++ b/examples/src/main/scala/example/complex/GuildApp.scala @@ -1,7 +1,7 @@ package example.complex import com.devsisters.shardcake._ -import com.devsisters.shardcake.interfaces.Serialization +import com.devsisters.shardcake.KryoSerialization.Default._ import dev.profunktor.redis4cats.RedisCommands import example.complex.GuildBehavior.GuildMessage.{ Join, Terminate } import example.complex.GuildBehavior._ @@ -17,7 +17,7 @@ object GuildApp extends ZIOAppDefault { .map(_.flatMap(_.toIntOption).fold(Config.default)(port => Config.default.copy(shardingPort = port))) ) - val program: ZIO[Sharding with Scope with Serialization with RedisCommands[Task, String, String], Throwable, Unit] = + val program: ZIO[Sharding with Scope with RedisCommands[Task, String, String], Throwable, Unit] = for { _ <- Sharding.registerEntity(Guild, behavior, p => Some(Terminate(p))) _ <- Sharding.registerScoped @@ -40,7 +40,6 @@ object GuildApp extends ZIOAppDefault { ZLayer.succeed(RedisConfig.default), redis, StorageRedis.live, - KryoSerialization.live, ShardManagerClient.liveWithSttp, GrpcPods.live, Sharding.live, diff --git a/examples/src/main/scala/example/simple/GuildApp.scala b/examples/src/main/scala/example/simple/GuildApp.scala index e6d407d5..3dfac6ee 100644 --- a/examples/src/main/scala/example/simple/GuildApp.scala +++ b/examples/src/main/scala/example/simple/GuildApp.scala @@ -2,6 +2,7 @@ package example.simple import com.devsisters.shardcake._ import com.devsisters.shardcake.interfaces._ +import com.devsisters.shardcake.interfaces.JavaSerialization.javaSerialization import example.simple.GuildBehavior._ import example.simple.GuildBehavior.GuildMessage.Join import zio.{ Config => _, _ } @@ -26,7 +27,6 @@ object GuildApp extends ZIOAppDefault { .provide( ZLayer.succeed(Config.default), ZLayer.succeed(GrpcConfig.default), - Serialization.javaSerialization, Storage.memory, ShardManagerClient.liveWithSttp, GrpcPods.live, diff --git a/examples/src/test/scala/example/EndToEndSpec.scala b/examples/src/test/scala/example/EndToEndSpec.scala index 3f909e53..9facd9d1 100644 --- a/examples/src/test/scala/example/EndToEndSpec.scala +++ b/examples/src/test/scala/example/EndToEndSpec.scala @@ -3,6 +3,7 @@ package example import com.devsisters.shardcake.StorageRedis.Redis import com.devsisters.shardcake._ import com.devsisters.shardcake.interfaces.PodsHealth +import com.devsisters.shardcake.KryoSerialization.Default._ import com.dimafeng.testcontainers.GenericContainer import dev.profunktor.redis4cats.Redis import dev.profunktor.redis4cats.connection.RedisClient @@ -96,7 +97,6 @@ object EndToEndSpec extends ZIOSpecDefault { } ).provideShared( Sharding.live, - KryoSerialization.live, GrpcPods.live, ShardManagerClient.liveWithSttp, StorageRedis.live, diff --git a/examples/src/test/scala/example/GrpcAuthExampleSpec.scala b/examples/src/test/scala/example/GrpcAuthExampleSpec.scala index fd286644..dcf5075f 100644 --- a/examples/src/test/scala/example/GrpcAuthExampleSpec.scala +++ b/examples/src/test/scala/example/GrpcAuthExampleSpec.scala @@ -2,6 +2,7 @@ package example import com.devsisters.shardcake._ import com.devsisters.shardcake.interfaces.{ Pods, Storage } +import com.devsisters.shardcake.KryoSerialization.Default._ import io.grpc.{ Metadata, Status } import scalapb.zio_grpc.{ ZClientInterceptor, ZTransform } import zio.test._ @@ -57,7 +58,6 @@ object GrpcAuthExampleSpec extends ZIOSpecDefault { config, grpcConfigLayer(validAuthenticationKey), Sharding.live, - KryoSerialization.live, GrpcPods.live, GrpcShardingService.live ) diff --git a/serialization-kryo/src/main/scala/com/devsisters/shardcake/KryoSerialization.scala b/serialization-kryo/src/main/scala/com/devsisters/shardcake/KryoSerialization.scala index bf25a634..4d06d0f7 100644 --- a/serialization-kryo/src/main/scala/com/devsisters/shardcake/KryoSerialization.scala +++ b/serialization-kryo/src/main/scala/com/devsisters/shardcake/KryoSerialization.scala @@ -1,36 +1,26 @@ package com.devsisters.shardcake import com.devsisters.shardcake.interfaces.Serialization -import com.typesafe.config.{ Config, ConfigFactory } +import com.typesafe.config.ConfigFactory import io.altoo.serialization.kryo.scala.ScalaKryoSerializer -import zio.{ Chunk, Task, ZIO, ZLayer } +import zio.{ Chunk, Task, ZIO } object KryoSerialization { + implicit def kryoSerialization[A](implicit serializer: ScalaKryoSerializer): Serialization[A] = + new Serialization[A] { + def encode(message: A): Task[Array[Byte]] = ZIO.fromTry(serializer.serialize(message)) + def decode(bytes: Array[Byte]): Task[A] = ZIO.fromTry(serializer.deserialize[A](bytes)) + override def encodeChunk(messages: Chunk[A]): Task[Chunk[Array[Byte]]] = + ZIO.attempt(messages.map(serializer.serialize(_).get)) + override def decodeChunk(bytes: Chunk[Array[Byte]]): Task[Chunk[A]] = + ZIO.attempt(bytes.map(serializer.deserialize(_).get)) + } - /** - * A layer that returns a serialization implementation using the Kryo library. - */ - val live: ZLayer[Any, Throwable, Serialization] = - ZLayer(ZIO.attempt(ConfigFactory.defaultReference()).flatMap(make)) + object Default { + private lazy val defaultKryoSerializer: ScalaKryoSerializer = + new ScalaKryoSerializer(ConfigFactory.defaultReference(), getClass.getClassLoader) - /** - * A layer that returns a serialization implementation using the Kryo library, taking a Config object. - * See https://github.com/altoo-ag/scala-kryo-serialization for more details about configuration. - */ - def liveWithConfig(config: Config): ZLayer[Any, Throwable, Serialization] = - ZLayer(make(config)) - - private def make(config: Config): Task[Serialization] = - ZIO.attempt { - new ScalaKryoSerializer(config, getClass.getClassLoader) - }.map(serializer => - new Serialization { - def encode(message: Any): Task[Array[Byte]] = ZIO.fromTry(serializer.serialize(message)) - def decode[A](bytes: Array[Byte]): Task[A] = ZIO.fromTry(serializer.deserialize[A](bytes)) - override def encodeChunk(messages: Chunk[Any]): Task[Chunk[Array[Byte]]] = - ZIO.attempt(messages.map(serializer.serialize(_).get)) - override def decodeChunk[A](bytes: Chunk[Array[Byte]]): Task[Chunk[A]] = - ZIO.attempt(bytes.map(serializer.deserialize[A](_).get)) - } - ) + implicit def defaultKryoSerialization[A]: Serialization[A] = + kryoSerialization[A](defaultKryoSerializer) + } } diff --git a/serialization-kryo/src/test/scala/com/devsisters/shardcake/KryoSerializationSpec.scala b/serialization-kryo/src/test/scala/com/devsisters/shardcake/KryoSerializationSpec.scala index c684fdda..19941e7c 100644 --- a/serialization-kryo/src/test/scala/com/devsisters/shardcake/KryoSerializationSpec.scala +++ b/serialization-kryo/src/test/scala/com/devsisters/shardcake/KryoSerializationSpec.scala @@ -1,7 +1,6 @@ package com.devsisters.shardcake -import com.devsisters.shardcake.interfaces.Serialization -import zio.{ Scope, ZIO } +import zio.Scope import zio.test._ object KryoSerializationSpec extends ZIOSpecDefault { @@ -11,9 +10,9 @@ object KryoSerializationSpec extends ZIOSpecDefault { case class Test(a: Int, b: String) val expected = Test(2, "test") for { - bytes <- ZIO.serviceWithZIO[Serialization](_.encode(expected)) - actual <- ZIO.serviceWithZIO[Serialization](_.decode[Test](bytes)) + bytes <- KryoSerialization.Default.defaultKryoSerialization.encode(expected) + actual <- KryoSerialization.Default.defaultKryoSerialization[Test].decode(bytes) } yield assertTrue(expected == actual) } - ).provideShared(KryoSerialization.live) + ) } From 653513f6b7dd7dff8a47401175f3802f5e5e6115 Mon Sep 17 00:00:00 2001 From: Yoonjae Jeon Date: Tue, 20 Jan 2026 10:24:53 +0900 Subject: [PATCH 2/5] . --- .../com/devsisters/shardcake/Replier.scala | 5 +- .../com/devsisters/shardcake/Sharding.scala | 73 +++++++++++-------- .../devsisters/shardcake/StreamReplier.scala | 5 +- .../shardcake/internal/EntityManager.scala | 4 +- .../shardcake/internal/ReplyChannel.scala | 62 ++++++++++------ .../scala/example/complex/GuildBehavior.scala | 1 + .../scala/example/simple/GuildBehavior.scala | 1 + .../src/test/scala/example/EndToEndSpec.scala | 7 +- .../shardcake/KryoSerialization.scala | 2 +- 9 files changed, 95 insertions(+), 65 deletions(-) diff --git a/entities/src/main/scala/com/devsisters/shardcake/Replier.scala b/entities/src/main/scala/com/devsisters/shardcake/Replier.scala index 9e267d20..d2ddabc0 100644 --- a/entities/src/main/scala/com/devsisters/shardcake/Replier.scala +++ b/entities/src/main/scala/com/devsisters/shardcake/Replier.scala @@ -1,11 +1,12 @@ package com.devsisters.shardcake +import com.devsisters.shardcake.interfaces.Serialization import zio.{ URIO, ZIO } /** * A metadata object that allows sending a response back to the sender */ -case class Replier[-R](id: String) { self => - def reply(reply: R): URIO[Sharding, Unit] = +case class Replier[R](id: String) { self => + def reply(reply: R)(implicit serialization: Serialization[R]): URIO[Sharding, Unit] = ZIO.serviceWithZIO[Sharding](_.reply(reply, self)) } diff --git a/entities/src/main/scala/com/devsisters/shardcake/Sharding.scala b/entities/src/main/scala/com/devsisters/shardcake/Sharding.scala index c0c6e731..896582c0 100644 --- a/entities/src/main/scala/com/devsisters/shardcake/Sharding.scala +++ b/entities/src/main/scala/com/devsisters/shardcake/Sharding.scala @@ -22,7 +22,7 @@ class Sharding private ( shardAssignments: Ref[Map[ShardId, PodAddress]], entityStates: Ref[Map[String, EntityState]], singletons: Ref.Synchronized[List[(String, UIO[Nothing], Option[Fiber[Nothing, Nothing]])]], - replyChannels: Ref[Map[String, ReplyChannel[Nothing]]], // channel for each pending reply, + replyChannels: Ref[Map[String, ReplyChannel[Any]]], // channel for each pending reply, lastUnhealthyNodeReported: Ref[OffsetDateTime], isShuttingDownRef: Ref[Boolean], shardManager: ShardManagerClient, @@ -181,51 +181,55 @@ class Sharding private ( private[shardcake] def isShuttingDown: UIO[Boolean] = isShuttingDownRef.get - def sendToLocalEntity[Msg: Serialization](msg: BinaryMessage): Task[Option[Array[Byte]]] = + def sendToLocalEntity(msg: BinaryMessage): Task[Option[Array[Byte]]] = for { - replyChannel <- ReplyChannel.single[Msg] + replyChannel <- ReplyChannel.single[Any] _ <- sendToLocalEntity(msg, replyChannel) res <- replyChannel.output - bytes <- ZIO.foreach(res)(implicitly[Serialization[Msg]].encode) + bytes <- ZIO.foreach(res) { case (msg, serialization) => serialization.encode(msg) } } yield bytes - def sendToLocalEntityAndReceiveStream[Msg: Serialization](msg: BinaryMessage): ZStream[Any, Throwable, Array[Byte]] = + def sendToLocalEntityAndReceiveStream(msg: BinaryMessage): ZStream[Any, Throwable, Array[Byte]] = ZStream.unwrap { for { - replyChannel <- ReplyChannel.stream[Msg] - _ <- sendToLocalEntity(msg, replyChannel) - } yield replyChannel.output.mapChunksZIO(implicitly[Serialization[Msg]].encodeChunk) + replyChannel <- ReplyChannel.stream[Any] + _ <- sendToLocalEntity(msg, replyChannel) + (promise, stream) = replyChannel.output + serialization <- promise.await + } yield stream.mapChunksZIO(serialization.encodeChunk) } - def sendStreamToLocalEntity[Msg: Serialization]( + def sendStreamToLocalEntity( messages: ZStream[Any, Throwable, BinaryMessage] ): Task[Option[Array[Byte]]] = ZIO.scoped { for { - replyChannel <- ReplyChannel.single[Msg] + replyChannel <- ReplyChannel.single[Any] _ <- messages.runForeach(sendToLocalEntity(_, replyChannel)).onError(replyChannel.fail).forkScoped res <- replyChannel.output - bytes <- ZIO.foreach(res)(implicitly[Serialization[Msg]].encode) + bytes <- ZIO.foreach(res) { case (msg, serialization) => serialization.encode(msg) } } yield bytes } - def sendStreamToLocalEntityAndReceiveStream[Msg: Serialization]( + def sendStreamToLocalEntityAndReceiveStream( messages: ZStream[Any, Throwable, BinaryMessage] ): ZStream[Any, Throwable, Array[Byte]] = ZStream.unwrapScoped { for { - replyChannel <- ReplyChannel.stream[Msg] - _ <- messages.runForeach(sendToLocalEntity(_, replyChannel)).onError(replyChannel.fail).forkScoped - } yield replyChannel.output.mapChunksZIO(implicitly[Serialization[Msg]].encodeChunk) + replyChannel <- ReplyChannel.stream[Any] + _ <- messages.runForeach(sendToLocalEntity(_, replyChannel)).onError(replyChannel.fail).forkScoped + (promise, stream) = replyChannel.output + serialization <- promise.await + } yield stream.mapChunksZIO(serialization.encodeChunk) } - private def sendToLocalEntity(msg: BinaryMessage, replyChannel: ReplyChannel[Nothing]): Task[Unit] = + private def sendToLocalEntity(msg: BinaryMessage, replyChannel: ReplyChannel[Any]): Task[Unit] = entityStates.get.flatMap(_.get(msg.entityType) match { case Some(state) => state.processBinary(msg, replyChannel).unit case None => ZIO.fail(new Exception(s"Entity type ${msg.entityType} was not registered.")) }) - private[shardcake] def initReply(id: String, replyChannel: ReplyChannel[Nothing]): UIO[Unit] = + private[shardcake] def initReply(id: String, replyChannel: ReplyChannel[Any]): UIO[Unit] = replyChannels .getAndUpdate(_.updated(id, replyChannel)) .flatMap(beforeReplyChannels => @@ -233,12 +237,15 @@ class Sharding private ( ) .unit - def reply[Reply](reply: Reply, replier: Replier[Reply]): UIO[Unit] = + def reply[Reply: Serialization](reply: Reply, replier: Replier[Reply]): UIO[Unit] = replyChannels .modify(repliers => (repliers.get(replier.id), repliers - replier.id)) .flatMap(ZIO.foreachDiscard(_)(_.asInstanceOf[ReplyChannel[Reply]].replySingle(reply))) - def replyStream[Reply](replies: ZStream[Any, Nothing, Reply], replier: StreamReplier[Reply]): UIO[Unit] = + def replyStream[Reply: Serialization]( + replies: ZStream[Any, Nothing, Reply], + replier: StreamReplier[Reply] + ): UIO[Unit] = replyChannels .modify(repliers => (repliers.get(replier.id), repliers - replier.id)) .flatMap(ZIO.foreachDiscard(_)(_.asInstanceOf[ReplyChannel[Reply]].replyStream(replies))) @@ -268,20 +275,22 @@ class Sharding private ( entityStates.get.flatMap( _.get(recipientTypeName) match { case Some(state) => - state.entityManager.asInstanceOf[EntityManager[Msg]].send(entityId, msg, replyId, replyChannel) + state.entityManager + .asInstanceOf[EntityManager[Msg]] + .send(entityId, msg, replyId, replyChannel.asInstanceOf[ReplyChannel[Any]]) case None => ZIO.fail(new Exception(s"Entity type $recipientTypeName was not registered.")) } ) - private def sendToPod[Msg: Serialization, Res: Serialization]( + private def sendToPod[Msg, Res]( recipientTypeName: String, entityId: String, pod: PodAddress, sendChannel: SendChannel[Msg], replyChannel: ReplyChannel[Res], replyId: Option[String] - ): Task[Unit] = + )(implicit msgSerialization: Serialization[Msg], resSerialization: Serialization[Res]): Task[Unit] = if (pod == address && !config.simulateRemotePods) { val run = sendChannel.foreach(sendToSelf(recipientTypeName, entityId, _, replyId, replyChannel)) sendChannel match { @@ -296,7 +305,7 @@ class Sharding private ( .tapError(handleError) .flatMap { case Some(bytes) => - implicitly[Serialization[Res]].decode(bytes).flatMap(replyChannel.replySingle) + resSerialization.decode(bytes).flatMap(replyChannel.replySingle(_)) case None => replyChannel.end } case _: ReplyChannel.FromQueue[_] => @@ -304,7 +313,7 @@ class Sharding private ( sendChannel .sendAndReceiveStream(pods, pod, entityId, recipientTypeName, replyId) .tapError(handleError) - .mapChunksZIO(implicitly[Serialization[Res]].decodeChunk) + .mapChunksZIO(resSerialization.decodeChunk) ) } } @@ -364,7 +373,7 @@ class Sharding private ( replyChannel <- ReplyChannel.single[Res] _ <- sendMessageGeneric(entityId, msg, replyId, replyChannel) res <- replyChannel.output - } yield res + } yield res.map(_._1) private def sendMessageAndReceiveStream[Res: Serialization]( entityId: String, @@ -374,7 +383,8 @@ class Sharding private ( for { replyChannel <- ReplyChannel.stream[Res] _ <- sendMessageGeneric(entityId, msg, replyId, replyChannel) - } yield replyChannel.output + (_, stream) = replyChannel.output + } yield stream private def sendStreamAndReceiveStream[Res: Serialization]( entityId: String, @@ -384,7 +394,8 @@ class Sharding private ( for { replyChannel <- ReplyChannel.stream[Res] _ <- sendStreamGeneric(entityId, messages, replyId, replyChannel) - } yield replyChannel.output + (_, stream) = replyChannel.output + } yield stream private def sendMessageGeneric[Res: Serialization]( entityId: String, @@ -496,7 +507,7 @@ class Sharding private ( Clock.sleep(200.millis) *> trySend }.onError(replyChannel.fail) res <- replyChannel.output - } yield res + } yield res.map(_._1) val send = trySend.flatMap { case Some(value) => ZIO.succeed(value) @@ -538,7 +549,7 @@ class Sharding private ( ): URIO[Scope with R, Unit] = for { entityManager <- EntityManager.make(recipientType, behavior, terminateMessage, self, config, entityMaxIdleTime) - processBinary = (msg: BinaryMessage, replyChannel: ReplyChannel[Nothing]) => + processBinary = (msg: BinaryMessage, replyChannel: ReplyChannel[Any]) => implicitly[Serialization[Req]] .decode(msg.body) .flatMap(entityManager.send(msg.entityId, _, msg.replyId, replyChannel)) @@ -571,7 +582,7 @@ object Sharding { private[shardcake] case class EntityState( entityManager: EntityManager[Nothing], - processBinary: (BinaryMessage, ReplyChannel[Nothing]) => UIO[Unit] + processBinary: (BinaryMessage, ReplyChannel[Any]) => UIO[Unit] ) /** @@ -596,7 +607,7 @@ object Sharding { } ) ) - replyChannels <- Ref.make[Map[String, ReplyChannel[Nothing]]](Map()) + replyChannels <- Ref.make[Map[String, ReplyChannel[Any]]](Map()) cdt <- Clock.currentDateTime lastUnhealthyNodeReported <- Ref.make(cdt) shuttingDown <- Ref.make(false) diff --git a/entities/src/main/scala/com/devsisters/shardcake/StreamReplier.scala b/entities/src/main/scala/com/devsisters/shardcake/StreamReplier.scala index 3884ecab..588e465f 100644 --- a/entities/src/main/scala/com/devsisters/shardcake/StreamReplier.scala +++ b/entities/src/main/scala/com/devsisters/shardcake/StreamReplier.scala @@ -1,12 +1,13 @@ package com.devsisters.shardcake +import com.devsisters.shardcake.interfaces.Serialization import zio.stream.ZStream import zio.{ URIO, ZIO } /** * A metadata object that allows sending a stream of responses back to the sender */ -final case class StreamReplier[-R](id: String) { self => - def replyStream(replies: ZStream[Any, Nothing, R]): URIO[Sharding, Unit] = +final case class StreamReplier[R](id: String) { self => + def replyStream(replies: ZStream[Any, Nothing, R])(implicit serialization: Serialization[R]): URIO[Sharding, Unit] = ZIO.serviceWithZIO[Sharding](_.replyStream(replies, self)) } diff --git a/entities/src/main/scala/com/devsisters/shardcake/internal/EntityManager.scala b/entities/src/main/scala/com/devsisters/shardcake/internal/EntityManager.scala index 18888f58..0ff6469b 100644 --- a/entities/src/main/scala/com/devsisters/shardcake/internal/EntityManager.scala +++ b/entities/src/main/scala/com/devsisters/shardcake/internal/EntityManager.scala @@ -11,7 +11,7 @@ private[shardcake] trait EntityManager[-Req] { entityId: String, req: Req, replyId: Option[String], - replyChannel: ReplyChannel[Nothing] + replyChannel: ReplyChannel[Any] ): IO[EntityNotManagedByThisPod, Unit] def terminateEntity(entityId: String): UIO[Unit] def terminateEntitiesOnShards(shards: Set[ShardId]): UIO[Unit] @@ -101,7 +101,7 @@ private[shardcake] object EntityManager { entityId: String, req: Req, replyId: Option[String], - replyChannel: ReplyChannel[Nothing] + replyChannel: ReplyChannel[Any] ): IO[EntityNotManagedByThisPod, Unit] = for { // first, verify that this entity should be handled by this pod diff --git a/entities/src/main/scala/com/devsisters/shardcake/internal/ReplyChannel.scala b/entities/src/main/scala/com/devsisters/shardcake/internal/ReplyChannel.scala index 79daf1a3..a2840b84 100644 --- a/entities/src/main/scala/com/devsisters/shardcake/internal/ReplyChannel.scala +++ b/entities/src/main/scala/com/devsisters/shardcake/internal/ReplyChannel.scala @@ -1,48 +1,62 @@ package com.devsisters.shardcake.internal +import com.devsisters.shardcake.interfaces.Serialization import zio.stream.{ Take, ZStream } import zio.{ Cause, Promise, Queue, Task, UIO } -private[shardcake] sealed trait ReplyChannel[-A] { self => +private[shardcake] sealed trait ReplyChannel[A] { self => val await: UIO[Unit] val end: UIO[Unit] def fail(cause: Cause[Throwable]): UIO[Unit] - def replySingle(a: A): UIO[Unit] - def replyStream(stream: ZStream[Any, Throwable, A]): UIO[Unit] + def replySingle(a: A)(implicit serialization: Serialization[A]): UIO[Unit] + def replyStream(stream: ZStream[Any, Throwable, A])(implicit serialization: Serialization[A]): UIO[Unit] } private[shardcake] object ReplyChannel { - case class FromQueue[A](queue: Queue[Take[Throwable, A]]) extends ReplyChannel[A] { - val await: UIO[Unit] = queue.awaitShutdown - val end: UIO[Unit] = queue.offer(Take.end).exit.unit - def fail(cause: Cause[Throwable]): UIO[Unit] = queue.offer(Take.failCause(cause)).exit.unit - def replySingle(a: A): UIO[Unit] = queue.offer(Take.single(a)).exit *> end - def replyStream(stream: ZStream[Any, Throwable, A]): UIO[Unit] = - (stream - .runForeachChunk(chunk => queue.offer(Take.chunk(chunk))) - .onExit(e => queue.offer(e.foldExit(Take.failCause, _ => Take.end))) - .ignore race await).fork.unit - val output: ZStream[Any, Throwable, A] = ZStream.fromQueueWithShutdown(queue).flattenTake.onError(fail) + case class FromQueue[A](queue: Queue[Take[Throwable, A]], serializationPromise: Promise[Throwable, Serialization[A]]) + extends ReplyChannel[A] { + val await: UIO[Unit] = queue.awaitShutdown + val end: UIO[Unit] = queue.offer(Take.end).exit.unit + def fail(cause: Cause[Throwable]): UIO[Unit] = + serializationPromise.failCause(cause) *> queue.offer(Take.failCause(cause)).exit.unit + def replySingle(a: A)(implicit serialization: Serialization[A]): UIO[Unit] = + serializationPromise.succeed(serialization) *> + queue.offer(Take.single(a)).exit *> end + + def replyStream(stream: ZStream[Any, Throwable, A])(implicit serialization: Serialization[A]): UIO[Unit] = + serializationPromise.succeed(serialization) *> + (stream + .runForeachChunk(chunk => queue.offer(Take.chunk(chunk))) + .onExit(e => queue.offer(e.foldExit(Take.failCause, _ => Take.end))) + .ignore race await).fork.unit + val output: (Promise[Throwable, Serialization[A]], ZStream[Any, Throwable, A]) = + serializationPromise -> + ZStream.fromQueueWithShutdown(queue).flattenTake.onError(fail) } - case class FromPromise[A](promise: Promise[Throwable, Option[A]]) extends ReplyChannel[A] { - val await: UIO[Unit] = promise.await.exit.unit - val end: UIO[Unit] = promise.succeed(None).unit - def fail(cause: Cause[Throwable]): UIO[Unit] = promise.failCause(cause).unit - def replySingle(a: A): UIO[Unit] = promise.succeed(Some(a)).unit - def replyStream(stream: ZStream[Any, Throwable, A]): UIO[Unit] = + case class FromPromise[A](promise: Promise[Throwable, Option[(A, Serialization[A])]]) extends ReplyChannel[A] { + val await: UIO[Unit] = promise.await.exit.unit + val end: UIO[Unit] = promise.succeed(None).unit + def fail(cause: Cause[Throwable]): UIO[Unit] = promise.failCause(cause).unit + def replySingle(a: A)(implicit serialization: Serialization[A]): UIO[Unit] = + promise.succeed(Some(a -> serialization)).unit + def replyStream(stream: ZStream[Any, Throwable, A])(implicit serialization: Serialization[A]): UIO[Unit] = stream.runHead - .flatMap(promise.succeed(_).unit) + .flatMap(a => promise.succeed(a.map(_ -> serialization)).unit) .catchAllCause[Any, Nothing, Unit](fail) .fork .unit - val output: Task[Option[A]] = promise.await.onError(fail) + val output: Task[Option[(A, Serialization[A])]] = promise.await.onError(fail) } def single[A]: UIO[FromPromise[A]] = - Promise.make[Throwable, Option[A]].map(FromPromise(_)) + Promise.make[Throwable, Option[(A, Serialization[A])]].map(FromPromise(_)) def stream[A]: UIO[FromQueue[A]] = - Queue.unbounded[Take[Throwable, A]].map(FromQueue(_)) + Promise.make[Throwable, Serialization[A]].flatMap { serializationPromise => + Queue.unbounded[Take[Throwable, A]].map { queue => + FromQueue(queue, serializationPromise) + } + } } diff --git a/examples/src/main/scala/example/complex/GuildBehavior.scala b/examples/src/main/scala/example/complex/GuildBehavior.scala index 1c0ad936..f32641fb 100644 --- a/examples/src/main/scala/example/complex/GuildBehavior.scala +++ b/examples/src/main/scala/example/complex/GuildBehavior.scala @@ -1,6 +1,7 @@ package example.complex import com.devsisters.shardcake.{ EntityType, Replier, Sharding } +import com.devsisters.shardcake.KryoSerialization.Default._ import dev.profunktor.redis4cats.RedisCommands import zio.{ Dequeue, Promise, RIO, Task, ZIO } diff --git a/examples/src/main/scala/example/simple/GuildBehavior.scala b/examples/src/main/scala/example/simple/GuildBehavior.scala index 4b1ec15b..a6f8fb48 100644 --- a/examples/src/main/scala/example/simple/GuildBehavior.scala +++ b/examples/src/main/scala/example/simple/GuildBehavior.scala @@ -1,6 +1,7 @@ package example.simple import com.devsisters.shardcake.{ EntityType, Replier, Sharding, StreamReplier } +import com.devsisters.shardcake.KryoSerialization.Default._ import zio.stream.ZStream import zio.{ Dequeue, RIO, Ref, ZIO } diff --git a/examples/src/test/scala/example/EndToEndSpec.scala b/examples/src/test/scala/example/EndToEndSpec.scala index 9facd9d1..8ceb3992 100644 --- a/examples/src/test/scala/example/EndToEndSpec.scala +++ b/examples/src/test/scala/example/EndToEndSpec.scala @@ -89,9 +89,10 @@ object EndToEndSpec extends ZIOSpecDefault { failure <- guild.send[Try[Set[String]]]("guild1")(Join("user6", _)) stream <- guild.sendAndReceiveStream[String]("guild1")(Stream(_)) res <- stream.runCollect - } yield assert(members)(isSuccess(hasSize(equalTo(5)))) && - assertTrue(failure.isFailure) && - assertTrue(timeout.toTry.isFailure) && + } yield assert(members)(isSuccess(hasSize(equalTo(5)))) && assertTrue( + failure.isFailure, + timeout.toTry.isFailure + ) && assert(res)(hasSize(equalTo(5))) } } diff --git a/serialization-kryo/src/main/scala/com/devsisters/shardcake/KryoSerialization.scala b/serialization-kryo/src/main/scala/com/devsisters/shardcake/KryoSerialization.scala index 4d06d0f7..a74d21c5 100644 --- a/serialization-kryo/src/main/scala/com/devsisters/shardcake/KryoSerialization.scala +++ b/serialization-kryo/src/main/scala/com/devsisters/shardcake/KryoSerialization.scala @@ -13,7 +13,7 @@ object KryoSerialization { override def encodeChunk(messages: Chunk[A]): Task[Chunk[Array[Byte]]] = ZIO.attempt(messages.map(serializer.serialize(_).get)) override def decodeChunk(bytes: Chunk[Array[Byte]]): Task[Chunk[A]] = - ZIO.attempt(bytes.map(serializer.deserialize(_).get)) + ZIO.attempt(bytes.map(serializer.deserialize[A](_).get)) } object Default { From 59c62d7fa771883ebcb0c8a8d55d3f1d11852d16 Mon Sep 17 00:00:00 2001 From: Yoonjae Jeon Date: Tue, 20 Jan 2026 10:40:19 +0900 Subject: [PATCH 3/5] . --- .../scala/com/devsisters/shardcake/BroadcastingSpec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/entities/src/test/scala/com/devsisters/shardcake/BroadcastingSpec.scala b/entities/src/test/scala/com/devsisters/shardcake/BroadcastingSpec.scala index b21b33b7..530fc124 100644 --- a/entities/src/test/scala/com/devsisters/shardcake/BroadcastingSpec.scala +++ b/entities/src/test/scala/com/devsisters/shardcake/BroadcastingSpec.scala @@ -2,9 +2,9 @@ package com.devsisters.shardcake import com.devsisters.shardcake.interfaces.JavaSerialization.javaSerialization import com.devsisters.shardcake.interfaces.Storage -import zio.test.TestAspect.{sequential, withLiveClock} +import zio.test.TestAspect.{ sequential, withLiveClock } import zio.test._ -import zio.{Config => _, _} +import zio.{ Config => _, _ } import scala.util.Success From f113f33913c5b46a1901f42d532d9200fe3b917c Mon Sep 17 00:00:00 2001 From: Yoonjae Jeon Date: Tue, 20 Jan 2026 11:38:16 +0900 Subject: [PATCH 4/5] address reviews --- examples/src/test/scala/example/EndToEndSpec.scala | 6 ++---- .../com/devsisters/shardcake/KryoSerializationSpec.scala | 7 ++++--- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/examples/src/test/scala/example/EndToEndSpec.scala b/examples/src/test/scala/example/EndToEndSpec.scala index 8ceb3992..b26ecc47 100644 --- a/examples/src/test/scala/example/EndToEndSpec.scala +++ b/examples/src/test/scala/example/EndToEndSpec.scala @@ -89,10 +89,8 @@ object EndToEndSpec extends ZIOSpecDefault { failure <- guild.send[Try[Set[String]]]("guild1")(Join("user6", _)) stream <- guild.sendAndReceiveStream[String]("guild1")(Stream(_)) res <- stream.runCollect - } yield assert(members)(isSuccess(hasSize(equalTo(5)))) && assertTrue( - failure.isFailure, - timeout.toTry.isFailure - ) && + } yield assert(members)(isSuccess(hasSize(equalTo(5)))) && + assertTrue(failure.isFailure, timeout.toTry.isFailure) && assert(res)(hasSize(equalTo(5))) } } diff --git a/serialization-kryo/src/test/scala/com/devsisters/shardcake/KryoSerializationSpec.scala b/serialization-kryo/src/test/scala/com/devsisters/shardcake/KryoSerializationSpec.scala index 19941e7c..ed62d457 100644 --- a/serialization-kryo/src/test/scala/com/devsisters/shardcake/KryoSerializationSpec.scala +++ b/serialization-kryo/src/test/scala/com/devsisters/shardcake/KryoSerializationSpec.scala @@ -8,10 +8,11 @@ object KryoSerializationSpec extends ZIOSpecDefault { suite("KryoSerializationSpec")( test("serialize back and forth") { case class Test(a: Int, b: String) - val expected = Test(2, "test") + val expected = Test(2, "test") + val serialization = KryoSerialization.Default.defaultKryoSerialization[Test] for { - bytes <- KryoSerialization.Default.defaultKryoSerialization.encode(expected) - actual <- KryoSerialization.Default.defaultKryoSerialization[Test].decode(bytes) + bytes <- serialization.encode(expected) + actual <- serialization.decode(bytes) } yield assertTrue(expected == actual) } ) From 0369dfccfd9a96ecdb74b0e9cfdd62a16fbbaa5a Mon Sep 17 00:00:00 2001 From: Jeon Yoonjae Date: Wed, 21 Jan 2026 16:19:05 +0900 Subject: [PATCH 5/5] Add an example uses upickle (#170) --- build.sbt | 16 ++- .../example/mailbox/MailboxBehavior.scala | 96 +++++++++++++ .../scala/example/MailboxEndToEndSpec.scala | 131 ++++++++++++++++++ .../shardcake/UpickleSerialization.scala | 19 +++ 4 files changed, 261 insertions(+), 1 deletion(-) create mode 100644 examples/src/main/scala/example/mailbox/MailboxBehavior.scala create mode 100644 examples/src/test/scala/example/MailboxEndToEndSpec.scala create mode 100644 serialization-upickle/src/main/scala/com/devsisters/shardcake/UpickleSerialization.scala diff --git a/build.sbt b/build.sbt index f8acaa9b..f546fca1 100644 --- a/build.sbt +++ b/build.sbt @@ -16,6 +16,7 @@ val redissonVersion = "3.45.1" val scalaKryoVersion = "1.3.0" val testContainersVersion = "0.44.1" val scalaCompatVersion = "2.13.0" +val upickleVersion = "4.4.2" inThisBuild( List( @@ -57,6 +58,7 @@ lazy val root = project storageRedis, storageRedisson, serializationKryo, + serializationUpickle, grpcProtocol, examples, benchmarks @@ -153,6 +155,18 @@ lazy val serializationKryo = project ) ) +lazy val serializationUpickle = project + .in(file("serialization-upickle")) + .settings(name := "shardcake-serialization-upickle") + .settings(commonSettings) + .dependsOn(core) + .settings( + libraryDependencies ++= + Seq( + "com.lihaoyi" %% "upickle" % upickleVersion + ) + ) + lazy val grpcProtocol = project .in(file("protocol-grpc")) .settings(name := "shardcake-protocol-grpc") @@ -186,7 +200,7 @@ lazy val examples = project "dev.zio" %% "zio-streams" % zioVersion ) ) - .dependsOn(manager, storageRedis, grpcProtocol, serializationKryo) + .dependsOn(manager, storageRedis, grpcProtocol, serializationKryo, serializationUpickle) lazy val benchmarks = project .in(file("benchmarks")) diff --git a/examples/src/main/scala/example/mailbox/MailboxBehavior.scala b/examples/src/main/scala/example/mailbox/MailboxBehavior.scala new file mode 100644 index 00000000..541d47f6 --- /dev/null +++ b/examples/src/main/scala/example/mailbox/MailboxBehavior.scala @@ -0,0 +1,96 @@ +package example.mailbox + +import com.devsisters.shardcake.UpickleSerialization._ +import com.devsisters.shardcake.{ EntityType, Replier, Sharding } +import dev.profunktor.redis4cats.RedisCommands +import upickle.default._ +import zio.{ Dequeue, RIO, Task, ZIO } + +object MailboxBehavior { + case class Letter(id: String, from: String, content: String, isRead: Boolean) + object Letter { + implicit val rw: ReadWriter[Letter] = macroRW + } + + implicit def replierRW[A]: ReadWriter[Replier[A]] = readwriter[String].bimap[Replier[A]]( + replier => replier.id, + id => Replier(id) + ) + + sealed trait MailboxMessage + + object MailboxMessage { + case class SendLetter(from: String, content: String, replier: Replier[Option[Letter]]) extends MailboxMessage + case class GetLetters(replier: Replier[List[Letter]]) extends MailboxMessage + case class ReadLetter(letterId: String, replier: Replier[Option[Letter]]) extends MailboxMessage + case class DeleteLetter(letterId: String, replier: Replier[Boolean]) extends MailboxMessage + case object Terminate extends MailboxMessage + + implicit val sendLetterRW: ReadWriter[SendLetter] = macroRW + implicit val getLettersRW: ReadWriter[GetLetters] = macroRW + implicit val readLetterRW: ReadWriter[ReadLetter] = macroRW + implicit val deleteLetterRW: ReadWriter[DeleteLetter] = macroRW + implicit val terminateRW: ReadWriter[Terminate.type] = macroRW + implicit val rw: ReadWriter[MailboxMessage] = macroRW + } + + object Mailbox extends EntityType[MailboxMessage]("mailbox") + + def behavior( + entityId: String, + messages: Dequeue[MailboxMessage] + ): RIO[Sharding with RedisCommands[Task, String, String], Nothing] = + ZIO.serviceWithZIO[RedisCommands[Task, String, String]](redis => + ZIO.logInfo(s"Started mailbox entity $entityId") *> + messages.take.flatMap(handleMessage(entityId, redis, _)).forever + ) + + def handleMessage( + entityId: String, + redis: RedisCommands[Task, String, String], + message: MailboxMessage + ): RIO[Sharding, Unit] = + message match { + case MailboxMessage.SendLetter(from, content, replier) => + val letterKey = s"mailbox:$entityId:letters" + val letterId = java.util.UUID.randomUUID().toString + val letter = Letter(letterId, from, content, isRead = false) + + redis.lPush(letterKey, write(letter)) *> + replier.reply(Some(letter)) + + case MailboxMessage.GetLetters(replier) => + val letterKey = s"mailbox:$entityId:letters" + + redis + .lRange(letterKey, 0, -1) + .map(_.map(json => read[Letter](json)).toList) + .flatMap(letters => replier.reply(letters)) + + case MailboxMessage.ReadLetter(letterId, replier) => + val letterKey = s"mailbox:$entityId:letters" + + redis + .lRange(letterKey, 0, -1) + .map(_.map(json => read[Letter](json)).find(_.id == letterId)) + .flatMap(letterOpt => replier.reply(letterOpt)) + + case MailboxMessage.DeleteLetter(letterId, replier) => + val letterKey = s"mailbox:$entityId:letters" + + for { + allLetters <- redis.lRange(letterKey, 0, -1).map(_.map(json => read[Letter](json))) + letterOpt = allLetters.find(_.id == letterId) + result <- letterOpt match { + case Some(letter) => + redis.lRem(letterKey, 1, write(letter)).as(true) + case None => + ZIO.succeed(false) + } + _ <- replier.reply(result) + } yield () + + case MailboxMessage.Terminate => + ZIO.interrupt + } +} diff --git a/examples/src/test/scala/example/MailboxEndToEndSpec.scala b/examples/src/test/scala/example/MailboxEndToEndSpec.scala new file mode 100644 index 00000000..4c7b8a43 --- /dev/null +++ b/examples/src/test/scala/example/MailboxEndToEndSpec.scala @@ -0,0 +1,131 @@ +package example + +import com.devsisters.shardcake.StorageRedis.Redis +import com.devsisters.shardcake._ +import com.devsisters.shardcake.interfaces.PodsHealth +import com.devsisters.shardcake.UpickleSerialization._ +import com.dimafeng.testcontainers.GenericContainer +import dev.profunktor.redis4cats.Redis +import dev.profunktor.redis4cats.connection.RedisClient +import dev.profunktor.redis4cats.data.RedisCodec +import dev.profunktor.redis4cats.effect.Log +import dev.profunktor.redis4cats.pubsub.PubSub +import example.mailbox.MailboxBehavior +import example.mailbox.MailboxBehavior.Mailbox +import example.mailbox.MailboxBehavior.MailboxMessage.{ DeleteLetter, GetLetters, ReadLetter, SendLetter } +import sttp.client4.UriContext +import zio.{ Config => _, _ } +import zio.Clock.ClockLive +import zio.interop.catz._ +import zio.test.Assertion._ +import zio.test.TestAspect.{ sequential, withLiveClock } +import zio.test._ + +object MailboxEndToEndSpec extends ZIOSpecDefault { + + val shardManagerServer: ZLayer[ShardManager with ManagerConfig, Throwable, Unit] = + ZLayer(Server.run().forkDaemon *> ClockLive.sleep(3 seconds).unit) + + val container: ZLayer[Any, Nothing, GenericContainer] = + ZLayer.scoped { + ZIO.acquireRelease { + ZIO.attemptBlocking { + val container = new GenericContainer(dockerImage = "redis:6.2.5", exposedPorts = Seq(6379)) + container.start() + container + }.orDie + }(container => ZIO.attemptBlocking(container.stop()).orDie) + } + + val redis: ZLayer[GenericContainer, Throwable, Redis] = + ZLayer.scopedEnvironment { + implicit val runtime: zio.Runtime[Any] = zio.Runtime.default + implicit val logger: Log[Task] = new Log[Task] { + override def debug(msg: => String): Task[Unit] = ZIO.unit + override def error(msg: => String): Task[Unit] = ZIO.logError(msg) + override def info(msg: => String): Task[Unit] = ZIO.logDebug(msg) + } + + ZIO + .service[GenericContainer] + .flatMap(container => + (for { + client <- RedisClient[Task].from( + s"redis://foobared@${container.host}:${container.mappedPort(container.exposedPorts.head)}" + ) + commands <- Redis[Task].fromClient(client, RedisCodec.Utf8) + pubSub <- PubSub.mkPubSubConnection[Task, String, String](client, RedisCodec.Utf8) + } yield ZEnvironment(commands, pubSub)).toScopedZIO + ) + } + + private val config = ZLayer.succeed( + Config.default.copy( + shardManagerUri = uri"http://localhost:8088/api/graphql", + simulateRemotePods = true, + sendTimeout = 3 seconds + ) + ) + private val grpcConfig = ZLayer.succeed(GrpcConfig.default) + private val managerConfig = ZLayer.succeed(ManagerConfig.default.copy(apiPort = 8088)) + private val redisConfig = ZLayer.succeed(RedisConfig.default) + + def spec: Spec[TestEnvironment with Scope, Any] = + suite("MailboxEndToEndSpec")( + test("Send and receive letters with upickle serialization") { + ZIO.scoped { + for { + _ <- Sharding.registerEntity(Mailbox, MailboxBehavior.behavior) + _ <- Sharding.registerScoped + mailbox <- Sharding.messenger(Mailbox) + + letter1 <- + mailbox.send[Option[MailboxBehavior.Letter]]("mailbox1")(SendLetter("user1", "Hello from user1!", _)) + letter2 <- mailbox.send[Option[MailboxBehavior.Letter]]("mailbox1")(SendLetter("user2", "Hi there!", _)) + letter3 <- mailbox.send[Option[MailboxBehavior.Letter]]("mailbox1")(SendLetter("user3", "How are you?", _)) + + letters <- mailbox.send[List[MailboxBehavior.Letter]]("mailbox1")(GetLetters(_)) + + readLetter <- letter1.map(_.id) match { + case Some(id) => mailbox.send[Option[MailboxBehavior.Letter]]("mailbox1")(ReadLetter(id, _)) + case None => ZIO.none + } + deleted <- letter2.map(_.id) match { + case Some(id) => mailbox.send[Boolean]("mailbox1")(DeleteLetter(id, _)) + case None => ZIO.succeed(false) + } + + lettersAfterDelete <- mailbox.send[List[MailboxBehavior.Letter]]("mailbox1")(GetLetters(_)) + + // Test different mailbox + letter4 <- + mailbox.send[Option[MailboxBehavior.Letter]]("mailbox2")(SendLetter("user4", "Message to mailbox2", _)) + letters2 <- mailbox.send[List[MailboxBehavior.Letter]]("mailbox2")(GetLetters(_)) + + } yield assert(letter1)(isSome) && + assert(letter2)(isSome) && + assert(letter3)(isSome) && + assert(letters)(hasSize(equalTo(3))) && + assertTrue(readLetter.zip(letter1).exists { case (a, b) => a == b }, deleted) && + assert(lettersAfterDelete)(hasSize(equalTo(2))) && + assert(letter4)(isSome) && + assert(letters2)(hasSize(equalTo(1))) + } + } + ).provideShared( + Sharding.live, + GrpcPods.live, + ShardManagerClient.liveWithSttp, + StorageRedis.live, + ShardManager.live, + PodsHealth.noop, + GrpcShardingService.live, + shardManagerServer, + container, + redis, + config, + grpcConfig, + managerConfig, + redisConfig + ) @@ sequential @@ withLiveClock +} diff --git a/serialization-upickle/src/main/scala/com/devsisters/shardcake/UpickleSerialization.scala b/serialization-upickle/src/main/scala/com/devsisters/shardcake/UpickleSerialization.scala new file mode 100644 index 00000000..e10f7e7c --- /dev/null +++ b/serialization-upickle/src/main/scala/com/devsisters/shardcake/UpickleSerialization.scala @@ -0,0 +1,19 @@ +package com.devsisters.shardcake + +import com.devsisters.shardcake.interfaces.Serialization +import upickle.default._ +import zio.{ Chunk, Task, ZIO } + +import scala.util.Try + +object UpickleSerialization { + implicit def upickleSerialization[A](implicit rw: ReadWriter[A]): Serialization[A] = + new Serialization[A] { + def encode(message: A): Task[Array[Byte]] = ZIO.fromTry(Try(writeBinary(message))) + def decode(bytes: Array[Byte]): Task[A] = ZIO.fromTry(Try(readBinary[A](bytes))) + override def encodeChunk(messages: Chunk[A]): Task[Chunk[Array[Byte]]] = + ZIO.attempt(messages.map(m => Try(writeBinary(m)).get)) + override def decodeChunk(bytes: Chunk[Array[Byte]]): Task[Chunk[A]] = + ZIO.attempt(bytes.map(m => Try(readBinary[A](m)).get)) + } +}