|
1 | | -"""Captcha verification service for the SSO login flow.""" |
2 | | - |
3 | | -from __future__ import annotations |
4 | | - |
5 | | -import logging |
6 | | -from dataclasses import dataclass, field |
7 | | -from typing import Any |
8 | | - |
9 | | -import httpx |
10 | | - |
11 | | -from src.core.auth.sso.config import CaptchaConfig |
12 | | -from src.core.auth.sso.exceptions import AuthenticationError, ConfigurationError |
13 | | - |
14 | | -logger = logging.getLogger(__name__) |
15 | | - |
16 | | - |
17 | | -@dataclass |
18 | | -class CaptchaVerificationResult: |
19 | | - """Result of a captcha verification attempt.""" |
20 | | - |
21 | | - success: bool |
22 | | - error_codes: list[str] = field(default_factory=list) |
23 | | - action: str | None = None |
24 | | - ray_id: str | None = None |
25 | | - |
26 | | - |
27 | | -class CaptchaService: |
28 | | - """Service that validates captcha challenges with a hosted provider.""" |
29 | | - |
30 | | - def __init__(self, config: CaptchaConfig | None): |
31 | | - """ |
32 | | - Initialize captcha service. |
33 | | -
|
34 | | - Args: |
35 | | - config: Captcha configuration or None if disabled |
36 | | - """ |
37 | | - self.config = config |
38 | | - |
39 | | - @property |
40 | | - def is_enabled(self) -> bool: |
41 | | - """Return True when captcha verification is required.""" |
42 | | - return bool(self.config and self.config.enabled) |
43 | | - |
44 | | - async def verify( |
45 | | - self, captcha_token: str | None, remote_ip: str | None = None |
46 | | - ) -> CaptchaVerificationResult: |
47 | | - """ |
48 | | - Validate a captcha token against the configured provider. |
49 | | -
|
50 | | - Args: |
51 | | - captcha_token: Token returned by the captcha widget |
52 | | - remote_ip: Optional client IP for provider telemetry |
53 | | -
|
54 | | - Returns: |
55 | | - CaptchaVerificationResult describing verification outcome |
56 | | -
|
57 | | - Raises: |
58 | | - ConfigurationError: If required secrets are missing |
59 | | - AuthenticationError: If the provider cannot be reached or returns bad data |
60 | | - """ |
61 | | - if not self.is_enabled: |
62 | | - return CaptchaVerificationResult(success=True) |
63 | | - |
64 | | - if not self.config.site_key or not self.config.secret_key: |
65 | | - raise ConfigurationError( |
66 | | - "Captcha is enabled but site_key or secret_key is not configured", |
67 | | - details={"provider": self.config.provider}, |
68 | | - ) |
69 | | - |
70 | | - if not captcha_token: |
71 | | - return CaptchaVerificationResult( |
72 | | - success=False, error_codes=["missing-token"] |
73 | | - ) |
74 | | - |
75 | | - payload: dict[str, Any] = { |
76 | | - "secret": self.config.secret_key, |
77 | | - "response": captcha_token, |
78 | | - } |
79 | | - if remote_ip: |
80 | | - payload["remoteip"] = remote_ip |
81 | | - |
82 | | - try: |
83 | | - async with httpx.AsyncClient(timeout=self.config.timeout_seconds) as client: |
84 | | - response = await client.post(self.config.verify_url, data=payload) |
85 | | - response.raise_for_status() |
86 | | - except httpx.HTTPStatusError as exc: |
87 | | - logger.warning( |
88 | | - "Captcha provider returned HTTP error", |
89 | | - extra={ |
90 | | - "status": exc.response.status_code, |
91 | | - "provider": self.config.provider, |
92 | | - }, |
93 | | - ) |
94 | | - raise AuthenticationError( |
95 | | - "Captcha verification failed", |
96 | | - details={ |
97 | | - "reason": "provider_http_error", |
98 | | - "status_code": exc.response.status_code, |
99 | | - }, |
100 | | - ) from exc |
101 | | - except httpx.HTTPError as exc: |
102 | | - logger.error( |
103 | | - "Captcha verification failed due to network error", |
104 | | - exc_info=True, |
105 | | - extra={"provider": self.config.provider}, |
106 | | - ) |
107 | | - raise AuthenticationError( |
108 | | - "Captcha verification failed", |
109 | | - details={"reason": "provider_unreachable"}, |
110 | | - ) from exc |
111 | | - |
112 | | - try: |
113 | | - result_json = response.json() |
114 | | - except ValueError as exc: |
115 | | - raise AuthenticationError( |
116 | | - "Captcha verification failed", |
117 | | - details={"reason": "invalid_response"}, |
118 | | - ) from exc |
119 | | - |
120 | | - success = bool(result_json.get("success")) |
121 | | - error_codes = [str(code) for code in result_json.get("error-codes", []) if code] |
122 | | - return CaptchaVerificationResult( |
123 | | - success=success, |
124 | | - error_codes=error_codes, |
125 | | - action=result_json.get("action"), |
126 | | - ray_id=result_json.get("ray_id") |
127 | | - or result_json.get("cdata") |
128 | | - or result_json.get("rayid"), |
129 | | - ) |
| 1 | +"""Captcha verification service for the SSO login flow.""" |
| 2 | + |
| 3 | +from __future__ import annotations |
| 4 | + |
| 5 | +import logging |
| 6 | +from dataclasses import dataclass, field |
| 7 | +from typing import Any |
| 8 | + |
| 9 | +import httpx |
| 10 | + |
| 11 | +from src.core.auth.sso.config import CaptchaConfig |
| 12 | +from src.core.auth.sso.exceptions import AuthenticationError, ConfigurationError |
| 13 | + |
| 14 | +logger = logging.getLogger(__name__) |
| 15 | + |
| 16 | + |
| 17 | +@dataclass |
| 18 | +class CaptchaVerificationResult: |
| 19 | + """Result of a captcha verification attempt.""" |
| 20 | + |
| 21 | + success: bool |
| 22 | + error_codes: list[str] = field(default_factory=list) |
| 23 | + action: str | None = None |
| 24 | + ray_id: str | None = None |
| 25 | + |
| 26 | + |
| 27 | +class CaptchaService: |
| 28 | + """Service that validates captcha challenges with a hosted provider.""" |
| 29 | + |
| 30 | + def __init__(self, config: CaptchaConfig | None): |
| 31 | + """ |
| 32 | + Initialize captcha service. |
| 33 | +
|
| 34 | + Args: |
| 35 | + config: Captcha configuration or None if disabled |
| 36 | + """ |
| 37 | + self.config = config |
| 38 | + |
| 39 | + @property |
| 40 | + def is_enabled(self) -> bool: |
| 41 | + """Return True when captcha verification is required.""" |
| 42 | + return bool(self.config and self.config.enabled) |
| 43 | + |
| 44 | + async def verify( |
| 45 | + self, captcha_token: str | None, remote_ip: str | None = None |
| 46 | + ) -> CaptchaVerificationResult: |
| 47 | + """ |
| 48 | + Validate a captcha token against the configured provider. |
| 49 | +
|
| 50 | + Args: |
| 51 | + captcha_token: Token returned by the captcha widget |
| 52 | + remote_ip: Optional client IP for provider telemetry |
| 53 | +
|
| 54 | + Returns: |
| 55 | + CaptchaVerificationResult describing verification outcome |
| 56 | +
|
| 57 | + Raises: |
| 58 | + ConfigurationError: If required secrets are missing |
| 59 | + AuthenticationError: If the provider cannot be reached or returns bad data |
| 60 | + """ |
| 61 | + if not self.is_enabled: |
| 62 | + return CaptchaVerificationResult(success=True) |
| 63 | + |
| 64 | + # At this point, self.config must be non-None because is_enabled checks it |
| 65 | + assert self.config is not None, "Config must be set when captcha is enabled" |
| 66 | + config = self.config |
| 67 | + |
| 68 | + if not config.site_key or not config.secret_key: |
| 69 | + raise ConfigurationError( |
| 70 | + "Captcha is enabled but site_key or secret_key is not configured", |
| 71 | + details={"provider": config.provider}, |
| 72 | + ) |
| 73 | + |
| 74 | + if not captcha_token: |
| 75 | + return CaptchaVerificationResult( |
| 76 | + success=False, error_codes=["missing-token"] |
| 77 | + ) |
| 78 | + |
| 79 | + payload: dict[str, Any] = { |
| 80 | + "secret": config.secret_key, |
| 81 | + "response": captcha_token, |
| 82 | + } |
| 83 | + if remote_ip: |
| 84 | + payload["remoteip"] = remote_ip |
| 85 | + |
| 86 | + try: |
| 87 | + async with httpx.AsyncClient(timeout=config.timeout_seconds) as client: |
| 88 | + response = await client.post(config.verify_url, data=payload) |
| 89 | + response.raise_for_status() |
| 90 | + except httpx.HTTPStatusError as exc: |
| 91 | + logger.warning( |
| 92 | + "Captcha provider returned HTTP error", |
| 93 | + extra={ |
| 94 | + "status": exc.response.status_code, |
| 95 | + "provider": config.provider, |
| 96 | + }, |
| 97 | + ) |
| 98 | + raise AuthenticationError( |
| 99 | + "Captcha verification failed", |
| 100 | + details={ |
| 101 | + "reason": "provider_http_error", |
| 102 | + "status_code": exc.response.status_code, |
| 103 | + }, |
| 104 | + ) from exc |
| 105 | + except httpx.HTTPError as exc: |
| 106 | + logger.error( |
| 107 | + "Captcha verification failed due to network error", |
| 108 | + exc_info=True, |
| 109 | + extra={"provider": config.provider}, |
| 110 | + ) |
| 111 | + raise AuthenticationError( |
| 112 | + "Captcha verification failed", |
| 113 | + details={"reason": "provider_unreachable"}, |
| 114 | + ) from exc |
| 115 | + |
| 116 | + try: |
| 117 | + result_json = response.json() |
| 118 | + except ValueError as exc: |
| 119 | + raise AuthenticationError( |
| 120 | + "Captcha verification failed", |
| 121 | + details={"reason": "invalid_response"}, |
| 122 | + ) from exc |
| 123 | + |
| 124 | + success = bool(result_json.get("success")) |
| 125 | + error_codes = [str(code) for code in result_json.get("error-codes", []) if code] |
| 126 | + return CaptchaVerificationResult( |
| 127 | + success=success, |
| 128 | + error_codes=error_codes, |
| 129 | + action=result_json.get("action"), |
| 130 | + ray_id=result_json.get("ray_id") |
| 131 | + or result_json.get("cdata") |
| 132 | + or result_json.get("rayid"), |
| 133 | + ) |
0 commit comments