Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions app/api/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,20 +69,25 @@ def get_status():
def get_transaction(txid):
tron_client = ConnectionManager.client()
tx = tron_client.get_transaction(txid)
info = parse_tx(tx)
tx_info = tron_client.get_transaction_info(txid)
try:
latest_block_number = tron_client.get_latest_block_number()
tx_block_number = tron_client.get_transaction_info(txid)["blockNumber"]
tx_block_number = tx_info["blockNumber"]
confirmations = latest_block_number - tx_block_number or 1
except tronpy.exceptions.TransactionNotFound:
logger.warning(f"Can't get confirmations for {txid}")
confirmations = 1
return {
"address": info.dst_addr,
"amount": info.amount,
"confirmations": confirmations,
"category": "receive",
}
tron_tx_list = parse_tx(tx, tx_info)
return [
{
"address": info.dst_addr,
"amount": info.amount,
"confirmations": confirmations,
"category": "receive",
}
for info in tron_tx_list
if info.dst_addr in BlockScanner.get_watched_accounts()
]


@api.post("/dump")
Expand Down
258 changes: 153 additions & 105 deletions app/block_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import functools
import time
from concurrent.futures import ThreadPoolExecutor
from typing import List

import requests

Expand Down Expand Up @@ -148,6 +149,19 @@ def download_block(self, n):
logger.debug(f"Block {n} download took {time.time() - start_time} seconds")
return block

@functools.lru_cache(maxsize=config.BLOCK_SCANNER_MAX_BLOCK_CHUNK_SIZE)
def download_tx_info_by_block_num(self, n):
start_time = time.time()
transaction_results = ConnectionManager.client().provider.make_request(
"wallet/gettransactioninfobyblocknum", {"num": n, "visible": True}
)
logger.debug(
f"Tx info for block {n} download took {time.time() - start_time} seconds"
)
return {
result["id"]: result for result in transaction_results if "log" in result
}

def notify_shkeeper(self, symbol, txid):
url = f"http://{config.SHKEEPER_HOST}/api/v1/walletnotify/{symbol}/{txid}"
headers = {"X-Shkeeper-Backend-Key": config.SHKEEPER_BACKEND_KEY}
Expand All @@ -168,14 +182,18 @@ def scan(self, block_num: int) -> bool:
if "transactions" not in block:
logger.debug(f"Block {block_num}: No transactions")
return True

block_tx_info = self.download_tx_info_by_block_num(block_num)

start = time.time()
valid_addresses = self.get_watched_accounts()

txs = block["transactions"]
for tx in txs:
try:
tron_tx = parse_tx(tx)
logger.debug(f"Block {block_num}: Found {tron_tx=}")
tx_info = block_tx_info.get(tx["txID"], {})
tron_tx_list = parse_tx(tx, tx_info)
logger.debug(f"Block {block_num}: Found {tron_tx_list=}")

except (
UnknownTransactionType,
Expand All @@ -195,82 +213,84 @@ def scan(self, block_num: int) -> bool:
)
raise e

