Skip to content

Commit ff3e6e1

Browse files
DZakhclaude
andauthored
Refactor getKnownBlockWithBackoff from recursion to loop (#934)
* Fix OOM crash in getKnownBlockWithBackoff by using iterative loop (#926) The recursive async function was causing stack frame accumulation when the RPC endpoint was unresponsive, leading to OOM crashes. Converting to an iterative while loop prevents stack buildup. https://claude.ai/code/session_01UkjKaX6cKJA568ixy6Kfi4 * Add Prometheus metric for RPC requests in getKnownBlockWithBackoff Move the SourceRequestCount increment into getKnownBlockWithBackoff so each RPC call attempt (including retries) is properly tracked. https://claude.ai/code/session_01UkjKaX6cKJA568ixy6Kfi4 --------- Co-authored-by: Claude <noreply@anthropic.com>
1 parent d8b3656 commit ff3e6e1

File tree

1 file changed

+38
-38
lines changed

1 file changed

+38
-38
lines changed

codegenerator/cli/npm/envio/src/sources/RpcSource.res

Lines changed: 38 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -16,48 +16,50 @@ let getKnownBlock = (provider, blockNumber) =>
1616
}
1717
)
1818

19-
let rec getKnownBlockWithBackoff = async (
19+
let getKnownBlockWithBackoff = async (
2020
~provider,
2121
~sourceName,
2222
~chain,
2323
~blockNumber,
2424
~backoffMsOnFailure,
2525
~lowercaseAddresses: bool,
26-
) =>
27-
switch await getKnownBlock(provider, blockNumber) {
28-
| exception err =>
29-
Logging.warn({
30-
"err": err->Utils.prettifyExn,
31-
"msg": `Issue while running fetching batch of events from the RPC. Will wait ${backoffMsOnFailure->Belt.Int.toString}ms and try again.`,
32-
"source": sourceName,
33-
"chainId": chain->ChainMap.Chain.toChainId,
34-
"type": "EXPONENTIAL_BACKOFF",
35-
})
36-
await Time.resolvePromiseAfterDelay(~delayMilliseconds=backoffMsOnFailure)
37-
await getKnownBlockWithBackoff(
38-
~provider,
39-
~sourceName,
40-
~chain,
41-
~blockNumber,
42-
~backoffMsOnFailure=backoffMsOnFailure * 2,
43-
~lowercaseAddresses,
44-
)
45-
| result =>
46-
// NOTE: this is wasteful if these fields are not selected in the users config.
47-
// There might be a better way to do this based on the block schema.
48-
// However this is not extremely expensive and good enough for now (only on rpc sync also).
49-
// Mutation would be cheaper,
50-
// BUT "result" is an Ethers.js Block object,
51-
// which has the fields as readonly.
52-
{
53-
...result,
54-
miner: if lowercaseAddresses {
55-
result.miner->Address.Evm.fromAddressLowercaseOrThrow
56-
} else {
57-
result.miner->Address.Evm.fromAddressOrThrow
58-
},
26+
) => {
27+
let currentBackoff = ref(backoffMsOnFailure)
28+
let result = ref(None)
29+
30+
while result.contents->Option.isNone {
31+
Prometheus.SourceRequestCount.increment(~sourceName, ~chainId=chain->ChainMap.Chain.toChainId)
32+
switch await getKnownBlock(provider, blockNumber) {
33+
| exception err =>
34+
Logging.warn({
35+
"err": err->Utils.prettifyExn,
36+
"msg": `Issue while running fetching batch of events from the RPC. Will wait ${currentBackoff.contents->Belt.Int.toString}ms and try again.`,
37+
"source": sourceName,
38+
"chainId": chain->ChainMap.Chain.toChainId,
39+
"type": "EXPONENTIAL_BACKOFF",
40+
})
41+
await Time.resolvePromiseAfterDelay(~delayMilliseconds=currentBackoff.contents)
42+
currentBackoff := currentBackoff.contents * 2
43+
| block =>
44+
// NOTE: this is wasteful if these fields are not selected in the users config.
45+
// There might be a better way to do this based on the block schema.
46+
// However this is not extremely expensive and good enough for now (only on rpc sync also).
47+
// Mutation would be cheaper,
48+
// BUT "block" is an Ethers.js Block object,
49+
// which has the fields as readonly.
50+
result :=
51+
Some({
52+
...block,
53+
miner: if lowercaseAddresses {
54+
block.miner->Address.Evm.fromAddressLowercaseOrThrow
55+
} else {
56+
block.miner->Address.Evm.fromAddressOrThrow
57+
},
58+
})
5959
}
6060
}
61+
result.contents->Option.getExn
62+
}
6163
let getSuggestedBlockIntervalFromExn = {
6264
// Unknown provider: "retry with the range 123-456"
6365
let suggestedRangeRegExp = %re(`/retry with the range (\d+)-(\d+)/`)
@@ -504,17 +506,15 @@ let make = (
504506

505507
let makeBlockLoader = () =>
506508
LazyLoader.make(
507-
~loaderFn=blockNumber => {
508-
Prometheus.SourceRequestCount.increment(~sourceName=name, ~chainId=chain->ChainMap.Chain.toChainId)
509+
~loaderFn=blockNumber =>
509510
getKnownBlockWithBackoff(
510511
~provider,
511512
~sourceName=name,
512513
~chain,
513514
~backoffMsOnFailure=1000,
514515
~blockNumber,
515516
~lowercaseAddresses,
516-
)
517-
},
517+
),
518518
~onError=(am, ~exn) => {
519519
Logging.error({
520520
"err": exn->Utils.prettifyExn,

0 commit comments

Comments
 (0)