Skip to content

Commit 940bf26

Browse files
authored
chore: sync dynamic config (#798)
Impacts: validator, facilitator
1 parent b85613e commit 940bf26

File tree

10 files changed

+68
-111
lines changed

10 files changed

+68
-111
lines changed

facilitator/app/src/project/core/utils.py

Lines changed: 0 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -1,83 +1,5 @@
1-
import asyncio
2-
from concurrent.futures import ThreadPoolExecutor
3-
41
import wandb
5-
from asgiref.sync import AsyncToSync, SyncToAsync
6-
from constance import config
72
from django.conf import settings
8-
from structlog import get_logger
9-
10-
log = get_logger(__name__)
11-
12-
13-
def get_thread_sensitive_executor(loop):
14-
"""Returns executor to run sync code in as if sync code were wrapped with sync_to_async
15-
16-
This is exactly copied implementation from SyncToAsync but instead of running code in thread
17-
and awaiting in the loop we return executor in which SyncToAsync would run sync code
18-
and we allow to pass function directly allowing for blocking waiting for future result.
19-
20-
TODO: handling deadlock is not implemented so it can be unsafe in some conditions
21-
"""
22-
current_thread_executor = getattr(AsyncToSync.executors, "current", None)
23-
if current_thread_executor:
24-
# If we have a parent sync thread above somewhere, use that
25-
executor = current_thread_executor
26-
elif SyncToAsync.thread_sensitive_context.get(None):
27-
# If we have a way of retrieving the current context, attempt
28-
# to use a per-context thread pool executor
29-
thread_sensitive_context = SyncToAsync.thread_sensitive_context.get()
30-
31-
if thread_sensitive_context in SyncToAsync.context_to_thread_executor:
32-
# Re-use thread executor in current context
33-
executor = SyncToAsync.context_to_thread_executor[thread_sensitive_context]
34-
else:
35-
# Create new thread executor in current context
36-
executor = ThreadPoolExecutor(max_workers=1)
37-
elif loop in AsyncToSync.loop_thread_executors:
38-
# Re-use thread executor for running loop
39-
executor = AsyncToSync.loop_thread_executors[loop]
40-
elif SyncToAsync.deadlock_context.get(False):
41-
raise RuntimeError("Single thread executor already being used, would deadlock")
42-
else:
43-
# Otherwise, we run it in a fixed single thread
44-
executor = SyncToAsync.single_thread_executor
45-
# TODO: we don't set deadlock here - maybe we should clean it all up
46-
# SyncToAsync.deadlock_context.set(True)
47-
return executor
48-
49-
50-
def safe_sync(fun, *args, **kwargs):
51-
"""Runs sensitive sync function in sync or async context
52-
53-
When running in sync context it is the same as if it was called directly.
54-
In async context function is run as if it would be run called using sync_to_async
55-
but it blocks async context when waiting for future result. This allows writing
56-
sync sensitive functions that can be run in async context without need to write
57-
async version of a function that allows await, eg. istead of writing two versions
58-
one using obj.save() and second await obj.asave() we can write one function
59-
with safe_sync(obj.save)
60-
"""
61-
try:
62-
# Try to get the event loop. If we're in an async context, this will succeed.
63-
loop = asyncio.get_running_loop()
64-
except RuntimeError:
65-
loop = None
66-
if loop is not None:
67-
executor = get_thread_sensitive_executor(loop)
68-
future = executor.submit(fun, *args, **kwargs)
69-
return future.result()
70-
else:
71-
# We're in a synchronous context without an event loop
72-
return fun(*args, **kwargs)
73-
74-
75-
class SafeConfig:
76-
def __getattr__(self, name):
77-
return safe_sync(getattr, config, name)
78-
79-
80-
safe_config = SafeConfig()
813

824

835
def fetch_compute_subnet_hardware() -> dict:

miner/app/src/compute_horde_miner/miner/dynamic_config.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,9 @@
44
from constance import config
55

66

7+
def get_config(key: str) -> Any:
8+
return getattr(config, key)
9+
10+
711
async def aget_config(key: str) -> Any:
812
return await sync_to_async(lambda: getattr(config, key))()

validator/app/src/compute_horde_validator/validator/allowance/utils/blocks.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@
55
import turbobt
66
from celery.utils.log import get_task_logger
77
from compute_horde_core.executor_class import ExecutorClass
8-
from constance import config
98
from django.db import IntegrityError, transaction
109
from django.db.models import Case, Exists, Max, Min, OuterRef, Q, Sum, When
1110

11+
from compute_horde_validator.validator.dynamic_config import get_config
1212
from compute_horde_validator.validator.locks import Lock, LockType
1313

1414
from ...models import SystemEvent
@@ -142,7 +142,7 @@ def process_block_allowance(
142142
creation_timestamp=supertensor_.get_block_timestamp(block_number),
143143
)
144144

145-
dynamic_multiplier = cast(float, config.DYNAMIC_BLOCK_ALLOWANCE_MULTIPLIER)
145+
dynamic_multiplier = cast(float, get_config("DYNAMIC_BLOCK_ALLOWANCE_MULTIPLIER"))
146146
neurons = supertensor_.list_neurons(block_number)
147147
save_neurons(neurons, block_number)
148148

validator/app/src/compute_horde_validator/validator/allowance/utils/spending.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
from typing import NamedTuple, cast
66

77
from compute_horde_core.executor_class import ExecutorClass
8-
from constance import config
8+
9+
from compute_horde_validator.validator.dynamic_config import get_config
910

1011
from ...models import Block, BlockAllowance
1112
from .. import settings
@@ -44,10 +45,10 @@ class SpendingBookkeeperBase(ABC):
4445
def __init__(self) -> None:
4546
self._block_expiry = settings.BLOCK_EXPIRY
4647
self._validation_leeway_lower = cast(
47-
int, config.DYNAMIC_SPENDING_VALIDATION_BLOCK_LEEWAY_LOWER
48+
int, get_config("DYNAMIC_SPENDING_VALIDATION_BLOCK_LEEWAY_LOWER")
4849
)
4950
self._validation_leeway_upper = cast(
50-
int, config.DYNAMIC_SPENDING_VALIDATION_BLOCK_LEEWAY_UPPER
51+
int, get_config("DYNAMIC_SPENDING_VALIDATION_BLOCK_LEEWAY_UPPER")
5152
)
5253

5354
def spend(

validator/app/src/compute_horde_validator/validator/collateral/default.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
import turbobt
1212
from asgiref.sync import async_to_sync
1313
from compute_horde.smart_contracts.utils import get_web3_connection
14-
from constance import config
1514
from django.conf import settings
1615
from eth_account import Account
1716
from eth_account.signers.local import LocalAccount
@@ -21,6 +20,7 @@
2120
from web3.exceptions import Web3RPCError
2221
from web3.types import Wei
2322

23+
from compute_horde_validator.validator.dynamic_config import get_config
2424
from compute_horde_validator.validator.models import Miner
2525

2626
from .base import CollateralBase
@@ -109,7 +109,7 @@ def slash_collateral(
109109

110110
w3 = get_web3_connection(network=settings.BITTENSOR_NETWORK)
111111

112-
amount_wei = config.DYNAMIC_COLLATERAL_SLASH_AMOUNT_WEI
112+
amount_wei = get_config("DYNAMIC_COLLATERAL_SLASH_AMOUNT_WEI")
113113
if amount_wei <= 0:
114114
raise SlashCollateralError("Slash amount must be greater than 0")
115115

validator/app/src/compute_horde_validator/validator/collateral/tests/helpers/env.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
from compute_horde_validator.validator.collateral import default as collateral_default
1010
from compute_horde_validator.validator.collateral.default import Collateral
11+
from compute_horde_validator.validator.tests.helpers import patch_constance
1112

1213
from .stubs import (
1314
HttpStub,
@@ -46,6 +47,7 @@ def __init__(
4647
self.fetch_log: list[dict[str, Any]] = []
4748
self.system_events = SystemEventRecorder()
4849
self.contract_log: list[tuple[str, Any]] = []
50+
self._constance_overlay: dict[str, Any] | None = None
4951

5052
def __enter__(self) -> CollateralTestEnvironment:
5153
self._stack = ExitStack()
@@ -68,7 +70,12 @@ def __enter__(self) -> CollateralTestEnvironment:
6870
)
6971
self._config = Mock()
7072
self._config.DYNAMIC_COLLATERAL_SLASH_AMOUNT_WEI = self.slash_amount
71-
self._stack.enter_context(patch.object(collateral_default, "config", self._config))
73+
self._constance_overlay = {
74+
"DYNAMIC_COLLATERAL_SLASH_AMOUNT_WEI": self.slash_amount,
75+
}
76+
self._stack.enter_context(
77+
patch_constance(self._constance_overlay),
78+
)
7279
self._stack.enter_context(
7380
patch(
7481
"compute_horde_validator.validator.collateral.default.settings.BITTENSOR_WALLET",
@@ -139,6 +146,8 @@ def set_receipt(self, receipt: dict[str, Any]) -> None:
139146
def set_slash_amount(self, amount: int) -> None:
140147
self.slash_amount = amount
141148
self._config.DYNAMIC_COLLATERAL_SLASH_AMOUNT_WEI = amount
149+
if self._constance_overlay is not None:
150+
self._constance_overlay["DYNAMIC_COLLATERAL_SLASH_AMOUNT_WEI"] = amount
142151

143152
def _record_transaction(
144153
self,

validator/app/src/compute_horde_validator/validator/dynamic_config.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@
77
from constance import config
88

99

10+
def get_config(key: str) -> Any:
11+
return getattr(config, key)
12+
13+
1014
async def aget_config(key: str) -> Any:
1115
return await sync_to_async(lambda: getattr(config, key))()
1216

@@ -30,7 +34,7 @@ def executor_class_value_map_parser(
3034

3135

3236
def get_miner_max_executors_per_class_sync() -> dict[ExecutorClass, int]:
33-
miner_max_executors_per_class: str = config.DYNAMIC_MINER_MAX_EXECUTORS_PER_CLASS
37+
miner_max_executors_per_class: str = get_config("DYNAMIC_MINER_MAX_EXECUTORS_PER_CLASS")
3438
result = {
3539
executor_class: count
3640
for executor_class, count in executor_class_value_map_parser(
@@ -43,5 +47,5 @@ def get_miner_max_executors_per_class_sync() -> dict[ExecutorClass, int]:
4347

4448
def get_executor_class_weights() -> dict[ExecutorClass, float]:
4549
return executor_class_value_map_parser(
46-
config.DYNAMIC_EXECUTOR_CLASS_WEIGHTS, value_parser=float
50+
get_config("DYNAMIC_EXECUTOR_CLASS_WEIGHTS"), value_parser=float
4751
)

validator/app/src/compute_horde_validator/validator/routing/default.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
V2JobRequest,
99
)
1010
from compute_horde_core.executor_class import ExecutorClass
11-
from constance import config
1211
from django.conf import settings
1312
from django.db.models import Count
1413
from django.utils import timezone
@@ -22,6 +21,7 @@
2221
Miner as AllowanceMiner,
2322
)
2423
from compute_horde_validator.validator.collateral.default import collateral
24+
from compute_horde_validator.validator.dynamic_config import get_config
2525
from compute_horde_validator.validator.models import Miner, MinerIncident, SystemEvent
2626
from compute_horde_validator.validator.receipts.default import receipts
2727
from compute_horde_validator.validator.routing.base import RoutingBase
@@ -138,7 +138,7 @@ def _pick_miner_for_job_v2(request: V2JobRequest) -> JobRoute:
138138
raise
139139

140140
# Collateral filtering: build a set of miners with sufficient collateral if threshold > 0
141-
collateral_threshold = config.DYNAMIC_MINIMUM_COLLATERAL_AMOUNT_WEI
141+
collateral_threshold = get_config("DYNAMIC_MINIMUM_COLLATERAL_AMOUNT_WEI")
142142
if collateral_threshold > 0:
143143
try:
144144
eligible_collateral_hotkeys = {
@@ -169,7 +169,9 @@ def _pick_miner_for_job_v2(request: V2JobRequest) -> JobRoute:
169169
manifests = allowance().get_manifests()
170170

171171
reliability_score_per_hotkey = _get_miners_reliability_score(
172-
reliability_window=timedelta(hours=float(config.DYNAMIC_ROUTING_RELIABILITY_WINDOW_HOURS)),
172+
reliability_window=timedelta(
173+
hours=float(get_config("DYNAMIC_ROUTING_RELIABILITY_WINDOW_HOURS"))
174+
),
173175
executor_class=executor_class,
174176
manifests=manifests,
175177
)
@@ -185,8 +187,8 @@ def _pick_miner_for_job_v2(request: V2JobRequest) -> JobRoute:
185187
items=suitable_hotkeys,
186188
weights=hotkey_weights,
187189
# With steepness >5 there is a strong cutoff at ~center*2, hence center~=cutoff/2
188-
center=float(config.DYNAMIC_ROUTING_RELIABILITY_SOFT_CUTOFF) / 2,
189-
steepness=float(config.DYNAMIC_ROUTING_RELIABILITY_SEPARATION),
190+
center=float(get_config("DYNAMIC_ROUTING_RELIABILITY_SOFT_CUTOFF")) / 2,
191+
steepness=float(get_config("DYNAMIC_ROUTING_RELIABILITY_SEPARATION")),
190192
)
191193

192194
# Get receipts instance once to avoid bound method issues with async_to_sync

validator/app/src/compute_horde_validator/validator/scoring/calculations.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
import numpy as np
77
from compute_horde_core.executor_class import ExecutorClass
8-
from constance import config
98

109
from compute_horde_validator.validator.allowance.default import allowance
1110
from compute_horde_validator.validator.allowance.types import (
@@ -14,6 +13,7 @@
1413
SpendingDetails,
1514
)
1615
from compute_horde_validator.validator.allowance.utils.spending import Triplet
16+
from compute_horde_validator.validator.dynamic_config import get_config
1717
from compute_horde_validator.validator.models import Miner, OrganicJob
1818
from compute_horde_validator.validator.receipts import receipts
1919
from compute_horde_validator.validator.scoring.metrics import (
@@ -167,8 +167,8 @@ def horde_score(
167167
def score_organic_jobs(jobs: Sequence[OrganicJob]) -> dict[str, float]:
168168
"""Score organic jobs."""
169169
batch_scores: defaultdict[str, float] = defaultdict(float)
170-
score = config.DYNAMIC_ORGANIC_JOB_SCORE
171-
limit = config.DYNAMIC_SCORE_ORGANIC_JOBS_LIMIT
170+
score = get_config("DYNAMIC_ORGANIC_JOB_SCORE")
171+
limit = get_config("DYNAMIC_SCORE_ORGANIC_JOBS_LIMIT")
172172

173173
for job in jobs:
174174
batch_scores[job.miner.hotkey] += score
@@ -202,7 +202,7 @@ def calculate_organic_scores(organic_jobs: list[OrganicJob]) -> dict[str, dict[s
202202
executor_class_organic_jobs[job.executor_class].append(job)
203203

204204
organic_scores_by_executor: dict[str, dict[str, float]] = {}
205-
organic_job_score = config.DYNAMIC_ORGANIC_JOB_SCORE
205+
organic_job_score = get_config("DYNAMIC_ORGANIC_JOB_SCORE")
206206

207207
for executor_class, jobs in executor_class_organic_jobs.items():
208208
executor_class_scores: dict[str, float] = {}

validator/app/src/compute_horde_validator/validator/scoring/tasks.py

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
from compute_horde_validator.celery import app
2222
from compute_horde_validator.validator.clean_me_up import bittensor_client
23+
from compute_horde_validator.validator.dynamic_config import get_config
2324
from compute_horde_validator.validator.locks import Locked, LockType, get_advisory_lock
2425
from compute_horde_validator.validator.models import SystemEvent
2526
from compute_horde_validator.validator.models.scoring.internal import (
@@ -60,7 +61,7 @@ def set_scores(bittensor: turbobt.Bittensor):
6061
logger.warning("Not setting scores, SERVING is disabled in constance config")
6162
return
6263

63-
commit_reveal_weights_enabled = config.DYNAMIC_COMMIT_REVEAL_WEIGHTS_ENABLED
64+
commit_reveal_weights_enabled = get_config("DYNAMIC_COMMIT_REVEAL_WEIGHTS_ENABLED")
6465

6566
current_block = async_to_sync(bittensor.blocks.head)()
6667

@@ -101,11 +102,21 @@ def set_scores(bittensor: turbobt.Bittensor):
101102
SystemEvent.EventSubType.SUCCESS,
102103
"",
103104
{
104-
"DYNAMIC_COMMIT_REVEAL_WEIGHTS_INTERVAL": config.DYNAMIC_COMMIT_REVEAL_WEIGHTS_INTERVAL,
105-
"DYNAMIC_COMMIT_REVEAL_COMMIT_START_OFFSET": config.DYNAMIC_COMMIT_REVEAL_COMMIT_START_OFFSET,
106-
"DYNAMIC_COMMIT_REVEAL_COMMIT_END_BUFFER": config.DYNAMIC_COMMIT_REVEAL_COMMIT_END_BUFFER,
107-
"DYNAMIC_COMMIT_REVEAL_REVEAL_END_BUFFER": config.DYNAMIC_COMMIT_REVEAL_REVEAL_END_BUFFER,
108-
"DYNAMIC_COMMIT_REVEAL_WEIGHTS_ENABLED": config.DYNAMIC_COMMIT_REVEAL_WEIGHTS_ENABLED,
105+
"DYNAMIC_COMMIT_REVEAL_WEIGHTS_INTERVAL": get_config(
106+
"DYNAMIC_COMMIT_REVEAL_WEIGHTS_INTERVAL"
107+
),
108+
"DYNAMIC_COMMIT_REVEAL_COMMIT_START_OFFSET": get_config(
109+
"DYNAMIC_COMMIT_REVEAL_COMMIT_START_OFFSET"
110+
),
111+
"DYNAMIC_COMMIT_REVEAL_COMMIT_END_BUFFER": get_config(
112+
"DYNAMIC_COMMIT_REVEAL_COMMIT_END_BUFFER"
113+
),
114+
"DYNAMIC_COMMIT_REVEAL_REVEAL_END_BUFFER": get_config(
115+
"DYNAMIC_COMMIT_REVEAL_REVEAL_END_BUFFER"
116+
),
117+
"DYNAMIC_COMMIT_REVEAL_WEIGHTS_ENABLED": get_config(
118+
"DYNAMIC_COMMIT_REVEAL_WEIGHTS_ENABLED"
119+
),
109120
"current_block_number": current_block.number,
110121
},
111122
)
@@ -212,12 +223,16 @@ def __init__(
212223
reveal_end_buffer: int | None = None,
213224
):
214225
self.current_block = current_block
215-
self.length = length or config.DYNAMIC_COMMIT_REVEAL_WEIGHTS_INTERVAL
216-
self.commit_start_offset = (
217-
commit_start_offset or config.DYNAMIC_COMMIT_REVEAL_COMMIT_START_OFFSET
226+
self.length = length or get_config("DYNAMIC_COMMIT_REVEAL_WEIGHTS_INTERVAL")
227+
self.commit_start_offset = commit_start_offset or get_config(
228+
"DYNAMIC_COMMIT_REVEAL_COMMIT_START_OFFSET"
229+
)
230+
self.commit_end_buffer = commit_end_buffer or get_config(
231+
"DYNAMIC_COMMIT_REVEAL_COMMIT_END_BUFFER"
232+
)
233+
self.reveal_end_buffer = reveal_end_buffer or get_config(
234+
"DYNAMIC_COMMIT_REVEAL_REVEAL_END_BUFFER"
218235
)
219-
self.commit_end_buffer = commit_end_buffer or config.DYNAMIC_COMMIT_REVEAL_COMMIT_END_BUFFER
220-
self.reveal_end_buffer = reveal_end_buffer or config.DYNAMIC_COMMIT_REVEAL_REVEAL_END_BUFFER
221236

222237
@cached_property
223238
def start(self):
@@ -386,9 +401,9 @@ def apply_dancing_burners(
386401
min_allowed_weights: int,
387402
max_weight_limit: int,
388403
) -> tuple["torch.Tensor", "torch.FloatTensor"] | tuple[NDArray[np.int64], NDArray[np.float32]]:
389-
burner_hotkeys = config.DYNAMIC_BURN_TARGET_SS58ADDRESSES.split(",")
390-
burn_rate = config.DYNAMIC_BURN_RATE
391-
burn_partition = config.DYNAMIC_BURN_PARTITION
404+
burner_hotkeys = get_config("DYNAMIC_BURN_TARGET_SS58ADDRESSES").split(",")
405+
burn_rate = get_config("DYNAMIC_BURN_RATE")
406+
burn_partition = get_config("DYNAMIC_BURN_PARTITION")
392407

393408
hotkey_to_uid = {neuron.hotkey: neuron.uid for neuron in neurons}
394409
registered_burner_hotkeys = sorted(

0 commit comments

Comments
 (0)