Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions schemas/dipdup-3.0.json
Original file line number Diff line number Diff line change
Expand Up @@ -1615,6 +1615,12 @@
"additionalProperties": false,
"description": "Starknet node datasource config",
"properties": {
"fetch_block_headers": {
"default": false,
"description": "Flag signalling that this datasource can be used for block headers fetching",
"title": "fetch_block_headers",
"type": "boolean"
},
"http": {
"anyOf": [
{
Expand Down
1 change: 1 addition & 0 deletions src/dipdup/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,7 @@ def load(
raw: bool = False,
unsafe: bool = False,
) -> DipDupConfig:

config_json, config_environment = DipDupYAMLConfig.load(
paths=paths,
environment=environment,
Expand Down
2 changes: 2 additions & 0 deletions src/dipdup/config/starknet_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class StarknetNodeDatasourceConfig(DatasourceConfig):
:param ws_url: Starknet node WebSocket URL
:param http: HTTP client configuration
:param rollback_depth: A number of blocks to store in database for rollback
:param fetch_block_headers: Flag signalling that this datasource can be used for block headers fetching
"""

kind: Literal['starknet.node'] = 'starknet.node'
Expand All @@ -28,6 +29,7 @@ class StarknetNodeDatasourceConfig(DatasourceConfig):
http: HttpConfig | None = None
# FIXME: Is default value correct?
rollback_depth: int = 32
fetch_block_headers: bool = False

@property
def merge_subscriptions(self) -> bool:
Expand Down
17 changes: 13 additions & 4 deletions src/dipdup/datasources/starknet_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ def __init__(self, config: StarknetNodeDatasourceConfig, merge_subscriptions: bo
BLOCK_CACHE_SIZE
)

@property
def fetch_block_headers(self) -> bool:
return self._config.fetch_block_headers

@property
def starknetpy(self) -> 'StarknetpyClient':
from dipdup.datasources._starknetpy import StarknetpyClient
Expand Down Expand Up @@ -86,12 +90,17 @@ async def get_events(

async def get_block_with_tx_hashes(
self, block_hash: int
) -> Union['StarknetBlockWithTxHashes', 'PendingStarknetBlockWithTxHashes']:
) -> Union['StarknetBlockWithTxHashes', 'PendingStarknetBlockWithTxHashes', None]:
if block := self._block_cache.get(block_hash, None):
return block

block = await self.starknetpy.get_block_with_tx_hashes(block_hash=block_hash)
self._block_cache[block_hash] = block

try:
block = await self.starknetpy.get_block_with_tx_hashes(block_hash=block_hash)
self._block_cache[block_hash] = block
except Exception as e:
self._logger.error("Can't fetch block headers", exc_info=e)
block = None

return block

async def get_abi(self, address: str) -> AbiJson:
Expand Down
71 changes: 50 additions & 21 deletions src/dipdup/indexes/starknet_events/fetcher.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import random
from collections.abc import AsyncIterator
from typing import TYPE_CHECKING
from typing import Any

from dipdup.datasources.starknet_node import StarknetNodeDatasource
Expand All @@ -12,6 +13,9 @@
from dipdup.models.starknet import StarknetEventData
from dipdup.models.starknet_subsquid import EventRequest

if TYPE_CHECKING:
from starknet_py.net.client_models import EmittedEvent

_logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -53,10 +57,49 @@ class EventFetcherChannel(FetcherChannel[StarknetEventData, StarknetNodeDatasour

_offset: str | None

async def fetch_timestamps(self, for_event: 'EmittedEvent', using_datasource: StarknetNodeDatasource) -> tuple[int|None, int|None]:
if for_event.block_hash is None or for_event.transaction_hash is None:
_logger.info('Skipping event. No block_hash or transaction_hash found in %s', for_event)
return None, None

block = await using_datasource.get_block_with_tx_hashes(
block_hash=for_event.block_hash,
)

if block is None:
_logger.info('Skipping event. No block exists for block_hash. BlackHash=%s', for_event.block_hash)
return None, None

timestamp = block.timestamp
transaction_idx = block.transactions.index(for_event.transaction_hash)

# NOTE: This event is corrupt, possibly due to old age.
if transaction_idx < 0:
_logger.info('Skipping event. No transaction_hash exists in block. TxHash=%s', for_event.transaction_hash)
return None, None

return timestamp, transaction_idx

async def fetch(self) -> None:
address, key0s = next(iter(self._filter))

datasource = self._datasources[0]
datasources = self._datasources

if not datasources:
_logger.info('No datasource with events enabled.')
return

datasource = datasources[0]

timestamp_datasources = [
datasource
for datasource in datasources
if datasource.fetch_block_headers
]

timestamp_datasource = None
if len(timestamp_datasources):
timestamp_datasource = timestamp_datasources[0]

events_chunk = await datasource.get_events(
address=address,
Expand All @@ -67,26 +110,12 @@ async def fetch(self) -> None:
)

for event in events_chunk.events:
# NOTE:
if event.block_hash is None or event.transaction_hash is None:
_logger.info('Skipping event. No block_hash or transaction_hash found in %s', event)
continue

block = await datasource.get_block_with_tx_hashes(
block_hash=event.block_hash,
)

if block is None:
_logger.info('Skipping event. No block exists for block_hash. BlackHash=%s', event.block_hash)
continue

timestamp = block.timestamp
transaction_idx = block.transactions.index(event.transaction_hash)

# NOTE: This event is corrupt, possibly due to old age.
if transaction_idx < 0:
_logger.info('Skipping event. No transaction_hash exists in block. TxHash=%s', event.transaction_hash)
continue
timestamp, transaction_idx = None, None

if timestamp_datasource:
timestamp, transaction_idx = await self.fetch_timestamps(
for_event=event, using_datasource=timestamp_datasource
)

self._buffer[event.block_number].append( # type: ignore[index]
StarknetEventData.from_starknetpy(
Expand Down