diff --git a/DOCUMENTATION.md b/DOCUMENTATION.md
index f9e88d709..bd5da8459 100755
--- a/DOCUMENTATION.md
+++ b/DOCUMENTATION.md
@@ -164,17 +164,17 @@ slightly differently:

-# Equinox.Cosmos
+# Equinox.CosmosStore
-## Container Diagram for `Equinox.Cosmos`
+## Container Diagram for `Equinox.CosmosStore`

-## Component Diagram for `Equinox.Cosmos`
+## Component Diagram for `Equinox.CosmosStore`

-## Code Diagrams for `Equinox.Cosmos`
+## Code Diagrams for `Equinox.CosmosStore`
This diagram walks through the basic sequence of operations, where:
- this node has not yet read this stream (i.e. there's nothing in the Cache)
@@ -1563,7 +1563,7 @@ having separate roundtrips obviously has implications).
This article provides a walkthrough of how `Equinox.Cosmos` encodes, writes and
reads records from a stream under its control.
-The code (see [source](src/Equinox.Cosmos/Cosmos.fs#L6)) contains lots of
+The code (see [source](src/Equinox.CosmosStore/CosmosStore.fs#L6)) contains lots of
comments and is intended to be read - this just provides some background.
## Batches
@@ -1830,21 +1830,19 @@ let gatewayLog =
outputLog.ForContext(Serilog.Core.Constants.SourceContextPropertyName, "Equinox")
// When starting the app, we connect (once)
-let connector : Equinox.Cosmos.Connector =
- Connector(
+let factory : Equinox.CosmosStore.CosmosStoreClientFactory =
+ CosmosStoreClientFactory(
requestTimeout = TimeSpan.FromSeconds 5.,
maxRetryAttemptsOnThrottledRequests = 1,
maxRetryWaitTimeInSeconds = 3,
log = gatewayLog)
-let cnx =
- connector.Connect("Application.CommandProcessor", Discovery.FromConnectionString connectionString)
- |> Async.RunSynchronously
+let client = factory.Create(Discovery.ConnectionString connectionString)
// If storing in a single collection, one specifies the db and collection
-// alternately use the overload that defers the mapping until the stream one is
-// writing to becomes clear
-let containerMap = Containers("databaseName", "containerName")
-let ctx = Context(cnx, containerMap, gatewayLog)
+// alternately use the overload that defers the mapping until the stream one is writing to becomes clear
+let connection = CosmosStoreConnection(client, "databaseName", "containerName")
+let storeContext = CosmosStoreContext(connection, "databaseName", "containerName")
+let ctx = EventsContext(storeContext, gatewayLog)
//
// Write an event
diff --git a/Directory.Build.props b/Directory.Build.props
index 9aafb25f4..8b7772af5 100644
--- a/Directory.Build.props
+++ b/Directory.Build.props
@@ -9,7 +9,7 @@
Copyright © 2016-20
- netcoreapp3.1;net461
+ netcoreapp3.1
netcoreapp3.1
$([System.IO.Path]::GetFullPath("$(MSBuildThisFileDirectory)"))
diff --git a/Equinox.sln b/Equinox.sln
index 80d4d4328..1be43e785 100644
--- a/Equinox.sln
+++ b/Equinox.sln
@@ -47,9 +47,9 @@ Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Equinox.MemoryStore.Integra
EndProject
Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Equinox.Tool", "tools\Equinox.Tool\Equinox.Tool.fsproj", "{C8992C1C-6DC5-42CD-A3D7-1C5663433FED}"
EndProject
-Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Equinox.Cosmos", "src\Equinox.Cosmos\Equinox.Cosmos.fsproj", "{54EA6187-9F9F-4D67-B602-163D011E43E6}"
+Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Equinox.CosmosStore", "src\Equinox.CosmosStore\Equinox.CosmosStore.fsproj", "{54EA6187-9F9F-4D67-B602-163D011E43E6}"
EndProject
-Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Equinox.Cosmos.Integration", "tests\Equinox.Cosmos.Integration\Equinox.Cosmos.Integration.fsproj", "{DE0FEBF0-72DC-4D4A-BBA7-788D875D6B4B}"
+Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Equinox.CosmosStore.Integration", "tests\Equinox.CosmosStore.Integration\Equinox.CosmosStore.Integration.fsproj", "{DE0FEBF0-72DC-4D4A-BBA7-788D875D6B4B}"
EndProject
Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "TodoBackend", "samples\TodoBackend\TodoBackend.fsproj", "{EC2EC658-3D85-44F3-AD2F-52AFCAFF8871}"
EndProject
diff --git a/README.md b/README.md
index f20d75451..bcf65dfb7 100644
--- a/README.md
+++ b/README.md
@@ -38,7 +38,7 @@ Some aspects of the implementation are distilled from [`Jet.com` systems dating
- support, (via the [`FsCodec.IEventCodec`](https://github.com/jet/FsCodec#IEventCodec)) for the maintenance of multiple co-existing compaction schemas for a given stream (A 'compaction' event/snapshot isa Event)
- compaction events typically do not get deleted (consistent with how EventStore works), although it is safe to do so in concept
- NB while this works well, and can deliver excellent performance (especially when allied with the Cache), [it's not a panacea, as noted in this excellent EventStore.org article on the topic](https://eventstore.org/docs/event-sourcing-basics/rolling-snapshots/index.html)
-- **`Equinox.Cosmos` 'Tip with Unfolds' schema**: (In contrast to `Equinox.EventStore`'s `AccessStrategy.RollingSnapshots`,) when using `Equinox.Cosmos`, optimized command processing is managed via the `Tip`; a document per stream with a well-known identity enabling Syncing the r/w Position via a single point-read by virtue of the fact that the document maintains:
+- **`Equinox.CosmosStore` 'Tip with Unfolds' schema**: (In contrast to `Equinox.EventStore`'s `AccessStrategy.RollingSnapshots`,) when using `Equinox.CosmosStore`, optimized command processing is managed via the `Tip`; a document per stream with a well-known identity enabling Syncing the r/w Position via a single point-read by virtue of the fact that the document maintains:
a) the present Position of the stream - i.e. the index at which the next events will be appended for a given stream (events and the Tip share a common logical partition key)
b) ephemeral (`deflate+base64` compressed) [_unfolds_](DOCUMENTATION.md#Cosmos-Storage-Model)
c) (optionally) a holding buffer for events since those unfolded events ([presently removed](https://github.com/jet/equinox/pull/58), but [should return](DOCUMENTATION.md#Roadmap), see [#109](https://github.com/jet/equinox/pull/109))
@@ -49,7 +49,7 @@ Some aspects of the implementation are distilled from [`Jet.com` systems dating
- no additional roundtrips to the store needed at either the Load or Sync points in the flow
It should be noted that from a querying perspective, the `Tip` shares the same structure as `Batch` documents (a potential future extension would be to carry some events in the `Tip` as [some interim versions of the implementation once did](https://github.com/jet/equinox/pull/58), see also [#109](https://github.com/jet/equinox/pull/109).
-- **`Equinox.Cosmos` `RollingState` and `Custom` 'non-event-sourced' modes**: Uses 'Tip with Unfolds' encoding to avoid having to write event documents at all - this enables one to build, reason about and test your aggregates in the normal manner, but inhibit event documents from being generated. This enables one to benefit from the caching and consistency management mechanisms without having to bear the cost of writing and storing the events themselves (and/or dealing with an ever-growing store size). Search for `transmute` or `RollingState` in the `samples` and/or see [the `Checkpoint` Aggregate in Propulsion](https://github.com/jet/propulsion/blob/master/src/Propulsion.EventStore/Checkpoint.fs). One chief use of this mechanism is for tracking Summary Event feeds in [the `dotnet-templates` `summaryConsumer` template](https://github.com/jet/dotnet-templates/tree/master/propulsion-summary-consumer).
+- **`Equinox.CosmosStore` `RollingState` and `Custom` 'non-event-sourced' modes**: Uses 'Tip with Unfolds' encoding to avoid having to write event documents at all - this enables one to build, reason about and test your aggregates in the normal manner, but inhibit event documents from being generated. This enables one to benefit from the caching and consistency management mechanisms without having to bear the cost of writing and storing the events themselves (and/or dealing with an ever-growing store size). Search for `transmute` or `RollingState` in the `samples` and/or see [the `Checkpoint` Aggregate in Propulsion](https://github.com/jet/propulsion/blob/master/src/Propulsion.EventStore/Checkpoint.fs). One chief use of this mechanism is for tracking Summary Event feeds in [the `dotnet-templates` `summaryConsumer` template](https://github.com/jet/dotnet-templates/tree/master/propulsion-summary-consumer).
## Components
@@ -77,7 +77,7 @@ The components within this repository are delivered as multi-targeted Nuget pack
- `Equinox.Core` [](https://www.nuget.org/packages/Equinox.Core/): Interfaces and helpers used in realizing the concrete Store implementations, together with the default [`System.Runtime.Caching.Cache`-based] `Cache` implementation . ([depends](https://www.fuget.org/packages/Equinox.Core) on `Equinox`, `System.Runtime.Caching`)
- `Equinox.MemoryStore` [](https://www.nuget.org/packages/Equinox.MemoryStore/): In-memory store for integration testing/performance baselining/providing out-of-the-box zero dependency storage for examples. ([depends](https://www.fuget.org/packages/Equinox.MemoryStore) on `Equinox.Core`, `FsCodec`)
- `Equinox.EventStore` [](https://www.nuget.org/packages/Equinox.EventStore/): Production-strength [EventStoreDB](https://eventstore.org/) Adapter instrumented to the degree necessitated by Jet's production monitoring requirements. ([depends](https://www.fuget.org/packages/Equinox.EventStore) on `Equinox.Core`, `EventStore.Client >= 20.6`, `FSharp.Control.AsyncSeq >= 2.0.23`)
-- `Equinox.Cosmos` [](https://www.nuget.org/packages/Equinox.Cosmos/): Production-strength Azure CosmosDB Adapter with integrated 'unfolds' feature, facilitating optimal read performance in terms of latency and RU costs, instrumented to the degree necessitated by Jet's production monitoring requirements. ([depends](https://www.fuget.org/packages/Equinox.Cosmos) on `Equinox.Core`, `Microsoft.Azure.Cosmos >= 3.9`, `FsCodec.NewtonsoftJson`, `FSharp.Control.AsyncSeq >= 2.0.23`)
+- `Equinox.CosmosStore` [](https://www.nuget.org/packages/Equinox.CosmosStore/): Production-strength Azure CosmosDB Adapter with integrated 'unfolds' feature, facilitating optimal read performance in terms of latency and RU costs, instrumented to the degree necessitated by Jet's production monitoring requirements. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore) on `Equinox.Core`, `Microsoft.Azure.Cosmos >= 3.9`, `FsCodec.NewtonsoftJson`, `FSharp.Control.AsyncSeq >= 2.0.23`)
- `Equinox.SqlStreamStore` [](https://www.nuget.org/packages/Equinox.SqlStreamStore/): Production-strength [SqlStreamStore](https://github.com/SQLStreamStore/SQLStreamStore) Adapter derived from `Equinox.EventStore` - provides core facilities (but does not connect to a specific database; see sibling `SqlStreamStore`.* packages). ([depends](https://www.fuget.org/packages/Equinox.SqlStreamStore) on `Equinox.Core`, `FsCodec`, `SqlStreamStore >= 1.2.0-beta.8`, `FSharp.Control.AsyncSeq`)
- `Equinox.SqlStreamStore.MsSql` [](https://www.nuget.org/packages/Equinox.SqlStreamStore.MsSql/): [SqlStreamStore.MsSql](https://sqlstreamstore.readthedocs.io/en/latest/sqlserver) Sql Server `Connector` implementation for `Equinox.SqlStreamStore` package). ([depends](https://www.fuget.org/packages/Equinox.SqlStreamStore.MsSql) on `Equinox.SqlStreamStore`, `SqlStreamStore.MsSql >= 1.2.0-beta.8`)
- `Equinox.SqlStreamStore.MySql` [](https://www.nuget.org/packages/Equinox.SqlStreamStore.MySql/): `SqlStreamStore.MySql` MySQL Í`Connector` implementation for `Equinox.SqlStreamStore` package). ([depends](https://www.fuget.org/packages/Equinox.SqlStreamStore.MySql) on `Equinox.SqlStreamStore`, `SqlStreamStore.MySql >= 1.2.0-beta.8`)
@@ -89,7 +89,7 @@ Equinox does not focus on projection logic or wrapping thereof - each store brin
- `FsKafka` [](https://www.nuget.org/packages/FsKafka/): Wraps `Confluent.Kafka` to provide efficient batched Kafka Producer and Consumer configurations, with basic logging instrumentation. Used in the [`propulsion project kafka`](https://github.com/jet/propulsion#dotnet-tool-provisioning--projections-test-tool) tool command; see [`dotnet new proProjector -k; dotnet new proConsumer` to generate a sample app](https://github.com/jet/dotnet-templates#propulsion-related) using it (see the `BatchedAsync` and `BatchedSync` modules in `Examples.fs`).
- `Propulsion` [](https://www.nuget.org/packages/Propulsion/): defines a canonical `Propulsion.Streams.StreamEvent` used to interop with `Propulsion.*` in processing pipelines for the `proProjector` and `proSync` templates in the [templates repo](https://github.com/jet/dotnet-templates), together with the `Ingestion`, `Streams`, `Progress` and `Parallel` modules that get composed into those processing pipelines. ([depends](https://www.fuget.org/packages/Propulsion) on `Serilog`)
-- `Propulsion.Cosmos` [](https://www.nuget.org/packages/Propulsion.Cosmos/): Wraps the [Microsoft .NET `ChangeFeedProcessor` library](https://github.com/Azure/azure-documentdb-changefeedprocessor-dotnet) providing a [processor loop](DOCUMENTATION.md#change-feed-processors) that maintains a continuous query loop per CosmosDb Physical Partition (Range) yielding new or updated documents (optionally unrolling events written by `Equinox.Cosmos` for processing or forwarding). Used in the [`propulsion project stats cosmos`](dotnet-tool-provisioning--benchmarking-tool) tool command; see [`dotnet new proProjector` to generate a sample app](#quickstart) using it. ([depends](https://www.fuget.org/packages/Propulsion.Cosmos) on `Equinox.Cosmos`, `Microsoft.Azure.DocumentDb.ChangeFeedProcessor >= 2.2.5`)
+- `Propulsion.Cosmos` [](https://www.nuget.org/packages/Propulsion.Cosmos/): Wraps the [Microsoft .NET `ChangeFeedProcessor` library](https://github.com/Azure/azure-documentdb-changefeedprocessor-dotnet) providing a [processor loop](DOCUMENTATION.md#change-feed-processors) that maintains a continuous query loop per CosmosDb Physical Partition (Range) yielding new or updated documents (optionally unrolling events written by `Equinox.CosmosStore` for processing or forwarding). Used in the [`propulsion project stats cosmos`](dotnet-tool-provisioning--benchmarking-tool) tool command; see [`dotnet new proProjector` to generate a sample app](#quickstart) using it. ([depends](https://www.fuget.org/packages/Propulsion.Cosmos) on `Equinox.Cosmos`, `Microsoft.Azure.DocumentDb.ChangeFeedProcessor >= 2.2.5`)
- `Propulsion.EventStore` [](https://www.nuget.org/packages/Propulsion.EventStore/) Used in the [`propulsion project es`](dotnet-tool-provisioning--benchmarking-tool) tool command; see [`dotnet new proSync` to generate a sample app](#quickstart) using it. ([depends](https://www.fuget.org/packages/Propulsion.EventStore) on `Equinox.EventStore`)
- `Propulsion.Kafka` [](https://www.nuget.org/packages/Propulsion.Kafka/): Provides a canonical `RenderedSpan` that can be used as a default format when projecting events via e.g. the Producer/Consumer pair in `dotnet new proProjector -k; dotnet new proConsumer`. ([depends](https://www.fuget.org/packages/Propulsion.Kafka) on `Newtonsoft.Json >= 11.0.2`, `Propulsion`, `FsKafka`)
diff --git a/build.proj b/build.proj
index 99309aa4b..0d5e2e31a 100644
--- a/build.proj
+++ b/build.proj
@@ -16,7 +16,7 @@
-
+
diff --git a/diagrams/context.puml b/diagrams/context.puml
index e58d1d8ea..0d9077c85 100644
--- a/diagrams/context.puml
+++ b/diagrams/context.puml
@@ -1,4 +1,4 @@
- @startuml
+@startuml
!includeurl https://raw.githubusercontent.com/skleanthous/C4-PlantumlSkin/master/build/output/c4.puml
title System Context Diagram for Equinox (+Propulsion)
diff --git a/samples/Infrastructure/Infrastructure.fsproj b/samples/Infrastructure/Infrastructure.fsproj
index d073373c8..9add04f11 100644
--- a/samples/Infrastructure/Infrastructure.fsproj
+++ b/samples/Infrastructure/Infrastructure.fsproj
@@ -19,7 +19,7 @@
-
+
@@ -35,7 +35,7 @@
-
+
\ No newline at end of file
diff --git a/samples/Infrastructure/Services.fs b/samples/Infrastructure/Services.fs
index 2ba19ba9b..831f8868f 100644
--- a/samples/Infrastructure/Services.fs
+++ b/samples/Infrastructure/Services.fs
@@ -1,20 +1,34 @@
module Samples.Infrastructure.Services
open Domain
+open FsCodec
open Microsoft.Extensions.DependencyInjection
open System
+open System.Text.Json
+
+[]
+type StreamCodec<'event, 'context> =
+ | JsonElementCodec of IEventCodec<'event, JsonElement, 'context>
+ | Utf8ArrayCodec of IEventCodec<'event, byte[], 'context>
type StreamResolver(storage) =
- member __.Resolve
- ( codec : FsCodec.IEventCodec<'event,byte[],_>,
+ member __.ResolveWithJsonElementCodec
+ ( codec : IEventCodec<'event, JsonElement, _>,
+ fold: ('state -> 'event seq -> 'state),
+ initial: 'state,
+ snapshot: (('event -> bool) * ('state -> 'event))) =
+ match storage with
+ | Storage.StorageConfig.Cosmos (store, caching, unfolds, _databaseId, _containerId) ->
+ let accessStrategy = if unfolds then Equinox.CosmosStore.AccessStrategy.Snapshot snapshot else Equinox.CosmosStore.AccessStrategy.Unoptimized
+ Equinox.CosmosStore.CosmosStoreCategory<'event,'state,_>(store, codec, fold, initial, caching, accessStrategy).Resolve
+ | _ -> failwith "Currently, only Cosmos can be used with a JsonElement codec."
+
+ member __.ResolveWithUtf8ArrayCodec
+ ( codec : IEventCodec<'event, byte[], _>,
fold: ('state -> 'event seq -> 'state),
initial: 'state,
snapshot: (('event -> bool) * ('state -> 'event))) =
match storage with
- | Storage.StorageConfig.Cosmos (gateway, caching, unfolds, databaseId, containerId) ->
- let store = Equinox.Cosmos.Context(gateway, databaseId, containerId)
- let accessStrategy = if unfolds then Equinox.Cosmos.AccessStrategy.Snapshot snapshot else Equinox.Cosmos.AccessStrategy.Unoptimized
- Equinox.Cosmos.Resolver<'event,'state,_>(store, codec, fold, initial, caching, accessStrategy).Resolve
| Storage.StorageConfig.Es (context, caching, unfolds) ->
let accessStrategy = if unfolds then Equinox.EventStore.AccessStrategy.RollingSnapshots snapshot |> Some else None
Equinox.EventStore.Resolver<'event,'state,_>(context, codec, fold, initial, ?caching = caching, ?access = accessStrategy).Resolve
@@ -23,6 +37,7 @@ type StreamResolver(storage) =
| Storage.StorageConfig.Sql (context, caching, unfolds) ->
let accessStrategy = if unfolds then Equinox.SqlStreamStore.AccessStrategy.RollingSnapshots snapshot |> Some else None
Equinox.SqlStreamStore.Resolver<'event,'state,_>(context, codec, fold, initial, ?caching = caching, ?access = accessStrategy).Resolve
+ | _ -> failwith "Only EventStore, Memory Store, and SQL Store can be used with a byte array codec."
type ServiceBuilder(storageConfig, handlerLog) =
let resolver = StreamResolver(storageConfig)
@@ -30,17 +45,29 @@ type ServiceBuilder(storageConfig, handlerLog) =
member __.CreateFavoritesService() =
let fold, initial = Favorites.Fold.fold, Favorites.Fold.initial
let snapshot = Favorites.Fold.isOrigin,Favorites.Fold.snapshot
- Backend.Favorites.create handlerLog (resolver.Resolve(Favorites.Events.codec,fold,initial,snapshot))
+
+ match storageConfig with
+ | Storage.StorageConfig.Cosmos _ -> resolver.ResolveWithJsonElementCodec(Favorites.Events.codecStj, fold, initial, snapshot)
+ | _ -> resolver.ResolveWithUtf8ArrayCodec(Favorites.Events.codecNewtonsoft, fold, initial, snapshot)
+ |> Backend.Favorites.create handlerLog
member __.CreateSaveForLaterService() =
let fold, initial = SavedForLater.Fold.fold, SavedForLater.Fold.initial
let snapshot = SavedForLater.Fold.isOrigin,SavedForLater.Fold.compact
- Backend.SavedForLater.create 50 handlerLog (resolver.Resolve(SavedForLater.Events.codec,fold,initial,snapshot))
+
+ match storageConfig with
+ | Storage.StorageConfig.Cosmos _ -> resolver.ResolveWithJsonElementCodec(SavedForLater.Events.codecStj,fold,initial,snapshot)
+ | _ -> resolver.ResolveWithUtf8ArrayCodec(SavedForLater.Events.codecNewtonsoft,fold,initial,snapshot)
+ |> Backend.SavedForLater.create 50 handlerLog
member __.CreateTodosService() =
let fold, initial = TodoBackend.Fold.fold, TodoBackend.Fold.initial
let snapshot = TodoBackend.Fold.isOrigin, TodoBackend.Fold.snapshot
- TodoBackend.create handlerLog (resolver.Resolve(TodoBackend.Events.codec,fold,initial,snapshot))
+
+ match storageConfig with
+ | Storage.StorageConfig.Cosmos _ -> resolver.ResolveWithJsonElementCodec(TodoBackend.Events.codecStj,fold,initial,snapshot)
+ | _ -> resolver.ResolveWithUtf8ArrayCodec(TodoBackend.Events.codecNewtonsoft,fold,initial,snapshot)
+ |> TodoBackend.create handlerLog
let register (services : IServiceCollection, storageConfig, handlerLog) =
let regF (factory : IServiceProvider -> 'T) = services.AddSingleton<'T>(fun (sp: IServiceProvider) -> factory sp) |> ignore
diff --git a/samples/Infrastructure/Storage.fs b/samples/Infrastructure/Storage.fs
index 35c7d21e1..08747b3c0 100644
--- a/samples/Infrastructure/Storage.fs
+++ b/samples/Infrastructure/Storage.fs
@@ -10,7 +10,7 @@ type StorageConfig =
// For MemoryStore, we keep the events as UTF8 arrays - we could use FsCodec.Codec.Box to remove the JSON encoding, which would improve perf but can conceal problems
| Memory of Equinox.MemoryStore.VolatileStore
| Es of Equinox.EventStore.Context * Equinox.EventStore.CachingStrategy option * unfolds: bool
- | Cosmos of Equinox.Cosmos.Gateway * Equinox.Cosmos.CachingStrategy * unfolds: bool * databaseId: string * containerId: string
+ | Cosmos of Equinox.CosmosStore.CosmosStoreContext * Equinox.CosmosStore.CachingStrategy * unfolds: bool * databaseId: string * containerId: string
| Sql of Equinox.SqlStreamStore.Context * Equinox.SqlStreamStore.CachingStrategy option * unfolds: bool
module MemoryStore =
@@ -35,7 +35,7 @@ module Cosmos =
type [] Arguments =
| [] VerboseStore
- | [] ConnectionMode of Microsoft.Azure.Cosmos.ConnectionMode
+ | [] ConnectionMode of Azure.Cosmos.ConnectionMode
| [] Timeout of float
| [] Retries of int
| [] RetriesWaitTimeS of float
@@ -54,7 +54,7 @@ module Cosmos =
| Database _ -> "specify a database name for store. (optional if environment variable EQUINOX_COSMOS_DATABASE specified)"
| Container _ -> "specify a container name for store. (optional if environment variable EQUINOX_COSMOS_CONTAINER specified)"
type Info(args : ParseResults) =
- member __.Mode = args.GetResult(ConnectionMode,Microsoft.Azure.Cosmos.ConnectionMode.Direct)
+ member __.Mode = args.GetResult(ConnectionMode,Azure.Cosmos.ConnectionMode.Direct)
member __.Connection = args.TryGetResult Connection |> defaultWithEnvVar "EQUINOX_COSMOS_CONNECTION" "Connection"
member __.Database = args.TryGetResult Database |> defaultWithEnvVar "EQUINOX_COSMOS_DATABASE" "Database"
member __.Container = args.TryGetResult Container |> defaultWithEnvVar "EQUINOX_COSMOS_CONTAINER" "Container"
@@ -67,22 +67,23 @@ module Cosmos =
/// 1) replace connection below with a connection string or Uri+Key for an initialized Equinox instance with a database and collection named "equinox-test"
/// 2) Set the 3x environment variables and create a local Equinox using tools/Equinox.Tool/bin/Release/net461/eqx.exe `
/// init -ru 1000 cosmos -s $env:EQUINOX_COSMOS_CONNECTION -d $env:EQUINOX_COSMOS_DATABASE -c $env:EQUINOX_COSMOS_CONTAINER
- open Equinox.Cosmos
+ open Equinox.CosmosStore
open Serilog
- let private createGateway connection maxItems = Gateway(connection, BatchingPolicy(defaultMaxItems=maxItems))
- let connection (log: ILogger, storeLog: ILogger) (a : Info) =
- let (Discovery.UriAndKey (endpointUri,_)) as discovery = a.Connection |> Discovery.FromConnectionString
+ let conn (log: ILogger) (a : Info) =
+ let discovery = Discovery.ConnectionString a.Connection
+ let client = CosmosStoreClientFactory(a.Timeout, a.Retries, a.MaxRetryWaitTime, mode=a.Mode).Create(discovery)
log.Information("CosmosDb {mode} {connection} Database {database} Container {container}",
- a.Mode, endpointUri, a.Database, a.Container)
+ a.Mode, client.Endpoint, a.Database, a.Container)
log.Information("CosmosDb timeout {timeout}s; Throttling retries {retries}, max wait {maxRetryWaitTime}s",
(let t = a.Timeout in t.TotalSeconds), a.Retries, let x = a.MaxRetryWaitTime in x.TotalSeconds)
- discovery, a.Database, a.Container, Connector(a.Timeout, a.Retries, a.MaxRetryWaitTime, log=storeLog, mode=a.Mode)
- let config (log: ILogger, storeLog) (cache, unfolds, batchSize) info =
- let discovery, dName, cName, connector = connection (log, storeLog) info
- let conn = connector.Connect(appName, discovery) |> Async.RunSynchronously
+ client, a.Database, a.Container
+ let config (log: ILogger) (cache, unfolds, batchSize) info =
+ let client, databaseId, containerId = conn log info
+ let conn = CosmosStoreConnection(client, databaseId, containerId)
+ let ctx = CosmosStoreContext(conn, defaultMaxItems = batchSize)
let cacheStrategy = match cache with Some c -> CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.) | None -> CachingStrategy.NoCaching
- StorageConfig.Cosmos (createGateway conn batchSize, cacheStrategy, unfolds, dName, cName)
+ StorageConfig.Cosmos (ctx, cacheStrategy, unfolds, databaseId, containerId)
/// To establish a local node to run the tests against:
/// 1. cinst eventstore-oss -y # where cinst is an invocation of the Chocolatey Package Installer on Windows
diff --git a/samples/Store/Backend/Backend.fsproj b/samples/Store/Backend/Backend.fsproj
index 8234b4a6c..0288582cb 100644
--- a/samples/Store/Backend/Backend.fsproj
+++ b/samples/Store/Backend/Backend.fsproj
@@ -1,7 +1,7 @@
- netstandard2.0;net461
+ netstandard2.1
5
false
true
@@ -18,13 +18,12 @@
-
+
-
-
+
\ No newline at end of file
diff --git a/samples/Store/Domain.Tests/Domain.Tests.fsproj b/samples/Store/Domain.Tests/Domain.Tests.fsproj
index 40240896f..d95a52a1c 100644
--- a/samples/Store/Domain.Tests/Domain.Tests.fsproj
+++ b/samples/Store/Domain.Tests/Domain.Tests.fsproj
@@ -21,6 +21,7 @@
+
all
diff --git a/samples/Store/Domain/Cart.fs b/samples/Store/Domain/Cart.fs
index 5f2d057c9..b4f9869eb 100644
--- a/samples/Store/Domain/Cart.fs
+++ b/samples/Store/Domain/Cart.fs
@@ -24,7 +24,9 @@ module Events =
| ItemQuantityChanged of ItemQuantityChangedInfo
| ItemPropertiesChanged of ItemPropertiesChangedInfo
interface TypeShape.UnionContract.IUnionContract
- let codec = FsCodec.NewtonsoftJson.Codec.Create()
+
+ let codecNewtonsoft = FsCodec.NewtonsoftJson.Codec.Create()
+ let codecStj options = FsCodec.SystemTextJson.Codec.Create(options = options)
module Fold =
diff --git a/samples/Store/Domain/ContactPreferences.fs b/samples/Store/Domain/ContactPreferences.fs
index a8c9e28b1..1f9443154 100644
--- a/samples/Store/Domain/ContactPreferences.fs
+++ b/samples/Store/Domain/ContactPreferences.fs
@@ -12,7 +12,9 @@ module Events =
type Event =
| []Updated of Value
interface TypeShape.UnionContract.IUnionContract
- let codec = FsCodec.NewtonsoftJson.Codec.Create()
+
+ let codecNewtonsoft = FsCodec.NewtonsoftJson.Codec.Create()
+ let codecStj options = FsCodec.SystemTextJson.Codec.Create(options = options)
module Fold =
diff --git a/samples/Store/Domain/Domain.fsproj b/samples/Store/Domain/Domain.fsproj
index f07326902..d04389ee2 100644
--- a/samples/Store/Domain/Domain.fsproj
+++ b/samples/Store/Domain/Domain.fsproj
@@ -1,7 +1,7 @@
- netstandard2.0;net461
+ netstandard2.1
5
false
true
@@ -18,10 +18,10 @@
-
-
+
+
\ No newline at end of file
diff --git a/samples/Store/Domain/Favorites.fs b/samples/Store/Domain/Favorites.fs
index a75606f1e..25bfa4756 100644
--- a/samples/Store/Domain/Favorites.fs
+++ b/samples/Store/Domain/Favorites.fs
@@ -14,7 +14,9 @@ module Events =
| Favorited of Favorited
| Unfavorited of Unfavorited
interface TypeShape.UnionContract.IUnionContract
- let codec = FsCodec.NewtonsoftJson.Codec.Create()
+
+ let codecNewtonsoft = FsCodec.NewtonsoftJson.Codec.Create()
+ let codecStj = FsCodec.SystemTextJson.Codec.Create()
module Fold =
diff --git a/samples/Store/Domain/SavedForLater.fs b/samples/Store/Domain/SavedForLater.fs
index f920139d4..49d3e1f3f 100644
--- a/samples/Store/Domain/SavedForLater.fs
+++ b/samples/Store/Domain/SavedForLater.fs
@@ -29,7 +29,9 @@ module Events =
/// Addition of a collection of skus to the list
| Added of Added
interface TypeShape.UnionContract.IUnionContract
- let codec = FsCodec.NewtonsoftJson.Codec.Create()
+
+ let codecNewtonsoft = FsCodec.NewtonsoftJson.Codec.Create()
+ let codecStj = FsCodec.SystemTextJson.Codec.Create()
module Fold =
open Events
diff --git a/samples/Store/Integration/CartIntegration.fs b/samples/Store/Integration/CartIntegration.fs
index 462d45300..8aaa33ff3 100644
--- a/samples/Store/Integration/CartIntegration.fs
+++ b/samples/Store/Integration/CartIntegration.fs
@@ -1,7 +1,7 @@
module Samples.Store.Integration.CartIntegration
open Equinox
-open Equinox.Cosmos.Integration
+open Equinox.CosmosStore.Integration
open Swensen.Unquote
#nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests)
@@ -11,19 +11,19 @@ let snapshot = Domain.Cart.Fold.isOrigin, Domain.Cart.Fold.snapshot
let createMemoryStore () = MemoryStore.VolatileStore()
let createServiceMemory log store =
- Backend.Cart.create log (fun (id,opt) -> MemoryStore.Resolver(store, Domain.Cart.Events.codec, fold, initial).Resolve(id,?option=opt))
-
-let codec = Domain.Cart.Events.codec
+ Backend.Cart.create log (fun (id,opt) -> MemoryStore.Resolver(store, Domain.Cart.Events.codecNewtonsoft, fold, initial).Resolve(id,?option=opt))
+let eventStoreCodec = Domain.Cart.Events.codecNewtonsoft
let resolveGesStreamWithRollingSnapshots gateway =
- fun (id,opt) -> EventStore.Resolver(gateway, codec, fold, initial, access = EventStore.AccessStrategy.RollingSnapshots snapshot).Resolve(id,?option=opt)
+ fun (id,opt) -> EventStore.Resolver(gateway, eventStoreCodec, fold, initial, access = EventStore.AccessStrategy.RollingSnapshots snapshot).Resolve(id,?option=opt)
let resolveGesStreamWithoutCustomAccessStrategy gateway =
- fun (id,opt) -> EventStore.Resolver(gateway, codec, fold, initial).Resolve(id,?option=opt)
+ fun (id,opt) -> EventStore.Resolver(gateway, eventStoreCodec, fold, initial).Resolve(id,?option=opt)
-let resolveCosmosStreamWithSnapshotStrategy gateway =
- fun (id,opt) -> Cosmos.Resolver(gateway, codec, fold, initial, Cosmos.CachingStrategy.NoCaching, Cosmos.AccessStrategy.Snapshot snapshot).Resolve(id,?option=opt)
-let resolveCosmosStreamWithoutCustomAccessStrategy gateway =
- fun (id,opt) -> Cosmos.Resolver(gateway, codec, fold, initial, Cosmos.CachingStrategy.NoCaching, Cosmos.AccessStrategy.Unoptimized).Resolve(id,?option=opt)
+let cosmosCodec = Domain.Cart.Events.codecStj (FsCodec.SystemTextJson.Options.Create())
+let resolveCosmosStreamWithSnapshotStrategy context =
+ fun (id,opt) -> CosmosStore.CosmosStoreCategory(context, cosmosCodec, fold, initial, CosmosStore.CachingStrategy.NoCaching, CosmosStore.AccessStrategy.Snapshot snapshot).Resolve(id,?option=opt)
+let resolveCosmosStreamWithoutCustomAccessStrategy context =
+ fun (id,opt) -> CosmosStore.CosmosStoreCategory(context, cosmosCodec, fold, initial, CosmosStore.CachingStrategy.NoCaching, CosmosStore.AccessStrategy.Unoptimized).Resolve(id,?option=opt)
let addAndThenRemoveItemsManyTimesExceptTheLastOne context cartId skuId (service: Backend.Cart.Service) count =
service.ExecuteManyAsync(cartId, false, seq {
@@ -50,7 +50,7 @@ type Tests(testOutputHelper) =
do! act service args
}
- let arrange connect choose resolve = async {
+ let arrangeEs connect choose resolve = async {
let log = createLog ()
let! conn = connect log
let gateway = choose conn defaultBatchSize
@@ -58,24 +58,29 @@ type Tests(testOutputHelper) =
[]
let ``Can roundtrip against EventStore, correctly folding the events without compaction semantics`` args = Async.RunSynchronously <| async {
- let! service = arrange connectToLocalEventStoreNode createGesGateway resolveGesStreamWithoutCustomAccessStrategy
+ let! service = arrangeEs connectToLocalEventStoreNode createGesGateway resolveGesStreamWithoutCustomAccessStrategy
do! act service args
}
[]
let ``Can roundtrip against EventStore, correctly folding the events with RollingSnapshots`` args = Async.RunSynchronously <| async {
- let! service = arrange connectToLocalEventStoreNode createGesGateway resolveGesStreamWithRollingSnapshots
+ let! service = arrangeEs connectToLocalEventStoreNode createGesGateway resolveGesStreamWithRollingSnapshots
do! act service args
}
+ let arrangeCosmos connect resolve =
+ let log = createLog ()
+ let ctx: CosmosStore.CosmosStoreContext = connect log defaultBatchSize
+ Backend.Cart.create log (resolve ctx)
+
[]
let ``Can roundtrip against Cosmos, correctly folding the events without custom access strategy`` args = Async.RunSynchronously <| async {
- let! service = arrange connectToSpecifiedCosmosOrSimulator createCosmosContext resolveCosmosStreamWithoutCustomAccessStrategy
+ let service = arrangeCosmos connectToSpecifiedCosmosOrSimulator resolveCosmosStreamWithoutCustomAccessStrategy
do! act service args
}
[]
let ``Can roundtrip against Cosmos, correctly folding the events with With Snapshotting`` args = Async.RunSynchronously <| async {
- let! service = arrange connectToSpecifiedCosmosOrSimulator createCosmosContext resolveCosmosStreamWithSnapshotStrategy
+ let service = arrangeCosmos connectToSpecifiedCosmosOrSimulator resolveCosmosStreamWithSnapshotStrategy
do! act service args
}
diff --git a/samples/Store/Integration/CodecIntegration.fs b/samples/Store/Integration/CodecIntegration.fs
index 057ae35ce..5572800c4 100644
--- a/samples/Store/Integration/CodecIntegration.fs
+++ b/samples/Store/Integration/CodecIntegration.fs
@@ -46,4 +46,4 @@ let ``Can roundtrip, rendering correctly`` (x: SimpleDu) =
render x =! if serialized.Data = null then null else System.Text.Encoding.UTF8.GetString(serialized.Data)
let adapted = FsCodec.Core.TimelineEvent.Create(-1L, serialized.EventType, serialized.Data)
let deserialized = codec.TryDecode adapted |> Option.get
- deserialized =! x
\ No newline at end of file
+ deserialized =! x
diff --git a/samples/Store/Integration/ContactPreferencesIntegration.fs b/samples/Store/Integration/ContactPreferencesIntegration.fs
index 178a6158e..8d0b10bde 100644
--- a/samples/Store/Integration/ContactPreferencesIntegration.fs
+++ b/samples/Store/Integration/ContactPreferencesIntegration.fs
@@ -1,7 +1,7 @@
module Samples.Store.Integration.ContactPreferencesIntegration
open Equinox
-open Equinox.Cosmos.Integration
+open Equinox.CosmosStore.Integration
open Swensen.Unquote
#nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests)
@@ -12,19 +12,20 @@ let createMemoryStore () = MemoryStore.VolatileStore<_>()
let createServiceMemory log store =
Backend.ContactPreferences.create log (MemoryStore.Resolver(store, FsCodec.Box.Codec.Create(), fold, initial).Resolve)
-let codec = Domain.ContactPreferences.Events.codec
+let eventStoreCodec = Domain.ContactPreferences.Events.codecNewtonsoft
let resolveStreamGesWithOptimizedStorageSemantics gateway =
- EventStore.Resolver(gateway 1, codec, fold, initial, access = EventStore.AccessStrategy.LatestKnownEvent).Resolve
+ EventStore.Resolver(gateway 1, eventStoreCodec, fold, initial, access = EventStore.AccessStrategy.LatestKnownEvent).Resolve
let resolveStreamGesWithoutAccessStrategy gateway =
- EventStore.Resolver(gateway defaultBatchSize, codec, fold, initial).Resolve
+ EventStore.Resolver(gateway defaultBatchSize, eventStoreCodec, fold, initial).Resolve
-let resolveStreamCosmosWithLatestKnownEventSemantics gateway =
- Cosmos.Resolver(gateway 1, codec, fold, initial, Cosmos.CachingStrategy.NoCaching, Cosmos.AccessStrategy.LatestKnownEvent).Resolve
-let resolveStreamCosmosUnoptimized gateway =
- Cosmos.Resolver(gateway defaultBatchSize, codec, fold, initial, Cosmos.CachingStrategy.NoCaching, Cosmos.AccessStrategy.Unoptimized).Resolve
-let resolveStreamCosmosRollingUnfolds gateway =
- let access = Cosmos.AccessStrategy.Custom(Domain.ContactPreferences.Fold.isOrigin, Domain.ContactPreferences.Fold.transmute)
- Cosmos.Resolver(gateway defaultBatchSize, codec, fold, initial, Cosmos.CachingStrategy.NoCaching, access).Resolve
+let cosmosCodec = Domain.ContactPreferences.Events.codecStj (FsCodec.SystemTextJson.Options.Create())
+let resolveStreamCosmosWithLatestKnownEventSemantics context =
+ CosmosStore.CosmosStoreCategory(context, cosmosCodec, fold, initial, CosmosStore.CachingStrategy.NoCaching, CosmosStore.AccessStrategy.LatestKnownEvent).Resolve
+let resolveStreamCosmosUnoptimized context =
+ CosmosStore.CosmosStoreCategory(context, cosmosCodec, fold, initial, CosmosStore.CachingStrategy.NoCaching, CosmosStore.AccessStrategy.Unoptimized).Resolve
+let resolveStreamCosmosRollingUnfolds context =
+ let access = CosmosStore.AccessStrategy.Custom(Domain.ContactPreferences.Fold.isOrigin, Domain.ContactPreferences.Fold.transmute)
+ CosmosStore.CosmosStoreCategory(context, cosmosCodec, fold, initial, CosmosStore.CachingStrategy.NoCaching, access).Resolve
type Tests(testOutputHelper) =
let testOutput = TestOutputAdapter testOutputHelper
@@ -43,7 +44,7 @@ type Tests(testOutputHelper) =
do! act service args
}
- let arrange connect choose resolve = async {
+ let arrangeEs connect choose resolve = async {
let log = createLog ()
let! conn = connect log
let gateway = choose conn
@@ -51,30 +52,35 @@ type Tests(testOutputHelper) =
[]
let ``Can roundtrip against EventStore, correctly folding the events with normal semantics`` args = Async.RunSynchronously <| async {
- let! service = arrange connectToLocalEventStoreNode createGesGateway resolveStreamGesWithoutAccessStrategy
+ let! service = arrangeEs connectToLocalEventStoreNode createGesGateway resolveStreamGesWithoutAccessStrategy
do! act service args
}
[]
let ``Can roundtrip against EventStore, correctly folding the events with compaction semantics`` args = Async.RunSynchronously <| async {
- let! service = arrange connectToLocalEventStoreNode createGesGateway resolveStreamGesWithOptimizedStorageSemantics
+ let! service = arrangeEs connectToLocalEventStoreNode createGesGateway resolveStreamGesWithOptimizedStorageSemantics
do! act service args
}
+ let arrangeCosmos connect resolve batchSize = async {
+ let log = createLog ()
+ let ctx: CosmosStore.CosmosStoreContext = connect log batchSize
+ return Backend.ContactPreferences.create log (resolve ctx) }
+
[]
let ``Can roundtrip against Cosmos, correctly folding the events with Unoptimized semantics`` args = Async.RunSynchronously <| async {
- let! service = arrange connectToSpecifiedCosmosOrSimulator createCosmosContext resolveStreamCosmosUnoptimized
+ let! service = arrangeCosmos connectToSpecifiedCosmosOrSimulator resolveStreamCosmosUnoptimized defaultBatchSize
do! act service args
}
[]
let ``Can roundtrip against Cosmos, correctly folding the events with LatestKnownEvent semantics`` args = Async.RunSynchronously <| async {
- let! service = arrange connectToSpecifiedCosmosOrSimulator createCosmosContext resolveStreamCosmosWithLatestKnownEventSemantics
+ let! service = arrangeCosmos connectToSpecifiedCosmosOrSimulator resolveStreamCosmosWithLatestKnownEventSemantics 1
do! act service args
}
[]
let ``Can roundtrip against Cosmos, correctly folding the events with RollingUnfold semantics`` args = Async.RunSynchronously <| async {
- let! service = arrange connectToSpecifiedCosmosOrSimulator createCosmosContext resolveStreamCosmosRollingUnfolds
+ let! service = arrangeCosmos connectToSpecifiedCosmosOrSimulator resolveStreamCosmosRollingUnfolds defaultBatchSize
do! act service args
}
diff --git a/samples/Store/Integration/FavoritesIntegration.fs b/samples/Store/Integration/FavoritesIntegration.fs
index da93ce82f..471e1b113 100644
--- a/samples/Store/Integration/FavoritesIntegration.fs
+++ b/samples/Store/Integration/FavoritesIntegration.fs
@@ -1,7 +1,7 @@
module Samples.Store.Integration.FavoritesIntegration
open Equinox
-open Equinox.Cosmos.Integration
+open Equinox.CosmosStore.Integration
open Swensen.Unquote
#nowarn "1182" // From hereon in, we may have some 'unused' privates (the tests)
@@ -13,18 +13,19 @@ let createMemoryStore () = MemoryStore.VolatileStore<_>()
let createServiceMemory log store =
Backend.Favorites.create log (MemoryStore.Resolver(store, FsCodec.Box.Codec.Create(), fold, initial).Resolve)
-let codec = Domain.Favorites.Events.codec
-let createServiceGes gateway log =
- let resolver = EventStore.Resolver(gateway, codec, fold, initial, access = EventStore.AccessStrategy.RollingSnapshots snapshot)
+let eventStoreCodec = Domain.Favorites.Events.codecNewtonsoft
+let createServiceGes context log =
+ let resolver = EventStore.Resolver(context, eventStoreCodec, fold, initial, access = EventStore.AccessStrategy.RollingSnapshots snapshot)
Backend.Favorites.create log resolver.Resolve
-let createServiceCosmos gateway log =
- let resolver = Cosmos.Resolver(gateway, codec, fold, initial, Cosmos.CachingStrategy.NoCaching, Cosmos.AccessStrategy.Snapshot snapshot)
+let cosmosCodec = Domain.Favorites.Events.codecStj
+let createServiceCosmos context log =
+ let resolver = CosmosStore.CosmosStoreCategory(context, cosmosCodec, fold, initial, CosmosStore.CachingStrategy.NoCaching, CosmosStore.AccessStrategy.Snapshot snapshot)
Backend.Favorites.create log resolver.Resolve
-let createServiceCosmosRollingState gateway log =
- let access = Cosmos.AccessStrategy.RollingState Domain.Favorites.Fold.snapshot
- let resolver = Cosmos.Resolver(gateway, codec, fold, initial, Cosmos.CachingStrategy.NoCaching, access)
+let createServiceCosmosRollingState context log =
+ let access = CosmosStore.AccessStrategy.RollingState Domain.Favorites.Fold.snapshot
+ let resolver = CosmosStore.CosmosStoreCategory(context, cosmosCodec, fold, initial, CosmosStore.CachingStrategy.NoCaching, access)
Backend.Favorites.create log resolver.Resolve
type Tests(testOutputHelper) =
@@ -60,17 +61,15 @@ type Tests(testOutputHelper) =
[]
let ``Can roundtrip against Cosmos, correctly folding the events`` args = Async.RunSynchronously <| async {
let log = createLog ()
- let! conn = connectToSpecifiedCosmosOrSimulator log
- let gateway = createCosmosContext conn defaultBatchSize
- let service = createServiceCosmos gateway log
+ let store = connectToSpecifiedCosmosOrSimulator log defaultBatchSize
+ let service = createServiceCosmos store log
do! act service args
}
[]
let ``Can roundtrip against Cosmos, correctly folding the events with rolling unfolds`` args = Async.RunSynchronously <| async {
let log = createLog ()
- let! conn = connectToSpecifiedCosmosOrSimulator log
- let gateway = createCosmosContext conn defaultBatchSize
- let service = createServiceCosmosRollingState gateway log
+ let store = connectToSpecifiedCosmosOrSimulator log defaultBatchSize
+ let service = createServiceCosmosRollingState store log
do! act service args
}
diff --git a/samples/Store/Integration/Integration.fsproj b/samples/Store/Integration/Integration.fsproj
index 6c0d14cfe..d29db5e2b 100644
--- a/samples/Store/Integration/Integration.fsproj
+++ b/samples/Store/Integration/Integration.fsproj
@@ -18,11 +18,11 @@
-
+
-
+
diff --git a/samples/Store/Integration/LogIntegration.fs b/samples/Store/Integration/LogIntegration.fs
index c4bf13efb..689141644 100644
--- a/samples/Store/Integration/LogIntegration.fs
+++ b/samples/Store/Integration/LogIntegration.fs
@@ -1,7 +1,7 @@
module Samples.Store.Integration.LogIntegration
open Equinox.Core
-open Equinox.Cosmos.Integration
+open Equinox.CosmosStore.Integration
open FSharp.UMX
open Swensen.Unquote
open System
@@ -23,7 +23,7 @@ module EquinoxEsInterop =
| Log.Batch (Direction.Backward,c,m) -> "LoadB", m, Some c
{ action = action; stream = metric.stream; interval = metric.interval; bytes = metric.bytes; count = metric.count; batches = batches }
module EquinoxCosmosInterop =
- open Equinox.Cosmos.Store
+ open Equinox.CosmosStore.Core
[]
type FlatMetric = { action: string; stream : string; interval: StopwatchInterval; bytes: int; count: int; responses: int option; ru: float } with
override __.ToString() = sprintf "%s-Stream=%s %s-Elapsed=%O Ru=%O" __.action __.stream __.action __.interval.Elapsed __.ru
@@ -65,7 +65,7 @@ type SerilogMetricsExtractor(emit : string -> unit) =
logEvent.Properties
|> Seq.tryPick (function
| KeyValue (k, SerilogScalar (:? Equinox.EventStore.Log.Event as m)) -> Some <| Choice1Of3 (k,m)
- | KeyValue (k, SerilogScalar (:? Equinox.Cosmos.Store.Log.Event as m)) -> Some <| Choice2Of3 (k,m)
+ | KeyValue (k, SerilogScalar (:? Equinox.CosmosStore.Core.Log.Event as m)) -> Some <| Choice2Of3 (k,m)
| _ -> None)
|> Option.defaultValue (Choice3Of3 ())
let handleLogEvent logEvent =
@@ -125,9 +125,8 @@ type Tests() =
let batchSize = defaultBatchSize
let buffer = ConcurrentQueue()
let log = createLoggerWithMetricsExtraction buffer.Enqueue
- let! conn = connectToSpecifiedCosmosOrSimulator log
- let gateway = createCosmosContext conn batchSize
- let service = Backend.Cart.create log (CartIntegration.resolveCosmosStreamWithSnapshotStrategy gateway)
+ let store = connectToSpecifiedCosmosOrSimulator log batchSize
+ let service = Backend.Cart.create log (CartIntegration.resolveCosmosStreamWithSnapshotStrategy store)
let itemCount = batchSize / 2 + 1
let cartId = % Guid.NewGuid()
do! act buffer service itemCount context cartId skuId "EqxCosmos Tip " // one is a 404, one is a 200
diff --git a/samples/TodoBackend/Todo.fs b/samples/TodoBackend/Todo.fs
index 6e8f1c90a..bc1e94096 100644
--- a/samples/TodoBackend/Todo.fs
+++ b/samples/TodoBackend/Todo.fs
@@ -20,7 +20,9 @@ module Events =
| Cleared
| Snapshotted of Snapshotted
interface TypeShape.UnionContract.IUnionContract
- let codec = FsCodec.NewtonsoftJson.Codec.Create()
+
+ let codecNewtonsoft = FsCodec.NewtonsoftJson.Codec.Create()
+ let codecStj = FsCodec.SystemTextJson.Codec.Create()
module Fold =
type State = { items : Events.Todo list; nextId : int }
diff --git a/samples/TodoBackend/TodoBackend.fsproj b/samples/TodoBackend/TodoBackend.fsproj
index 42dc7a64a..c8646fa1e 100644
--- a/samples/TodoBackend/TodoBackend.fsproj
+++ b/samples/TodoBackend/TodoBackend.fsproj
@@ -1,7 +1,7 @@
- netstandard2.0;net461
+ netstandard2.1
5
false
true
@@ -13,8 +13,7 @@
-
-
+
diff --git a/samples/Tutorial/AsAt.fsx b/samples/Tutorial/AsAt.fsx
index 691ddeba7..b316d79d8 100644
--- a/samples/Tutorial/AsAt.fsx
+++ b/samples/Tutorial/AsAt.fsx
@@ -26,14 +26,14 @@
#r "Equinox.dll"
#r "TypeShape.dll"
#r "FsCodec.NewtonsoftJson.dll"
+#r "FsCodec.SystemTextJson.dll"
#r "FSharp.Control.AsyncSeq.dll"
#r "System.Net.Http"
#r "Serilog.Sinks.Seq.dll"
#r "Eventstore.ClientAPI.dll"
#r "Equinox.EventStore.dll"
-#r "Microsoft.Azure.Cosmos.Direct.dll"
-#r "Microsoft.Azure.Cosmos.Client.dll"
-#r "Equinox.Cosmos.dll"
+#r "Azure.Cosmos.dll"
+#r "Equinox.CosmosStore.dll"
open System
@@ -52,15 +52,16 @@ module Events =
// unlike most Aggregates, knowing the Event's index is critical - for this reason, we always propagate that index alongside the event body
type Event = int64 * Contract
+ // our upconversion function doesn't actually fit the term - it just tuples the underlying event
+ let up (evt : FsCodec.ITimelineEvent<_>,e) : Event =
+ evt.Index,e
+ // as per the `up`, the downConverter needs to drop the index (which is only there for symmetry), add null metadata
+ let down (_index,e) : Contract * _ option * DateTimeOffset option =
+ e,None,None
+
// unlike most normal codecs, we have a mapping to supply as we want the Index to be added to each event so we can track it in the State as we fold
- let codec =
- // our upconversion function doesn't actually fit the term - it just tuples the underlying event
- let up (evt : FsCodec.ITimelineEvent<_>,e) : Event =
- evt.Index,e
- // as per the `up`, the downConverter needs to drop the index (which is only there for symmetry), add null metadata
- let down (_index,e) : Contract * _ option * DateTimeOffset option =
- e,None,None
- FsCodec.NewtonsoftJson.Codec.Create(up,down)
+ let codec = FsCodec.NewtonsoftJson.Codec.Create(up,down)
+ let codecStj = FsCodec.SystemTextJson.Codec.Create(up,down)
module Fold =
@@ -124,19 +125,21 @@ module Log =
let c = LoggerConfiguration()
let c = if verbose then c.MinimumLevel.Debug() else c
let c = c.WriteTo.Sink(Equinox.EventStore.Log.InternalMetrics.Stats.LogSink()) // to power Log.InternalMetrics.dump
- let c = c.WriteTo.Sink(Equinox.Cosmos.Store.Log.InternalMetrics.Stats.LogSink()) // to power Log.InternalMetrics.dump
+ let c = c.WriteTo.Sink(Equinox.CosmosStore.Core.Log.InternalMetrics.Stats.LogSink()) // to power Log.InternalMetrics.dump
let c = c.WriteTo.Seq("http://localhost:5341") // https://getseq.net
let c = c.WriteTo.Console(if verbose then LogEventLevel.Debug else LogEventLevel.Information)
c.CreateLogger()
let dumpMetrics () =
- Equinox.Cosmos.Store.Log.InternalMetrics.dump log
+ Equinox.CosmosStore.Core.Log.InternalMetrics.dump log
Equinox.EventStore.Log.InternalMetrics.dump log
let [] appName = "equinox-tutorial"
let cache = Equinox.Cache(appName, 20)
module EventStore =
+
open Equinox.EventStore
+
let snapshotWindow = 500
// see QuickStart for how to run a local instance in a mode that emulates the behavior of a cluster
let (host,username,password) = "localhost", "admin", "changeit"
@@ -153,16 +156,18 @@ module EventStore =
let resolve id = Equinox.Stream(Log.log, resolver.Resolve(streamName id), maxAttempts = 3)
module Cosmos =
- open Equinox.Cosmos
- let read key = System.Environment.GetEnvironmentVariable key |> Option.ofObj |> Option.get
- let connector = Connector(TimeSpan.FromSeconds 5., 2, TimeSpan.FromSeconds 5., log=Log.log, mode=Microsoft.Azure.Cosmos.ConnectionMode.Gateway)
- let conn = connector.Connect(appName, Discovery.FromConnectionString (read "EQUINOX_COSMOS_CONNECTION")) |> Async.RunSynchronously
- let context = Context(conn, read "EQUINOX_COSMOS_DATABASE", read "EQUINOX_COSMOS_CONTAINER")
+ open Equinox.CosmosStore
+
+ let read key = System.Environment.GetEnvironmentVariable key |> Option.ofObj |> Option.get
+ let factory = CosmosStoreClientFactory(TimeSpan.FromSeconds 5., 2, TimeSpan.FromSeconds 5., mode=Azure.Cosmos.ConnectionMode.Gateway)
+ let client = factory.Create(Discovery.ConnectionString (read "EQUINOX_COSMOS_CONNECTION"))
+ let conn = CosmosStoreConnection(client, read "EQUINOX_COSMOS_DATABASE", read "EQUINOX_COSMOS_CONTAINER")
+ let context = CosmosStoreContext(conn)
let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching
let accessStrategy = AccessStrategy.Snapshot (Fold.isValid,Fold.snapshot)
- let resolver = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
- let resolve id = Equinox.Stream(Log.log, resolver.Resolve(streamName id), maxAttempts = 3)
+ let category = CosmosStoreCategory(context, Events.codecStj, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
+ let resolve id = Equinox.Stream(Log.log, category.Resolve(streamName id), maxAttempts = 3)
let serviceES = Service(EventStore.resolve)
let serviceCosmos = Service(Cosmos.resolve)
diff --git a/samples/Tutorial/Cosmos.fsx b/samples/Tutorial/Cosmos.fsx
index 8c9b7942f..47e3abfd8 100644
--- a/samples/Tutorial/Cosmos.fsx
+++ b/samples/Tutorial/Cosmos.fsx
@@ -6,18 +6,17 @@
#I "bin/Debug/netstandard2.1/"
#r "Serilog.dll"
#r "Serilog.Sinks.Console.dll"
-#r "Newtonsoft.Json.dll"
#r "TypeShape.dll"
#r "Equinox.dll"
#r "Equinox.Core.dll"
#r "FSharp.UMX.dll"
#r "FsCodec.dll"
-#r "FsCodec.NewtonsoftJson.dll"
+#r "FsCodec.SystemTextJson.dll"
#r "FSharp.Control.AsyncSeq.dll"
-#r "Microsoft.Azure.Cosmos.Client.dll"
+#r "Azure.Cosmos.dll"
#r "System.Net.Http"
#r "Serilog.Sinks.Seq.dll"
-#r "Equinox.Cosmos.dll"
+#r "Equinox.CosmosStore.dll"
module Log =
@@ -27,11 +26,11 @@ module Log =
let log =
let c = LoggerConfiguration()
let c = if verbose then c.MinimumLevel.Debug() else c
- let c = c.WriteTo.Sink(Equinox.Cosmos.Store.Log.InternalMetrics.Stats.LogSink()) // to power Log.InternalMetrics.dump
+ let c = c.WriteTo.Sink(Equinox.CosmosStore.Core.Log.InternalMetrics.Stats.LogSink()) // to power Log.InternalMetrics.dump
let c = c.WriteTo.Seq("http://localhost:5341") // https://getseq.net
let c = c.WriteTo.Console(if verbose then LogEventLevel.Debug else LogEventLevel.Information)
c.CreateLogger()
- let dumpMetrics () = Equinox.Cosmos.Store.Log.InternalMetrics.dump log
+ let dumpMetrics () = Equinox.CosmosStore.Core.Log.InternalMetrics.dump log
module Favorites =
@@ -45,7 +44,7 @@ module Favorites =
| Added of Item
| Removed of Item
interface TypeShape.UnionContract.IUnionContract
- let codec = FsCodec.NewtonsoftJson.Codec.Create() // Coming soon, replace Newtonsoft with SystemTextJson and works same
+ let codec = FsCodec.SystemTextJson.Codec.Create() // Coming soon, replace Newtonsoft with SystemTextJson and works same
module Fold =
@@ -82,21 +81,24 @@ module Favorites =
module Cosmos =
- open Equinox.Cosmos // Everything outside of this module is completely storage agnostic so can be unit tested simply and/or bound to any store
+ open Equinox.CosmosStore // Everything outside of this module is completely storage agnostic so can be unit tested simply and/or bound to any store
let accessStrategy = AccessStrategy.Unoptimized // Or Snapshot etc https://github.com/jet/equinox/blob/master/DOCUMENTATION.md#access-strategies
let create (context, cache) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching
- let resolver = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
- create resolver.Resolve
+ let category = CosmosStoreCategory(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
+ create category.Resolve
let [] appName = "equinox-tutorial"
module Store =
- let read key = System.Environment.GetEnvironmentVariable key |> Option.ofObj |> Option.get
- let connector = Equinox.Cosmos.Connector(System.TimeSpan.FromSeconds 5., 2, System.TimeSpan.FromSeconds 5., log=Log.log)
- let conn = connector.Connect(appName, Equinox.Cosmos.Discovery.FromConnectionString (read "EQUINOX_COSMOS_CONNECTION")) |> Async.RunSynchronously
- let createContext () = Equinox.Cosmos.Context(conn, read "EQUINOX_COSMOS_DATABASE", read "EQUINOX_COSMOS_CONTAINER")
+ open Equinox.CosmosStore
+
+ let read key = System.Environment.GetEnvironmentVariable key |> Option.ofObj |> Option.get
+ let factory = Equinox.CosmosStore.CosmosStoreClientFactory(System.TimeSpan.FromSeconds 5., 2, System.TimeSpan.FromSeconds 5.)
+ let client = factory.Create(Discovery.ConnectionString (read "EQUINOX_COSMOS_CONNECTION"))
+ let conn = CosmosStoreConnection(client, read "EQUINOX_COSMOS_DATABASE", read "EQUINOX_COSMOS_CONTAINER")
+ let createContext () = CosmosStoreContext(conn)
let context = Store.createContext ()
let cache = Equinox.Cache(appName, 20)
diff --git a/samples/Tutorial/FulfilmentCenter.fsx b/samples/Tutorial/FulfilmentCenter.fsx
index 18972d4f7..c9c6520d2 100644
--- a/samples/Tutorial/FulfilmentCenter.fsx
+++ b/samples/Tutorial/FulfilmentCenter.fsx
@@ -1,17 +1,16 @@
#I "bin/Debug/netstandard2.1/"
#r "Serilog.dll"
#r "Serilog.Sinks.Console.dll"
-#r "Newtonsoft.Json.dll"
#r "TypeShape.dll"
#r "Equinox.dll"
#r "Equinox.Core.dll"
#r "FSharp.UMX.dll"
#r "FSCodec.dll"
-#r "FsCodec.NewtonsoftJson.dll"
-#r "Microsoft.Azure.Cosmos.Client.dll"
+#r "FsCodec.SystemTextJson.dll"
+#r "Azure.Cosmos.dll"
#r "System.Net.Http"
#r "Serilog.Sinks.Seq.dll"
-#r "Equinox.Cosmos.dll"
+#r "Equinox.CosmosStore.dll"
open FSharp.UMX
@@ -54,7 +53,7 @@ module FulfilmentCenter =
| FcDetailsChanged of FcData
| FcRenamed of FcName
interface TypeShape.UnionContract.IUnionContract
- let codec = FsCodec.NewtonsoftJson.Codec.Create()
+ let codec = FsCodec.SystemTextJson.Codec.Create()
module Fold =
@@ -103,7 +102,7 @@ module FulfilmentCenter =
member __.Read id : Async = read id
member __.QueryWithVersion(id, render : Fold.State -> 'res) : Async = queryEx id render
-open Equinox.Cosmos
+open Equinox.CosmosStore
open System
module Log =
@@ -114,27 +113,27 @@ module Log =
let log =
let c = LoggerConfiguration()
let c = if verbose then c.MinimumLevel.Debug() else c
- let c = c.WriteTo.Sink(Store.Log.InternalMetrics.Stats.LogSink()) // to power Log.InternalMetrics.dump
+ let c = c.WriteTo.Sink(Core.Log.InternalMetrics.Stats.LogSink()) // to power Log.InternalMetrics.dump
let c = c.WriteTo.Seq("http://localhost:5341") // https://getseq.net
let c = c.WriteTo.Console(if verbose then LogEventLevel.Debug else LogEventLevel.Information)
c.CreateLogger()
- let dumpMetrics () = Store.Log.InternalMetrics.dump log
+ let dumpMetrics () = Core.Log.InternalMetrics.dump log
module Store =
let read key = Environment.GetEnvironmentVariable key |> Option.ofObj |> Option.get
let appName = "equinox-tutorial"
- let connector = Connector(TimeSpan.FromSeconds 5., 2, TimeSpan.FromSeconds 5., log=Log.log)
- let conn = connector.Connect(appName, Discovery.FromConnectionString (read "EQUINOX_COSMOS_CONNECTION")) |> Async.RunSynchronously
- let gateway = Gateway(conn, BatchingPolicy())
- let context = Context(gateway, read "EQUINOX_COSMOS_DATABASE", read "EQUINOX_COSMOS_CONTAINER")
+ let factory = CosmosStoreClientFactory(TimeSpan.FromSeconds 5., 2, TimeSpan.FromSeconds 5., mode=Azure.Cosmos.ConnectionMode.Gateway)
+ let client = factory.Create(Discovery.ConnectionString (read "EQUINOX_COSMOS_CONNECTION"))
+ let conn = CosmosStoreConnection(client, read "EQUINOX_COSMOS_DATABASE", read "EQUINOX_COSMOS_CONTAINER")
+ let context = CosmosStoreContext(conn)
let cache = Equinox.Cache(appName, 20)
let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching
open FulfilmentCenter
-let resolver = Resolver(Store.context, Events.codec, Fold.fold, Fold.initial, Store.cacheStrategy, AccessStrategy.Unoptimized)
-let resolve id = Equinox.Stream(Log.log, resolver.Resolve(streamName id), maxAttempts = 3)
+let category = CosmosStoreCategory(Store.context, Events.codec, Fold.fold, Fold.initial, Store.cacheStrategy, AccessStrategy.Unoptimized)
+let resolve id = Equinox.Stream(Log.log, category.Resolve(streamName id), maxAttempts = 3)
let service = Service(resolve)
let fc = "fc0"
diff --git a/samples/Tutorial/Gapless.fs b/samples/Tutorial/Gapless.fs
index 7042b4e10..e85381585 100644
--- a/samples/Tutorial/Gapless.fs
+++ b/samples/Tutorial/Gapless.fs
@@ -18,7 +18,9 @@ module Events =
| Released of Item
| Snapshotted of Snapshotted
interface TypeShape.UnionContract.IUnionContract
- let codec = FsCodec.NewtonsoftJson.Codec.Create()
+
+ let codecNewtonsoft = FsCodec.NewtonsoftJson.Codec.Create()
+ let codecStj = FsCodec.SystemTextJson.Codec.Create()
module Fold =
@@ -76,23 +78,23 @@ let [] appName = "equinox-tutorial-gapless"
module Cosmos =
- open Equinox.Cosmos
- let private create (context,cache,accessStrategy) =
+ open Equinox.CosmosStore
+ let private create (context, cache, accessStrategy) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching
- let resolver = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
+ let category = CosmosStoreCategory(context, Events.codecStj, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
let resolve sequenceId =
let streamName = streamName sequenceId
- Equinox.Stream(Serilog.Log.Logger, resolver.Resolve streamName, maxAttempts = 3)
+ Equinox.Stream(Serilog.Log.Logger, category.Resolve streamName, maxAttempts = 3)
Service(resolve)
module Snapshot =
- let create (context,cache) =
+ let create (context, cache) =
let accessStrategy = AccessStrategy.Snapshot (Fold.isOrigin,Fold.snapshot)
- create(context,cache,accessStrategy)
+ create(context, cache, accessStrategy)
module RollingUnfolds =
- let create (context,cache) =
+ let create (context, cache) =
let accessStrategy = AccessStrategy.RollingState Fold.snapshot
- create(context,cache,accessStrategy)
+ create(context, cache, accessStrategy)
diff --git a/samples/Tutorial/Index.fs b/samples/Tutorial/Index.fs
index 92bd1c06b..6981dc018 100644
--- a/samples/Tutorial/Index.fs
+++ b/samples/Tutorial/Index.fs
@@ -13,7 +13,9 @@ module Events =
| Deleted of ItemIds
| Snapshotted of Items<'v>
interface TypeShape.UnionContract.IUnionContract
- let codec<'v> = FsCodec.NewtonsoftJson.Codec.Create>()
+
+ let codecNewtonsoft<'v> = FsCodec.NewtonsoftJson.Codec.Create>()
+ let codecStj<'v> = FsCodec.SystemTextJson.Codec.Create>()
module Fold =
@@ -53,15 +55,15 @@ let create<'t> resolve indexId =
module Cosmos =
- open Equinox.Cosmos
+ open Equinox.CosmosStore
let create<'v> (context,cache) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
let accessStrategy = AccessStrategy.RollingState Fold.snapshot
- let resolver = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
- create resolver.Resolve
+ let category = CosmosStoreCategory(context, Events.codecStj, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
+ create category.Resolve
module MemoryStore =
let create store =
- let resolver = Equinox.MemoryStore.Resolver(store, Events.codec, Fold.fold, Fold.initial)
+ let resolver = Equinox.MemoryStore.Resolver(store, Events.codecNewtonsoft, Fold.fold, Fold.initial)
create resolver.Resolve
diff --git a/samples/Tutorial/Sequence.fs b/samples/Tutorial/Sequence.fs
index c69acb510..62deeddc4 100644
--- a/samples/Tutorial/Sequence.fs
+++ b/samples/Tutorial/Sequence.fs
@@ -25,7 +25,9 @@ module Events =
type Event =
| Reserved of Reserved
interface TypeShape.UnionContract.IUnionContract
- let codec = FsCodec.NewtonsoftJson.Codec.Create()
+
+ let codecNewtonsoft = FsCodec.NewtonsoftJson.Codec.Create()
+ let codecStj = FsCodec.SystemTextJson.Codec.Create()
module Fold =
@@ -55,11 +57,11 @@ let create resolve =
module Cosmos =
- open Equinox.Cosmos
+ open Equinox.CosmosStore
let private create (context,cache,accessStrategy) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching
- let resolver = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
- create resolver.Resolve
+ let category = CosmosStoreCategory(context, Events.codecStj, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
+ create category.Resolve
module LatestKnownEvent =
diff --git a/samples/Tutorial/Set.fs b/samples/Tutorial/Set.fs
index b9b5a3ae7..a8d78c9cb 100644
--- a/samples/Tutorial/Set.fs
+++ b/samples/Tutorial/Set.fs
@@ -12,7 +12,9 @@ module Events =
| Deleted of Items
| Snapshotted of Items
interface TypeShape.UnionContract.IUnionContract
- let codec = FsCodec.NewtonsoftJson.Codec.Create()
+
+ let codecNewtonsoft = FsCodec.NewtonsoftJson.Codec.Create()
+ let codecStj = FsCodec.SystemTextJson.Codec.Create()
module Fold =
@@ -53,15 +55,15 @@ let create resolve setId =
module Cosmos =
- open Equinox.Cosmos
- let create (context,cache) =
+ open Equinox.CosmosStore
+ let create (context, cache) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
let accessStrategy = AccessStrategy.RollingState Fold.snapshot
- let resolver = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
- create resolver.Resolve
+ let category = CosmosStoreCategory(context, Events.codecStj, Fold.fold, Fold.initial, cacheStrategy, accessStrategy)
+ create category.Resolve
module MemoryStore =
let create store =
- let resolver = Equinox.MemoryStore.Resolver(store, Events.codec, Fold.fold, Fold.initial)
+ let resolver = Equinox.MemoryStore.Resolver(store, Events.codecNewtonsoft, Fold.fold, Fold.initial)
create resolver.Resolve
diff --git a/samples/Tutorial/Todo.fsx b/samples/Tutorial/Todo.fsx
index cc642230a..4ff922c49 100644
--- a/samples/Tutorial/Todo.fsx
+++ b/samples/Tutorial/Todo.fsx
@@ -6,16 +6,15 @@
#I "bin/Debug/netstandard2.1/"
#r "Serilog.dll"
#r "Serilog.Sinks.Console.dll"
-#r "Newtonsoft.Json.dll"
+#r "System.Text.Json.dll"
#r "TypeShape.dll"
#r "Equinox.Core.dll"
#r "Equinox.dll"
#r "FSharp.UMX.dll"
#r "FsCodec.dll"
-#r "FsCodec.NewtonsoftJson.dll"
+#r "FsCodec.SystemTextJson.dll"
#r "FSharp.Control.AsyncSeq.dll"
-#r "Microsoft.Azure.Cosmos.Client.dll"
-#r "Equinox.Cosmos.dll"
+#r "Equinox.CosmosStore.dll"
open System
@@ -35,7 +34,7 @@ type Event =
| Cleared
| Snapshotted of Snapshotted
interface TypeShape.UnionContract.IUnionContract
-let codec = FsCodec.NewtonsoftJson.Codec.Create()
+let codec = FsCodec.SystemTextJson.Codec.Create()
type State = { items : Todo list; nextId : int }
let initial = { items = []; nextId = 0 }
@@ -116,21 +115,21 @@ let log = LoggerConfiguration().WriteTo.Console().CreateLogger()
let [] appName = "equinox-tutorial"
let cache = Equinox.Cache(appName, 20)
-open Equinox.Cosmos
-module Store =
- let read key = Environment.GetEnvironmentVariable key |> Option.ofObj |> Option.get
+open Equinox.CosmosStore
- let connector = Connector(TimeSpan.FromSeconds 5., 2, TimeSpan.FromSeconds 5., log=log)
- let conn = connector.Connect(appName, Discovery.FromConnectionString (read "EQUINOX_COSMOS_CONNECTION")) |> Async.RunSynchronously
- let gateway = Gateway(conn, BatchingPolicy())
+module Store =
- let store = Context(gateway, read "EQUINOX_COSMOS_DATABASE", read "EQUINOX_COSMOS_CONTAINER")
+ let read key = Environment.GetEnvironmentVariable key |> Option.ofObj |> Option.get
+ let factory = CosmosStoreClientFactory(TimeSpan.FromSeconds 5., 2, TimeSpan.FromSeconds 5.)
+ let client = factory.Create(Discovery.ConnectionString (read "EQUINOX_COSMOS_CONNECTION"))
+ let conn = CosmosStoreConnection(client, read "EQUINOX_COSMOS_DATABASE", read "EQUINOX_COSMOS_CONTAINER")
+ let context = CosmosStoreContext(conn)
let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.)
module TodosCategory =
let access = AccessStrategy.Snapshot (isOrigin,snapshot)
- let resolver = Resolver(Store.store, codec, fold, initial, Store.cacheStrategy, access=access)
- let resolve id = Equinox.Stream(log, resolver.Resolve(streamName id), maxAttempts = 3)
+ let category = CosmosStoreCategory(Store.context, codec, fold, initial, Store.cacheStrategy, access=access)
+ let resolve id = Equinox.Stream(log, category.Resolve(streamName id), maxAttempts = 3)
let service = Service(TodosCategory.resolve)
diff --git a/samples/Tutorial/Tutorial.fsproj b/samples/Tutorial/Tutorial.fsproj
index 250f0f066..0da3b7fce 100644
--- a/samples/Tutorial/Tutorial.fsproj
+++ b/samples/Tutorial/Tutorial.fsproj
@@ -1,11 +1,10 @@
- netstandard2.1
+ netstandard2.1
5
true
true
- true
@@ -24,7 +23,7 @@
-
+
@@ -33,9 +32,10 @@
+
+
-
\ No newline at end of file
diff --git a/samples/Tutorial/Upload.fs b/samples/Tutorial/Upload.fs
index f7b5b742c..e6f243e51 100644
--- a/samples/Tutorial/Upload.fs
+++ b/samples/Tutorial/Upload.fs
@@ -40,7 +40,9 @@ module Events =
type Event =
| IdAssigned of IdAssigned
interface TypeShape.UnionContract.IUnionContract
- let codec = FsCodec.NewtonsoftJson.Codec.Create()
+
+ let codecNewtonsoft = FsCodec.NewtonsoftJson.Codec.Create()
+ let codecStj = FsCodec.SystemTextJson.Codec.Create()
module Fold =
@@ -70,14 +72,14 @@ let create resolve =
module Cosmos =
- open Equinox.Cosmos
+ open Equinox.CosmosStore
let create (context,cache) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) // OR CachingStrategy.NoCaching
- let resolver = Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, AccessStrategy.LatestKnownEvent)
- create resolver.Resolve
+ let category = CosmosStoreCategory(context, Events.codecStj, Fold.fold, Fold.initial, cacheStrategy, AccessStrategy.LatestKnownEvent)
+ create category.Resolve
module EventStore =
open Equinox.EventStore
let create context =
- let resolver = Resolver(context, Events.codec, Fold.fold, Fold.initial, access=AccessStrategy.LatestKnownEvent)
+ let resolver = Resolver(context, Events.codecNewtonsoft, Fold.fold, Fold.initial, access=AccessStrategy.LatestKnownEvent)
create resolver.Resolve
diff --git a/samples/Web/Program.fs b/samples/Web/Program.fs
index c49531f82..fab9549b7 100644
--- a/samples/Web/Program.fs
+++ b/samples/Web/Program.fs
@@ -29,7 +29,7 @@ module Program =
.Enrich.FromLogContext()
.WriteTo.Console()
// TOCONSIDER log and reset every minute or something ?
- .WriteTo.Sink(Equinox.Cosmos.Store.Log.InternalMetrics.Stats.LogSink())
+ .WriteTo.Sink(Equinox.CosmosStore.Core.Log.InternalMetrics.Stats.LogSink())
.WriteTo.Sink(Equinox.EventStore.Log.InternalMetrics.Stats.LogSink())
.WriteTo.Sink(Equinox.SqlStreamStore.Log.InternalMetrics.Stats.LogSink())
let c =
@@ -41,4 +41,4 @@ module Program =
0
with e ->
eprintfn "%s" e.Message
- 1
\ No newline at end of file
+ 1
diff --git a/samples/Web/Startup.fs b/samples/Web/Startup.fs
index e896f6f09..51245dd2c 100644
--- a/samples/Web/Startup.fs
+++ b/samples/Web/Startup.fs
@@ -70,7 +70,7 @@ type Startup() =
| Some (Cosmos sargs) ->
let storeLog = createStoreLog <| sargs.Contains Storage.Cosmos.Arguments.VerboseStore
log.Information("CosmosDb Storage options: {options:l}", options)
- Storage.Cosmos.config (log,storeLog) (cache, unfolds, defaultBatchSize) (Storage.Cosmos.Info sargs), storeLog
+ Storage.Cosmos.config log (cache, unfolds, defaultBatchSize) (Storage.Cosmos.Info sargs), storeLog
| Some (Es sargs) ->
let storeLog = createStoreLog <| sargs.Contains Storage.EventStore.Arguments.VerboseStore
log.Information("EventStore Storage options: {options:l}", options)
diff --git a/src/Equinox.Core/Infrastructure.fs b/src/Equinox.Core/Infrastructure.fs
index d73177d6f..aaa90d24a 100755
--- a/src/Equinox.Core/Infrastructure.fs
+++ b/src/Equinox.Core/Infrastructure.fs
@@ -6,11 +6,14 @@ open FSharp.Control
open System
open System.Diagnostics
open System.Threading.Tasks
+open System.Threading
type OAttribute = System.Runtime.InteropServices.OptionalAttribute
type DAttribute = System.Runtime.InteropServices.DefaultParameterValueAttribute
#if NET461
+let isNull v = v = null
+
module Array =
let tryHead (array : 'T[]) =
if array.Length = 0 then None
@@ -27,12 +30,14 @@ module Array =
elif predicate array.[i] then Some i
else loop (i - 1)
loop (array.Length - 1)
+ let singleton v = Array.create 1 v
module Option =
let filter predicate option = match option with None -> None | Some x -> if predicate x then Some x else None
let toNullable option = match option with Some x -> Nullable x | None -> Nullable ()
let ofObj obj = match obj with null -> None | x -> Some x
let toObj option = match option with None -> null | Some x -> x
+ let defaultWith f = function | Some v -> v | _ -> f()
#endif
type Async with
@@ -68,6 +73,10 @@ type Async with
sc ())
|> ignore)
+#if NETSTANDARD2_1
+ static member inline AwaitValueTask (vtask: ValueTask<'T>) : Async<'T> = vtask.AsTask() |> Async.AwaitTaskCorrect
+#endif
+
[]
module Regex =
open System.Text.RegularExpressions
diff --git a/src/Equinox.Core/Stream.fs b/src/Equinox.Core/Stream.fs
index a69a9b079..f75fdba37 100755
--- a/src/Equinox.Core/Stream.fs
+++ b/src/Equinox.Core/Stream.fs
@@ -2,15 +2,15 @@
module Equinox.Core.Stream
/// Represents a specific stream in a ICategory
-type private Stream<'event, 'state, 'streamId, 'context>(category : ICategory<'event, 'state, 'streamId, 'context>, streamId: 'streamId, opt, context) =
+type private Stream<'event, 'state, 'streamId, 'context>(category : ICategory<'event, 'state, 'streamId, 'context>, streamId: 'streamId, opt, context, compress) =
interface IStream<'event, 'state> with
member __.Load log =
category.Load(log, streamId, opt)
member __.TrySync(log: Serilog.ILogger, token: StreamToken, originState: 'state, events: 'event list) =
- category.TrySync(log, token, originState, events, context)
+ category.TrySync(log, token, originState, events, context, compress)
-let create (category : ICategory<'event, 'state, 'streamId, 'context>) streamId opt context : IStream<'event, 'state> = Stream(category, streamId, opt, context) :> _
+let create (category : ICategory<'event, 'state, 'streamId, 'context>) streamId opt context compress : IStream<'event, 'state> = Stream(category, streamId, opt, context, compress) :> _
/// Handles case where some earlier processing has loaded or determined a the state of a stream, allowing us to avoid a read roundtrip
type private InitializedStream<'event, 'state>(inner : IStream<'event, 'state>, memento : StreamToken * 'state) =
diff --git a/src/Equinox.Core/Types.fs b/src/Equinox.Core/Types.fs
index 37f3c470a..cd6aa1b63 100755
--- a/src/Equinox.Core/Types.fs
+++ b/src/Equinox.Core/Types.fs
@@ -15,7 +15,7 @@ type ICategory<'event, 'state, 'streamId, 'context> =
/// - Conflict: signifies the sync failed, and the proposed decision hence needs to be reconsidered in light of the supplied conflicting Stream State
/// NB the central precondition upon which the sync is predicated is that the stream has not diverged from the `originState` represented by `token`
/// where the precondition is not met, the SyncResult.Conflict bears a [lazy] async result (in a specific manner optimal for the store)
- abstract TrySync : log: ILogger * StreamToken * 'state * events: 'event list * 'context option -> Async>
+ abstract TrySync : log: ILogger * StreamToken * 'state * events: 'event list * 'context option * compress: bool -> Async>
/// Represents a time measurement of a computation that includes stopwatch tick metadata
[]
diff --git a/src/Equinox.CosmosStore/CosmosJsonSerializer.fs b/src/Equinox.CosmosStore/CosmosJsonSerializer.fs
new file mode 100644
index 000000000..8a8a70814
--- /dev/null
+++ b/src/Equinox.CosmosStore/CosmosJsonSerializer.fs
@@ -0,0 +1,77 @@
+namespace Equinox.CosmosStore.Core
+
+open Azure.Cosmos.Serialization
+open Equinox.Core
+open System
+open System.IO
+open System.Text.Json
+open System.Text.Json.Serialization
+
+module JsonHelper =
+
+ let d = JsonDocument.Parse "null"
+ let private Null = d.RootElement
+ /// System.Text.Json versions > 4.7 reject JsonValueKind.Undefined elements
+ let fixup (e : JsonElement) = if e.ValueKind = JsonValueKind.Undefined then Null else e
+
+type CosmosJsonSerializer (options: JsonSerializerOptions) =
+ inherit CosmosSerializer()
+
+ override __.FromStream<'T> (stream) =
+ using (stream) (fun stream ->
+ if stream.Length = 0L then
+ Unchecked.defaultof<'T>
+ elif typeof.IsAssignableFrom(typeof<'T>) then
+ stream :> obj :?> 'T
+ else
+ JsonSerializer.DeserializeAsync<'T>(stream, options)
+ |> Async.AwaitValueTask
+ |> Async.RunSynchronously
+ )
+
+ override __.ToStream<'T> (input: 'T) =
+ let memoryStream = new MemoryStream()
+
+ JsonSerializer.SerializeAsync(memoryStream, input, input.GetType(), options)
+ |> Async.AwaitTaskCorrect
+ |> Async.RunSynchronously
+
+ memoryStream.Position <- 0L
+ memoryStream :> Stream
+
+/// Manages zipping of the UTF-8 json bytes to make the index record minimal from the perspective of the writer stored proc
+/// Only applied to snapshots in the Tip
+and JsonCompressedBase64Converter() =
+ inherit JsonConverter()
+
+ static member Compress(value: JsonElement) =
+ if value.ValueKind = JsonValueKind.Null then value
+ else
+ let input = System.Text.Encoding.UTF8.GetBytes(value.GetRawText())
+ use output = new MemoryStream()
+ use compressor = new System.IO.Compression.DeflateStream(output, System.IO.Compression.CompressionLevel.Optimal)
+ compressor.Write(input, 0, input.Length)
+ compressor.Close()
+ JsonDocument.Parse("\"" + System.Convert.ToBase64String(output.ToArray()) + "\"").RootElement
+
+ override __.Read(reader, _typeToConvert, options) =
+ if reader.TokenType <> JsonTokenType.String then
+ JsonSerializer.Deserialize(&reader, options)
+ else
+ let compressedBytes = reader.GetBytesFromBase64()
+ use input = new MemoryStream(compressedBytes)
+ use decompressor = new System.IO.Compression.DeflateStream(input, System.IO.Compression.CompressionMode.Decompress)
+ use output = new MemoryStream()
+ decompressor.CopyTo(output)
+ JsonSerializer.Deserialize(ReadOnlySpan.op_Implicit(output.ToArray()), options)
+
+ override __.Write(writer, value, options) =
+ JsonSerializer.Serialize(writer, value, options)
+
+type JsonCompressedBase64ConverterAttribute () =
+ inherit JsonConverterAttribute(typeof)
+
+ static let converter = JsonCompressedBase64Converter()
+
+ override __.CreateConverter _typeToConvert =
+ converter :> JsonConverter
diff --git a/src/Equinox.Cosmos/Cosmos.fs b/src/Equinox.CosmosStore/CosmosStore.fs
similarity index 63%
rename from src/Equinox.Cosmos/Cosmos.fs
rename to src/Equinox.CosmosStore/CosmosStore.fs
index 9e24b56f5..cd9d550f1 100644
--- a/src/Equinox.Cosmos/Cosmos.fs
+++ b/src/Equinox.CosmosStore/CosmosStore.fs
@@ -1,16 +1,18 @@
-namespace Equinox.Cosmos.Store
+namespace Equinox.CosmosStore.Core
+open Azure
+open Azure.Cosmos
open Equinox.Core
open FsCodec
-open Microsoft.Azure.Cosmos
-open Newtonsoft.Json
+open FSharp.Control
open Serilog
open System
-open System.IO
+open System.Text.Json
+open System.Threading
/// A single Domain Event from the array held in a Batch
-type []
- Event =
+[]
+type Event = // TODO for STJ v5: All fields required unless explicitly optional
{ /// Creation datetime (as opposed to system-defined _lastUpdated which is touched by triggers, replication etc.)
t: DateTimeOffset // ISO 8601
@@ -18,24 +20,19 @@ type []
c: string // required
/// Event body, as UTF-8 encoded json ready to be injected into the Json being rendered for CosmosDB
- [)>]
- []
- d: byte[] // Required, but can be null so Nullary cases can work
+ d: JsonElement // TODO for STJ v5: Required, but can be null so Nullary cases can work
- /// Optional metadata, as UTF-8 encoded json, ready to emit directly (null, not written if missing)
- [)>]
- []
- m: byte[]
+ /// Optional metadata, as UTF-8 encoded json, ready to emit directly
+ m: JsonElement // TODO for STJ v5: Optional, not serialized if missing
- /// Optional correlationId (can be null, not written if missing)
- []
- correlationId : string
+ /// Optional correlationId
+ correlationId : string // TODO for STJ v5: Optional, not serialized if missing
- /// Optional causationId (can be null, not written if missing)
- []
- causationId : string }
+ /// Optional causationId
+ causationId : string // TODO for STJ v5: Optional, not serialized if missing
+ }
- interface IEventData with
+ interface IEventData with
member __.EventType = __.c
member __.Data = __.d
member __.Meta = __.m
@@ -45,12 +42,11 @@ type []
member __.Timestamp = __.t
/// A 'normal' (frozen, not Tip) Batch of Events (without any Unfolds)
-type []
- Batch =
+[]
+type Batch = // TODO for STJ v5: All fields required unless explicitly optional
{ /// CosmosDB-mandated Partition Key, must be maintained within the document
/// Not actually required if running in single partition mode, but for simplicity, we always write it
- [] // Not requested in queries
- p: string // "{streamName}"
+ p: string // "{streamName}" TODO for STJ v5: Optional, not requested in queries
/// CosmosDB-mandated unique row key; needs to be unique within any partition it is maintained; must be string
/// At the present time, one can't perform an ORDER BY on this field, hence we also have i shadowing it
@@ -60,8 +56,7 @@ type []
/// When we read, we need to capture the value so we can retain it for caching purposes
/// NB this is not relevant to fill in when we pass it to the writing stored procedure
/// as it will do: 1. read 2. merge 3. write merged version contingent on the _etag not having changed
- []
- _etag: string
+ _etag: string // TODO for STJ v5: Optional, not serialized if missing
/// base 'i' value for the Events held herein
i: int64 // {index}
@@ -78,6 +73,7 @@ type []
static member internal IndexedFields = [Batch.PartitionKeyField; "i"; "n"]
/// Compaction/Snapshot/Projection Event based on the state at a given point in time `i`
+[]
type Unfold =
{ /// Base: Stream Position (Version) of State from which this Unfold Event was generated
i: int64
@@ -89,61 +85,30 @@ type Unfold =
c: string // required
/// Event body - Json -> UTF-8 -> Deflate -> Base64
- [)>]
- d: byte[] // required
+ []
+ d: JsonElement // required
/// Optional metadata, same encoding as `d` (can be null; not written if missing)
- [)>]
- []
- m: byte[] } // optional
-
-/// Manages zipping of the UTF-8 json bytes to make the index record minimal from the perspective of the writer stored proc
-/// Only applied to snapshots in the Tip
-and Base64DeflateUtf8JsonConverter() =
- inherit JsonConverter()
- let pickle (input : byte[]) : string =
- if input = null then null else
-
- use output = new MemoryStream()
- use compressor = new System.IO.Compression.DeflateStream(output, System.IO.Compression.CompressionLevel.Optimal)
- compressor.Write(input,0,input.Length)
- compressor.Close()
- System.Convert.ToBase64String(output.ToArray())
- let unpickle str : byte[] =
- if str = null then null else
-
- let compressedBytes = System.Convert.FromBase64String str
- use input = new MemoryStream(compressedBytes)
- use decompressor = new System.IO.Compression.DeflateStream(input, System.IO.Compression.CompressionMode.Decompress)
- use output = new MemoryStream()
- decompressor.CopyTo(output)
- output.ToArray()
-
- override __.CanConvert(objectType) =
- typeof.Equals(objectType)
- override __.ReadJson(reader, _, _, serializer) =
- //( if reader.TokenType = JsonToken.Null then null else
- serializer.Deserialize(reader, typedefof) :?> string |> unpickle |> box
- override __.WriteJson(writer, value, serializer) =
- let pickled = value |> unbox |> pickle
- serializer.Serialize(writer, pickled)
+ []
+ m: JsonElement // TODO for STJ v5: Optional, not serialized if missing
+ }
/// The special-case 'Pending' Batch Format used to read the currently active (and mutable) document
/// Stored representation has the following diffs vs a 'normal' (frozen/completed) Batch: a) `id` = `-1` b) contains unfolds (`u`)
/// NB the type does double duty as a) model for when we read it b) encoding a batch being sent to the stored proc
-type []
- Tip =
- { [] // Not requested in queries
+[]
+type Tip = // TODO for STJ v5: All fields required unless explicitly optional
+ {
/// Partition key, as per Batch
- p: string // "{streamName}"
+ p: string // "{streamName}" TODO for STJ v5: Optional, not requested in queries
+
/// Document Id within partition, as per Batch
id: string // "{-1}" - Well known IdConstant used while this remains the pending batch
/// When we read, we need to capture the value so we can retain it for caching purposes
/// NB this is not relevant to fill in when we pass it to the writing stored procedure
/// as it will do: 1. read 2. merge 3. write merged version contingent on the _etag not having changed
- []
- _etag: string
+ _etag: string // TODO for STJ v5: Optional, not serialized if missing
/// base 'i' value for the Events held herein
i: int64
@@ -159,8 +124,8 @@ type []
static member internal WellKnownDocumentId = "-1"
/// Position and Etag to which an operation is relative
-type []
- Position = { index: int64; etag: string option }
+[]
+type Position = { index: int64; etag: string option }
module internal Position =
/// NB very inefficient compared to FromDocument or using one already returned to you
@@ -171,7 +136,7 @@ module internal Position =
let fromAppendAtEnd = fromI -1L // sic - needs to yield -1
let fromEtag (value : string) = { fromI -2L with etag = Some value }
/// NB very inefficient compared to FromDocument or using one already returned to you
- let fromMaxIndex (xs: ITimelineEvent[]) =
+ let fromMaxIndex (xs: ITimelineEvent[]) =
if Array.isEmpty xs then fromKnownEmpty
else fromI (1L + Seq.max (seq { for x in xs -> x.Index }))
/// Create Position from Tip record context (facilitating 1 RU reads)
@@ -185,9 +150,9 @@ module internal Position =
type Direction = Forward | Backward override this.ToString() = match this with Forward -> "Forward" | Backward -> "Backward"
type internal Enum() =
- static member internal Events(b: Tip) : ITimelineEvent seq =
+ static member internal Events(b: Tip) : ITimelineEvent seq =
b.e |> Seq.mapi (fun offset x -> FsCodec.Core.TimelineEvent.Create(b.i + int64 offset, x.c, x.d, x.m, Guid.Empty, x.correlationId, x.causationId, x.t))
- static member Events(i: int64, e: Event[], startPos : Position option, direction) : ITimelineEvent seq = seq {
+ static member Events(i: int64, e: Event[], startPos : Position option, direction) : ITimelineEvent seq = seq {
// If we're loading from a nominated position, we need to discard items in the batch before/after the start on the start page
let isValidGivenStartPos i =
match startPos with
@@ -202,9 +167,9 @@ type internal Enum() =
static member internal Events(b: Batch, startPos, direction) =
Enum.Events(b.i, b.e, startPos, direction)
|> if direction = Direction.Backward then System.Linq.Enumerable.Reverse else id
- static member Unfolds(xs: Unfold[]) : ITimelineEvent seq = seq {
+ static member Unfolds(xs: Unfold[]) : ITimelineEvent seq = seq {
for x in xs -> FsCodec.Core.TimelineEvent.Create(x.i, x.c, x.d, x.m, Guid.Empty, null, null, x.t, isUnfold=true) }
- static member EventsAndUnfolds(x: Tip): ITimelineEvent seq =
+ static member EventsAndUnfolds(x: Tip): ITimelineEvent seq =
Enum.Events x
|> Seq.append (Enum.Unfolds x.u)
// where Index is equal, unfolds get delivered after the events so the fold semantics can be 'idempotent'
@@ -241,8 +206,8 @@ module Log =
/// Bytes in Measurement is number of events deleted
| Prune of responsesHandled : int * Measurement
let prop name value (log : ILogger) = log.ForContext(name, value)
- let propData name (events: #IEventData seq) (log : ILogger) =
- let render = function null -> "null" | bytes -> System.Text.Encoding.UTF8.GetString bytes
+ let propData name (events: #IEventData seq) (log : ILogger) =
+ let render = function (j: JsonElement) when j.ValueKind <> JsonValueKind.Null -> j.GetRawText() | _ -> "null"
let items = seq { for e in events do yield sprintf "{\"%s\": %s}" e.EventType (render e.Data) }
log.ForContext(name, sprintf "[%s]" (String.concat ",\n\r" items))
let propEvents = propData "events"
@@ -264,7 +229,7 @@ module Log =
let event (value : Event) (log : ILogger) =
let enrich (e : LogEvent) = e.AddPropertyIfAbsent(LogEventProperty("cosmosEvt", ScalarValue(value)))
log.ForContext({ new Serilog.Core.ILogEventEnricher with member __.Enrich(evt,_) = enrich evt })
- let (|BlobLen|) = function null -> 0 | (x : byte[]) -> x.Length
+ let (|BlobLen|) = function (j: JsonElement) when j.ValueKind <> JsonValueKind.Null && j.ValueKind <> JsonValueKind.Undefined -> j.GetRawText().Length | _ -> 0
let (|EventLen|) (x: #IEventData<_>) = let (BlobLen bytes), (BlobLen metaBytes) = x.Data, x.Meta in bytes+metaBytes
let (|BatchLen|) = Seq.sumBy (|EventLen|)
@@ -358,7 +323,7 @@ module Log =
for uom, f in measures do let d = f duration in if d <> 0. then logPeriodicRate uom (float totalCount/d |> int64) (totalRc/d)
[]
-module private MicrosoftAzureCosmosWrappers =
+module AzureCosmosWrappers =
/// Extracts the innermost exception from a nested hierarchy of Aggregate Exceptions
let (|AggregateException|) (exn : exn) =
let rec aux (e : exn) =
@@ -374,30 +339,27 @@ module private MicrosoftAzureCosmosWrappers =
| _ -> None
// CosmosDB Error HttpStatusCode extractor
let (|CosmosStatusCode|) (e : CosmosException) =
- e.StatusCode
+ e.Response.Status
type ReadResult<'T> = Found of 'T | NotFound | NotModified
- type Container with
- member container.TryReadItem(partitionKey : PartitionKey, documentId : string, ?options : ItemRequestOptions): Async> = async {
- let options = defaultArg options null
- let! ct = Async.CancellationToken
- // TODO use TryReadItemStreamAsync to avoid the exception https://github.com/Azure/azure-cosmos-dotnet-v3/issues/692#issuecomment-521936888
- try let! item = async { return! container.ReadItemAsync(documentId, partitionKey, requestOptions = options, cancellationToken = ct) |> Async.AwaitTaskCorrect }
- // if item.StatusCode = System.Net.HttpStatusCode.NotModified then return item.RequestCharge, NotModified
- // NB `.Document` will NRE if a IfNoneModified precondition triggers a NotModified result
- // else
- return item.RequestCharge, Found item.Resource
- with CosmosException (CosmosStatusCode System.Net.HttpStatusCode.NotFound as e) -> return e.RequestCharge, NotFound
- | CosmosException (CosmosStatusCode System.Net.HttpStatusCode.NotModified as e) -> return e.RequestCharge, NotModified
- // NB while the docs suggest you may see a 412, the NotModified in the body of the try/with is actually what happens
- | CosmosException (CosmosStatusCode System.Net.HttpStatusCode.PreconditionFailed as e) -> return e.RequestCharge, NotModified }
-module Sync =
- // NB don't nest in a private module, or serialization will fail miserably ;)
- []
- type SyncResponse = { etag: string; n: int64; conflicts: Unfold[] }
- let [] private sprocName = "EquinoxRollingUnfolds3" // NB need to rename/number for any breaking change
- let [] private sprocBody = """
+ type Azure.Core.ResponseHeaders with
+ member headers.GetRequestCharge () =
+ match headers.TryGetValue("x-ms-request-charge") with
+ | true, charge when not <| String.IsNullOrEmpty charge -> float charge
+ | _ -> 0.
+
+[]
+type SyncResponse = { etag: string; n: int64; conflicts: Unfold[] }
+type ResourceThroughput =
+| Default
+| SetIfCreating of int
+| ReplaceAlways of int
+type [] Provisioning = Container of throughput: ResourceThroughput | Database of throughput: ResourceThroughput
+
+module SyncStoredProcedure =
+ let [] defaultName = "EquinoxRollingUnfolds3" // NB need to rename/number for any breaking change
+ let [] body = """
// Manages the merging of the supplied Request Batch, fulfilling one of the following end-states
// 1 perform concurrency check (index=-1 -> always append; index=-2 -> check based on .etag; _ -> check .n=.index)
// 2a Verify no current Tip; if so - incoming req.e and defines the `n`ext position / unfolds
@@ -462,22 +424,55 @@ function sync(req, expIndex, expEtag) {
}
}"""
+type ContainerGateway(cosmosContainer : CosmosContainer) =
+
+ member val CosmosContainer = cosmosContainer with get
+
+ abstract member GetQueryIteratorByPage<'T> : query: QueryDefinition * ?options: QueryRequestOptions -> AsyncSeq>
+ default __.GetQueryIteratorByPage<'T>(query, ?options) =
+ cosmosContainer.GetItemQueryIterator<'T>(query, requestOptions = defaultArg options null).AsPages() |> AsyncSeq.ofAsyncEnum
+
+ abstract member TryReadItem<'T> : docId: string * partitionKey: string * ?options: ItemRequestOptions -> Async>
+ default __.TryReadItem<'T>(docId, partitionKey, ?options) = async {
+ let partitionKey = PartitionKey partitionKey
+ let options = defaultArg options null
+ let! ct = Async.CancellationToken
+ // TODO use TryReadItemStreamAsync to avoid the exception https://github.com/Azure/azure-cosmos-dotnet-v3/issues/692#issuecomment-521936888
+ try let! item = async { return! cosmosContainer.ReadItemAsync<'T>(docId, partitionKey, requestOptions = options, cancellationToken = ct) |> Async.AwaitTaskCorrect }
+ // if item.StatusCode = System.Net.HttpStatusCode.NotModified then return item.RequestCharge, NotModified
+ // NB `.Document` will NRE if a IfNoneModified precondition triggers a NotModified result
+ // else
+
+ return item.GetRawResponse().Headers.GetRequestCharge(), Found item.Value
+ with CosmosException (CosmosStatusCode 404 as e) -> return e.Response.Headers.GetRequestCharge(), NotFound
+ | CosmosException (CosmosStatusCode 304 as e) -> return e.Response.Headers.GetRequestCharge(), NotModified
+ // NB while the docs suggest you may see a 412, the NotModified in the body of the try/with is actually what happens
+ | CosmosException (CosmosStatusCode sc as e) when sc = int System.Net.HttpStatusCode.PreconditionFailed -> return e.Response.Headers.GetRequestCharge(), NotModified }
+
+ abstract member ExecuteStoredProcedure: storedProcedureName: string * partitionKey: string * args: obj[] -> Async>
+ default __.ExecuteStoredProcedure(storedProcedureName, partitionKey, args) = async {
+ let! ct = Async.CancellationToken
+ let partitionKey = PartitionKey partitionKey
+ //let args = [| box tip; box index; box (Option.toObj etag)|]
+ return! cosmosContainer.Scripts.ExecuteStoredProcedureAsync(storedProcedureName, partitionKey, args, cancellationToken = ct) |> Async.AwaitTaskCorrect }
+
+module Sync =
+
+ // NB don't nest in a private module, or serialization will fail miserably ;)
[]
type Result =
| Written of Position
- | Conflict of Position * events: ITimelineEvent[]
+ | Conflict of Position * events: ITimelineEvent[]
| ConflictUnknown of Position
type [] Exp = Version of int64 | Etag of string | Any
- let private run (container : Container, stream : string) (exp, req: Tip)
+ let private run (gateway : ContainerGateway, stream : string) (exp, req: Tip)
: Async = async {
let ep = match exp with Exp.Version ev -> Position.fromI ev | Exp.Etag et -> Position.fromEtag et | Exp.Any -> Position.fromAppendAtEnd
- let! ct = Async.CancellationToken
let args = [| box req; box ep.index; box (Option.toObj ep.etag)|]
- let! (res : Scripts.StoredProcedureExecuteResponse) =
- container.Scripts.ExecuteStoredProcedureAsync(sprocName, PartitionKey stream, args, cancellationToken = ct) |> Async.AwaitTaskCorrect
- let newPos = { index = res.Resource.n; etag = Option.ofObj res.Resource.etag }
- return res.RequestCharge, res.Resource.conflicts |> function
+ let! res = gateway.ExecuteStoredProcedure(SyncStoredProcedure.defaultName, stream, args)
+ let newPos = { index = res.Value.n; etag = Option.ofObj res.Value.etag }
+ return res.GetRawResponse().Headers.GetRequestCharge(), res.Value.conflicts |> function
| null -> Result.Written newPos
| [||] when newPos.index = 0L -> Result.Conflict (newPos, Array.empty)
| [||] -> Result.ConflictUnknown newPos
@@ -516,87 +511,83 @@ function sync(req, expIndex, expEtag) {
let batch (log : ILogger) retryPolicy containerStream batch: Async =
let call = logged containerStream batch
Log.withLoggedRetries retryPolicy "writeAttempt" call log
- let mkBatch (stream: string) (events: IEventData<_>[]) unfolds: Tip =
+
+ let private mkEvent (e : IEventData<_>) =
+ { t = e.Timestamp; c = e.EventType; d = JsonHelper.fixup e.Data; m = JsonHelper.fixup e.Meta; correlationId = e.CorrelationId; causationId = e.CausationId }
+ let mkBatch (stream: string) (events: IEventData<_>[]) unfolds : Tip =
{ p = stream; id = Tip.WellKnownDocumentId; n = -1L(*Server-managed*); i = -1L(*Server-managed*); _etag = null
- e = [| for e in events -> { t = e.Timestamp; c = e.EventType; d = e.Data; m = e.Meta; correlationId = e.CorrelationId; causationId = e.CausationId } |]
- u = Array.ofSeq unfolds }
- let mkUnfold baseIndex (unfolds: IEventData<_> seq) : Unfold seq =
- unfolds |> Seq.mapi (fun offset x -> { i = baseIndex + int64 offset; c = x.EventType; d = x.Data; m = x.Meta; t = DateTimeOffset.UtcNow } : Unfold)
-
- module Initialization =
- type [] Provisioning = Container of rus: int | Database of rus: int
- let adjustOfferC (c:Container) (rus : int) = async {
- let! ct = Async.CancellationToken
- let! _ = c.ReplaceThroughputAsync(rus, cancellationToken = ct) |> Async.AwaitTaskCorrect in () }
- let adjustOfferD (d:Database) (rus : int) = async {
- let! ct = Async.CancellationToken
- let! _ = d.ReplaceThroughputAsync(rus, cancellationToken = ct) |> Async.AwaitTaskCorrect in () }
- let private createDatabaseIfNotExists (client:CosmosClient) dName maybeRus = async {
- let! ct = Async.CancellationToken
- let! dbr = client.CreateDatabaseIfNotExistsAsync(id=dName, throughput = Option.toNullable maybeRus, cancellationToken=ct) |> Async.AwaitTaskCorrect
- return dbr.Database }
- let private createOrProvisionDatabase (client:CosmosClient) dName mode = async {
- match mode with
- | Provisioning.Database rus ->
- let! db = createDatabaseIfNotExists client dName (Some rus)
- do! adjustOfferD db rus
- | Provisioning.Container _ ->
- let! _ = createDatabaseIfNotExists client dName None in () }
- let private createContainerIfNotExists (d:Database) (cp:ContainerProperties) maybeRus = async {
- let! ct = Async.CancellationToken
- let! c = d.CreateContainerIfNotExistsAsync(cp, throughput=Option.toNullable maybeRus, cancellationToken=ct) |> Async.AwaitTaskCorrect
- return c.Container }
- let private createOrProvisionContainer (d:Database) (cp:ContainerProperties) mode = async {
- match mode with
- | Provisioning.Database _ ->
- return! createContainerIfNotExists d cp None
- | Provisioning.Container rus ->
- let! c = createContainerIfNotExists d cp (Some rus)
- do! adjustOfferC c rus
- return c }
- let private createStoredProcIfNotExists (c:Container) (name, body): Async = async {
- try let! r = c.Scripts.CreateStoredProcedureAsync(Scripts.StoredProcedureProperties(id=name, body=body)) |> Async.AwaitTaskCorrect
- return r.RequestCharge
- with CosmosException ((CosmosStatusCode sc) as e) when sc = System.Net.HttpStatusCode.Conflict -> return e.RequestCharge }
- let private mkContainerProperties containerName partitionKeyFieldName =
- ContainerProperties(id = containerName, partitionKeyPath = sprintf "/%s" partitionKeyFieldName)
- let private createBatchAndTipContainerIfNotExists (client: CosmosClient) (dName,cName) mode : Async =
- let def = mkContainerProperties cName Batch.PartitionKeyField
- def.IndexingPolicy.IndexingMode <- IndexingMode.Consistent
- def.IndexingPolicy.Automatic <- true
- // Can either do a blacklist or a whitelist
- // Given how long and variable the blacklist would be, we whitelist instead
- def.IndexingPolicy.ExcludedPaths.Add(ExcludedPath(Path="/*"))
- // NB its critical to index the nominated PartitionKey field defined above or there will be runtime errors
- for k in Batch.IndexedFields do def.IndexingPolicy.IncludedPaths.Add(IncludedPath(Path = sprintf "/%s/?" k))
- createOrProvisionContainer (client.GetDatabase dName) def mode
- let createSyncStoredProcIfNotExists (log: ILogger option) container = async {
- let! t, ru = createStoredProcIfNotExists container (sprocName,sprocBody) |> Stopwatch.Time
- match log with
- | None -> ()
- | Some log -> log.Information("Created stored procedure {sprocId} in {ms}ms rc={ru}", sprocName, (let e = t.Elapsed in e.TotalMilliseconds), ru) }
- let private createAuxContainerIfNotExists (client: CosmosClient) (dName,cName) mode : Async =
- let def = mkContainerProperties cName "id" // as per Cosmos team, Partition Key must be "/id"
- // TL;DR no indexing of any kind; see https://github.com/Azure/azure-documentdb-changefeedprocessor-dotnet/issues/142
- def.IndexingPolicy.Automatic <- false
- def.IndexingPolicy.IndexingMode <- IndexingMode.None
- createOrProvisionContainer (client.GetDatabase dName) def mode
- let init log (client: CosmosClient) (dName,cName) mode skipStoredProc = async {
- do! createOrProvisionDatabase client dName mode
- let! container = createBatchAndTipContainerIfNotExists client (dName,cName) mode
- if not skipStoredProc then
- do! createSyncStoredProcIfNotExists (Some log) container }
- let initAux (client: CosmosClient) (dName,cName) rus = async {
- // Hardwired for now (not sure if CFP can store in a Database-allocated as it would need to be supplying partion keys)
- let mode = Provisioning.Container rus
- do! createOrProvisionDatabase client dName mode
- return! createAuxContainerIfNotExists client (dName,cName) mode }
+ e = [| for e in events -> mkEvent e |]; u = Array.ofSeq unfolds }
+ let mkUnfold compress baseIndex (unfolds: IEventData<_> seq) : Unfold seq =
+ let inline compressIfRequested x = if compress then JsonCompressedBase64Converter.Compress x else x
+ unfolds
+ |> Seq.mapi (fun offset x ->
+ {
+ i = baseIndex + int64 offset
+ c = x.EventType
+ d = compressIfRequested <| JsonHelper.fixup x.Data
+ m = compressIfRequested <| JsonHelper.fixup x.Meta
+ t = DateTimeOffset.UtcNow
+ } : Unfold)
+
+module Initialization =
+ let internal getOrCreateDatabase (client: CosmosClient) (databaseId: string) (throughput: ResourceThroughput) = async {
+ let! ct = Async.CancellationToken
+ let! response =
+ match throughput with
+ | Default -> client.CreateDatabaseIfNotExistsAsync(id = databaseId, cancellationToken = ct) |> Async.AwaitTaskCorrect
+ | SetIfCreating value -> client.CreateDatabaseIfNotExistsAsync(id = databaseId, throughput = Nullable(value), cancellationToken = ct) |> Async.AwaitTaskCorrect
+ | ReplaceAlways value -> async {
+ let! response = client.CreateDatabaseIfNotExistsAsync(id = databaseId, throughput = Nullable(value), cancellationToken = ct) |> Async.AwaitTaskCorrect
+ let! _ = response.Database.ReplaceThroughputAsync(value, cancellationToken = ct) |> Async.AwaitTaskCorrect
+ return response }
+ return response.Database }
+
+ let internal getOrCreateContainer (db: CosmosDatabase) (props: ContainerProperties) (throughput: ResourceThroughput) = async {
+ let! ct = Async.CancellationToken
+ let! response =
+ match throughput with
+ | Default -> db.CreateContainerIfNotExistsAsync(props, cancellationToken = ct) |> Async.AwaitTaskCorrect
+ | SetIfCreating value -> db.CreateContainerIfNotExistsAsync(props, throughput = Nullable(value), cancellationToken = ct) |> Async.AwaitTaskCorrect
+ | ReplaceAlways value -> async {
+ let! response = db.CreateContainerIfNotExistsAsync(props, throughput = Nullable(value), cancellationToken = ct) |> Async.AwaitTaskCorrect
+ let! _ = response.Container.ReplaceThroughputAsync(value, cancellationToken = ct) |> Async.AwaitTaskCorrect
+ return response }
+ return response.Container }
+
+ let internal getBatchAndTipContainerProps (containerId: string) =
+ let props = ContainerProperties(id = containerId, partitionKeyPath = sprintf "/%s" Batch.PartitionKeyField)
+ props.IndexingPolicy.IndexingMode <- IndexingMode.Consistent
+ props.IndexingPolicy.Automatic <- true
+ // Can either do a blacklist or a whitelist
+ // Given how long and variable the blacklist would be, we whitelist instead
+ props.IndexingPolicy.ExcludedPaths.Add(ExcludedPath(Path="/*"))
+ // NB its critical to index the nominated PartitionKey field defined above or there will be runtime errors
+ for k in Batch.IndexedFields do props.IndexingPolicy.IncludedPaths.Add(IncludedPath(Path = sprintf "/%s/?" k))
+ props
+
+ let createSyncStoredProcedure (container: CosmosContainer) nameOverride = async {
+ let! ct = Async.CancellationToken
+ let name = nameOverride |> Option.defaultValue SyncStoredProcedure.defaultName
+ try let! r = container.Scripts.CreateStoredProcedureAsync(Scripts.StoredProcedureProperties(name, SyncStoredProcedure.body), cancellationToken = ct) |> Async.AwaitTaskCorrect
+ return r.GetRawResponse().Headers.GetRequestCharge()
+ with CosmosException ((CosmosStatusCode sc) as e) when sc = int System.Net.HttpStatusCode.Conflict -> return e.Response.Headers.GetRequestCharge() }
+
+ let initializeContainer (client: CosmosClient) (databaseId: string) (containerId: string) (mode: Provisioning) (createStoredProcedure: bool, nameOverride: string option) = async {
+ let dbThroughput = match mode with Provisioning.Database throughput -> throughput | _ -> Default
+ let containerThroughput = match mode with Provisioning.Container throughput -> throughput | _ -> Default
+ let! db = getOrCreateDatabase client databaseId dbThroughput
+ let! container = getOrCreateContainer db (getBatchAndTipContainerProps containerId) containerThroughput
+
+ if createStoredProcedure then
+ let! (_ru : float) = createSyncStoredProcedure container nameOverride in ()
+
+ return container }
module internal Tip =
- let private get (container : Container, stream : string) (maybePos: Position option) =
- let ro = match maybePos with Some { etag=Some etag } -> ItemRequestOptions(IfNoneMatchEtag=etag) | _ -> null
- container.TryReadItem(PartitionKey stream, Tip.WellKnownDocumentId, ro)
- let private loggedGet (get : Container * string -> Position option -> Async<_>) (container,stream) (maybePos: Position option) (log: ILogger) = async {
+ let private get (gateway : ContainerGateway, stream : string) (maybePos: Position option) =
+ let ro = match maybePos with Some { etag=Some etag } -> ItemRequestOptions(IfNoneMatch=Nullable(Azure.ETag(etag))) | _ -> null
+ gateway.TryReadItem(Tip.WellKnownDocumentId, stream, options = ro)
+ let private loggedGet (get : ContainerGateway * string -> Position option -> Async<_>) (container,stream) (maybePos: Position option) (log: ILogger) = async {
let log = log |> Log.prop "stream" stream
let! t, (ru, res : ReadResult) = get (container,stream) maybePos |> Stopwatch.Time
let log bytes count (f : Log.Measurement -> _) = log |> Log.event (f { stream = stream; interval = t; bytes = bytes; count = count; ru = ru })
@@ -614,7 +605,7 @@ module internal Tip =
let log = log |> Log.prop "_etag" tip._etag |> Log.prop "n" tip.n
log.Information("EqxCosmos {action:l} {res} {ms}ms rc={ru}", "Tip", 200, (let e = t.Elapsed in e.TotalMilliseconds), ru)
return ru, res }
- type [] Result = NotModified | NotFound | Found of Position * ITimelineEvent[]
+ type [] Result = NotModified | NotFound | Found of Position * ITimelineEvent[]
/// `pos` being Some implies that the caller holds a cached value and hence is ready to deal with IndexResult.NotModified
let tryLoad (log : ILogger) retryPolicy containerStream (maybePos: Position option): Async = async {
let! _rc, res = Log.withLoggedRetries retryPolicy "readAttempt" (loggedGet get containerStream maybePos) log
@@ -624,8 +615,7 @@ module internal Tip =
| ReadResult.Found tip -> return Result.Found (Position.fromTip tip, Enum.EventsAndUnfolds tip |> Array.ofSeq) }
module internal Query =
- open FSharp.Control
- let private mkQuery (container : Container, stream: string) maxItems (direction: Direction) startPos : FeedIterator=
+ let private mkQuery (gateway : ContainerGateway, stream: string) maxItems (direction: Direction) startPos : AsyncSeq> =
let query =
let root = sprintf "SELECT c.id, c.i, c._etag, c.n, c.e FROM c WHERE c.id!=\"%s\"" Tip.WellKnownDocumentId
let tail = sprintf "ORDER BY c.i %s" (if direction = Direction.Forward then "ASC" else "DESC")
@@ -634,44 +624,51 @@ module internal Tip =
| Some { index = positionSoExclusiveWhenBackward } ->
let cond = if direction = Direction.Forward then "c.n > @startPos" else "c.i < @startPos"
QueryDefinition(sprintf "%s AND %s %s" root cond tail).WithParameter("@startPos", positionSoExclusiveWhenBackward)
- let qro = new QueryRequestOptions(PartitionKey = Nullable(PartitionKey stream), MaxItemCount=Nullable maxItems)
- container.GetItemQueryIterator(query, requestOptions = qro)
+ let qro = QueryRequestOptions(PartitionKey = Nullable(PartitionKey stream), MaxItemCount=Nullable maxItems)
+ gateway.GetQueryIteratorByPage(query, options = qro)
// Unrolls the Batches in a response - note when reading backwards, the events are emitted in reverse order of index
- let private handleResponse direction (streamName: string) startPos (query: FeedIterator<'T>) (log: ILogger)
- : Async[] * Position option * float> = async {
- let! ct = Async.CancellationToken
- let! t, (res : FeedResponse<'T>) = query.ReadNextAsync(ct) |> Async.AwaitTaskCorrect |> Stopwatch.Time
- let batches, ru = Array.ofSeq res, res.RequestCharge
- let events = batches |> Seq.collect (fun b -> Enum.Events(b, startPos, direction)) |> Array.ofSeq
- let (Log.BatchLen bytes), count = events, events.Length
- let reqMetric : Log.Measurement = { stream = streamName; interval = t; bytes = bytes; count = count; ru = ru }
- let log = let evt = Log.Response (direction, reqMetric) in log |> Log.event evt
- let log = if (not << log.IsEnabled) Events.LogEventLevel.Debug then log else log |> Log.propEvents events
- let index = if count = 0 then Nullable () else Nullable <| Seq.min (seq { for x in batches -> x.i })
- (log |> (match startPos with Some pos -> Log.propStartPos pos | None -> id) |> Log.prop "bytes" bytes)
- .Information("EqxCosmos {action:l} {count}/{batches} {direction} {ms}ms i={index} rc={ru}",
- "Response", count, batches.Length, direction, (let e = t.Elapsed in e.TotalMilliseconds), index, ru)
- let maybePosition = batches |> Array.tryPick Position.tryFromBatch
- return events, maybePosition, ru }
-
- let private run (log : ILogger) (readSlice: FeedIterator -> ILogger -> Async[] * Position option * float>)
- (maxPermittedBatchReads: int option)
- (query: FeedIterator)
- : AsyncSeq[] * Position option * float> =
- let rec loop batchCount : AsyncSeq[] * Position option * float> = asyncSeq {
+ let private processNextPage direction (streamName: string) startPos (enumerator: IAsyncEnumerator>) (log: ILogger)
+ : Async