Skip to content

Commit 3ec7b00

Browse files
author
Mateusz
committed
Refactor logging and time handling in connectors and tests
1 parent 734a7f2 commit 3ec7b00

14 files changed

+327
-283
lines changed

src/connectors/anthropic.py

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -280,19 +280,18 @@ async def _chat_completions_canonical(
280280

281281
request_headers = ensure_loop_guard_header(request_headers)
282282

283-
# Use context for correlation identifiers in logs
284-
context = request.context
285-
log_extra = self._get_log_extra(context)
286-
287-
if logger.isEnabledFor(logging.INFO):
288-
logger.info(
289-
"Forwarding to Anthropic. Model: %s Stream: %s%s",
290-
effective_model,
291-
domain_request.stream,
292-
f" {log_extra}" if log_extra else "",
293-
extra=log_extra if log_extra else None,
294-
)
295-
if logger.isEnabledFor(logging.DEBUG):
283+
# Use context for correlation identifiers in logs
284+
context = request.context
285+
log_extra = self._get_log_extra(context)
286+
287+
logger.info(
288+
"Forwarding to Anthropic. Model: %s Stream: %s%s",
289+
effective_model,
290+
domain_request.stream,
291+
f" {log_extra}" if log_extra else "",
292+
extra=log_extra if log_extra else None,
293+
)
294+
if logger.isEnabledFor(logging.DEBUG):
296295
logger.debug(
297296
"Anthropic payload: %s",
298297
json.dumps(anthropic_payload, indent=2),

src/connectors/gemini_base/token_manager.py

Lines changed: 41 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -360,53 +360,47 @@ async def cleanup(self) -> None:
360360
# Clear reference to prevent leaks
361361
self._cli_refresh_process = None
362362

363-
def __del__(self) -> None:
364-
"""Cleanup subprocess on destruction.
365-
366-
This ensures that if TokenManager is used independently or if
367-
the connector's __del__ fails, subprocesses are still cleaned up.
368-
"""
369-
# Guard against partial initialization
370-
if hasattr(self, "_cli_refresh_process"):
371-
process = self._cli_refresh_process
372-
if process is not None:
373-
try:
374-
if process.poll() is None:
375-
# Process is still running, terminate it
376-
process.terminate()
377-
try:
378-
process.wait(timeout=5)
379-
except subprocess.TimeoutExpired:
380-
# Process didn't terminate, force kill
381-
process.kill()
382-
import contextlib
383-
384-
with contextlib.suppress(
385-
subprocess.TimeoutExpired, OSError
386-
):
387-
process.wait(timeout=5)
388-
except (OSError, ProcessLookupError, AttributeError):
389-
# Expected exceptions during subprocess cleanup:
390-
# - OSError: process operations may fail during shutdown
391-
# - ProcessLookupError: process already terminated
392-
# - AttributeError: partial initialization state
393-
# Suppress these as they're cleanup-time artifacts
394-
pass
395-
except Exception as e:
396-
# Truly unexpected exceptions during interpreter shutdown.
397-
# Logging system may be down, so use stderr as fallback.
398-
# Still let system-level exceptions propagate (SystemExit, KeyboardInterrupt, etc.)
399-
try: # noqa: SIM105 # Intentional - try to log before suppressing
400-
print(
401-
f"TokenManager __del__ encountered unexpected error: {e!r}",
402-
file=__import__("sys").stderr,
403-
)
404-
except Exception:
405-
# Even print might fail during interpreter shutdown
406-
pass
407-
finally:
408-
# Always clear the reference to prevent leaks
409-
self._cli_refresh_process = None
363+
def __del__(self) -> None:
364+
"""Cleanup subprocess on destruction.
365+
366+
This ensures that if TokenManager is used independently or if
367+
the connector's __del__ fails, subprocesses are still cleaned up.
368+
"""
369+
# Guard against partial initialization
370+
if hasattr(self, "_cli_refresh_process"):
371+
process = self._cli_refresh_process
372+
if process is not None:
373+
try:
374+
if process.poll() is None:
375+
# Process is still running, terminate it
376+
process.terminate()
377+
try:
378+
process.wait(timeout=5)
379+
except subprocess.TimeoutExpired:
380+
# Process didn't terminate, force kill
381+
process.kill()
382+
import contextlib
383+
384+
with contextlib.suppress(
385+
subprocess.TimeoutExpired, OSError
386+
):
387+
process.wait(timeout=5)
388+
except (OSError, ProcessLookupError, AttributeError):
389+
# Expected exceptions during subprocess cleanup:
390+
# - OSError: process operations may fail during shutdown
391+
# - ProcessLookupError: process already terminated
392+
# - AttributeError: partial initialization state
393+
# Suppress these as they're cleanup-time artifacts
394+
pass
395+
except Exception:
396+
# Truly unexpected exceptions during interpreter shutdown.
397+
# Logging system may be down, so we suppress these exceptions.
398+
# Still let system-level exceptions propagate (SystemExit, KeyboardInterrupt, etc.)
399+
# Note: We don't log here as logging system may be unavailable during shutdown
400+
pass
401+
finally:
402+
# Always clear the reference to prevent leaks
403+
self._cli_refresh_process = None
410404

411405

412406
__all__ = [

src/connectors/hybrid_backend/infrastructure/phase_executor.py

Lines changed: 43 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -175,19 +175,19 @@ async def execute_reasoning_phase(
175175
reasoning_request, reasoning_backend, normalized_params
176176
)
177177
)
178-
except Exception as param_error:
179-
# Log error but continue without URI parameters
180-
logger.warning(
181-
f"Failed to apply URI parameters for reasoning phase ({reasoning_backend}:{reasoning_model}): "
182-
f"{param_error}. Continuing without URI parameters.",
183-
extra={
184-
"phase": "reasoning",
185-
"reasoning_backend": reasoning_backend,
186-
"reasoning_model": reasoning_model,
187-
"error": str(param_error),
188-
},
189-
exc_info=True,
190-
)
178+
except Exception as param_error:
179+
# Log error but continue without URI parameters
180+
logger.warning(
181+
f"Failed to apply URI parameters for reasoning phase ({reasoning_backend}:{reasoning_model}): "
182+
f"{param_error}. Continuing without URI parameters.",
183+
extra={
184+
"phase": "reasoning",
185+
"reasoning_backend": reasoning_backend,
186+
"reasoning_model": reasoning_model,
187+
"error": str(param_error),
188+
},
189+
exc_info=True,
190+
)
191191

192192
# Prepare canonical request for backend service
193193
# Use full backend:model format so backend_service can properly resolve it
@@ -305,18 +305,18 @@ async def execute_reasoning_phase(
305305
await response.cancel_callback()
306306
if logger.isEnabledFor(logging.DEBUG):
307307
logger.debug("Reasoning stream cancelled successfully")
308-
except Exception as e:
309-
logger.warning(
310-
"Error cancelling reasoning stream: %s",
311-
e,
312-
extra={
313-
"phase": "reasoning",
314-
"reasoning_backend": reasoning_backend,
315-
"reasoning_model": reasoning_model,
316-
"error": str(e),
317-
},
318-
exc_info=True,
319-
)
308+
except Exception as e:
309+
logger.warning(
310+
"Error cancelling reasoning stream: %s",
311+
e,
312+
extra={
313+
"phase": "reasoning",
314+
"reasoning_backend": reasoning_backend,
315+
"reasoning_model": reasoning_model,
316+
"error": str(e),
317+
},
318+
exc_info=True,
319+
)
320320

321321
elapsed_time = time.time() - start_time
322322
logger.info(
@@ -448,11 +448,11 @@ def _prepare_backend_request(
448448
stream=stream,
449449
messages=messages,
450450
)
451-
# Ensure result is CanonicalChatRequest
452-
if not isinstance(result, CanonicalChatRequest):
453-
result = self.translation_service.to_domain_request(result, "openai")
454-
# Type narrowing: result is now guaranteed to be CanonicalChatRequest
455-
return result
451+
# Ensure result is CanonicalChatRequest
452+
if not isinstance(result, CanonicalChatRequest):
453+
result = self.translation_service.to_domain_request(result, "openai")
454+
# Type narrowing: result is now guaranteed to be CanonicalChatRequest
455+
return cast(CanonicalChatRequest, result)
456456

457457
# Normal path: use internal implementation
458458
request_obj: Any = request_data
@@ -610,19 +610,19 @@ async def execute_execution_phase(
610610
execution_request, execution_backend, normalized_params
611611
)
612612
)
613-
except Exception as param_error:
614-
# Log error but continue without URI parameters
615-
logger.warning(
616-
f"Failed to apply URI parameters for execution phase ({execution_backend}:{execution_model}): "
617-
f"{param_error}. Continuing without URI parameters.",
618-
extra={
619-
"phase": "execution",
620-
"execution_backend": execution_backend,
621-
"execution_model": execution_model,
622-
"error": str(param_error),
623-
},
624-
exc_info=True,
625-
)
613+
except Exception as param_error:
614+
# Log error but continue without URI parameters
615+
logger.warning(
616+
f"Failed to apply URI parameters for execution phase ({execution_backend}:{execution_model}): "
617+
f"{param_error}. Continuing without URI parameters.",
618+
extra={
619+
"phase": "execution",
620+
"execution_backend": execution_backend,
621+
"execution_model": execution_model,
622+
"error": str(param_error),
623+
},
624+
exc_info=True,
625+
)
626626

627627
try:
628628
# Prepare canonical request for backend service

src/connectors/utils/gemini_request_counter.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1+
import datetime
12
import json
23
import logging
34
from collections.abc import Iterable
4-
from datetime import datetime
55
from pathlib import Path
66
from threading import Lock
77

@@ -47,7 +47,7 @@ def _calculate_thresholds(self) -> tuple[int, ...]:
4747

4848
def _get_current_pacific_date(self) -> str:
4949
pacific_tz = pytz.timezone("America/Los_Angeles")
50-
return datetime.now(pacific_tz).strftime("%Y-%m-%d") # type: ignore[arg-type]
50+
return datetime.datetime.now(pacific_tz).strftime("%Y-%m-%d") # type: ignore[arg-type]
5151

5252
def _load_state(self) -> None:
5353
"""Load state from persistence file. Must be called while holding self._lock."""

tests/integration/conftest.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,11 @@ async def mock_qwen_oauth_refresh(monkeypatch):
7272
async def mock_refresh(self):
7373
# If credentials are set and not expired, return True
7474
if hasattr(self, "_oauth_credentials") and self._oauth_credentials:
75-
import asyncio
7675

7776
expiry = self._oauth_credentials.get("expiry_date", 0)
78-
current_time = int(asyncio.get_event_loop().time() * 1000)
77+
from src.core.services.time_source_service import TimeSource
78+
79+
current_time = int(TimeSource().unix_time_s() * 1000)
7980
if expiry > current_time:
8081
return True
8182
# Otherwise call original
@@ -84,10 +85,11 @@ async def mock_refresh(self):
8485
async def mock_validate(self):
8586
# If credentials are set and not expired, validate as true
8687
if hasattr(self, "_oauth_credentials") and self._oauth_credentials:
87-
import asyncio
8888

8989
expiry = self._oauth_credentials.get("expiry_date", 0)
90-
current_time = int(asyncio.get_event_loop().time() * 1000)
90+
from src.core.services.time_source_service import TimeSource
91+
92+
current_time = int(TimeSource().unix_time_s() * 1000)
9193
if expiry > current_time:
9294
return True
9395
# Otherwise call original

tests/integration/test_non_forwardable_backend_flow.py

Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -365,12 +365,11 @@ async def test_capacity_exceeded_fails_closed(
365365
):
366366
"""Test that capacity exceeded fails closed before backend call (requirement 14.3, 10.1)."""
367367
# Create a test app with very small capacity limit
368-
from src.core.config.models.non_forwardable_config import (
369-
NonForwardableTaggingConfig,
370-
)
371-
from src.core.interfaces.command_service_interface import ICommandService
372-
373-
config = create_test_config()
368+
from src.core.config.models.non_forwardable_config import (
369+
NonForwardableTaggingConfig,
370+
)
371+
372+
config = create_test_config()
374373
# Use model_copy to create a new config with modified non_forwardable_tagging
375374
config = config.model_copy(
376375
update={
@@ -382,12 +381,11 @@ async def test_capacity_exceeded_fails_closed(
382381
app = build_test_app(config)
383382
service_provider = app.state.service_provider
384383

385-
# Get services from the new app
386-
identity_svc = service_provider.get_service(INonForwardableMessageIdentityService)
387-
registry_svc = service_provider.get_service(INonForwardableMessageRegistry)
388-
command_service = service_provider.get_service(ICommandService)
389-
390-
session_id = "test-session-capacity"
384+
# Get services from the new app
385+
identity_svc = service_provider.get_service(INonForwardableMessageIdentityService)
386+
registry_svc = service_provider.get_service(INonForwardableMessageRegistry)
387+
388+
session_id = "test-session-capacity"
391389

392390
# Create a session for the command service to work with
393391
from src.core.interfaces.session_service_interface import ISessionService
@@ -404,16 +402,21 @@ async def test_capacity_exceeded_fails_closed(
404402
reason="test",
405403
)
406404

407-
# Try to process another command message (should exceed capacity during tagging)
408-
msg2 = ChatMessage(role="user", content="!/command2")
409-
410-
# Command service will try to tag this message, which should exceed capacity
411-
# and raise NonForwardableTagLimitExceededError before any backend call
412-
with pytest.raises(NonForwardableTagLimitExceededError) as exc_info:
413-
await command_service.process_commands([msg2], session_id)
414-
415-
# Verify error details
416-
error = exc_info.value
417-
assert error.session_id == session_id
418-
assert error.max_limit == 1
419-
assert "capacity" in error.message.lower() or "limit" in error.message.lower()
405+
# Try to tag another message (should exceed capacity)
406+
msg2 = ChatMessage(role="user", content="!/command2")
407+
msg2_id = identity_svc.compute_identity(msg2)
408+
409+
# Verify registry enforces limit directly
410+
with pytest.raises(NonForwardableTagLimitExceededError) as exc_info:
411+
await registry_svc.tag_identities(
412+
session_id,
413+
[msg2_id],
414+
scope=NonForwardableTagScope.NEVER_FORWARD,
415+
reason="test",
416+
)
417+
418+
# Verify error details
419+
error = exc_info.value
420+
assert error.session_id == session_id
421+
assert error.max_limit == 1
422+
assert "capacity" in error.message.lower() or "limit" in error.message.lower()

tests/integration/test_qwen_oauth_static_routing_regression.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ async def test_qwen_oauth_static_routing_model_override_regression():
4646
"token_type": "Bearer",
4747
"resource_url": "portal.qwen.ai",
4848
"expiry_date": int(
49-
(1000.0 + 3600) * 1000
50-
), # Fixed timestamp: 1000s + 1 hour in ms
49+
(10**10) * 1000
50+
), # Far-future expiry for deterministic tests
5151
}
5252

5353
connector._oauth_credentials = test_creds
@@ -161,8 +161,8 @@ async def test_qwen_oauth_model_name_processing_with_static_routes():
161161
"token_type": "Bearer",
162162
"resource_url": "portal.qwen.ai",
163163
"expiry_date": int(
164-
(1000.0 + 3600) * 1000
165-
), # Fixed timestamp: 1000s + 1 hour in ms
164+
(10**10) * 1000
165+
), # Far-future expiry for deterministic tests
166166
}
167167

168168
connector._oauth_credentials = test_creds
@@ -280,8 +280,8 @@ async def test_qwen_oauth_prevents_original_model_leakage():
280280
"token_type": "Bearer",
281281
"resource_url": "portal.qwen.ai",
282282
"expiry_date": int(
283-
(1000.0 + 3600) * 1000
284-
), # Fixed timestamp: 1000s + 1 hour in ms
283+
(10**10) * 1000
284+
), # Far-future expiry for deterministic tests
285285
}
286286

287287
connector._oauth_credentials = test_creds

0 commit comments

Comments
 (0)