@@ -1516,93 +1516,10 @@ let rollback = (fetchState: t, ~targetBlockNumber) => {
15161516 )
15171517}
15181518
1519- // When resetting pending queries, preserve progress from already-fetched queries.
1520- // If there are fetched queries in the middle (with unfetched ones before them),
1521- // create gap partitions for the unfetched ranges and update latestFetchedBlock
1522- // to avoid re-fetching events that are already in the buffer.
1519+ // Reset pending queries by rolling back to the current latestFullyFetchedBlock.
1520+ // This discards any in-flight queries and their partial progress.
15231521let resetPendingQueries = (fetchState : t ) => {
1524- let newEntities = fetchState .optimizedPartitions .entities -> Utils .Dict .shallowCopy
1525- let nextPartitionIndexRef = ref (fetchState .optimizedPartitions .nextPartitionIndex )
1526-
1527- for idx in 0 to fetchState .optimizedPartitions .idsInAscOrder -> Array .length - 1 {
1528- let partitionId = fetchState .optimizedPartitions .idsInAscOrder -> Js .Array2 .unsafe_get (idx )
1529- let partition = fetchState .optimizedPartitions .entities -> Js .Dict .unsafeGet (partitionId )
1530-
1531- if partition .mutPendingQueries -> Array .length > 0 {
1532- // Track the end of the last fetched region (partition's latestFetchedBlock or previous fetched query)
1533- let lastFetchedEndRef = ref (partition .latestFetchedBlock .blockNumber )
1534- let maxFetchedBlockRef = ref (partition .latestFetchedBlock )
1535-
1536- // Process all pending queries - they may be interleaved: [fetching, fetched, fetching, fetched]
1537- // Create gap partitions for unfetched ranges between fetched queries
1538- for qIdx in 0 to partition .mutPendingQueries -> Array .length - 1 {
1539- let pq = partition .mutPendingQueries -> Js .Array2 .unsafe_get (qIdx )
1540-
1541- switch pq .fetchedBlock {
1542- | None => () // Unfetched query - will be covered by gap partitions
1543- | Some (fetchedBlock ) =>
1544- // Create a gap partition for any unfetched range before this fetched query
1545- let gapStart = lastFetchedEndRef .contents + 1
1546- let gapEnd = pq .fromBlock - 1
1547- if gapEnd >= gapStart {
1548- let newPartitionId = nextPartitionIndexRef .contents -> Int .toString
1549- nextPartitionIndexRef := nextPartitionIndexRef .contents + 1
1550- newEntities -> Js .Dict .set (
1551- newPartitionId ,
1552- {
1553- id : newPartitionId ,
1554- latestFetchedBlock : {
1555- blockNumber : gapStart - 1 ,
1556- blockTimestamp : 0 ,
1557- },
1558- selection : partition .selection ,
1559- addressesByContractName : partition .addressesByContractName ,
1560- endBlock : Some (gapEnd ),
1561- dynamicContract : partition .dynamicContract ,
1562- mutPendingQueries : [],
1563- prevQueryRange : partition .prevQueryRange ,
1564- prevPrevQueryRange : partition .prevPrevQueryRange ,
1565- prevPrevPrevQueryRange : partition .prevPrevPrevQueryRange ,
1566- },
1567- )
1568- }
1569-
1570- // For chunks, use toBlock as effective end (incomplete chunks create new partitions)
1571- // For non-chunks, use fetchedBlock
1572- let effectiveEndBlock = pq .isChunk ? pq .toBlock -> Option .getUnsafe : fetchedBlock .blockNumber
1573-
1574- // Update last fetched end and max fetched block
1575- lastFetchedEndRef := effectiveEndBlock
1576- if effectiveEndBlock > maxFetchedBlockRef .contents .blockNumber {
1577- maxFetchedBlockRef :=
1578- {
1579- blockNumber : effectiveEndBlock ,
1580- blockTimestamp : pq .isChunk ? 0 : fetchedBlock .blockTimestamp ,
1581- }
1582- }
1583- }
1584- }
1585-
1586- newEntities -> Js .Dict .set (
1587- partitionId ,
1588- {
1589- ... partition ,
1590- latestFetchedBlock : maxFetchedBlockRef .contents ,
1591- mutPendingQueries : [],
1592- },
1593- )
1594- }
1595- }
1596-
1597- {
1598- ... fetchState ,
1599- optimizedPartitions : OptimizedPartitions .make (
1600- ~partitions = newEntities -> Js .Dict .values ,
1601- ~maxAddrInPartition = fetchState .optimizedPartitions .maxAddrInPartition ,
1602- ~nextPartitionIndex = nextPartitionIndexRef .contents ,
1603- ~dynamicContracts = fetchState .optimizedPartitions .dynamicContracts ,
1604- ),
1605- }
1522+ fetchState -> rollback (~targetBlockNumber = fetchState .latestFullyFetchedBlock .blockNumber )
16061523}
16071524
16081525/**
0 commit comments