if config.EXTERNAL_DRAIN_CONFIG:
#
# Customized workflow (AML)
#
if tron_tx.dst_addr not in valid_addresses:
continue
if tron_tx.status != "SUCCESS":
logger.warning(
f"Skipping notification for bad status TX {tron_tx=}"
)
continue
logger.info(f"Sending notification for TX {tron_tx=}")
self.notify_shkeeper(tron_tx.symbol.value, tron_tx.txid)
if (
self.main_account not in (tron_tx.src_addr, tron_tx.dst_addr)
and tron_tx.dst_addr in valid_addresses
and tron_tx.src_addr not in valid_addresses
): # to one-time from foreign
add_transaction_to_db(
tron_tx.txid,
tron_tx.dst_addr,
tron_tx.amount,
tron_tx.symbol,
)
run_payout_for_tx.apply_async(
args=[
tron_tx.symbol,
tron_tx.dst_addr,
for tron_tx in tron_tx_list:
if config.EXTERNAL_DRAIN_CONFIG:
#
# Customized workflow (AML)
#
if tron_tx.dst_addr not in valid_addresses:
continue
if tron_tx.status != "SUCCESS":
logger.warning(
f"Skipping notification for bad status TX {tron_tx=}"
)
continue
logger.info(f"Sending notification for TX {tron_tx=}")
self.notify_shkeeper(tron_tx.symbol.value, tron_tx.txid)
if (
self.main_account
not in (tron_tx.src_addr, tron_tx.dst_addr)
and tron_tx.dst_addr in valid_addresses
and tron_tx.src_addr not in valid_addresses
): # to one-time from foreign
add_transaction_to_db(
tron_tx.txid,
],
# wait for 5min for data to be updated in AMLBot
countdown=config.AML_WAIT_BEFORE_API_CALL,
)

elif (
tron_tx.dst_addr in valid_addresses
and tron_tx.src_addr == self.main_account
): # to one-time from fee-deposit
add_transaction_to_db(
tron_tx.txid,
tron_tx.dst_addr,
tron_tx.amount,
tron_tx.symbol,
"from_fee",
)
else:
raise Exception("")
else:
#
# Default workflow
#
if (
tron_tx.symbol == "TRX"
and tron_tx.src_addr == self.main_account
and tron_tx.dst_addr in valid_addresses
):
logger.info(
f"Ignoring TRX transaction from main to onetime acc: {tron_tx}"
)
continue
tron_tx.dst_addr,
tron_tx.amount,
tron_tx.symbol,
)
run_payout_for_tx.apply_async(
args=[
tron_tx.symbol,
tron_tx.dst_addr,
tron_tx.txid,
],
# wait for 5min for data to be updated in AMLBot
countdown=config.AML_WAIT_BEFORE_API_CALL,
)

if tron_tx.dst_addr in valid_addresses:
if tron_tx.status == "SUCCESS":
logger.info(f"Sending notification for {tron_tx}")
self.notify_shkeeper(tron_tx.symbol.value, tron_tx.txid)
# Send funds to main account
if tron_tx.is_trc20:
transfer_trc20_from.delay(
tron_tx.dst_addr, tron_tx.symbol
)
else:
transfer_trx_from.delay(tron_tx.dst_addr)
elif (
tron_tx.dst_addr in valid_addresses
and tron_tx.src_addr == self.main_account
): # to one-time from fee-deposit
add_transaction_to_db(
tron_tx.txid,
tron_tx.dst_addr,
tron_tx.amount,
tron_tx.symbol,
"from_fee",
)
else:
logger.warning(
f"Not sending notification for tx with status {tron_tx.status}: {tron_tx}"
raise Exception("")
else:
#
# Default workflow
#
if (
tron_tx.symbol == "TRX"
and tron_tx.src_addr == self.main_account
and tron_tx.dst_addr in valid_addresses
):
logger.info(
f"Ignoring TRX transaction from main to onetime acc: {tron_tx}"
)
continue

if tron_tx.dst_addr in valid_addresses:
if tron_tx.status == "SUCCESS":
logger.info(f"Sending notification for {tron_tx}")
self.notify_shkeeper(tron_tx.symbol.value, tron_tx.txid)
# Send funds to main account
if tron_tx.is_trc20:
transfer_trc20_from.delay(
tron_tx.dst_addr, tron_tx.symbol
)
else:
transfer_trx_from.delay(tron_tx.dst_addr)
else:
logger.warning(
f"Not sending notification for tx with status {tron_tx.status}: {tron_tx}"
)
logger.debug(
f"block {block_num} info extraction time: {time.time() - start}"
)
Expand All @@ -281,7 +301,8 @@ def scan(self, block_num: int) -> bool:
return True


def parse_tx(tx: dict) -> TronTransaction:
def parse_tx(tx: dict, transaction_info) -> List[TronTransaction]:
transactions = []
is_trc20 = False
txid = tx["txID"]
tx_type = tx["raw_data"]["contract"][0]["type"]
Expand All @@ -298,44 +319,71 @@ def parse_tx(tx: dict) -> TronTransaction:
tx["raw_data"]["contract"][0]["parameter"]["value"]["amount"]
) / Decimal(1_000_000)

