Skip to content

Commit a05f733

Browse files
authored
add ReadStream RPC (#1500)
* add readStream service * fix the timing of readerStopReading calls * some refactor && add readSingleShardStream RPC * update proto
1 parent c0fd015 commit a05f733

File tree

8 files changed

+311
-70
lines changed

8 files changed

+311
-70
lines changed

common/hstream/HStream/Exception.hs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ module HStream.Exception
5050
, InvalidQuerySql (InvalidQuerySql)
5151
, SQLNotSupportedByParseSQL(SQLNotSupportedByParseSQL)
5252
, ConflictShardReaderOffset (ConflictShardReaderOffset)
53+
, TooManyShardCount (TooManyShardCount)
5354

5455
-- * Exception: SomeDeadlineExceeded
5556
--
@@ -427,6 +428,7 @@ MAKE_EX_1_DEFMSG(SomeInvalidArgument, InvalidConnectorType, Text, API.ErrorCodeC
427428
MAKE_EX_1_DEFMSG(SomeInvalidArgument, SQLNotSupportedByParseSQL, Text, API.ErrorCodeInternalError)
428429
MAKE_EX_1_DEFMSG(SomeInvalidArgument, InvalidQuerySql, String, API.ErrorCodeQueryInvalidSQL)
429430
MAKE_EX_1_DEFMSG(SomeInvalidArgument, ConflictShardReaderOffset, String, API.ErrorCodeShardReaderConflictOffset)
431+
MAKE_EX_1_DEFMSG(SomeInvalidArgument, TooManyShardCount, String, API.ErrorCodeShardReaderTooManyShards)
430432

431433
invalidIdentifier :: API.ResourceType -> String -> InvalidObjectIdentifier
432434
invalidIdentifier API.ResourceTypeResStream = InvalidObjectIdentifier API.ErrorCodeStreamInvalidObjectIdentifier

external/protocol

hstream-store/HStream/Store/Stream.hs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ module HStream.Store.Stream
110110
, LD.readerSetWaitOnlyWhenNoData
111111
, LD.readerStopReading
112112
, LD.readerIsReadingAny
113+
, LD.readerIsReading
113114
-- ** Checkpointed Reader
114115
, FFI.LDSyncCkpReader
115116
, newLDFileCkpReader

hstream/src/HStream/Server/Core/ShardReader.hs

Lines changed: 242 additions & 62 deletions
Large diffs are not rendered by default.

hstream/src/HStream/Server/Handler.hs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,13 @@ handlers serverContext@ServerContext{..} =
5959
-- Shards
6060
hstreamApiListShards = listShardsHandler serverContext,
6161
-- Reader
62-
hstreamApiListShardReaders = listShardReadersHandler serverContext,
62+
hstreamApiListShardReaders = listShardReadersHandler serverContext,
6363
hstreamApiCreateShardReader = createShardReaderHandler serverContext,
6464
hstreamApiDeleteShardReader = deleteShardReaderHandler serverContext,
6565
hstreamApiReadShard = readShardHandler serverContext,
6666
hstreamApiReadShardStream = readShardStreamHandler serverContext,
67+
hstreamApiReadStream = readStreamHandler serverContext,
68+
hstreamApiReadSingleShardStream = readSingleShardStreamHandler serverContext,
6769

6870
-- Stats
6971
hstreamApiPerStreamTimeSeriesStats = H.perStreamTimeSeriesStats scStatsHolder,

hstream/src/HStream/Server/Handler/ShardReader.hs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,16 @@ module HStream.Server.Handler.ShardReader
1313
, readShardHandler
1414
, listShardReadersHandler
1515
, readShardStreamHandler
16+
, readStreamHandler
17+
, readSingleShardStreamHandler
1618
-- * For hs-grpc-server
1719
, handleListShardReaders
1820
, handleCreateShardReader
1921
, handleDeleteShardReader
2022
, handleReadShard
2123
, handleReadShardStream
24+
, handleReadStream
25+
, handleReadSingleShardStream
2226
)
2327
where
2428

@@ -122,3 +126,41 @@ handleReadShardStream
122126
handleReadShardStream sc _ req stream = catchDefaultEx $ do
123127
Log.debug $ "Receive read shard stream Request: " <> Log.build (show req)
124128
C.readShardStream sc req (G.streamWrite stream . Just)
129+
130+
readStreamHandler
131+
:: ServerContext
132+
-> ServerRequest 'ServerStreaming ReadStreamRequest ReadStreamResponse
133+
-> IO (ServerResponse 'ServerStreaming ReadStreamResponse)
134+
readStreamHandler sc (ServerWriterRequest _meta req streamSend) =
135+
defaultServerStreamExceptionHandle $ do
136+
Log.debug $ "Receive read shard stream Request: " <> Log.build (show req)
137+
C.readStream sc req streamWrite
138+
return $ ServerWriterResponse mempty StatusUnknown "should not reach here"
139+
where
140+
streamWrite x = first show <$> streamSend x
141+
142+
handleReadStream
143+
:: ServerContext
144+
-> G.ServerStreamHandler ReadStreamRequest ReadStreamResponse ()
145+
handleReadStream sc _ req stream = catchDefaultEx $ do
146+
Log.debug $ "Receive read shard stream Request: " <> Log.build (show req)
147+
C.readStream sc req (G.streamWrite stream . Just)
148+
149+
readSingleShardStreamHandler
150+
:: ServerContext
151+
-> ServerRequest 'ServerStreaming ReadSingleShardStreamRequest ReadSingleShardStreamResponse
152+
-> IO (ServerResponse 'ServerStreaming ReadSingleShardStreamResponse)
153+
readSingleShardStreamHandler sc (ServerWriterRequest _meta req streamSend) =
154+
defaultServerStreamExceptionHandle $ do
155+
Log.debug $ "Receive read single shard stream Request: " <> Log.build (show req)
156+
C.readSingleShardStream sc req streamWrite
157+
return $ ServerWriterResponse mempty StatusUnknown "should not reach here"
158+
where
159+
streamWrite x = first show <$> streamSend x
160+
161+
handleReadSingleShardStream
162+
:: ServerContext
163+
-> G.ServerStreamHandler ReadSingleShardStreamRequest ReadSingleShardStreamResponse ()
164+
handleReadSingleShardStream sc _ req stream = catchDefaultEx $ do
165+
Log.debug $ "Receive read single shard stream Request: " <> Log.build (show req)
166+
C.readSingleShardStream sc req (G.streamWrite stream . Just)

hstream/src/HStream/Server/HsGrpcHandler.hs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ handlers sc =
5555
-- Read
5656
, unary (GRPC :: GRPC P.HStreamApi "readShard") (H.handleReadShard sc)
5757
, serverStream (GRPC :: GRPC P.HStreamApi "readShardStream") (H.handleReadShardStream sc)
58+
, serverStream (GRPC :: GRPC P.HStreamApi "readStream") (H.handleReadStream sc)
59+
, serverStream (GRPC :: GRPC P.HStreamApi "readSingleShardStream") (H.handleReadSingleShardStream sc)
5860
-- Subscribe
5961
, bidiStream (GRPC :: GRPC P.HStreamApi "streamingFetch") (H.handleStreamingFetch sc)
6062
-- Stats

hstream/src/HStream/Server/Types.hs

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import Control.Concurrent (MVar, ThreadId)
1212
import Control.Concurrent.STM
1313
import Data.Aeson (FromJSON (..), ToJSON (..))
1414
import qualified Data.Aeson as Aeson
15+
import Data.HashMap.Strict (HashMap)
1516
import qualified Data.HashMap.Strict as HM
1617
import qualified Data.Heap as Heap
1718
import Data.Int (Int32, Int64)
@@ -255,14 +256,25 @@ instance TaskManager IO.Worker where
255256
recoverTask = IO.recoverTask
256257

257258
data ShardReader = ShardReader
258-
{ reader :: S.LDReader
259-
, totalBatches :: Maybe (IORef Word64)
260-
, startTs :: Maybe Int64
261-
, endTs :: Maybe Int64
259+
{ shardReader :: S.LDReader
260+
, shardReaderTotalBatches :: Maybe (IORef Word64)
261+
, shardReaderStartTs :: Maybe Int64
262+
, shardReaderEndTs :: Maybe Int64
263+
, targetShard :: S.C_LogID
262264
}
263265

264-
mkShardReader :: S.LDReader -> Maybe (IORef Word64) -> Maybe Int64 -> Maybe Int64 -> ShardReader
265-
mkShardReader reader totalBatches startTs endTs = ShardReader {..}
266+
mkShardReader :: S.LDReader -> S.C_LogID -> Maybe (IORef Word64) -> Maybe Int64 -> Maybe Int64 -> ShardReader
267+
mkShardReader shardReader targetShard shardReaderTotalBatches shardReaderStartTs shardReaderEndTs = ShardReader {..}
268+
269+
data StreamReader = StreamReader
270+
{ streamReader :: S.LDReader
271+
, streamReaderTotalBatches :: Maybe (IORef Word64)
272+
, streamReaderTsLimits :: HashMap S.C_LogID (Maybe Int64, Maybe Int64)
273+
-- ^ shardId -> (startTs, endTs)
274+
}
275+
276+
mkStreamReader :: S.LDReader -> Maybe (IORef Word64) -> HashMap S.C_LogID (Maybe Int64, Maybe Int64) -> StreamReader
277+
mkStreamReader streamReader streamReaderTotalBatches streamReaderTsLimits = StreamReader {..}
266278

267279
--------------------------------------------------------------------------------
268280

0 commit comments

Comments
 (0)