Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/dipdup/config/starknet_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class StarknetNodeDatasourceConfig(DatasourceConfig):
http: HttpConfig | None = None
# FIXME: Is default value correct?
rollback_depth: int = 32
fetch_block_headers: bool = False

@property
def merge_subscriptions(self) -> bool:
Expand Down
4 changes: 4 additions & 0 deletions src/dipdup/datasources/starknet_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ def __init__(self, config: StarknetNodeDatasourceConfig, merge_subscriptions: bo
BLOCK_CACHE_SIZE
)

@property
def fetch_block_headers(self) -> bool:
return self._config.fetch_block_headers

@property
def starknetpy(self) -> 'StarknetpyClient':
from dipdup.datasources._starknetpy import StarknetpyClient
Expand Down
71 changes: 50 additions & 21 deletions src/dipdup/indexes/starknet_events/fetcher.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import random
from collections.abc import AsyncIterator
from typing import TYPE_CHECKING
from typing import Any

from dipdup.datasources.starknet_node import StarknetNodeDatasource
Expand All @@ -12,6 +13,9 @@
from dipdup.models.starknet import StarknetEventData
from dipdup.models.starknet_subsquid import EventRequest

if TYPE_CHECKING:
from starknet_py.net.client_models import EmittedEvent

_logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -53,10 +57,49 @@ class EventFetcherChannel(FetcherChannel[StarknetEventData, StarknetNodeDatasour

_offset: str | None

async def fetch_timestamps(self, for_event: 'EmittedEvent', using_datasource: StarknetNodeDatasource) -> tuple[int|None, int|None]:
if for_event.block_hash is None or for_event.transaction_hash is None:
_logger.info('Skipping event. No block_hash or transaction_hash found in %s', for_event)
return None, None

block = await using_datasource.get_block_with_tx_hashes(
block_hash=for_event.block_hash,
)

if block is None:
_logger.info('Skipping event. No block exists for block_hash. BlackHash=%s', for_event.block_hash)
return None, None

timestamp = block.timestamp
transaction_idx = block.transactions.index(for_event.transaction_hash)

# NOTE: This event is corrupt, possibly due to old age.
if transaction_idx < 0:
_logger.info('Skipping event. No transaction_hash exists in block. TxHash=%s', for_event.transaction_hash)
return None, None

return timestamp, transaction_idx

async def fetch(self) -> None:
address, key0s = next(iter(self._filter))

datasource = self._datasources[0]
datasources = self._datasources

if not datasources:
_logger.info('No datasource with events enabled.')
return

datasource = datasources[0]

timestamp_datasources = [
datasource
for datasource in datasources
if datasource.fetch_block_headers
]

timestamp_datasource = None
if len(timestamp_datasources):
timestamp_datasource = timestamp_datasources[0]

events_chunk = await datasource.get_events(
address=address,
Expand All @@ -67,26 +110,12 @@ async def fetch(self) -> None:
)

for event in events_chunk.events:
# NOTE:
if event.block_hash is None or event.transaction_hash is None:
_logger.info('Skipping event. No block_hash or transaction_hash found in %s', event)
continue

block = await datasource.get_block_with_tx_hashes(
block_hash=event.block_hash,
)

if block is None:
_logger.info('Skipping event. No block exists for block_hash. BlackHash=%s', event.block_hash)
continue

timestamp = block.timestamp
transaction_idx = block.transactions.index(event.transaction_hash)

# NOTE: This event is corrupt, possibly due to old age.
if transaction_idx < 0:
_logger.info('Skipping event. No transaction_hash exists in block. TxHash=%s', event.transaction_hash)
continue
timestamp, transaction_idx = None, None

if timestamp_datasource:
timestamp, transaction_idx = await self.fetch_timestamps(
for_event=event, using_datasource=timestamp_datasource
)

self._buffer[event.block_number].append( # type: ignore[index]
StarknetEventData.from_starknetpy(
Expand Down