transactions.append(
TronTransaction(
status=status,
txid=txid,
symbol=symbol,
src_addr=from_addr,
dst_addr=to_addr,
amount=amount,
is_trc20=is_trc20,
)
)

elif tx_type == "TriggerSmartContract":
is_trc20 = True
cont_addr = tx["raw_data"]["contract"][0]["parameter"]["value"][
"contract_address"
]
try:
symbol = config.get_symbol(cont_addr)
except UnknownToken:
raise UnknownTransactionType(f"Unknown contract address {cont_addr}")

raw_data = tx["raw_data"]["contract"][0]["parameter"]["value"]["data"]

func_selector = raw_data[:8]
if func_selector != "a9059cbb": # erc20 transfer()
raise UnknownTransactionType(f"Unknown function selector: {func_selector}")

# Workaround for "Can't decode tx data: Padding bytes were not empty" errors
# https://github.com/ethereum/eth-abi/issues/162
raw_to_addr = bytes.fromhex("0" * 24 + raw_data[8 + 24 : 8 + 64])
raw_amount = bytes.fromhex(raw_data[8 + 64 :])
decoded_amount = trx_abi.decode_single("uint256", raw_amount)
if "log" not in transaction_info:
raise UnknownTransactionType(f"Transaction {txid} produced no logs")

for entry in transaction_info["log"]:
try:
log_entry_producer_address = entry["address"]
symbol = config.get_symbol(log_entry_producer_address)
except UnknownToken:
continue

# >>> from tronpy.contract import keccak256
# >>> bytes.hex(keccak256("Transfer(address,address,uint256)".encode()))
# 'ddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef'
transfer_event = (
"ddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"
)
event = entry["topics"][0]
if event != transfer_event:
continue

from_addr = tx["raw_data"]["contract"][0]["parameter"]["value"]["owner_address"]
to_addr = trx_abi.decode_single("address", raw_to_addr)
amount = Decimal(decoded_amount) / (10 ** config.get_decimal(symbol))
_, hex_from_addr, hex_to_addr = entry["topics"]

from_addr = trx_abi.decode_single(
"address",
bytes.fromhex(hex_from_addr),
)
to_addr = trx_abi.decode_single(
"address",
bytes.fromhex(hex_to_addr),
)
decoded_amount = trx_abi.decode_single(
"uint256", bytes.fromhex(entry["data"])
)
amount = Decimal(decoded_amount) / (10 ** config.get_decimal(symbol))

transactions.append(
TronTransaction(
status=status,
txid=txid,
symbol=symbol,
src_addr=from_addr,
dst_addr=to_addr,
amount=amount,
is_trc20=is_trc20,
)
)
else:
raise UnknownTransactionType(f"Unknown transaction type: {txid}: {tx_type}")

return TronTransaction(
status=status,
txid=txid,
symbol=symbol,
src_addr=from_addr,
dst_addr=to_addr,
amount=amount,
is_trc20=is_trc20,
)
return transactions


def block_scanner_stats(bs: BlockScanner):
Expand Down
2 changes: 1 addition & 1 deletion app/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def transfer_trc20_from(onetime_publ_key, symbol):
tx_token = tx_token.sign(onetime_priv_key)
tx_token_res = tx_token.broadcast().wait()
logger.info(
f"{token_balance / 10**precision} {symbol} sent to {onetime_publ_key} with {tx_token.txid}. Details: {tx_token_res}"
f"{token_balance / 10**precision} {symbol} sent to {main_publ_key} with {tx_token.txid}. Details: {tx_token_res}"
)

return {"tx_trx_res": tx_trx_res, "tx_token": tx_token_res}
Expand Down