Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.devsisters.shardcake

import com.devsisters.shardcake.KryoSerialization.Default._
import com.devsisters.shardcake.Server.Message.Ping
import com.devsisters.shardcake.Server.PingPongEntity
import zio.{ Config => _, _ }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.devsisters.shardcake

import com.devsisters.shardcake.interfaces.Storage
import com.devsisters.shardcake.KryoSerialization.Default._
import zio.{ Config => _, _ }
import zio.stream.ZStream

Expand Down Expand Up @@ -51,7 +52,6 @@ object Server {

val sharding: ZLayer[Config, Throwable, Sharding with GrpcConfig] =
ZLayer.makeSome[Config, Sharding with GrpcConfig](
KryoSerialization.live,
memory,
grpcConfig,
shardManagerClient,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,46 +1,53 @@
package com.devsisters.shardcake.interfaces

import zio.{ Chunk, Task, ULayer, ZIO, ZLayer }
import zio.{ Chunk, Task, ZIO }

import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream }

/**
* An interface to serialize user messages that will be sent between pods.
*/
trait Serialization {
trait Serialization[Msg] {

/**
* Transforms the given message into binary
*/
def encode(message: Any): Task[Array[Byte]]
def encode(message: Msg): Task[Array[Byte]]

/**
* Transform binary back into the given type
*/
def decode[A](bytes: Array[Byte]): Task[A]
def decode(bytes: Array[Byte]): Task[Msg]

/**
* Transforms a chunk of messages into binary
*/
def encodeChunk(messages: Chunk[Any]): Task[Chunk[Array[Byte]]] =
def encodeChunk(messages: Chunk[Msg]): Task[Chunk[Array[Byte]]] =
ZIO.foreach(messages)(encode)

/**
* Transforms a chunk of binary back into the given type
*/
def decodeChunk[A](bytes: Chunk[Array[Byte]]): Task[Chunk[A]] =
ZIO.foreach(bytes)(decode[A])
def decodeChunk(bytes: Chunk[Array[Byte]]): Task[Chunk[Msg]] =
ZIO.foreach(bytes)(decode)
}

object Serialization {
implicit val unitSerialization: Serialization[Unit] = new Serialization[Unit] {
def encode(message: Unit): Task[Array[Byte]] = ZIO.succeed(Array.emptyByteArray)
def decode(bytes: Array[Byte]): Task[Unit] = ZIO.unit
}
}

object JavaSerialization {

/**
* A layer that uses Java serialization for encoding and decoding messages.
* A Java serialization for encoding and decoding messages.
* This is useful for testing and not recommended to use in production.
*/
val javaSerialization: ULayer[Serialization] =
ZLayer.succeed(new Serialization {
def encode(message: Any): Task[Array[Byte]] =
implicit def javaSerialization[T]: Serialization[T] =
new Serialization[T] {
def encode(message: T): Task[Array[Byte]] =
ZIO.scoped {
val stream = new ByteArrayOutputStream()
ZIO
Expand All @@ -49,11 +56,11 @@ object Serialization {
.as(stream.toByteArray)
}

def decode[A](bytes: Array[Byte]): Task[A] =
def decode(bytes: Array[Byte]): Task[T] =
ZIO.scoped {
ZIO
.fromAutoCloseable(ZIO.attempt(new ObjectInputStream(new ByteArrayInputStream(bytes))))
.flatMap(ois => ZIO.attempt(ois.readObject.asInstanceOf[A]))
.flatMap(ois => ZIO.attempt(ois.readObject.asInstanceOf[T]))
}
})
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.devsisters.shardcake

import com.devsisters.shardcake.interfaces.Serialization
import zio.{ Scope, ZIO }
import com.devsisters.shardcake.interfaces.JavaSerialization
import zio.Scope
import zio.test._

object JavaSerializationSpec extends ZIOSpecDefault {
Expand All @@ -11,9 +11,9 @@ object JavaSerializationSpec extends ZIOSpecDefault {
case class Test(a: Int, b: String)
val expected = Test(2, "test")
for {
bytes <- ZIO.serviceWithZIO[Serialization](_.encode(expected))
actual <- ZIO.serviceWithZIO[Serialization](_.decode[Test](bytes))
bytes <- JavaSerialization.javaSerialization.encode(expected)
actual <- JavaSerialization.javaSerialization[Test].decode(bytes)
} yield assertTrue(expected == actual)
}
).provideShared(Serialization.javaSerialization)
)
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.devsisters.shardcake

import com.devsisters.shardcake.interfaces.Serialization
import zio.UIO

import scala.util.Try
Expand All @@ -18,5 +19,5 @@ trait Broadcaster[-Msg] {
/**
* Broadcast a message and wait for a response from each consumer
*/
def broadcast[Res](topic: String)(msg: Replier[Res] => Msg): UIO[Map[PodAddress, Try[Res]]]
def broadcast[Res: Serialization](topic: String)(msg: Replier[Res] => Msg): UIO[Map[PodAddress, Try[Res]]]
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ object LocalSharding {
* A special layer meant for testing that uses a local queue rather than an external transport.
* This layer will only work in a single JVM and is not suitable for production use.
*/
val live: RLayer[ShardManagerClient with Storage with Serialization with Config, Sharding] =
ZLayer.makeSome[ShardManagerClient with Storage with Serialization with Config, Sharding](
val live: RLayer[ShardManagerClient with Storage with Config, Sharding] =
ZLayer.makeSome[ShardManagerClient with Storage with Config, Sharding](
localQueue,
localPods,
localServer,
Expand Down
13 changes: 8 additions & 5 deletions entities/src/main/scala/com/devsisters/shardcake/Messenger.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.devsisters.shardcake

import com.devsisters.shardcake.errors.StreamCancelled
import com.devsisters.shardcake.interfaces.Serialization
import zio._
import zio.stream.ZStream

Expand All @@ -18,7 +19,7 @@ trait Messenger[-Msg] {
/**
* Send a message and wait for a response of type `Res`
*/
def send[Res](entityId: String)(msg: Replier[Res] => Msg): Task[Res]
def send[Res: Serialization](entityId: String)(msg: Replier[Res] => Msg): Task[Res]

/**
* Send a message and receive a stream of responses of type `Res`.
Expand All @@ -27,7 +28,9 @@ trait Messenger[-Msg] {
* streaming responses. See `sendStreamAutoRestart` for an alternative that will automatically restart the stream
* in case of rebalance.
*/
def sendAndReceiveStream[Res](entityId: String)(msg: StreamReplier[Res] => Msg): Task[ZStream[Any, Throwable, Res]]
def sendAndReceiveStream[Res: Serialization](entityId: String)(
msg: StreamReplier[Res] => Msg
): Task[ZStream[Any, Throwable, Res]]

/**
* Send a stream of messages.
Expand All @@ -37,7 +40,7 @@ trait Messenger[-Msg] {
/**
* Send a stream of messages and receive a stream of responses of type `Res`.
*/
def sendStreamAndReceiveStream[Res](entityId: String)(
def sendStreamAndReceiveStream[Res: Serialization](entityId: String)(
messages: StreamReplier[Res] => ZStream[Any, Throwable, Msg]
): Task[ZStream[Any, Throwable, Res]]

Expand All @@ -50,7 +53,7 @@ trait Messenger[-Msg] {
* cursor from the responses so that when the remote entity is rebalanced, a new message can be sent with the right
* cursor according to what we've seen in the previous stream of responses.
*/
def sendAndReceiveStreamAutoRestart[Cursor, Res](entityId: String, cursor: Cursor)(
def sendAndReceiveStreamAutoRestart[Cursor, Res: Serialization](entityId: String, cursor: Cursor)(
msg: (Cursor, StreamReplier[Res]) => Msg
)(
updateCursor: (Cursor, Res) => Cursor
Expand Down Expand Up @@ -79,7 +82,7 @@ trait Messenger[-Msg] {
* cursor from the responses so that when the remote entity is rebalanced, a new message can be sent with the right
* cursor according to what we've seen in the previous stream of responses.
*/
def sendStreamAndReceiveStreamAutoRestart[Cursor, Res](entityId: String, cursor: Cursor)(
def sendStreamAndReceiveStreamAutoRestart[Cursor, Res: Serialization](entityId: String, cursor: Cursor)(
msg: (Cursor, StreamReplier[Res]) => ZStream[Any, Throwable, Msg]
)(
updateCursor: (Cursor, Res) => Cursor
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package com.devsisters.shardcake

import com.devsisters.shardcake.interfaces.Serialization
import zio.{ URIO, ZIO }

/**
* A metadata object that allows sending a response back to the sender
*/
case class Replier[-R](id: String) { self =>
def reply(reply: R): URIO[Sharding, Unit] =
case class Replier[R](id: String) { self =>
def reply(reply: R)(implicit serialization: Serialization[R]): URIO[Sharding, Unit] =
ZIO.serviceWithZIO[Sharding](_.reply(reply, self))
}
Loading