diff --git a/Makefile b/Makefile index b2950a6..4f52eaa 100644 --- a/Makefile +++ b/Makefile @@ -18,6 +18,21 @@ test: ## Test the code with pytest @echo "๐ Testing code: Running pytest" @uv run python -m pytest --cov --cov-config=pyproject.toml --cov-report=xml -vv -s +.PHONY: test-integration +test-integration: ## Run integration tests (real protocol servers, no Docker) + @echo "๐ Running integration tests" + @uv run --group integration python -m pytest -m integration -o "addopts=" -vv -s + +.PHONY: test-docker +test-docker: ## Run Docker integration tests (requires Docker) + @echo "๐ณ Running Docker integration tests" + @uv run --group docker python -m pytest -m docker -o "addopts=" -vv -s + +.PHONY: test-all +test-all: ## Run all tests (unit + integration + Docker) + @echo "๐ Running all tests" + @uv run --group integration --group docker python -m pytest -o "addopts=" --cov --cov-config=pyproject.toml --cov-report=xml -vv -s + .PHONY: build build: clean-build ## Build wheel file @echo "๐ Creating wheel file" @@ -25,8 +40,25 @@ build: clean-build ## Build wheel file .PHONY: clean-build clean-build: ## Clean build artifacts - @echo "๐ Removing build artifacts" - @uv run python -c "import shutil; import os; shutil.rmtree('dist') if os.path.exists('dist') else None" + @rm -rf dist build *.egg-info + +.PHONY: clean +clean: clean-build ## Remove build, test, and cache artifacts + @find . -type d -name "__pycache__" -exec rm -rf {} + 2>/dev/null || true + @find . -type f -name "*.py[co]" -delete 2>/dev/null || true + @rm -rf .pytest_cache .coverage coverage.xml htmlcov .mypy_cache .ruff_cache .hypothesis + @echo "โจ Clean complete" + +.PHONY: clean-docker +clean-docker: ## Stop and remove Docker test containers and volumes + @echo "๐ณ Cleaning Docker test environment" + @docker compose -f tests/docker/docker-compose.yml down -v --remove-orphans 2>/dev/null || true + @echo "โจ Docker clean complete" + +.PHONY: clean-all +clean-all: clean clean-docker ## Remove all artifacts including tox, docs, and Docker + @rm -rf .tox .nox site + @echo "โจ Deep clean complete" .PHONY: publish publish: ## Publish a release to PyPI. diff --git a/README.md b/README.md index 7db0a35..91dbe99 100644 --- a/README.md +++ b/README.md @@ -63,24 +63,136 @@ You can also configure the email server using environment variables, which is pa #### Available Environment Variables -| Variable | Description | Default | Required | -| --------------------------------------------- | ------------------------------------------------- | ------------- | -------- | -| `MCP_EMAIL_SERVER_ACCOUNT_NAME` | Account identifier | `"default"` | No | -| `MCP_EMAIL_SERVER_FULL_NAME` | Display name | Email prefix | No | -| `MCP_EMAIL_SERVER_EMAIL_ADDRESS` | Email address | - | Yes | -| `MCP_EMAIL_SERVER_USER_NAME` | Login username | Same as email | No | -| `MCP_EMAIL_SERVER_PASSWORD` | Email password | - | Yes | -| `MCP_EMAIL_SERVER_IMAP_HOST` | IMAP server host | - | Yes | -| `MCP_EMAIL_SERVER_IMAP_PORT` | IMAP server port | `993` | No | -| `MCP_EMAIL_SERVER_IMAP_SSL` | Enable IMAP SSL | `true` | No | -| `MCP_EMAIL_SERVER_SMTP_HOST` | SMTP server host | - | Yes | -| `MCP_EMAIL_SERVER_SMTP_PORT` | SMTP server port | `465` | No | -| `MCP_EMAIL_SERVER_SMTP_SSL` | Enable SMTP SSL | `true` | No | -| `MCP_EMAIL_SERVER_SMTP_START_SSL` | Enable STARTTLS | `false` | No | -| `MCP_EMAIL_SERVER_SMTP_VERIFY_SSL` | Verify SSL certificates (disable for self-signed) | `true` | No | -| `MCP_EMAIL_SERVER_ENABLE_ATTACHMENT_DOWNLOAD` | Enable attachment download | `false` | No | -| `MCP_EMAIL_SERVER_SAVE_TO_SENT` | Save sent emails to IMAP Sent folder | `true` | No | -| `MCP_EMAIL_SERVER_SENT_FOLDER_NAME` | Custom Sent folder name (auto-detect if not set) | - | No | +| Variable | TOML Key | Description | Default | Required | +| --------------------------------------------- | ------------------------------ | ------------------------------------------------------ | ------------------------------------------------ | -------- | +| `MCP_EMAIL_SERVER_CONFIG_PATH` | โ | Path to TOML config file | `~/.config/zerolib/mcp_email_server/config.toml` | No | +| `MCP_EMAIL_SERVER_ACCOUNT_NAME` | `emails[].account_name` | Account identifier | `"default"` | No | +| `MCP_EMAIL_SERVER_FULL_NAME` | `emails[].full_name` | Display name | Email prefix | No | +| `MCP_EMAIL_SERVER_EMAIL_ADDRESS` | `emails[].email_address` | Email address | โ | Yes | +| `MCP_EMAIL_SERVER_USER_NAME` | `emails[].incoming.user_name` | Login username (IMAP & SMTP) | Same as email | No | +| `MCP_EMAIL_SERVER_PASSWORD` | `emails[].incoming.password` | Email password (IMAP & SMTP) | โ | Yes | +| `MCP_EMAIL_SERVER_IMAP_HOST` | `emails[].incoming.host` | IMAP server host | โ | Yes | +| `MCP_EMAIL_SERVER_IMAP_PORT` | `emails[].incoming.port` | IMAP server port | `993` | No | +| `MCP_EMAIL_SERVER_IMAP_SECURITY` | `emails[].incoming.security` | IMAP connection security: `tls`, `starttls`, or `none` | `tls` | No | +| `MCP_EMAIL_SERVER_IMAP_VERIFY_SSL` | `emails[].incoming.verify_ssl` | Verify IMAP SSL certificates | `true` | No | +| `MCP_EMAIL_SERVER_IMAP_USER_NAME` | `emails[].incoming.user_name` | IMAP-specific username (overrides `USER_NAME`) | Same as `USER_NAME` | No | +| `MCP_EMAIL_SERVER_IMAP_PASSWORD` | `emails[].incoming.password` | IMAP-specific password (overrides `PASSWORD`) | Same as `PASSWORD` | No | +| `MCP_EMAIL_SERVER_SMTP_HOST` | `emails[].outgoing.host` | SMTP server host | โ | Yes | +| `MCP_EMAIL_SERVER_SMTP_PORT` | `emails[].outgoing.port` | SMTP server port | `465` | No | +| `MCP_EMAIL_SERVER_SMTP_SECURITY` | `emails[].outgoing.security` | SMTP connection security: `tls`, `starttls`, or `none` | `tls` | No | +| `MCP_EMAIL_SERVER_SMTP_VERIFY_SSL` | `emails[].outgoing.verify_ssl` | Verify SMTP SSL certificates | `true` | No | +| `MCP_EMAIL_SERVER_SMTP_USER_NAME` | `emails[].outgoing.user_name` | SMTP-specific username (overrides `USER_NAME`) | Same as `USER_NAME` | No | +| `MCP_EMAIL_SERVER_SMTP_PASSWORD` | `emails[].outgoing.password` | SMTP-specific password (overrides `PASSWORD`) | Same as `PASSWORD` | No | +| `MCP_EMAIL_SERVER_SAVE_TO_SENT` | `emails[].save_to_sent` | Save sent emails to IMAP Sent folder | `true` | No | +| `MCP_EMAIL_SERVER_SENT_FOLDER_NAME` | `emails[].sent_folder_name` | Custom Sent folder name (auto-detect if not set) | โ | No | +| `MCP_EMAIL_SERVER_ENABLE_ATTACHMENT_DOWNLOAD` | `enable_attachment_download` | Enable attachment download | `false` | No | + +> **Note:** The `emails[].` prefix corresponds to `[[emails]]` in TOML (array of tables). See [TOML Configuration Reference](#toml-configuration-reference) for the full config structure. + +> **Deprecated:** `MCP_EMAIL_SERVER_IMAP_SSL`, `MCP_EMAIL_SERVER_SMTP_SSL`, and `MCP_EMAIL_SERVER_SMTP_START_SSL` still work for backward compatibility but are superseded by the `*_SECURITY` variables above. + +#### Connection Security Modes + +The `security` field (or `*_SECURITY` env var) controls how the connection to the mail server is encrypted, per [RFC 8314](https://tools.ietf.org/html/rfc8314): + +| Mode | Description | IMAP Port | SMTP Port | +| ---------- | ----------------------------------------------------- | --------- | --------- | +| `tls` | **Implicit TLS** โ encrypted from the first byte | 993 | 465 | +| `starttls` | **STARTTLS** โ connect plaintext, then upgrade to TLS | 143 | 587 | +| `none` | **No encryption** โ plaintext only (not recommended) | 143 | 25 | + +### TOML Configuration Reference + +The configuration file is located at `~/.config/zerolib/mcp_email_server/config.toml` by default (override with `MCP_EMAIL_SERVER_CONFIG_PATH`). You can also configure settings via the UI: `uvx mcp-email-server@latest ui` + +#### Single Account + +```toml +# Global settings +enable_attachment_download = false # Set to true to allow attachment downloads + +# Email account โ use [[emails]] for each account +[[emails]] +account_name = "work" +full_name = "John Doe" +email_address = "john@example.com" +save_to_sent = true # Save sent emails to IMAP Sent folder +# sent_folder_name = "Sent" # Optional: override auto-detected Sent folder name + +# IMAP (incoming mail) +[emails.incoming] +host = "imap.gmail.com" +port = 993 +user_name = "john@example.com" +password = "your-app-password" +security = "tls" # "tls" (default) | "starttls" | "none" +verify_ssl = true # Set to false for self-signed certificates + +# SMTP (outgoing mail) +[emails.outgoing] +host = "smtp.gmail.com" +port = 465 +user_name = "john@example.com" +password = "your-app-password" +security = "tls" # "tls" (default) | "starttls" | "none" +verify_ssl = true # Set to false for self-signed certificates +``` + +#### Multiple Accounts + +Add multiple `[[emails]]` blocks to configure several email accounts. Each account has its own IMAP/SMTP settings and can use different security modes: + +```toml +enable_attachment_download = true + +# Account 1: Work Gmail (Implicit TLS) +[[emails]] +account_name = "work" +full_name = "John Doe" +email_address = "john@company.com" +save_to_sent = true + +[emails.incoming] +host = "imap.gmail.com" +port = 993 +user_name = "john@company.com" +password = "gmail-app-password" +security = "tls" +verify_ssl = true + +[emails.outgoing] +host = "smtp.gmail.com" +port = 465 +user_name = "john@company.com" +password = "gmail-app-password" +security = "tls" +verify_ssl = true + +# Account 2: Personal ProtonMail via Bridge (STARTTLS + self-signed certs) +[[emails]] +account_name = "personal" +full_name = "John Doe" +email_address = "john@proton.me" +save_to_sent = true + +[emails.incoming] +host = "127.0.0.1" +port = 1143 +user_name = "john@proton.me" +password = "bridge-password" +security = "starttls" +verify_ssl = false + +[emails.outgoing] +host = "127.0.0.1" +port = 1025 +user_name = "john@proton.me" +password = "bridge-password" +security = "starttls" +verify_ssl = false +``` + +> **Tip:** Use `account_name` to distinguish accounts when calling MCP tools (e.g., `get_emails(account_name="work")`). ### Enabling Attachment Downloads @@ -153,7 +265,7 @@ sent_folder_name = "INBOX.Sent" ### Self-Signed Certificates (e.g., ProtonMail Bridge) -If you're using a local mail server with self-signed certificates (like ProtonMail Bridge), you'll need to disable SSL certificate verification: +If you're using a local mail server with self-signed certificates (like ProtonMail Bridge), you'll need to disable SSL certificate verification for both IMAP and SMTP: ```json { @@ -162,6 +274,7 @@ If you're using a local mail server with self-signed certificates (like ProtonMa "command": "uvx", "args": ["mcp-email-server@latest", "stdio"], "env": { + "MCP_EMAIL_SERVER_IMAP_VERIFY_SSL": "false", "MCP_EMAIL_SERVER_SMTP_VERIFY_SSL": "false" } } @@ -176,7 +289,37 @@ Or in TOML configuration: account_name = "protonmail" # ... other settings ... +[emails.incoming] +verify_ssl = false + +[emails.outgoing] +verify_ssl = false +``` + +#### ProtonMail Bridge Example + +ProtonMail Bridge uses STARTTLS on local ports with self-signed certificates: + +```toml +[[emails]] +account_name = "protonmail" +full_name = "Your Name" +email_address = "you@proton.me" + +[emails.incoming] +host = "127.0.0.1" +port = 1143 +user_name = "you@proton.me" +password = "your-bridge-password" +security = "starttls" +verify_ssl = false + [emails.outgoing] +host = "127.0.0.1" +port = 1025 +user_name = "you@proton.me" +password = "your-bridge-password" +security = "starttls" verify_ssl = false ``` diff --git a/mcp_email_server/config.py b/mcp_email_server/config.py index 879f35f..b2b3434 100644 --- a/mcp_email_server/config.py +++ b/mcp_email_server/config.py @@ -2,6 +2,7 @@ import datetime import os +from enum import Enum from pathlib import Path from typing import Any from zoneinfo import ZoneInfo @@ -30,15 +31,79 @@ def _parse_bool_env(value: str | None, default: bool = False) -> bool: CONFIG_PATH = Path(os.getenv("MCP_EMAIL_SERVER_CONFIG_PATH", DEFAULT_CONFIG_PATH)).expanduser().resolve() +class ConnectionSecurity(str, Enum): + """Connection security mode per RFC 8314. + + - TLS: Implicit TLS โ encrypted from the first byte (IMAP port 993, SMTP port 465) + - STARTTLS: Connect plaintext, then upgrade via STARTTLS command (IMAP port 143, SMTP port 587) + - NONE: No encryption (not recommended, only for trusted local connections) + """ + + TLS = "tls" + STARTTLS = "starttls" + NONE = "none" + + +def _parse_security_env(value: str | None, default: ConnectionSecurity | None = None) -> ConnectionSecurity | None: + """Parse ConnectionSecurity from environment variable string.""" + if value is None: + return default + try: + return ConnectionSecurity(value.lower()) + except ValueError: + logger.warning(f"Invalid security value '{value}', using default") + return default + + class EmailServer(BaseModel): user_name: str password: str host: str port: int - use_ssl: bool = True # Usually port 465 - start_ssl: bool = False # Usually port 587 + security: ConnectionSecurity = ConnectionSecurity.TLS verify_ssl: bool = True # Set to False for self-signed certificates (e.g., ProtonMail Bridge) + # Deprecated: use `security` instead. Kept for backward compatibility with existing configs. + use_ssl: bool | None = None + start_ssl: bool | None = None + + @model_validator(mode="before") + @classmethod + def resolve_security_from_legacy(cls, data: Any) -> Any: + """Derive `security` from deprecated `use_ssl`/`start_ssl` for backward compatibility. + + If the new `security` field is explicitly set, it takes precedence. + If only legacy fields are set, `security` is derived from them. + """ + if not isinstance(data, dict): + return data + + has_security = "security" in data + use_ssl = data.get("use_ssl") + start_ssl = data.get("start_ssl") + + # Only derive from legacy fields when `security` was NOT explicitly provided + if not has_security and (use_ssl is not None or start_ssl is not None): + use_ssl_val = use_ssl if use_ssl is not None else False + start_ssl_val = start_ssl if start_ssl is not None else False + + if use_ssl_val and start_ssl_val: + raise ValueError( + "Invalid configuration: 'use_ssl' and 'start_ssl' cannot both be true. " + "Use 'security = \"tls\"' for implicit TLS or 'security = \"starttls\"' for STARTTLS." + ) + + if use_ssl_val: + data["security"] = ConnectionSecurity.TLS + elif start_ssl_val: + data["security"] = ConnectionSecurity.STARTTLS + elif use_ssl is False and start_ssl is False: + # Only disable encryption when BOTH are explicitly set to False + data["security"] = ConnectionSecurity.NONE + # Otherwise, leave security at its default (TLS) for safety + + return data + def masked(self) -> EmailServer: return self.model_copy(update={"password": "********"}) @@ -101,36 +166,57 @@ def init( imap_user_name: str | None = None, imap_password: str | None = None, imap_port: int = 993, - imap_ssl: bool = True, + imap_security: ConnectionSecurity = ConnectionSecurity.TLS, + imap_verify_ssl: bool = True, smtp_port: int = 465, - smtp_ssl: bool = True, - smtp_start_ssl: bool = False, + smtp_security: ConnectionSecurity = ConnectionSecurity.TLS, smtp_verify_ssl: bool = True, smtp_user_name: str | None = None, smtp_password: str | None = None, save_to_sent: bool = True, sent_folder_name: str | None = None, + # Deprecated parameters for backward compatibility + imap_ssl: bool | None = None, + smtp_ssl: bool | None = None, + smtp_start_ssl: bool | None = None, ) -> EmailSettings: + # Build incoming server config + incoming_kwargs: dict[str, Any] = { + "user_name": imap_user_name or user_name, + "password": imap_password or password, + "host": imap_host, + "port": imap_port, + "verify_ssl": imap_verify_ssl, + } + if imap_ssl is not None: + # Legacy path: use_ssl was explicitly passed + incoming_kwargs["use_ssl"] = imap_ssl + else: + incoming_kwargs["security"] = imap_security + + # Build outgoing server config + outgoing_kwargs: dict[str, Any] = { + "user_name": smtp_user_name or user_name, + "password": smtp_password or password, + "host": smtp_host, + "port": smtp_port, + "verify_ssl": smtp_verify_ssl, + } + if smtp_ssl is not None or smtp_start_ssl is not None: + # Legacy path: use_ssl/start_ssl were explicitly passed + if smtp_ssl is not None: + outgoing_kwargs["use_ssl"] = smtp_ssl + if smtp_start_ssl is not None: + outgoing_kwargs["start_ssl"] = smtp_start_ssl + else: + outgoing_kwargs["security"] = smtp_security + return cls( account_name=account_name, full_name=full_name, email_address=email_address, - incoming=EmailServer( - user_name=imap_user_name or user_name, - password=imap_password or password, - host=imap_host, - port=imap_port, - use_ssl=imap_ssl, - ), - outgoing=EmailServer( - user_name=smtp_user_name or user_name, - password=smtp_password or password, - host=smtp_host, - port=smtp_port, - use_ssl=smtp_ssl, - start_ssl=smtp_start_ssl, - verify_ssl=smtp_verify_ssl, - ), + incoming=EmailServer(**incoming_kwargs), + outgoing=EmailServer(**outgoing_kwargs), save_to_sent=save_to_sent, sent_folder_name=sent_folder_name, ) @@ -147,14 +233,19 @@ def from_env(cls) -> EmailSettings | None: - MCP_EMAIL_SERVER_PASSWORD - MCP_EMAIL_SERVER_IMAP_HOST - MCP_EMAIL_SERVER_IMAP_PORT (default: 993) - - MCP_EMAIL_SERVER_IMAP_SSL (default: true) + - MCP_EMAIL_SERVER_IMAP_SECURITY (default: "tls") โ "tls", "starttls", or "none" + - MCP_EMAIL_SERVER_IMAP_VERIFY_SSL (default: true) - MCP_EMAIL_SERVER_SMTP_HOST - MCP_EMAIL_SERVER_SMTP_PORT (default: 465) - - MCP_EMAIL_SERVER_SMTP_SSL (default: true) - - MCP_EMAIL_SERVER_SMTP_START_SSL (default: false) + - MCP_EMAIL_SERVER_SMTP_SECURITY (default: "tls") โ "tls", "starttls", or "none" - MCP_EMAIL_SERVER_SMTP_VERIFY_SSL (default: true) - MCP_EMAIL_SERVER_SAVE_TO_SENT (default: true) - MCP_EMAIL_SERVER_SENT_FOLDER_NAME (default: auto-detect) + + Deprecated (still supported for backward compatibility): + - MCP_EMAIL_SERVER_IMAP_SSL โ use MCP_EMAIL_SERVER_IMAP_SECURITY instead + - MCP_EMAIL_SERVER_SMTP_SSL โ use MCP_EMAIL_SERVER_SMTP_SECURITY instead + - MCP_EMAIL_SERVER_SMTP_START_SSL โ use MCP_EMAIL_SERVER_SMTP_SECURITY instead """ # Check if minimum required environment variables are set email_address = os.getenv("MCP_EMAIL_SERVER_EMAIL_ADDRESS") @@ -176,31 +267,67 @@ def from_env(cls) -> EmailSettings | None: return None try: - return cls.init( - account_name=account_name, - full_name=full_name, - email_address=email_address, - user_name=user_name, - password=password, - imap_host=imap_host, - imap_port=int(os.getenv("MCP_EMAIL_SERVER_IMAP_PORT", "993")), - imap_ssl=_parse_bool_env(os.getenv("MCP_EMAIL_SERVER_IMAP_SSL"), True), - smtp_host=smtp_host, - smtp_port=int(os.getenv("MCP_EMAIL_SERVER_SMTP_PORT", "465")), - smtp_ssl=_parse_bool_env(os.getenv("MCP_EMAIL_SERVER_SMTP_SSL"), True), - smtp_start_ssl=_parse_bool_env(os.getenv("MCP_EMAIL_SERVER_SMTP_START_SSL"), False), - smtp_verify_ssl=_parse_bool_env(os.getenv("MCP_EMAIL_SERVER_SMTP_VERIFY_SSL"), True), - smtp_user_name=os.getenv("MCP_EMAIL_SERVER_SMTP_USER_NAME", user_name), - smtp_password=os.getenv("MCP_EMAIL_SERVER_SMTP_PASSWORD", password), - imap_user_name=os.getenv("MCP_EMAIL_SERVER_IMAP_USER_NAME", user_name), - imap_password=os.getenv("MCP_EMAIL_SERVER_IMAP_PASSWORD", password), - save_to_sent=_parse_bool_env(os.getenv("MCP_EMAIL_SERVER_SAVE_TO_SENT"), True), - sent_folder_name=os.getenv("MCP_EMAIL_SERVER_SENT_FOLDER_NAME"), - ) + imap_port = int(os.getenv("MCP_EMAIL_SERVER_IMAP_PORT", "993")) + smtp_port = int(os.getenv("MCP_EMAIL_SERVER_SMTP_PORT", "465")) + except ValueError as e: + logger.error(f"Invalid port configuration: {e}") + return None + + init_kwargs: dict[str, Any] = { + "account_name": account_name, + "full_name": full_name, + "email_address": email_address, + "user_name": user_name, + "password": password, + "imap_host": imap_host, + "imap_port": imap_port, + "imap_verify_ssl": _parse_bool_env(os.getenv("MCP_EMAIL_SERVER_IMAP_VERIFY_SSL"), True), + "smtp_host": smtp_host, + "smtp_port": smtp_port, + "smtp_verify_ssl": _parse_bool_env(os.getenv("MCP_EMAIL_SERVER_SMTP_VERIFY_SSL"), True), + "smtp_user_name": os.getenv("MCP_EMAIL_SERVER_SMTP_USER_NAME", user_name), + "smtp_password": os.getenv("MCP_EMAIL_SERVER_SMTP_PASSWORD", password), + "imap_user_name": os.getenv("MCP_EMAIL_SERVER_IMAP_USER_NAME", user_name), + "imap_password": os.getenv("MCP_EMAIL_SERVER_IMAP_PASSWORD", password), + "save_to_sent": _parse_bool_env(os.getenv("MCP_EMAIL_SERVER_SAVE_TO_SENT"), True), + "sent_folder_name": os.getenv("MCP_EMAIL_SERVER_SENT_FOLDER_NAME"), + } + + cls._resolve_security_env(init_kwargs) + + try: + return cls.init(**init_kwargs) except (ValueError, TypeError) as e: logger.error(f"Failed to create email settings from environment variables: {e}") return None + @staticmethod + def _resolve_security_env(init_kwargs: dict[str, Any]) -> None: + """Resolve IMAP/SMTP security from env vars, preferring new over legacy.""" + imap_security_env = os.getenv("MCP_EMAIL_SERVER_IMAP_SECURITY") + smtp_security_env = os.getenv("MCP_EMAIL_SERVER_SMTP_SECURITY") + + if imap_security_env is not None: + security = _parse_security_env(imap_security_env) + if security is not None: + init_kwargs["imap_security"] = security + else: + imap_ssl_env = os.getenv("MCP_EMAIL_SERVER_IMAP_SSL") + if imap_ssl_env is not None: + init_kwargs["imap_ssl"] = _parse_bool_env(imap_ssl_env, True) + + if smtp_security_env is not None: + security = _parse_security_env(smtp_security_env) + if security is not None: + init_kwargs["smtp_security"] = security + else: + smtp_ssl_env = os.getenv("MCP_EMAIL_SERVER_SMTP_SSL") + smtp_start_ssl_env = os.getenv("MCP_EMAIL_SERVER_SMTP_START_SSL") + if smtp_ssl_env is not None: + init_kwargs["smtp_ssl"] = _parse_bool_env(smtp_ssl_env, True) + if smtp_start_ssl_env is not None: + init_kwargs["smtp_start_ssl"] = _parse_bool_env(smtp_start_ssl_env, False) + def masked(self) -> EmailSettings: return self.model_copy( update={ @@ -259,6 +386,10 @@ def add_email(self, email: EmailSettings) -> None: """Use re-assigned for validation to work.""" self.emails = [email, *self.emails] + def update_email(self, email: EmailSettings) -> None: + """Replace an existing email account by account_name (delete + add).""" + self.emails = [email if e.account_name == email.account_name else e for e in self.emails] + def add_provider(self, provider: ProviderSettings) -> None: """Use re-assigned for validation to work.""" self.providers = [provider, *self.providers] diff --git a/mcp_email_server/emails/classic.py b/mcp_email_server/emails/classic.py index 618c959..f31db49 100644 --- a/mcp_email_server/emails/classic.py +++ b/mcp_email_server/emails/classic.py @@ -1,4 +1,5 @@ import asyncio +import contextlib import email.utils import mimetypes import re @@ -18,7 +19,7 @@ import aioimaplib import aiosmtplib -from mcp_email_server.config import EmailServer, EmailSettings +from mcp_email_server.config import ConnectionSecurity, EmailServer, EmailSettings from mcp_email_server.emails import EmailHandler from mcp_email_server.emails.models import ( AttachmentDownloadResponse, @@ -93,17 +94,176 @@ def _create_smtp_ssl_context(verify_ssl: bool) -> ssl.SSLContext | None: return ctx +def _create_imap_ssl_context(verify_ssl: bool) -> ssl.SSLContext: + """Create SSL context for IMAP connections (Implicit TLS and STARTTLS). + + Returns a default context for verified connections, or a permissive context + for self-signed certificates when verify_ssl=False (e.g., ProtonMail Bridge). + """ + ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH) + if not verify_ssl: + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + return ctx + + +async def _imap_starttls(imap: aioimaplib.IMAP4, ssl_context: ssl.SSLContext, host: str) -> None: + """Upgrade an IMAP connection to TLS via STARTTLS (RFC 3501 ยง6.2.1). + + Sends the STARTTLS command and upgrades the underlying transport to TLS + using asyncio.loop.start_tls(). After upgrade, re-fetches capabilities + as they may change post-TLS. + """ + # Verify server supports STARTTLS + if "STARTTLS" not in imap.protocol.capabilities: + raise OSError("IMAP server does not advertise STARTTLS capability") + + # Send STARTTLS command + response = await imap.protocol.execute( + aioimaplib.Command("STARTTLS", imap.protocol.new_tag(), loop=imap.protocol.loop) + ) + if response.result != "OK": + raise OSError(f"STARTTLS command failed: {response.result}") + + # Upgrade the transport to TLS + loop = asyncio.get_running_loop() + tls_transport = await loop.start_tls( + imap.protocol.transport, + imap.protocol, + ssl_context, + server_hostname=host, + ) + imap.protocol.transport = tls_transport + + # Re-fetch capabilities (may change after TLS upgrade per RFC 3501) + await imap.protocol.capability() + + +async def _create_imap_connection( + server: EmailServer, +) -> aioimaplib.IMAP4 | aioimaplib.IMAP4_SSL: + """Create an IMAP connection with the appropriate security mode. + + Handles Implicit TLS, STARTTLS, and plaintext connections based on the + server's security configuration. + """ + if server.security == ConnectionSecurity.TLS: + ssl_context = _create_imap_ssl_context(server.verify_ssl) + imap = aioimaplib.IMAP4_SSL(server.host, server.port, ssl_context=ssl_context) + await imap._client_task + await imap.wait_hello_from_server() + return imap + + # For STARTTLS and plaintext, start with a plain connection + imap = aioimaplib.IMAP4(server.host, server.port) + await imap._client_task + await imap.wait_hello_from_server() + + if server.security == ConnectionSecurity.STARTTLS: + ssl_context = _create_imap_ssl_context(server.verify_ssl) + await _imap_starttls(imap, ssl_context, server.host) + + return imap + + +def _force_close_imap(imap: aioimaplib.IMAP4 | aioimaplib.IMAP4_SSL) -> None: + """Force-close an IMAP connection's transport and cancel internal tasks. + + Defensive cleanup for aioimaplib connections that may not shut down cleanly + after logout(). aioimaplib's _client_task and transport do not support + asyncio cancellation properly. + See: https://github.com/iroco-co/aioimaplib/issues/128 + """ + with contextlib.suppress(Exception): + if ( + hasattr(imap, "protocol") + and imap.protocol + and hasattr(imap.protocol, "transport") + and imap.protocol.transport + ): + imap.protocol.transport.close() + with contextlib.suppress(Exception): + if hasattr(imap, "_client_task") and not imap._client_task.done(): + imap._client_task.cancel() + + +async def test_imap_connection(server: EmailServer, timeout: int = 10) -> str: + """Test IMAP connection and login. Returns a user-friendly status message.""" + imap = None + try: + imap = await asyncio.wait_for(_create_imap_connection(server), timeout=timeout) + try: + response = await asyncio.wait_for(imap.login(server.user_name, server.password), timeout=timeout) + if response.result != "OK": + return f"โ IMAP authentication failed: {response.result}" + return f"โ IMAP connection successful ({server.host}:{server.port}, security: {server.security.value})" + finally: + with contextlib.suppress(Exception): + await imap.logout() + except ssl.SSLCertVerificationError: + return "โ SSL certificate verification failed. Disable 'Verify SSL Certificate' or check your certificate." + except ConnectionRefusedError: + return f"โ Connection refused at {server.host}:{server.port}. Check host and port." + except (TimeoutError, asyncio.TimeoutError): + return f"โ Connection timed out ({server.host}:{server.port}). Check host, port, and firewall." + except OSError as e: + if "STARTTLS" in str(e): + return "โ Server does not support STARTTLS. Try 'tls' or 'none' security mode." + return f"โ IMAP error: {e}" + except Exception as e: + return f"โ IMAP error: {e}" + finally: + if imap is not None: + _force_close_imap(imap) + + +async def test_smtp_connection(server: EmailServer, timeout: int = 10) -> str: + """Test SMTP connection and login. Returns a user-friendly status message.""" + use_tls = server.security == ConnectionSecurity.TLS + start_tls = server.security == ConnectionSecurity.STARTTLS + ssl_context = _create_smtp_ssl_context(server.verify_ssl) + + try: + smtp = aiosmtplib.SMTP( + hostname=server.host, + port=server.port, + use_tls=use_tls, + start_tls=start_tls, + tls_context=ssl_context, + timeout=timeout, + ) + await asyncio.wait_for(smtp.connect(), timeout=timeout) + try: + await asyncio.wait_for(smtp.login(server.user_name, server.password), timeout=timeout) + return f"โ SMTP connection successful ({server.host}:{server.port}, security: {server.security.value})" + finally: + with contextlib.suppress(Exception): + await smtp.quit() + except ssl.SSLCertVerificationError: + return "โ SSL certificate verification failed. Disable 'Verify SSL Certificate' or check your certificate." + except ConnectionRefusedError: + return f"โ Connection refused at {server.host}:{server.port}. Check host and port." + except (TimeoutError, asyncio.TimeoutError): + return f"โ Connection timed out ({server.host}:{server.port}). Check host, port, and firewall." + except aiosmtplib.SMTPAuthenticationError as e: + return f"โ SMTP authentication failed: {e.message}" + except Exception as e: + return f"โ SMTP error: {e}" + + class EmailClient: def __init__(self, email_server: EmailServer, sender: str | None = None): self.email_server = email_server self.sender = sender or email_server.user_name - self.imap_class = aioimaplib.IMAP4_SSL if self.email_server.use_ssl else aioimaplib.IMAP4 - - self.smtp_use_tls = self.email_server.use_ssl - self.smtp_start_tls = self.email_server.start_ssl + self.smtp_use_tls = email_server.security == ConnectionSecurity.TLS + self.smtp_start_tls = email_server.security == ConnectionSecurity.STARTTLS self.smtp_verify_ssl = self.email_server.verify_ssl + async def _connect_imap(self) -> aioimaplib.IMAP4 | aioimaplib.IMAP4_SSL: + """Create and return an IMAP connection with proper security.""" + return await _create_imap_connection(self.email_server) + def _get_smtp_ssl_context(self) -> ssl.SSLContext | None: """Get SSL context for SMTP connections based on verify_ssl setting.""" return _create_smtp_ssl_context(self.smtp_verify_ssl) @@ -403,12 +563,8 @@ async def get_email_count( flagged: bool | None = None, answered: bool | None = None, ) -> int: - imap = self.imap_class(self.email_server.host, self.email_server.port) + imap = await self._connect_imap() try: - # Wait for the connection to be established - await imap._client_task - await imap.wait_hello_from_server() - # Login and select inbox await imap.login(self.email_server.user_name, self.email_server.password) await _send_imap_id(imap) @@ -449,12 +605,8 @@ async def get_emails_metadata_stream( flagged: bool | None = None, answered: bool | None = None, ) -> AsyncGenerator[dict[str, Any], None]: - imap = self.imap_class(self.email_server.host, self.email_server.port) + imap = await self._connect_imap() try: - # Wait for the connection to be established - await imap._client_task - await imap.wait_hello_from_server() - # Login and select mailbox await imap.login(self.email_server.user_name, self.email_server.password) await _send_imap_id(imap) @@ -563,12 +715,8 @@ async def _fetch_email_with_formats(self, imap, email_id: str) -> list | None: return None async def get_email_body_by_id(self, email_id: str, mailbox: str = "INBOX") -> dict[str, Any] | None: - imap = self.imap_class(self.email_server.host, self.email_server.port) + imap = await self._connect_imap() try: - # Wait for the connection to be established - await imap._client_task - await imap.wait_hello_from_server() - # Login and select inbox await imap.login(self.email_server.user_name, self.email_server.password) await _send_imap_id(imap) @@ -618,11 +766,8 @@ async def download_attachment( Returns: A dictionary with download result information. """ - imap = self.imap_class(self.email_server.host, self.email_server.port) + imap = await self._connect_imap() try: - await imap._client_task - await imap.wait_hello_from_server() - await imap.login(self.email_server.user_name, self.email_server.password) await _send_imap_id(imap) await imap.select(_quote_mailbox(mailbox)) @@ -853,8 +998,7 @@ async def append_to_sent( Returns: True if successfully saved, False otherwise """ - imap_class = aioimaplib.IMAP4_SSL if incoming_server.use_ssl else aioimaplib.IMAP4 - imap = imap_class(incoming_server.host, incoming_server.port) + imap = await _create_imap_connection(incoming_server) # Common Sent folder names across different providers sent_folder_candidates = [ @@ -870,8 +1014,6 @@ async def append_to_sent( sent_folder_candidates = [f for f in sent_folder_candidates if f] try: - await imap._client_task - await imap.wait_hello_from_server() await imap.login(incoming_server.user_name, incoming_server.password) await _send_imap_id(imap) @@ -928,13 +1070,11 @@ async def append_to_sent( async def delete_emails(self, email_ids: list[str], mailbox: str = "INBOX") -> tuple[list[str], list[str]]: """Delete emails by their UIDs. Returns (deleted_ids, failed_ids).""" - imap = self.imap_class(self.email_server.host, self.email_server.port) + imap = await self._connect_imap() deleted_ids = [] failed_ids = [] try: - await imap._client_task - await imap.wait_hello_from_server() await imap.login(self.email_server.user_name, self.email_server.password) await _send_imap_id(imap) await imap.select(_quote_mailbox(mailbox)) diff --git a/mcp_email_server/ui.py b/mcp_email_server/ui.py index 23374d0..afca0cc 100644 --- a/mcp_email_server/ui.py +++ b/mcp_email_server/ui.py @@ -1,46 +1,70 @@ +import asyncio + import gradio as gr -from mcp_email_server.config import EmailSettings, get_settings, store_settings +from mcp_email_server.config import ConnectionSecurity, EmailServer, EmailSettings, get_settings, store_settings +from mcp_email_server.emails.classic import test_imap_connection, test_smtp_connection from mcp_email_server.tools.installer import install_claude_desktop, is_installed, need_update, uninstall_claude_desktop +PASSWORD_PLACEHOLDER = "********" # noqa: S105 +IMAP_PORT_MAP = {"tls": 993, "starttls": 143, "none": 143} +SMTP_PORT_MAP = {"tls": 465, "starttls": 587, "none": 25} + + +def _get_form_defaults(): + """Return default values for all form fields.""" + return ( + "", # account_name + "", # full_name + "", # email_address + "", # user_name + "", # password + "", # imap_host + 993, # imap_port + "tls", # imap_security + True, # imap_verify_ssl + "", # imap_user_name + "", # imap_password + "", # smtp_host + 465, # smtp_port + "tls", # smtp_security + True, # smtp_verify_ssl + "", # smtp_user_name + "", # smtp_password + True, # same_security + ) + def create_ui(): # noqa: C901 - # Create a Gradio interface with gr.Blocks(title="Email Settings Configuration") as app: gr.Markdown("# Email Settings Configuration") - # Function to get current accounts + # Hidden state to track edit mode + editing_account = gr.State(value=None) + def get_current_accounts(): settings = get_settings(reload=True) - email_accounts = [email.account_name for email in settings.emails] - return email_accounts + return [email.account_name for email in settings.emails] - # Function to update account list display def update_account_list(): settings = get_settings(reload=True) email_accounts = [email.account_name for email in settings.emails] if email_accounts: - # Create a detailed list of accounts with more information accounts_details = [] - for email in settings.emails: + for email_cfg in settings.emails: details = [ - f"**Account Name:** {email.account_name}", - f"**Full Name:** {email.full_name}", - f"**Email Address:** {email.email_address}", + f"**Account Name:** {email_cfg.account_name}", + f"**Full Name:** {email_cfg.full_name}", + f"**Email Address:** {email_cfg.email_address}", ] - - if hasattr(email, "description") and email.description: - details.append(f"**Description:** {email.description}") - - # Add IMAP/SMTP provider info if available - if hasattr(email, "incoming") and hasattr(email.incoming, "host"): - details.append(f"**IMAP Provider:** {email.incoming.host}") - - if hasattr(email, "outgoing") and hasattr(email.outgoing, "host"): - details.append(f"**SMTP Provider:** {email.outgoing.host}") - - accounts_details.append("### " + email.account_name + "\n" + "\n".join(details) + "\n") + if hasattr(email_cfg, "description") and email_cfg.description: + details.append(f"**Description:** {email_cfg.description}") + if hasattr(email_cfg, "incoming") and hasattr(email_cfg.incoming, "host"): + details.append(f"**IMAP Provider:** {email_cfg.incoming.host}") + if hasattr(email_cfg, "outgoing") and hasattr(email_cfg.outgoing, "host"): + details.append(f"**SMTP Provider:** {email_cfg.outgoing.host}") + accounts_details.append("### " + email_cfg.account_name + "\n" + "\n".join(details) + "\n") accounts_md = "\n".join(accounts_details) return ( @@ -55,57 +79,43 @@ def update_account_list(): gr.update(visible=False), ) - # Display current email accounts and allow deletion + # --- Current Email Accounts --- with gr.Accordion("Current Email Accounts", open=True): - # Display the list of accounts accounts_display = gr.Markdown("") - - # Create a dropdown to select account to delete account_to_delete = gr.Dropdown(choices=[], label="Select Account to Delete", interactive=True) - - # Status message for deletion delete_status = gr.Markdown("") - - # Delete button delete_btn = gr.Button("Delete Selected Account") - # Function to delete an account def delete_email_account(account_name): if not account_name: return "Error: Please select an account to delete.", *update_account_list() - try: - # Get current settings settings = get_settings() - - # Delete the account settings.delete_email(account_name) - - # Store settings store_settings(settings) - - # Return success message and update the UI return f"Success: Email account '{account_name}' has been deleted.", *update_account_list() except Exception as e: return f"Error: {e!s}", *update_account_list() - # Connect the delete button to the delete function delete_btn.click( fn=delete_email_account, inputs=[account_to_delete], outputs=[delete_status, accounts_display, account_to_delete, delete_btn], ) - - # Initialize the account list - app.load( - fn=update_account_list, - inputs=None, - outputs=[accounts_display, account_to_delete, delete_btn], + app.load(fn=update_account_list, inputs=None, outputs=[accounts_display, account_to_delete, delete_btn]) + + # --- Add / Edit Email Account --- + with gr.Accordion("Add / Edit Email Account", open=True): + gr.Markdown("### Email Account Settings") + + # Edit existing account dropdown + edit_account = gr.Dropdown( + choices=[], + label="Edit Existing Account", + interactive=True, + info="Select an account to edit, or leave empty to create a new one", ) - - # Form for adding a new email account - with gr.Accordion("Add New Email Account", open=True): - gr.Markdown("### Add New Email Account") + clear_btn = gr.Button("Clear / New Account", size="sm") # Basic account information account_name = gr.Textbox(label="Account Name", placeholder="e.g. work_email") @@ -116,13 +126,19 @@ def delete_email_account(account_name): user_name = gr.Textbox(label="Username", placeholder="e.g. john@example.com") password = gr.Textbox(label="Password", type="password") - # IMAP settings + # IMAP and SMTP settings with gr.Row(): with gr.Column(): - gr.Markdown("### IMAP Settings") + gr.Markdown("### IMAP Settings (Incoming)") imap_host = gr.Textbox(label="IMAP Host", placeholder="e.g. imap.example.com") imap_port = gr.Number(label="IMAP Port", value=993) - imap_ssl = gr.Checkbox(label="Use SSL", value=True) + imap_security = gr.Dropdown( + label="Connection Security", + choices=["tls", "starttls", "none"], + value="tls", + info="TLS (port 993) | STARTTLS (port 143) | None (not recommended)", + ) + imap_verify_ssl = gr.Checkbox(label="Verify SSL Certificate", value=True) imap_user_name = gr.Textbox( label="IMAP Username (optional)", placeholder="Leave empty to use the same as above" ) @@ -132,13 +148,23 @@ def delete_email_account(account_name): placeholder="Leave empty to use the same as above", ) - # SMTP settings with gr.Column(): - gr.Markdown("### SMTP Settings") + gr.Markdown("### SMTP Settings (Outgoing)") smtp_host = gr.Textbox(label="SMTP Host", placeholder="e.g. smtp.example.com") smtp_port = gr.Number(label="SMTP Port", value=465) - smtp_ssl = gr.Checkbox(label="Use SSL", value=True) - smtp_start_ssl = gr.Checkbox(label="Start SSL", value=False) + same_security = gr.Checkbox( + label="Use same security settings as IMAP", + value=True, + info="When checked, SMTP security mirrors IMAP settings", + ) + smtp_security = gr.Dropdown( + label="Connection Security", + choices=["tls", "starttls", "none"], + value="tls", + info="TLS (port 465) | STARTTLS (port 587) | None (not recommended)", + interactive=False, + ) + smtp_verify_ssl = gr.Checkbox(label="Verify SSL Certificate", value=True, interactive=False) smtp_user_name = gr.Textbox( label="SMTP Username (optional)", placeholder="Leave empty to use the same as above" ) @@ -148,14 +174,268 @@ def delete_email_account(account_name): placeholder="Leave empty to use the same as above", ) - # Status message + # --- Auto-update port on security change --- + def update_imap_port(security): + return IMAP_PORT_MAP.get(security, 993) + + def on_imap_security_change(security, same_sec): + """Update IMAP port, and mirror to SMTP if same_security is checked.""" + imap_p = IMAP_PORT_MAP.get(security, 993) + if same_sec: + smtp_p = SMTP_PORT_MAP.get(security, 465) + return imap_p, security, True, smtp_p + return imap_p, gr.update(), gr.update(), gr.update() + + imap_security.change( + fn=on_imap_security_change, + inputs=[imap_security, same_security], + outputs=[imap_port, smtp_security, smtp_verify_ssl, smtp_port], + ) + + def on_imap_verify_ssl_change(verify, same_sec): + if same_sec: + return verify + return gr.update() + + imap_verify_ssl.change( + fn=on_imap_verify_ssl_change, + inputs=[imap_verify_ssl, same_security], + outputs=[smtp_verify_ssl], + ) + + def on_same_security_toggle(same_sec, imap_sec, imap_verify): + """Toggle SMTP security fields interactive state and sync values.""" + if same_sec: + smtp_p = SMTP_PORT_MAP.get(imap_sec, 465) + return ( + gr.update(value=imap_sec, interactive=False), + gr.update(value=imap_verify, interactive=False), + smtp_p, + ) + return ( + gr.update(interactive=True), + gr.update(interactive=True), + gr.update(), + ) + + same_security.change( + fn=on_same_security_toggle, + inputs=[same_security, imap_security, imap_verify_ssl], + outputs=[smtp_security, smtp_verify_ssl, smtp_port], + ) + + def update_smtp_port(security): + return SMTP_PORT_MAP.get(security, 465) + + smtp_security.change(fn=update_smtp_port, inputs=[smtp_security], outputs=[smtp_port]) + + # --- Test Connection Buttons --- + with gr.Row(): + test_imap_btn = gr.Button("๐ Test IMAP Connection", size="sm") + test_smtp_btn = gr.Button("๐ Test SMTP Connection", size="sm") + test_status = gr.Markdown("") + + def _build_server(host, port, user, pwd, security, verify_ssl): + return EmailServer( + user_name=user, + password=pwd, + host=host, + port=int(port), + security=security, + verify_ssl=verify_ssl, + ) + + def run_imap_test( + imap_host, + imap_port, + imap_security, + imap_verify_ssl, + user_name, + password, + imap_user_name, + imap_password, + ): + if not imap_host: + return "โ Please enter an IMAP host." + effective_user = imap_user_name if imap_user_name else user_name + effective_pass = imap_password if imap_password else password + if not effective_user or not effective_pass: + return "โ Please enter username and password." + try: + server = _build_server( + imap_host, + imap_port, + effective_user, + effective_pass, + imap_security, + imap_verify_ssl, + ) + return asyncio.run(test_imap_connection(server)) + except Exception as e: + return f"โ Error: {e}" + + def run_smtp_test( + smtp_host, + smtp_port, + smtp_security, + smtp_verify_ssl, + user_name, + password, + smtp_user_name, + smtp_password, + ): + if not smtp_host: + return "โ Please enter an SMTP host." + effective_user = smtp_user_name if smtp_user_name else user_name + effective_pass = smtp_password if smtp_password else password + if not effective_user or not effective_pass: + return "โ Please enter username and password." + try: + server = _build_server( + smtp_host, + smtp_port, + effective_user, + effective_pass, + smtp_security, + smtp_verify_ssl, + ) + return asyncio.run(test_smtp_connection(server)) + except Exception as e: + return f"โ Error: {e}" + + test_imap_btn.click( + fn=run_imap_test, + inputs=[ + imap_host, + imap_port, + imap_security, + imap_verify_ssl, + user_name, + password, + imap_user_name, + imap_password, + ], + outputs=[test_status], + ) + test_smtp_btn.click( + fn=run_smtp_test, + inputs=[ + smtp_host, + smtp_port, + smtp_security, + smtp_verify_ssl, + user_name, + password, + smtp_user_name, + smtp_password, + ], + outputs=[test_status], + ) + + # --- Load existing account for editing --- + all_form_fields = [ + account_name, + full_name, + email_address, + user_name, + password, + imap_host, + imap_port, + imap_security, + imap_verify_ssl, + imap_user_name, + imap_password, + smtp_host, + smtp_port, + smtp_security, + smtp_verify_ssl, + smtp_user_name, + smtp_password, + same_security, + ] + + def load_account(selected_account): + """Load an existing account's settings into the form.""" + if not selected_account: + return (None, *_get_form_defaults()) + + settings = get_settings(reload=True) + for email_cfg in settings.emails: + if email_cfg.account_name == selected_account: + inc = email_cfg.incoming + out = email_cfg.outgoing + same_sec = inc.security == out.security and inc.verify_ssl == out.verify_ssl + return ( + selected_account, # editing_account state + email_cfg.account_name, + email_cfg.full_name, + email_cfg.email_address, + inc.user_name, + PASSWORD_PLACEHOLDER, + inc.host, + inc.port, + inc.security.value, + inc.verify_ssl, + "" if inc.user_name == out.user_name else out.user_name, + "", + out.host, + out.port, + out.security.value, + out.verify_ssl, + "" if out.user_name == inc.user_name else out.user_name, + "", + same_sec, + ) + + return (None, *_get_form_defaults()) + + edit_account.change( + fn=load_account, + inputs=[edit_account], + outputs=[editing_account, *all_form_fields], + ) + + def clear_form(): + return (None, gr.update(value=None), *_get_form_defaults()) + + clear_btn.click( + fn=clear_form, + inputs=[], + outputs=[editing_account, edit_account, *all_form_fields], + ) + + # --- Status and Save --- status_message = gr.Markdown("") + save_btn = gr.Button("Save Email Settings", variant="primary") + + def _make_result(msg, form_values): + account_md, account_choices, btn_visible = update_account_list() + accounts = get_current_accounts() + return ( + msg, + account_md, + account_choices, + btn_visible, + gr.update(choices=accounts), + *form_values, + ) + + def _resolve_passwords(editing, password, imap_password, smtp_password, settings): + """Resolve effective passwords, keeping existing ones if placeholder.""" + effective_password = password + effective_imap_password = imap_password if imap_password else None + effective_smtp_password = smtp_password if smtp_password else None - # Save button - save_btn = gr.Button("Save Email Settings") + if editing and password == PASSWORD_PLACEHOLDER: + for email_cfg in settings.emails: + if email_cfg.account_name == editing: + effective_password = email_cfg.incoming.password + break + + return effective_password, effective_imap_password, effective_smtp_password - # Function to save settings def save_email_settings( + editing, account_name, full_name, email_address, @@ -163,224 +443,119 @@ def save_email_settings( password, imap_host, imap_port, - imap_ssl, + imap_security, + imap_verify_ssl, imap_user_name, imap_password, smtp_host, smtp_port, - smtp_ssl, - smtp_start_ssl, + smtp_security, + smtp_verify_ssl, smtp_user_name, smtp_password, + same_security_checked, ): + form_vals = ( + account_name, + full_name, + email_address, + user_name, + password, + imap_host, + imap_port, + imap_security, + imap_verify_ssl, + imap_user_name, + imap_password, + smtp_host, + smtp_port, + smtp_security, + smtp_verify_ssl, + smtp_user_name, + smtp_password, + same_security_checked, + ) try: - # Validate required fields - if not account_name or not full_name or not email_address or not user_name or not password: - # Get account list update - account_md, account_choices, btn_visible = update_account_list() - return ( - "Error: Please fill in all required fields.", - account_md, - account_choices, - btn_visible, - account_name, - full_name, - email_address, - user_name, - password, - imap_host, - imap_port, - imap_ssl, - imap_user_name, - imap_password, - smtp_host, - smtp_port, - smtp_ssl, - smtp_start_ssl, - smtp_user_name, - smtp_password, - ) + if not account_name or not full_name or not email_address or not user_name: + return _make_result("Error: Please fill in all required fields.", form_vals) + + is_editing = editing is not None + if not is_editing and not password: + return _make_result("Error: Password is required.", form_vals) if not imap_host or not smtp_host: - # Get account list update - account_md, account_choices, btn_visible = update_account_list() - return ( - "Error: IMAP and SMTP hosts are required.", - account_md, - account_choices, - btn_visible, - account_name, - full_name, - email_address, - user_name, - password, - imap_host, - imap_port, - imap_ssl, - imap_user_name, - imap_password, - smtp_host, - smtp_port, - smtp_ssl, - smtp_start_ssl, - smtp_user_name, - smtp_password, - ) + return _make_result("Error: IMAP and SMTP hosts are required.", form_vals) - # Get current settings settings = get_settings() - # Check if account name already exists - for email in settings.emails: - if email.account_name == account_name: - # Get account list update - account_md, account_choices, btn_visible = update_account_list() - return ( - f"Error: Account name '{account_name}' already exists.", - account_md, - account_choices, - btn_visible, - account_name, - full_name, - email_address, - user_name, - password, - imap_host, - imap_port, - imap_ssl, - imap_user_name, - imap_password, - smtp_host, - smtp_port, - smtp_ssl, - smtp_start_ssl, - smtp_user_name, - smtp_password, - ) - - # Create new email settings + if not is_editing: + for email_cfg in settings.emails: + if email_cfg.account_name == account_name: + return _make_result(f"Error: Account name '{account_name}' already exists.", form_vals) + + effective_password, effective_imap_password, effective_smtp_password = _resolve_passwords( + editing, password, imap_password, smtp_password, settings + ) + + effective_smtp_security = imap_security if same_security_checked else smtp_security + effective_smtp_verify_ssl = imap_verify_ssl if same_security_checked else smtp_verify_ssl + email_settings = EmailSettings.init( account_name=account_name, full_name=full_name, email_address=email_address, user_name=user_name, - password=password, + password=effective_password, imap_host=imap_host, smtp_host=smtp_host, imap_port=int(imap_port), - imap_ssl=imap_ssl, + imap_security=ConnectionSecurity(imap_security), + imap_verify_ssl=imap_verify_ssl, smtp_port=int(smtp_port), - smtp_ssl=smtp_ssl, - smtp_start_ssl=smtp_start_ssl, + smtp_security=ConnectionSecurity(effective_smtp_security), + smtp_verify_ssl=effective_smtp_verify_ssl, imap_user_name=imap_user_name if imap_user_name else None, - imap_password=imap_password if imap_password else None, + imap_password=effective_imap_password, smtp_user_name=smtp_user_name if smtp_user_name else None, - smtp_password=smtp_password if smtp_password else None, + smtp_password=effective_smtp_password, ) - # Add to settings - settings.add_email(email_settings) + if is_editing: + settings.update_email(email_settings) + action = "updated" + else: + settings.add_email(email_settings) + action = "added" - # Store settings store_settings(settings) - # Get account list update - account_md, account_choices, btn_visible = update_account_list() - - # Return success message, update the UI, and clear form fields - return ( - f"Success: Email account '{account_name}' has been added.", - account_md, - account_choices, - btn_visible, - "", # Clear account_name - "", # Clear full_name - "", # Clear email_address - "", # Clear user_name - "", # Clear password - "", # Clear imap_host - 993, # Reset imap_port - True, # Reset imap_ssl - "", # Clear imap_user_name - "", # Clear imap_password - "", # Clear smtp_host - 465, # Reset smtp_port - True, # Reset smtp_ssl - False, # Reset smtp_start_ssl - "", # Clear smtp_user_name - "", # Clear smtp_password + return _make_result( + f"Success: Email account '{account_name}' has been {action}.", + _get_form_defaults(), ) except Exception as e: - # Get account list update - account_md, account_choices, btn_visible = update_account_list() - return ( - f"Error: {e!s}", - account_md, - account_choices, - btn_visible, - account_name, - full_name, - email_address, - user_name, - password, - imap_host, - imap_port, - imap_ssl, - imap_user_name, - imap_password, - smtp_host, - smtp_port, - smtp_ssl, - smtp_start_ssl, - smtp_user_name, - smtp_password, - ) + return _make_result(f"Error: {e!s}", form_vals) - # Connect the save button to the save function save_btn.click( fn=save_email_settings, - inputs=[ - account_name, - full_name, - email_address, - user_name, - password, - imap_host, - imap_port, - imap_ssl, - imap_user_name, - imap_password, - smtp_host, - smtp_port, - smtp_ssl, - smtp_start_ssl, - smtp_user_name, - smtp_password, - ], + inputs=[editing_account, *all_form_fields], outputs=[ status_message, accounts_display, account_to_delete, delete_btn, - account_name, - full_name, - email_address, - user_name, - password, - imap_host, - imap_port, - imap_ssl, - imap_user_name, - imap_password, - smtp_host, - smtp_port, - smtp_ssl, - smtp_start_ssl, - smtp_user_name, - smtp_password, + edit_account, + *all_form_fields, ], ) + # Initialize edit dropdown with current accounts + def init_edit_dropdown(): + accounts = get_current_accounts() + return gr.update(choices=accounts) + + app.load(fn=init_edit_dropdown, inputs=None, outputs=[edit_account]) + # Claude Desktop Integration with gr.Accordion("Claude Desktop Integration", open=True): gr.Markdown("### Claude Desktop Integration") diff --git a/pyproject.toml b/pyproject.toml index 839ffb9..b586607 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,6 +51,14 @@ dev = [ "mkdocs-material>=8.5.10", "mkdocstrings[python]>=0.26.1", ] +integration = [ + "pytest-localserver>=0.10.0", + "aiosmtpd>=1.4.6", + "cryptography>=44.0.0", +] +docker = [ + "pytest-docker>=3.2.5", +] [build-system] requires = ["hatchling"] @@ -114,6 +122,7 @@ ignore = [ [tool.ruff.lint.per-file-ignores] "tests/*" = ["S101", "S106", "SIM117"] +"tests/**/*" = ["S101", "S106", "SIM117"] "tests/conftest.py" = ["E402"] [tool.ruff.format] diff --git a/pytest.ini b/pytest.ini index 521546f..7ce1ee1 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,4 +1,8 @@ # pytest.ini [pytest] asyncio_mode = auto -# asyncio_default_fixture_loop_scope = +# Default: only unit tests (integration/docker excluded) +addopts = -m "not integration and not docker" +markers = + integration: tests against real protocol servers (pytest-localserver + MockImapServer, no Docker) + docker: tests requiring Docker (GreenMail container, full SMTPโIMAP roundtrip) diff --git a/tests/docker/__init__.py b/tests/docker/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/docker/conftest.py b/tests/docker/conftest.py new file mode 100644 index 0000000..210852e --- /dev/null +++ b/tests/docker/conftest.py @@ -0,0 +1,234 @@ +"""Fixtures for Docker-based integration tests (Tier 2). + +GreenMail provides a real SMTP+IMAP server for full send/read roundtrip tests. +Tests auto-skip if Docker is not available. + +Security modes tested: +- NONE: SMTP port 3025, IMAP port 3143 (plaintext) +- TLS: SMTPS port 3465, IMAPS port 3993 (Implicit TLS, self-signed) +- STARTTLS: skipped (greenmail#135 โ GreenMail does not support STARTTLS) +""" + +from __future__ import annotations + +import shutil +import socket + +import pytest + +from mcp_email_server.config import ConnectionSecurity, EmailServer +from mcp_email_server.emails.classic import EmailClient + +# GreenMail accepts any credentials when auth is disabled +GREENMAIL_USER = "testuser@localhost" +GREENMAIL_PASSWORD = "testpass" # noqa: S105 + +GREENMAIL_SMTP_PORT = 3025 +GREENMAIL_IMAP_PORT = 3143 +GREENMAIL_SMTPS_PORT = 3465 +GREENMAIL_IMAPS_PORT = 3993 + + +def pytest_collection_modifyitems(config, items): + """Auto-skip all docker tests if Docker is not available.""" + if not shutil.which("docker"): + skip = pytest.mark.skip(reason="Docker not available") + for item in items: + item.add_marker(skip) + + +@pytest.fixture(scope="session") +def docker_compose_file(): + """Point pytest-docker to our docker-compose.yml.""" + import pathlib + + return str(pathlib.Path(__file__).parent / "docker-compose.yml") + + +@pytest.fixture(scope="session") +def docker_setup(): + """Override default 'up --build --wait' to avoid healthcheck requirement.""" + return ["up --build -d"] + + +def _greenmail_is_ready() -> bool: + """Check if GreenMail SMTP and IMAP services are actually ready. + + A simple socket connection is insufficient โ GreenMail may accept TCP + connections before the SMTP/IMAP handlers are initialized. + """ + import smtplib + + try: + # Test SMTP with a real EHLO handshake + with smtplib.SMTP("127.0.0.1", GREENMAIL_SMTP_PORT, timeout=3) as smtp: + smtp.ehlo() + # Test IMAP socket + with socket.create_connection(("127.0.0.1", GREENMAIL_IMAP_PORT), timeout=3): + pass + # Test SMTPS socket (TLS port) + with socket.create_connection(("127.0.0.1", GREENMAIL_SMTPS_PORT), timeout=3): + pass + # Test IMAPS socket (TLS port) + with socket.create_connection(("127.0.0.1", GREENMAIL_IMAPS_PORT), timeout=3): + pass + return True + except Exception: + return False + + +@pytest.fixture(scope="session") +def greenmail(docker_services): + """Wait for GreenMail to be healthy and return connection details.""" + docker_services.wait_until_responsive( + timeout=60.0, + pause=1.0, + check=_greenmail_is_ready, + ) + return { + "smtp_host": "127.0.0.1", + "smtp_port": GREENMAIL_SMTP_PORT, + "imap_host": "127.0.0.1", + "imap_port": GREENMAIL_IMAP_PORT, + "smtps_port": GREENMAIL_SMTPS_PORT, + "imaps_port": GREENMAIL_IMAPS_PORT, + "user": GREENMAIL_USER, + "password": GREENMAIL_PASSWORD, + } + + +# --------------------------------------------------------------------------- +# Plaintext (NONE) fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture() +def greenmail_smtp_server(greenmail) -> EmailServer: + """EmailServer config for GreenMail SMTP (plaintext).""" + return EmailServer( + host=greenmail["smtp_host"], + port=greenmail["smtp_port"], + user_name=greenmail["user"], + password=greenmail["password"], + security=ConnectionSecurity.NONE, + verify_ssl=False, + ) + + +@pytest.fixture() +def greenmail_imap_server(greenmail) -> EmailServer: + """EmailServer config for GreenMail IMAP (plaintext).""" + return EmailServer( + host=greenmail["imap_host"], + port=greenmail["imap_port"], + user_name=greenmail["user"], + password=greenmail["password"], + security=ConnectionSecurity.NONE, + verify_ssl=False, + ) + + +@pytest.fixture() +def greenmail_smtp_client(greenmail_smtp_server) -> EmailClient: + """EmailClient wired to GreenMail SMTP (plaintext).""" + return EmailClient(greenmail_smtp_server, sender=GREENMAIL_USER) + + +@pytest.fixture() +def greenmail_imap_client(greenmail_imap_server) -> EmailClient: + """EmailClient wired to GreenMail IMAP (plaintext).""" + return EmailClient(greenmail_imap_server, sender=GREENMAIL_USER) + + +# --------------------------------------------------------------------------- +# Implicit TLS fixtures (SMTPS / IMAPS) +# --------------------------------------------------------------------------- + + +@pytest.fixture() +def greenmail_smtps_server(greenmail) -> EmailServer: + """EmailServer config for GreenMail SMTPS (Implicit TLS).""" + return EmailServer( + host=greenmail["smtp_host"], + port=greenmail["smtps_port"], + user_name=greenmail["user"], + password=greenmail["password"], + security=ConnectionSecurity.TLS, + verify_ssl=False, + ) + + +@pytest.fixture() +def greenmail_imaps_server(greenmail) -> EmailServer: + """EmailServer config for GreenMail IMAPS (Implicit TLS).""" + return EmailServer( + host=greenmail["imap_host"], + port=greenmail["imaps_port"], + user_name=greenmail["user"], + password=greenmail["password"], + security=ConnectionSecurity.TLS, + verify_ssl=False, + ) + + +@pytest.fixture() +def greenmail_smtps_client(greenmail_smtps_server) -> EmailClient: + """EmailClient wired to GreenMail SMTPS (Implicit TLS).""" + return EmailClient(greenmail_smtps_server, sender=GREENMAIL_USER) + + +@pytest.fixture() +def greenmail_imaps_client(greenmail_imaps_server) -> EmailClient: + """EmailClient wired to GreenMail IMAPS (Implicit TLS).""" + return EmailClient(greenmail_imaps_server, sender=GREENMAIL_USER) + + +# Note: GreenMail does not support STARTTLS on plaintext ports. +# See https://github.com/greenmail-mail-test/greenmail/issues/135 +# STARTTLS is tested at Tier 1 (SMTP via aiosmtpd) and unit tests (IMAP). + + +# --------------------------------------------------------------------------- +# STARTTLS fixtures (upgrade on plaintext ports) +# TODO(greenmail#135): GreenMail does not advertise STARTTLS on plaintext ports. +# These fixtures are kept for when GreenMail adds STARTTLS support. +# See: https://github.com/greenmail-mail-test/greenmail/issues/135 +# --------------------------------------------------------------------------- + + +@pytest.fixture() +def greenmail_smtp_starttls_server(greenmail) -> EmailServer: + """EmailServer config for GreenMail SMTP with STARTTLS.""" + return EmailServer( + host=greenmail["smtp_host"], + port=greenmail["smtp_port"], + user_name=greenmail["user"], + password=greenmail["password"], + security=ConnectionSecurity.STARTTLS, + verify_ssl=False, + ) + + +@pytest.fixture() +def greenmail_imap_starttls_server(greenmail) -> EmailServer: + """EmailServer config for GreenMail IMAP with STARTTLS.""" + return EmailServer( + host=greenmail["imap_host"], + port=greenmail["imap_port"], + user_name=greenmail["user"], + password=greenmail["password"], + security=ConnectionSecurity.STARTTLS, + verify_ssl=False, + ) + + +@pytest.fixture() +def greenmail_smtp_starttls_client(greenmail_smtp_starttls_server) -> EmailClient: + """EmailClient wired to GreenMail SMTP with STARTTLS.""" + return EmailClient(greenmail_smtp_starttls_server, sender=GREENMAIL_USER) + + +@pytest.fixture() +def greenmail_imap_starttls_client(greenmail_imap_starttls_server) -> EmailClient: + """EmailClient wired to GreenMail IMAP with STARTTLS.""" + return EmailClient(greenmail_imap_starttls_server, sender=GREENMAIL_USER) diff --git a/tests/docker/docker-compose.yml b/tests/docker/docker-compose.yml new file mode 100644 index 0000000..492df93 --- /dev/null +++ b/tests/docker/docker-compose.yml @@ -0,0 +1,14 @@ +services: + greenmail: + image: greenmail/standalone:2.1.8 + ports: + - "3025:3025" # SMTP + - "3143:3143" # IMAP + - "3465:3465" # SMTPS + - "3993:3993" # IMAPS + - "8080:8080" # REST API + environment: + GREENMAIL_OPTS: >- + -Dgreenmail.setup.test.all + -Dgreenmail.hostname=0.0.0.0 + -Dgreenmail.auth.disabled diff --git a/tests/docker/test_roundtrip.py b/tests/docker/test_roundtrip.py new file mode 100644 index 0000000..5caa4c1 --- /dev/null +++ b/tests/docker/test_roundtrip.py @@ -0,0 +1,170 @@ +"""Docker integration tests: full SMTP/IMAP roundtrip via GreenMail. + +Tests the complete email pipeline end-to-end across security modes: +- NONE: plaintext SMTP (3025) / IMAP (3143) +- TLS: SMTPS (3465) / IMAPS (3993) +- STARTTLS: skipped (greenmail#135 โ GreenMail does not support STARTTLS) + +Run: make test-docker +Requires: Docker (auto-skips if not available) +""" + +from __future__ import annotations + +import asyncio + +import pytest + +from mcp_email_server.emails.classic import ( + test_imap_connection as check_imap_connection, +) +from mcp_email_server.emails.classic import ( + test_smtp_connection as check_smtp_connection, +) + +from .conftest import GREENMAIL_USER + +pytestmark = pytest.mark.docker + + +async def _get_first_uid(imap_client) -> str: + """Helper to get the first email UID from INBOX via metadata stream.""" + async for email_data in imap_client.get_emails_metadata_stream(mailbox="INBOX", page=1, page_size=1): + return email_data.get("email_id") or email_data.get("uid") + msg = "No emails found in INBOX" + raise AssertionError(msg) + + +# --------------------------------------------------------------------------- +# Plaintext (NONE) roundtrip +# --------------------------------------------------------------------------- + + +class TestRoundtripPlaintext: + """Send via SMTP, read via IMAP in GreenMail (plaintext).""" + + async def test_send_and_count(self, greenmail_smtp_client, greenmail_imap_client): + """Send an email and verify the count increases.""" + await greenmail_smtp_client.send_email( + recipients=[GREENMAIL_USER], + subject="Roundtrip Count Test", + body="Hello from Docker integration test!", + ) + await asyncio.sleep(1) + + count = await greenmail_imap_client.get_email_count(mailbox="INBOX") + assert count >= 1 + + async def test_send_and_read_body(self, greenmail_smtp_client, greenmail_imap_client): + """Send an email and verify the body content via IMAP.""" + await greenmail_smtp_client.send_email( + recipients=[GREENMAIL_USER], + subject="Body Roundtrip", + body="Expected body content here.", + ) + await asyncio.sleep(1) + + uid = await _get_first_uid(greenmail_imap_client) + body_result = await greenmail_imap_client.get_email_body_by_id(uid, mailbox="INBOX") + assert body_result is not None + assert "body" in body_result + + async def test_send_and_delete(self, greenmail_smtp_client, greenmail_imap_client): + """Send, read, delete, and verify the email is gone.""" + await greenmail_smtp_client.send_email( + recipients=[GREENMAIL_USER], + subject="Delete Roundtrip", + body="This will be deleted.", + ) + await asyncio.sleep(1) + + count_before = await greenmail_imap_client.get_email_count(mailbox="INBOX") + assert count_before >= 1 + + uid = await _get_first_uid(greenmail_imap_client) + deleted, _failed = await greenmail_imap_client.delete_emails([uid], mailbox="INBOX") + assert uid in deleted + + +# --------------------------------------------------------------------------- +# Implicit TLS (SMTPS/IMAPS) roundtrip +# --------------------------------------------------------------------------- + + +class TestRoundtripTLS: + """Send via SMTPS, read via IMAPS in GreenMail (Implicit TLS).""" + + async def test_smtp_tls_connection(self, greenmail_smtps_server): + """Verify SMTPS connection succeeds.""" + result = await check_smtp_connection(greenmail_smtps_server, timeout=10) + assert result.startswith("\u2705") + assert "tls" in result + + async def test_imap_tls_connection(self, greenmail_imaps_server): + """Verify IMAPS connection succeeds.""" + result = await check_imap_connection(greenmail_imaps_server, timeout=10) + assert result.startswith("\u2705") + assert "tls" in result + + async def test_send_tls_read_tls(self, greenmail_smtps_client, greenmail_imaps_client): + """Full TLS roundtrip: send via SMTPS, read via IMAPS.""" + await greenmail_smtps_client.send_email( + recipients=[GREENMAIL_USER], + subject="TLS Roundtrip", + body="Encrypted end-to-end!", + ) + await asyncio.sleep(1) + + count = await greenmail_imaps_client.get_email_count(mailbox="INBOX") + assert count >= 1 + + uid = await _get_first_uid(greenmail_imaps_client) + body_result = await greenmail_imaps_client.get_email_body_by_id(uid, mailbox="INBOX") + assert body_result is not None + + +# --------------------------------------------------------------------------- +# STARTTLS roundtrip +# TODO(greenmail#135): GreenMail does not advertise STARTTLS on plaintext ports. +# The SMTP EHLO response on port 3025 omits the STARTTLS extension, and the +# IMAP CAPABILITY on port 3143 does not include STARTTLS. +# STARTTLS is tested at Tier 1: SMTP via aiosmtpd, IMAP via unit tests. +# See: https://github.com/greenmail-mail-test/greenmail/issues/135 +# +# Uncomment when GreenMail adds STARTTLS support on plaintext ports. +# --------------------------------------------------------------------------- +# +# class TestRoundtripSTARTTLS: +# """Send via SMTP+STARTTLS, read via IMAP+STARTTLS in GreenMail.""" +# +# async def test_smtp_starttls_connection(self, greenmail_smtp_starttls_server): +# """Verify SMTP STARTTLS connection succeeds.""" +# result = await check_smtp_connection(greenmail_smtp_starttls_server, timeout=10) +# assert result.startswith("\u2705") +# assert "starttls" in result +# +# async def test_imap_starttls_connection(self, greenmail_imap_starttls_server): +# """Verify IMAP STARTTLS connection succeeds.""" +# result = await check_imap_connection(greenmail_imap_starttls_server, timeout=10) +# assert result.startswith("\u2705") +# assert "starttls" in result +# +# async def test_send_starttls_read_starttls( +# self, greenmail_smtp_starttls_client, greenmail_imap_starttls_client +# ): +# """Full STARTTLS roundtrip: send + read with STARTTLS on both sides.""" +# await greenmail_smtp_starttls_client.send_email( +# recipients=[GREENMAIL_USER], +# subject="STARTTLS Roundtrip", +# body="Upgraded to TLS on both sides!", +# ) +# await asyncio.sleep(1) +# +# count = await greenmail_imap_starttls_client.get_email_count(mailbox="INBOX") +# assert count >= 1 +# +# uid = await _get_first_uid(greenmail_imap_starttls_client) +# body_result = await greenmail_imap_starttls_client.get_email_body_by_id( +# uid, mailbox="INBOX" +# ) +# assert body_result is not None diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py new file mode 100644 index 0000000..34a4d85 --- /dev/null +++ b/tests/integration/conftest.py @@ -0,0 +1,419 @@ +"""Fixtures for integration tests against real protocol servers. + +Tier 1: Pure Python โ no Docker required. +- MockImapServer from aioimaplib (ships with runtime dependency) +- pytest-localserver SMTP server (dev dependency group: integration) + +Provides fixtures for all three ConnectionSecurity modes: +- NONE: plaintext IMAP + SMTP (default fixtures) +- TLS: Implicit TLS IMAP + SMTP via self-signed certs +- STARTTLS: tested at Docker tier (MockImapServer lacks STARTTLS support) +""" + +from __future__ import annotations + +import asyncio +import contextlib +import ipaddress +import socket +import ssl +import tempfile +from email.mime.application import MIMEApplication +from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText + +import pytest +from aioimaplib.imap_testing_server import Mail, MockImapServer +from aiosmtpd.smtp import AuthResult +from pytest_localserver.smtp import Handler, Server + +from mcp_email_server.config import ConnectionSecurity, EmailServer +from mcp_email_server.emails.classic import EmailClient + +# --------------------------------------------------------------------------- +# Workaround: aioimaplib#128 โ dangling asyncio tasks after IMAP connections +# aioimaplib's _client_task does not support asyncio cancellation and can +# linger after imap.logout(), preventing clean event-loop shutdown. +# This autouse fixture cancels all dangling tasks after each test. +# See: https://github.com/iroco-co/aioimaplib/issues/128 +# --------------------------------------------------------------------------- + + +@pytest.fixture(autouse=True) +async def _cleanup_dangling_imap_tasks(): + """Cancel lingering aioimaplib tasks after each test to prevent hangs. + + Workaround for https://github.com/iroco-co/aioimaplib/issues/128 + aioimaplib's _client_task spawns asyncio tasks (create_connection, etc.) + that do not support cancellation and linger after imap.logout(). + """ + tasks_before = set(asyncio.all_tasks()) + yield + await asyncio.sleep(0.05) + for task in asyncio.all_tasks() - tasks_before: + if task.done() or task == asyncio.current_task(): + continue + task.cancel() + with contextlib.suppress(asyncio.CancelledError, Exception): + await task + + +TEST_USER = "testuser@localhost" +TEST_PASSWORD = "testpass" # noqa: S105 + +# MockImapServer stores mailbox names verbatim. The production code +# quotes mailbox names for RFC 3501 compatibility (_quote_mailbox wraps +# them in double-quotes, e.g. '"INBOX"'). We must inject mail into +# the *quoted* mailbox so it is visible to the production EmailClient. +QUOTED_INBOX = '"INBOX"' + + +def _accept_any(server, session, envelope, mechanism, auth_data): + """Authenticator that accepts any credentials (for testing).""" + return AuthResult(success=True) + + +class AuthSmtpServer(Server): + """SMTP server that accepts AUTH LOGIN/PLAIN with any credentials.""" + + def __init__(self, host="localhost", port=0, ssl_context=None): + from aiosmtpd.controller import Controller + + kwargs = { + "authenticator": _accept_any, + "auth_require_tls": False, + } + if ssl_context is not None: + kwargs["tls_context"] = ssl_context + Controller.__init__( + self, + Handler(), + hostname=host, + port=port, + **kwargs, + ) + + +class AuthSmtpServerTLS(Server): + """SMTP server that speaks Implicit TLS (wraps socket in TLS immediately).""" + + def __init__(self, host="localhost", port=0, ssl_context=None): + from aiosmtpd.controller import Controller + + Controller.__init__( + self, + Handler(), + hostname=host, + port=port, + authenticator=_accept_any, + auth_require_tls=False, + ssl_context=ssl_context, + ) + + +def _free_port() -> int: + """Find a free TCP port on localhost.""" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("127.0.0.1", 0)) + return s.getsockname()[1] + + +def _generate_self_signed_cert(): + """Generate a self-signed certificate and key for testing TLS. + + Returns (certfile_path, keyfile_path) as temporary files. + Uses the cryptography library which ships with most Python environments. + """ + import datetime + + from cryptography import x509 + from cryptography.hazmat.primitives import hashes, serialization + from cryptography.hazmat.primitives.asymmetric import rsa + from cryptography.x509.oid import NameOID + + key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + subject = issuer = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, "localhost")]) + + cert = ( + x509 + .CertificateBuilder() + .subject_name(subject) + .issuer_name(issuer) + .public_key(key.public_key()) + .serial_number(x509.random_serial_number()) + .not_valid_before(datetime.datetime.now(datetime.timezone.utc)) + .not_valid_after(datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(days=1)) + .add_extension( + x509.SubjectAlternativeName([ + x509.DNSName("localhost"), + x509.IPAddress(ipaddress.IPv4Address("127.0.0.1")), + ]), + critical=False, + ) + .sign(key, hashes.SHA256()) + ) + + certfile = tempfile.NamedTemporaryFile(suffix=".pem", delete=False) # noqa: SIM115 + certfile.write(cert.public_bytes(serialization.Encoding.PEM)) + certfile.close() + + keyfile = tempfile.NamedTemporaryFile(suffix=".pem", delete=False) # noqa: SIM115 + keyfile.write( + key.private_bytes( + serialization.Encoding.PEM, serialization.PrivateFormat.TraditionalOpenSSL, serialization.NoEncryption() + ) + ) + keyfile.close() + + return certfile.name, keyfile.name + + +@pytest.fixture(scope="session") +def self_signed_cert(): + """Session-scoped self-signed TLS certificate for testing. + + Yields (certfile, keyfile, server_ssl_context, client_ssl_context). + """ + import os + + certfile, keyfile = _generate_self_signed_cert() + + # Server-side context + server_ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) + server_ctx.load_cert_chain(certfile, keyfile) + + # Client-side context (trusts the self-signed cert) + client_ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH) + client_ctx.load_verify_locations(certfile) + + yield certfile, keyfile, server_ctx, client_ctx + + os.unlink(certfile) + os.unlink(keyfile) + + +# --------------------------------------------------------------------------- +# Plaintext (NONE) fixtures โ default +# --------------------------------------------------------------------------- + + +@pytest.fixture() +async def imap_server_and_port(): + """Start a MockImapServer on a random port for each test. + + The server shares the test's event loop (required by aioimaplib). + Yields (server, port) so tests can inject mail and create clients. + """ + port = _free_port() + loop = asyncio.get_running_loop() + server = MockImapServer(loop=loop) + real_server = await server.run_server(host="127.0.0.1", port=port) + + yield server, port + + server.reset() + real_server.close() + # wait_closed() can hang if aioimaplib left dangling connections (aioimaplib#128) + with contextlib.suppress(TimeoutError, asyncio.TimeoutError): + await asyncio.wait_for(real_server.wait_closed(), timeout=1.0) + + +@pytest.fixture() +def imap_email_server(imap_server_and_port) -> EmailServer: + """Create an EmailServer config pointing at the MockImapServer.""" + _, port = imap_server_and_port + return EmailServer( + host="127.0.0.1", + port=port, + user_name=TEST_USER, + password=TEST_PASSWORD, + security=ConnectionSecurity.NONE, + verify_ssl=False, + ) + + +@pytest.fixture() +def imap_client(imap_email_server) -> EmailClient: + """Create an EmailClient wired to the MockImapServer.""" + return EmailClient(imap_email_server, sender=TEST_USER) + + +@pytest.fixture() +def smtpserver(request): + """SMTP server fixture that supports AUTH (unlike pytest-localserver default).""" + server = AuthSmtpServer() + server.start() + request.addfinalizer(server.stop) + return server + + +@pytest.fixture() +def smtp_email_server(smtpserver) -> EmailServer: + """Create an EmailServer config pointing at our auth-enabled SMTP server.""" + host, port = smtpserver.addr + return EmailServer( + host=host, + port=port, + user_name=TEST_USER, + password=TEST_PASSWORD, + security=ConnectionSecurity.NONE, + verify_ssl=False, + ) + + +@pytest.fixture() +def smtp_client(smtp_email_server) -> EmailClient: + """Create an EmailClient wired to the pytest-localserver SMTP server.""" + return EmailClient(smtp_email_server, sender=TEST_USER) + + +# --------------------------------------------------------------------------- +# Implicit TLS fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture() +async def imap_tls_server_and_port(self_signed_cert): + """Start a MockImapServer with Implicit TLS on a random port.""" + _, _, server_ctx, _ = self_signed_cert + port = _free_port() + loop = asyncio.get_running_loop() + server = MockImapServer(loop=loop) + real_server = await server.run_server(host="127.0.0.1", port=port, ssl_context=server_ctx) + + yield server, port + + server.reset() + real_server.close() + # wait_closed() can hang if aioimaplib left dangling connections (aioimaplib#128) + with contextlib.suppress(TimeoutError, asyncio.TimeoutError): + await asyncio.wait_for(real_server.wait_closed(), timeout=1.0) + + +@pytest.fixture() +def imap_tls_email_server(imap_tls_server_and_port) -> EmailServer: + """EmailServer config for Implicit TLS IMAP (verify_ssl=False for self-signed).""" + _, port = imap_tls_server_and_port + return EmailServer( + host="127.0.0.1", + port=port, + user_name=TEST_USER, + password=TEST_PASSWORD, + security=ConnectionSecurity.TLS, + verify_ssl=False, + ) + + +@pytest.fixture() +def imap_tls_client(imap_tls_email_server) -> EmailClient: + """EmailClient wired to MockImapServer over Implicit TLS.""" + return EmailClient(imap_tls_email_server, sender=TEST_USER) + + +@pytest.fixture() +def smtpserver_tls(request, self_signed_cert): + """SMTP server with Implicit TLS (wraps socket in TLS on connect).""" + _, _, server_ctx, _ = self_signed_cert + server = AuthSmtpServerTLS(ssl_context=server_ctx) + server.start() + request.addfinalizer(server.stop) + return server + + +@pytest.fixture() +def smtp_tls_email_server(smtpserver_tls) -> EmailServer: + """EmailServer config for Implicit TLS SMTP.""" + host, port = smtpserver_tls.addr + return EmailServer( + host=host, + port=port, + user_name=TEST_USER, + password=TEST_PASSWORD, + security=ConnectionSecurity.TLS, + verify_ssl=False, + ) + + +@pytest.fixture() +def smtp_tls_client(smtp_tls_email_server) -> EmailClient: + """EmailClient wired to SMTP over Implicit TLS.""" + return EmailClient(smtp_tls_email_server, sender=TEST_USER) + + +# --------------------------------------------------------------------------- +# STARTTLS SMTP fixture (MockImapServer lacks STARTTLS protocol support) +# --------------------------------------------------------------------------- + + +@pytest.fixture() +def smtpserver_starttls(request, self_signed_cert): + """SMTP server that supports STARTTLS upgrade.""" + _, _, server_ctx, _ = self_signed_cert + server = AuthSmtpServer(ssl_context=server_ctx) + server.start() + request.addfinalizer(server.stop) + return server + + +@pytest.fixture() +def smtp_starttls_email_server(smtpserver_starttls) -> EmailServer: + """EmailServer config for SMTP STARTTLS.""" + host, port = smtpserver_starttls.addr + return EmailServer( + host=host, + port=port, + user_name=TEST_USER, + password=TEST_PASSWORD, + security=ConnectionSecurity.STARTTLS, + verify_ssl=False, + ) + + +@pytest.fixture() +def smtp_starttls_client(smtp_starttls_email_server) -> EmailClient: + """EmailClient wired to SMTP with STARTTLS upgrade.""" + return EmailClient(smtp_starttls_email_server, sender=TEST_USER) + + +# --------------------------------------------------------------------------- +# Mail helpers +# --------------------------------------------------------------------------- + + +def make_test_mail( + to: str = TEST_USER, + subject: str = "Test Subject", + body: str = "Hello, World!", + mail_from: str = "sender@localhost", + **kwargs, +) -> Mail: + """Create a Mail object for injection into MockImapServer.""" + return Mail.create( + to=[to], + mail_from=mail_from, + subject=subject, + content=body, + **kwargs, + ) + + +def make_multipart_mail( + to: str = TEST_USER, + subject: str = "Test with Attachment", + body: str = "See attached.", + mail_from: str = "sender@localhost", + attachment_name: str = "test.txt", + attachment_content: bytes = b"attachment content", +) -> Mail: + """Create a multipart Mail with an attachment for MockImapServer injection.""" + msg = MIMEMultipart() + msg["To"] = to + msg["From"] = mail_from + msg["Subject"] = subject + msg.attach(MIMEText(body, "plain", "utf-8")) + + part = MIMEApplication(attachment_content, Name=attachment_name) + part["Content-Disposition"] = f'attachment; filename="{attachment_name}"' + msg.attach(part) + + return Mail(msg) diff --git a/tests/integration/test_imap_live.py b/tests/integration/test_imap_live.py new file mode 100644 index 0000000..46604ef --- /dev/null +++ b/tests/integration/test_imap_live.py @@ -0,0 +1,136 @@ +"""Integration tests for IMAP operations against MockImapServer. + +These tests exercise the real EmailClient methods against a live IMAP server +running in-process. No mocks โ every byte goes over a TCP socket. + +Note: MockImapServer does NOT support INTERNALDATE or BODY.PEEK[HEADER], +so get_emails_metadata_stream() cannot be tested here. Use GreenMail +Docker tests (Tier 2) for full metadata/roundtrip coverage. + +Run: make test-integration +""" + +from __future__ import annotations + +from email.mime.text import MIMEText + +import pytest + +from mcp_email_server.config import ConnectionSecurity, EmailServer +from mcp_email_server.emails.classic import test_imap_connection as check_imap_connection + +from .conftest import QUOTED_INBOX, TEST_USER, make_multipart_mail, make_test_mail + +pytestmark = pytest.mark.integration + + +class TestImapConnectionLive: + """Test IMAP connection against a real server.""" + + async def test_connection_success(self, imap_server_and_port, imap_email_server): + result = await check_imap_connection(imap_email_server, timeout=5) + assert result.startswith("โ ") + assert "successful" in result + + async def test_connection_wrong_port(self): + server = EmailServer( + host="127.0.0.1", + port=1, + user_name="x", + password="x", + security=ConnectionSecurity.NONE, + verify_ssl=False, + ) + result = await check_imap_connection(server, timeout=2) + assert result.startswith("โ") + + +class TestGetEmailCount: + """Test get_email_count against MockImapServer.""" + + async def test_empty_inbox(self, imap_server_and_port, imap_client): + count = await imap_client.get_email_count(mailbox="INBOX") + assert count == 0 + + async def test_with_injected_emails(self, imap_server_and_port, imap_client): + server, _ = imap_server_and_port + server.receive(make_test_mail(subject="Email 1"), imap_user=TEST_USER, mailbox=QUOTED_INBOX) + server.receive(make_test_mail(subject="Email 2"), imap_user=TEST_USER, mailbox=QUOTED_INBOX) + server.receive(make_test_mail(subject="Email 3"), imap_user=TEST_USER, mailbox=QUOTED_INBOX) + + count = await imap_client.get_email_count(mailbox="INBOX") + assert count == 3 + + +class TestGetEmailBody: + """Test get_email_body_by_id against MockImapServer.""" + + async def test_body_extraction(self, imap_server_and_port, imap_client): + server, _ = imap_server_and_port + server.receive( + make_test_mail(subject="Body Test", body="This is the body content."), + imap_user=TEST_USER, + mailbox=QUOTED_INBOX, + ) + + body_result = await imap_client.get_email_body_by_id("1", mailbox="INBOX") + assert body_result is not None + assert "body" in body_result + + +class TestDownloadAttachment: + """Test download_attachment against MockImapServer.""" + + async def test_attachment_download(self, imap_server_and_port, imap_client, tmp_path): + server, _ = imap_server_and_port + attachment_content = b"Hello from attachment!" + server.receive( + make_multipart_mail( + subject="Attachment Test", + attachment_name="notes.txt", + attachment_content=attachment_content, + ), + imap_user=TEST_USER, + mailbox=QUOTED_INBOX, + ) + + save_path = str(tmp_path / "notes.txt") + download = await imap_client.download_attachment("1", "notes.txt", save_path, mailbox="INBOX") + + assert download["attachment_name"] == "notes.txt" + assert download["size"] == len(attachment_content) + assert (tmp_path / "notes.txt").read_bytes() == attachment_content + + +class TestDeleteEmails: + """Test delete_emails against MockImapServer.""" + + async def test_delete_and_verify_gone(self, imap_server_and_port, imap_client): + server, _ = imap_server_and_port + server.receive(make_test_mail(subject="To Delete"), imap_user=TEST_USER, mailbox=QUOTED_INBOX) + + # Verify exists + count_before = await imap_client.get_email_count(mailbox="INBOX") + assert count_before == 1 + + # Delete UID 1 + deleted, failed = await imap_client.delete_emails(["1"], mailbox="INBOX") + assert "1" in deleted + assert len(failed) == 0 + + # Verify gone + count_after = await imap_client.get_email_count(mailbox="INBOX") + assert count_after == 0 + + +class TestAppendToSent: + """Test append_to_sent against MockImapServer.""" + + async def test_append_message(self, imap_server_and_port, imap_client, imap_email_server): + msg = MIMEText("Sent message body", "plain", "utf-8") + msg["Subject"] = "Sent Test" + msg["From"] = TEST_USER + msg["To"] = "recipient@localhost" + + result = await imap_client.append_to_sent(msg, imap_email_server) + assert result is True diff --git a/tests/integration/test_imap_security.py b/tests/integration/test_imap_security.py new file mode 100644 index 0000000..a6e7dd6 --- /dev/null +++ b/tests/integration/test_imap_security.py @@ -0,0 +1,141 @@ +"""Integration tests for IMAP with different security settings. + +Tests exercise EmailClient against MockImapServer in multiple security modes: +- ConnectionSecurity.NONE (plaintext โ covered in test_imap_live.py) +- ConnectionSecurity.TLS (Implicit TLS via self-signed cert) +- Connection test diagnostics with wrong security settings + +MockImapServer does NOT support STARTTLS protocol, so STARTTLS IMAP is +tested only at the Docker tier (GreenMail). + +Run: make test-integration +""" + +from __future__ import annotations + +import pytest + +from mcp_email_server.config import ConnectionSecurity, EmailServer +from mcp_email_server.emails.classic import test_imap_connection as check_imap_connection + +from .conftest import QUOTED_INBOX, TEST_PASSWORD, TEST_USER, make_test_mail + +pytestmark = pytest.mark.integration + + +class TestImapImplicitTLS: + """IMAP operations over Implicit TLS (ConnectionSecurity.TLS).""" + + async def test_connection_success(self, imap_tls_server_and_port, imap_tls_email_server): + """Verify TLS connection + login succeeds with self-signed cert.""" + result = await check_imap_connection(imap_tls_email_server, timeout=5) + assert result.startswith("โ ") + assert "tls" in result + + async def test_email_count(self, imap_tls_server_and_port, imap_tls_client): + """Verify email count works over TLS.""" + server, _ = imap_tls_server_and_port + server.receive(make_test_mail(subject="TLS Email"), imap_user=TEST_USER, mailbox=QUOTED_INBOX) + + count = await imap_tls_client.get_email_count(mailbox="INBOX") + assert count == 1 + + async def test_email_body(self, imap_tls_server_and_port, imap_tls_client): + """Verify body extraction works over TLS.""" + server, _ = imap_tls_server_and_port + server.receive( + make_test_mail(subject="TLS Body", body="Encrypted body content."), + imap_user=TEST_USER, + mailbox=QUOTED_INBOX, + ) + + result = await imap_tls_client.get_email_body_by_id("1", mailbox="INBOX") + assert result is not None + assert "body" in result + + async def test_delete_over_tls(self, imap_tls_server_and_port, imap_tls_client): + """Verify delete works over TLS.""" + server, _ = imap_tls_server_and_port + server.receive(make_test_mail(subject="TLS Delete"), imap_user=TEST_USER, mailbox=QUOTED_INBOX) + + deleted, failed = await imap_tls_client.delete_emails(["1"], mailbox="INBOX") + assert "1" in deleted + assert len(failed) == 0 + + +class TestImapSecurityMismatch: + """Test that wrong security settings produce clear error messages.""" + + async def test_tls_on_plaintext_server(self, imap_server_and_port): + """TLS client against plaintext server should fail with clear error.""" + _, port = imap_server_and_port + server = EmailServer( + host="127.0.0.1", + port=port, + user_name=TEST_USER, + password=TEST_PASSWORD, + security=ConnectionSecurity.TLS, + verify_ssl=False, + ) + result = await check_imap_connection(server, timeout=3) + assert result.startswith("โ") + + async def test_starttls_on_plaintext_server(self, imap_server_and_port): + """STARTTLS against a server without STARTTLS capability should fail clearly.""" + _, port = imap_server_and_port + server = EmailServer( + host="127.0.0.1", + port=port, + user_name=TEST_USER, + password=TEST_PASSWORD, + security=ConnectionSecurity.STARTTLS, + verify_ssl=False, + ) + result = await check_imap_connection(server, timeout=3) + assert result.startswith("โ") + assert "STARTTLS" in result + + +# --------------------------------------------------------------------------- +# Blocked IMAP mismatch tests +# TODO(aioimaplib#128): aioimaplib's internal tasks do not support asyncio +# cancellation. Connecting with the wrong security mode (e.g. plaintext client +# to a TLS server, or verify_ssl=True with a self-signed cert) causes +# test_imap_connection() to hang indefinitely โ the internal _client_task +# holds transport references that prevent cleanup. +# See: https://github.com/iroco-co/aioimaplib/issues/128 +# +# Uncomment when aioimaplib#128 is resolved upstream. +# --------------------------------------------------------------------------- +# +# class TestImapSecurityMismatchBlocked: +# """Tests blocked by aioimaplib#128 (connection hangs on security mismatch).""" +# +# async def test_plaintext_on_tls_server(self, imap_tls_server_and_port): +# """Plaintext client against TLS server should time out with clear error.""" +# _, port = imap_tls_server_and_port +# server = EmailServer( +# host="127.0.0.1", +# port=port, +# user_name=TEST_USER, +# password=TEST_PASSWORD, +# security=ConnectionSecurity.NONE, +# verify_ssl=False, +# ) +# result = await check_imap_connection(server, timeout=3) +# assert result.startswith("โ") +# +# async def test_verify_ssl_true_rejects_self_signed(self, imap_tls_server_and_port): +# """verify_ssl=True should reject our self-signed certificate.""" +# _, port = imap_tls_server_and_port +# server = EmailServer( +# host="127.0.0.1", +# port=port, +# user_name=TEST_USER, +# password=TEST_PASSWORD, +# security=ConnectionSecurity.TLS, +# verify_ssl=True, +# ) +# result = await check_imap_connection(server, timeout=3) +# assert result.startswith("โ") +# assert "SSL" in result or "certificate" in result.lower() or "timed out" in result.lower() diff --git a/tests/integration/test_smtp_live.py b/tests/integration/test_smtp_live.py new file mode 100644 index 0000000..e0c0cee --- /dev/null +++ b/tests/integration/test_smtp_live.py @@ -0,0 +1,152 @@ +"""Integration tests for SMTP operations against pytest-localserver. + +These tests exercise the real EmailClient.send_email() method against a live +SMTP server. Sent messages are captured in smtpserver.outbox for inspection. + +Run: make test-integration +""" + +from __future__ import annotations + +import pytest + +from mcp_email_server.emails.classic import test_smtp_connection as check_smtp_connection + +from .conftest import TEST_USER + +pytestmark = pytest.mark.integration + + +class TestSmtpConnectionLive: + """Test SMTP connection against a real server.""" + + async def test_connection_success(self, smtp_email_server): + result = await check_smtp_connection(smtp_email_server, timeout=5) + assert result.startswith("โ ") + assert "successful" in result + + +class TestSendEmail: + """Test send_email against pytest-localserver SMTP.""" + + async def test_send_basic_email(self, smtp_client, smtpserver): + await smtp_client.send_email( + recipients=["recipient@localhost"], + subject="Basic Test", + body="Hello from integration test!", + ) + + assert len(smtpserver.outbox) == 1 + msg = smtpserver.outbox[0] + assert msg["Subject"] == "Basic Test" + assert msg["To"] == "recipient@localhost" + assert msg["From"] == TEST_USER + + async def test_send_email_with_cc(self, smtp_client, smtpserver): + await smtp_client.send_email( + recipients=["to@localhost"], + subject="CC Test", + body="With CC", + cc=["cc1@localhost", "cc2@localhost"], + ) + + assert len(smtpserver.outbox) == 1 + msg = smtpserver.outbox[0] + assert msg["Cc"] == "cc1@localhost, cc2@localhost" + + async def test_send_email_with_bcc(self, smtp_client, smtpserver): + await smtp_client.send_email( + recipients=["to@localhost"], + subject="BCC Test", + body="With BCC", + bcc=["secret@localhost"], + ) + + assert len(smtpserver.outbox) == 1 + msg = smtpserver.outbox[0] + # BCC should NOT appear in headers + assert msg.get("Bcc") is None + + async def test_send_html_email(self, smtp_client, smtpserver): + await smtp_client.send_email( + recipients=["to@localhost"], + subject="HTML Test", + body="