Skip to content

Commit 375ae24

Browse files
committed
Move maxPartitionConcurrency check after all the filtering is done
1 parent b3545a3 commit 375ae24

File tree

4 files changed

+25
-38
lines changed

4 files changed

+25
-38
lines changed

codegenerator/cli/templates/static/codegen/src/eventFetching/ChainManager.res

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
open Belt
2+
23
type t = {
34
chainFetchers: ChainMap.t<ChainFetcher.t>,
45
//Holds arbitrary events that were added when a batch ended processing early

codegenerator/cli/templates/static/codegen/src/eventFetching/PartitionedFetchState.res

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -184,14 +184,9 @@ at the max queue size.
184184
*/
185185
let getMostBehindPartitions = (
186186
{partitions}: t,
187-
~maxNumQueries,
188187
~maxPerChainQueueSize,
189188
~partitionsCurrentlyFetching,
190189
) => {
191-
let maxNumQueries = Pervasives.max(
192-
maxNumQueries - partitionsCurrentlyFetching->Belt.Set.Int.size,
193-
0,
194-
)
195190
let numPartitions = partitions->Array.length
196191
let maxPartitionQueueSize = maxPerChainQueueSize / numPartitions
197192

@@ -211,7 +206,6 @@ let getMostBehindPartitions = (
211206
FetchState.getLatestFullyFetchedBlock(a.fetchState).blockNumber -
212207
FetchState.getLatestFullyFetchedBlock(b.fetchState).blockNumber
213208
)
214-
->Js.Array.slice(~start=0, ~end_=maxNumQueries)
215209
}
216210

217211
type nextQueries = WaitForNewBlock | NextQuery(array<FetchState.nextQuery>)
@@ -225,7 +219,6 @@ let getNextQueries = (self: t, ~maxPerChainQueueSize, ~partitionsCurrentlyFetchi
225219

226220
self
227221
->getMostBehindPartitions(
228-
~maxNumQueries=Env.maxPartitionConcurrency,
229222
~maxPerChainQueueSize,
230223
~partitionsCurrentlyFetching,
231224
)

codegenerator/cli/templates/static/codegen/src/globalState/GlobalState.res

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -857,7 +857,7 @@ let checkAndFetchForChain = (
857857
) => async chain => {
858858
let chainFetcher = state.chainManager.chainFetchers->ChainMap.get(chain)
859859
if !isRollingBack(state) {
860-
let {chainConfig: {chainWorker}, logger, currentBlockHeight} = chainFetcher
860+
let {chainConfig: {chainWorker}, logger, currentBlockHeight, partitionsCurrentlyFetching} = chainFetcher
861861

862862
let (nextQueries, updatedPartitions) =
863863
chainFetcher->ChainFetcher.getNextQueries(~maxPerChainQueueSize=state.maxPerChainQueueSize)
@@ -895,6 +895,11 @@ let checkAndFetchForChain = (
895895
dispatchAction(FinishWaitingForNewBlock({chain, currentBlockHeight}))
896896
}
897897
| (queries, _) =>
898+
let maxNumQueries = Pervasives.max(
899+
Env.maxPartitionConcurrency - partitionsCurrentlyFetching->Belt.Set.Int.size,
900+
0,
901+
)
902+
let queries = queries->Js.Array2.slice(~start=0, ~end_=maxNumQueries)
898903
let newPartitionsCurrentlyFetching =
899904
queries->Array.map(query => query.partitionId)->Set.Int.fromArray
900905
dispatchAction(SetCurrentlyFetchingBatch(chain, newPartitionsCurrentlyFetching))

scenarios/test_codegen/test/lib_tests/PartitionedFetchState_test.res

Lines changed: 18 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -52,56 +52,47 @@ describe("PartitionedFetchState getMostBehindPartitions", () => {
5252
mockFetchState(~latestFetchedBlockNumber=3),
5353
}
5454
let partitionedFetchState = mockPartitionedFetchState(~partitions)
55-
it(
56-
"With multiple partitions always returns the most behind partitions up to the max concurrency level",
57-
() => {
58-
let maxNumQueries = 3
59-
let partitionsCurrentlyFetching = Set.Int.empty
55+
it("With multiple partitions always returns the most behind partitions first", () => {
56+
let partitionsCurrentlyFetching = Set.Int.empty
6057

61-
let mostBehindPartitions =
62-
partitionedFetchState->PartitionedFetchState.getMostBehindPartitions(
63-
~maxNumQueries,
64-
~maxPerChainQueueSize=10,
65-
~partitionsCurrentlyFetching,
66-
)
67-
68-
Assert.equal(mostBehindPartitions->Array.length, maxNumQueries)
69-
70-
let partitionIds = mostBehindPartitions->Array.map(p => p.partitionId)
71-
Assert.deepEqual(
72-
partitionIds,
73-
[2, 3, 4],
74-
~message="Should have returned the partitions with the lowest latestFetchedBlock",
58+
let mostBehindPartitions =
59+
partitionedFetchState->PartitionedFetchState.getMostBehindPartitions(
60+
~maxPerChainQueueSize=10,
61+
~partitionsCurrentlyFetching,
7562
)
76-
},
77-
)
63+
64+
let partitionIds = mostBehindPartitions->Array.map(p => p.partitionId)
65+
Assert.deepEqual(
66+
partitionIds,
67+
[2, 3, 4, 0, 1],
68+
~message="Should have returned the partitions with the lowest latestFetchedBlock first",
69+
)
70+
})
7871

7972
it("Will not return partitions that are currently fetching", () => {
80-
let maxNumQueries = 3
8173
let partitionsCurrentlyFetching = Set.Int.fromArray([2, 3])
8274

8375
let mostBehindPartitions =
8476
partitionedFetchState->PartitionedFetchState.getMostBehindPartitions(
85-
~maxNumQueries,
8677
~maxPerChainQueueSize=10,
8778
~partitionsCurrentlyFetching,
8879
)
8980

9081
Assert.equal(
9182
mostBehindPartitions->Array.length,
92-
maxNumQueries - partitionsCurrentlyFetching->Set.Int.size,
83+
partitionedFetchState.partitions->Js.Array2.length -
84+
partitionsCurrentlyFetching->Set.Int.size,
9385
)
9486

9587
let partitionIds = mostBehindPartitions->Array.map(p => p.partitionId)
9688
Assert.deepEqual(
9789
partitionIds,
98-
[4],
90+
[4, 0, 1],
9991
~message="Should have returned the partitions with the lowest latestFetchedBlock that are not currently fetching",
10092
)
10193
})
10294

103-
it("Should not return partition that is at max partition size", () => {
104-
let maxNumQueries = 3
95+
it("Should not return partition that is at max queue size", () => {
10596
let partitions = list{
10697
mockFetchState(~latestFetchedBlockNumber=4),
10798
mockFetchState(~latestFetchedBlockNumber=5),
@@ -119,7 +110,6 @@ describe("PartitionedFetchState getMostBehindPartitions", () => {
119110

120111
let mostBehindPartitions =
121112
partitionedFetchState->PartitionedFetchState.getMostBehindPartitions(
122-
~maxNumQueries,
123113
~maxPerChainQueueSize=10, //each partition should therefore have a max of 2 events
124114
~partitionsCurrentlyFetching=Set.Int.empty,
125115
)
@@ -133,7 +123,6 @@ describe("PartitionedFetchState getMostBehindPartitions", () => {
133123
})
134124

135125
it("if need be should return less than maxNum queries if all partitions at their max", () => {
136-
let maxNumQueries = 3
137126
let partitions = list{
138127
mockFetchState(~latestFetchedBlockNumber=4),
139128
mockFetchState(~latestFetchedBlockNumber=5),
@@ -154,7 +143,6 @@ describe("PartitionedFetchState getMostBehindPartitions", () => {
154143

155144
let mostBehindPartitions =
156145
partitionedFetchState->PartitionedFetchState.getMostBehindPartitions(
157-
~maxNumQueries,
158146
~maxPerChainQueueSize=10, //each partition should therefore have a max of 2 events
159147
~partitionsCurrentlyFetching=Set.Int.empty,
160148
)

0 commit comments

Comments
 (0)