diff --git a/README.md b/README.md index 7db0a35..eeb47cf 100644 --- a/README.md +++ b/README.md @@ -79,6 +79,7 @@ You can also configure the email server using environment variables, which is pa | `MCP_EMAIL_SERVER_SMTP_START_SSL` | Enable STARTTLS | `false` | No | | `MCP_EMAIL_SERVER_SMTP_VERIFY_SSL` | Verify SSL certificates (disable for self-signed) | `true` | No | | `MCP_EMAIL_SERVER_ENABLE_ATTACHMENT_DOWNLOAD` | Enable attachment download | `false` | No | +| `MCP_EMAIL_SERVER_ENABLE_FOLDER_MANAGEMENT` | Enable folder management tools | `false` | No | | `MCP_EMAIL_SERVER_SAVE_TO_SENT` | Save sent emails to IMAP Sent folder | `true` | No | | `MCP_EMAIL_SERVER_SENT_FOLDER_NAME` | Custom Sent folder name (auto-detect if not set) | - | No | @@ -115,6 +116,39 @@ enable_attachment_download = true Once enabled, you can use the `download_attachment` tool to save email attachments to a specified path. +### Enabling Folder Management + +By default, folder management tools (list, create, delete, rename folders, and move/copy emails) are disabled for security reasons. To enable these features: + +**Option 1: Environment Variable** + +```json +{ + "mcpServers": { + "zerolib-email": { + "command": "uvx", + "args": ["mcp-email-server@latest", "stdio"], + "env": { + "MCP_EMAIL_SERVER_ENABLE_FOLDER_MANAGEMENT": "true" + } + } + } +} +``` + +**Option 2: TOML Configuration** + +Add `enable_folder_management = true` to your TOML configuration file (`~/.config/zerolib/mcp_email_server/config.toml`): + +```toml +enable_folder_management = true + +[[emails]] +# ... your email configuration +``` + +Once enabled, you can use the folder management tools: `list_folders`, `create_folder`, `delete_folder`, `rename_folder`, `move_emails`, and `copy_emails`. + ### Saving Sent Emails to IMAP Sent Folder By default, sent emails are automatically saved to your IMAP Sent folder. This ensures that emails sent via the MCP server appear in your email client (Thunderbird, webmail, etc.). @@ -247,6 +281,50 @@ await send_email( The `in_reply_to` parameter sets the `In-Reply-To` header, and `references` sets the `References` header. Both are used by email clients to thread conversations properly. +### Managing Folders + +You can list, create, delete, and rename email folders: + +```python +# List all folders +folders = await list_folders(account_name="work") +for folder in folders.folders: + print(f"{folder.name} (flags: {folder.flags})") + +# Create a new folder +await create_folder(account_name="work", folder_name="Projects/2024") + +# Rename a folder +await rename_folder(account_name="work", old_name="Old Name", new_name="New Name") + +# Delete a folder (must be empty on most servers) +await delete_folder(account_name="work", folder_name="Old Folder") +``` + +### Moving and Copying Emails + +Move or copy emails between folders: + +```python +# Move emails to a different folder +await move_emails( + account_name="work", + email_ids=["123", "456"], + destination_folder="Archive", + source_mailbox="INBOX" +) + +# Copy emails (preserves original) - useful for applying labels +await copy_emails( + account_name="work", + email_ids=["123"], + destination_folder="Labels/Important", + source_mailbox="INBOX" +) +``` + +**Note for Proton Mail users:** Proton Mail Bridge exposes labels as folders under `Labels/`. Copying an email to a label folder effectively applies that label while keeping the original in place. + ## Development This project is managed using [uv](https://github.com/ai-zerolab/uv). diff --git a/mcp_email_server/app.py b/mcp_email_server/app.py index de9f95a..bf8eee8 100644 --- a/mcp_email_server/app.py +++ b/mcp_email_server/app.py @@ -14,7 +14,13 @@ from mcp_email_server.emails.models import ( AttachmentDownloadResponse, EmailContentBatchResponse, + EmailLabelsResponse, + EmailMarkResponse, EmailMetadataPageResponse, + EmailMoveResponse, + FolderListResponse, + FolderOperationResponse, + LabelListResponse, ) mcp = FastMCP("email") @@ -68,7 +74,13 @@ async def list_emails_metadata( Literal["asc", "desc"], Field(default=None, description="Order emails by field. `asc` or `desc`."), ] = "desc", - mailbox: Annotated[str, Field(default="INBOX", description="The mailbox to search.")] = "INBOX", + mailbox: Annotated[ + str, + Field( + default="INBOX", + description="IMAP folder path. Standard: INBOX, Sent, Drafts, Trash. Provider-specific: Gmail uses '[Gmail]/...' prefix (e.g., '[Gmail]/Sent Mail'); ProtonMail Bridge exposes folders as 'Folders/' and labels as 'Labels/'.", + ), + ] = "INBOX", seen: Annotated[ bool | None, Field(default=None, description="Filter by read status: True=read, False=unread, None=all."), @@ -111,7 +123,13 @@ async def get_emails_content( description="List of email_id to retrieve (obtained from list_emails_metadata). Can be a single email_id or multiple email_ids." ), ], - mailbox: Annotated[str, Field(default="INBOX", description="The mailbox to retrieve emails from.")] = "INBOX", + mailbox: Annotated[ + str, + Field( + default="INBOX", + description="IMAP folder path. Standard: INBOX, Sent, Drafts, Trash. Provider-specific: Gmail uses '[Gmail]/...' prefix; ProtonMail Bridge uses 'Folders/' and 'Labels/'.", + ), + ] = "INBOX", ) -> EmailContentBatchResponse: handler = dispatch_handler(account_name) return await handler.get_emails_content(email_ids, mailbox) @@ -185,7 +203,13 @@ async def delete_emails( list[str], Field(description="List of email_id to delete (obtained from list_emails_metadata)."), ], - mailbox: Annotated[str, Field(default="INBOX", description="The mailbox to delete emails from.")] = "INBOX", + mailbox: Annotated[ + str, + Field( + default="INBOX", + description="IMAP folder path. Standard: INBOX, Sent, Drafts, Trash. Provider-specific: Gmail uses '[Gmail]/...' prefix; ProtonMail Bridge uses 'Folders/' and 'Labels/'.", + ), + ] = "INBOX", ) -> str: handler = dispatch_handler(account_name) deleted_ids, failed_ids = await handler.delete_emails(email_ids, mailbox) @@ -196,6 +220,29 @@ async def delete_emails( return result +@mcp.tool(description="Mark one or more emails as read or unread. Use list_emails_metadata first to get the email_id.") +async def mark_emails( + account_name: Annotated[str, Field(description="The name of the email account.")], + email_ids: Annotated[ + list[str], + Field(description="List of email_id to mark (obtained from list_emails_metadata)."), + ], + mark_as: Annotated[ + Literal["read", "unread"], + Field(description="Mark emails as 'read' or 'unread'."), + ], + mailbox: Annotated[ + str, + Field( + default="INBOX", + description="IMAP folder path. Standard: INBOX, Sent, Drafts, Trash. Provider-specific: Gmail uses '[Gmail]/...' prefix; ProtonMail Bridge uses 'Folders/' and 'Labels/'.", + ), + ] = "INBOX", +) -> EmailMarkResponse: + handler = dispatch_handler(account_name) + return await handler.mark_emails(email_ids, mark_as, mailbox) + + @mcp.tool( description="Download an email attachment and save it to the specified path. This feature must be explicitly enabled in settings (enable_attachment_download=true) due to security considerations.", ) @@ -208,7 +255,13 @@ async def download_attachment( str, Field(description="The name of the attachment to download (as shown in the attachments list).") ], save_path: Annotated[str, Field(description="The absolute path where the attachment should be saved.")], - mailbox: Annotated[str, Field(description="The mailbox to search in (default: INBOX).")] = "INBOX", + mailbox: Annotated[ + str, + Field( + default="INBOX", + description="IMAP folder path. Standard: INBOX, Sent, Drafts, Trash. Provider-specific: Gmail uses '[Gmail]/...' prefix; ProtonMail Bridge uses 'Folders/' and 'Labels/'.", + ), + ] = "INBOX", ) -> AttachmentDownloadResponse: settings = get_settings() if not settings.enable_attachment_download: @@ -219,3 +272,177 @@ async def download_attachment( handler = dispatch_handler(account_name) return await handler.download_attachment(email_id, attachment_name, save_path, mailbox) + + +def _check_folder_management_enabled() -> None: + """Check if folder management is enabled, raise PermissionError if not.""" + settings = get_settings() + if not settings.enable_folder_management: + msg = ( + "Folder management is disabled. Set 'enable_folder_management=true' in settings " + "or 'MCP_EMAIL_SERVER_ENABLE_FOLDER_MANAGEMENT=true' environment variable to enable this feature." + ) + raise PermissionError(msg) + + +@mcp.tool( + description="List all folders/mailboxes for an email account. Returns folder names, hierarchy delimiters, and IMAP flags. Requires enable_folder_management=true.", +) +async def list_folders( + account_name: Annotated[str, Field(description="The name of the email account.")], +) -> FolderListResponse: + _check_folder_management_enabled() + handler = dispatch_handler(account_name) + return await handler.list_folders() + + +@mcp.tool( + description="Move one or more emails to a different folder (removes from source). Use this to clear emails from INBOX. Uses IMAP MOVE command if supported, otherwise falls back to COPY + DELETE. Requires enable_folder_management=true.", +) +async def move_emails( + account_name: Annotated[str, Field(description="The name of the email account.")], + email_ids: Annotated[ + list[str], + Field(description="List of email_id to move (obtained from list_emails_metadata)."), + ], + destination_folder: Annotated[str, Field(description="The destination folder name.")], + source_mailbox: Annotated[ + str, Field(default="INBOX", description="The source mailbox to move emails from.") + ] = "INBOX", +) -> EmailMoveResponse: + _check_folder_management_enabled() + handler = dispatch_handler(account_name) + return await handler.move_emails(email_ids, destination_folder, source_mailbox) + + +@mcp.tool( + description="Copy one or more emails to a different folder. The original emails remain in the source folder. Useful for applying labels in providers like Proton Mail. Requires enable_folder_management=true.", +) +async def copy_emails( + account_name: Annotated[str, Field(description="The name of the email account.")], + email_ids: Annotated[ + list[str], + Field(description="List of email_id to copy (obtained from list_emails_metadata)."), + ], + destination_folder: Annotated[str, Field(description="The destination folder name.")], + source_mailbox: Annotated[ + str, Field(default="INBOX", description="The source mailbox to copy emails from.") + ] = "INBOX", +) -> EmailMoveResponse: + _check_folder_management_enabled() + handler = dispatch_handler(account_name) + return await handler.copy_emails(email_ids, destination_folder, source_mailbox) + + +@mcp.tool(description="Create a new folder/mailbox. Requires enable_folder_management=true.") +async def create_folder( + account_name: Annotated[str, Field(description="The name of the email account.")], + folder_name: Annotated[str, Field(description="The name of the folder to create.")], +) -> FolderOperationResponse: + _check_folder_management_enabled() + handler = dispatch_handler(account_name) + return await handler.create_folder(folder_name) + + +@mcp.tool( + description="Delete a folder/mailbox. The folder must be empty on most IMAP servers. Requires enable_folder_management=true." +) +async def delete_folder( + account_name: Annotated[str, Field(description="The name of the email account.")], + folder_name: Annotated[str, Field(description="The name of the folder to delete.")], +) -> FolderOperationResponse: + _check_folder_management_enabled() + handler = dispatch_handler(account_name) + return await handler.delete_folder(folder_name) + + +@mcp.tool(description="Rename a folder/mailbox. Requires enable_folder_management=true.") +async def rename_folder( + account_name: Annotated[str, Field(description="The name of the email account.")], + old_name: Annotated[str, Field(description="The current folder name.")], + new_name: Annotated[str, Field(description="The new folder name.")], +) -> FolderOperationResponse: + _check_folder_management_enabled() + handler = dispatch_handler(account_name) + return await handler.rename_folder(old_name, new_name) + + +@mcp.tool( + description="List all labels for an email account (ProtonMail: folders under Labels/ prefix). Requires enable_folder_management=true." +) +async def list_labels( + account_name: Annotated[str, Field(description="The name of the email account.")], +) -> LabelListResponse: + _check_folder_management_enabled() + handler = dispatch_handler(account_name) + return await handler.list_labels() + + +@mcp.tool( + description="Apply a label to one or more emails. NOTE: This only tags emails - originals stay in INBOX. To remove from INBOX, use move_emails instead. Requires enable_folder_management=true." +) +async def apply_label( + account_name: Annotated[str, Field(description="The name of the email account.")], + email_ids: Annotated[ + list[str], + Field(description="List of email_id to label (obtained from list_emails_metadata)."), + ], + label_name: Annotated[str, Field(description="The label name (without Labels/ prefix).")], + source_mailbox: Annotated[ + str, Field(default="INBOX", description="The source mailbox containing the emails.") + ] = "INBOX", +) -> EmailMoveResponse: + _check_folder_management_enabled() + handler = dispatch_handler(account_name) + return await handler.apply_label(email_ids, label_name, source_mailbox) + + +@mcp.tool( + description="Remove a label from one or more emails. Deletes from label folder while preserving original emails. Requires enable_folder_management=true." +) +async def remove_label( + account_name: Annotated[str, Field(description="The name of the email account.")], + email_ids: Annotated[ + list[str], + Field(description="List of email_id to unlabel (obtained from list_emails_metadata)."), + ], + label_name: Annotated[str, Field(description="The label name (without Labels/ prefix).")], +) -> EmailMoveResponse: + _check_folder_management_enabled() + handler = dispatch_handler(account_name) + return await handler.remove_label(email_ids, label_name) + + +@mcp.tool(description="Get all labels applied to a specific email. Requires enable_folder_management=true.") +async def get_email_labels( + account_name: Annotated[str, Field(description="The name of the email account.")], + email_id: Annotated[str, Field(description="The email_id to check (obtained from list_emails_metadata).")], + source_mailbox: Annotated[ + str, Field(default="INBOX", description="The source mailbox containing the email.") + ] = "INBOX", +) -> EmailLabelsResponse: + _check_folder_management_enabled() + handler = dispatch_handler(account_name) + return await handler.get_email_labels(email_id, source_mailbox) + + +@mcp.tool(description="Create a new label (creates Labels/name folder). Requires enable_folder_management=true.") +async def create_label( + account_name: Annotated[str, Field(description="The name of the email account.")], + label_name: Annotated[str, Field(description="The label name to create (without Labels/ prefix).")], +) -> FolderOperationResponse: + _check_folder_management_enabled() + handler = dispatch_handler(account_name) + return await handler.create_label(label_name) + + +@mcp.tool( + description="Delete a label (deletes Labels/name folder). The label must be empty on most IMAP servers. Requires enable_folder_management=true." +) +async def delete_label( + account_name: Annotated[str, Field(description="The name of the email account.")], + label_name: Annotated[str, Field(description="The label name to delete (without Labels/ prefix).")], +) -> FolderOperationResponse: + _check_folder_management_enabled() + handler = dispatch_handler(account_name) + return await handler.delete_label(label_name) diff --git a/mcp_email_server/config.py b/mcp_email_server/config.py index fbbcabc..a72932b 100644 --- a/mcp_email_server/config.py +++ b/mcp_email_server/config.py @@ -228,6 +228,7 @@ class Settings(BaseSettings): providers: list[ProviderSettings] = [] db_location: str = CONFIG_PATH.with_name("db.sqlite3").as_posix() enable_attachment_download: bool = False + enable_folder_management: bool = False model_config = SettingsConfigDict(toml_file=CONFIG_PATH, validate_assignment=True, revalidate_instances="always") @@ -241,6 +242,12 @@ def __init__(self, **data: Any) -> None: self.enable_attachment_download = _parse_bool_env(env_enable_attachment, False) logger.info(f"Set enable_attachment_download={self.enable_attachment_download} from environment variable") + # Check for enable_folder_management from environment variable + env_enable_folder = os.getenv("MCP_EMAIL_SERVER_ENABLE_FOLDER_MANAGEMENT") + if env_enable_folder is not None: + self.enable_folder_management = _parse_bool_env(env_enable_folder, False) + logger.info(f"Set enable_folder_management={self.enable_folder_management} from environment variable") + # Check for email configuration from environment variables env_email = EmailSettings.from_env() if env_email: diff --git a/mcp_email_server/emails/__init__.py b/mcp_email_server/emails/__init__.py index 251e9bc..d865638 100644 --- a/mcp_email_server/emails/__init__.py +++ b/mcp_email_server/emails/__init__.py @@ -6,7 +6,13 @@ from mcp_email_server.emails.models import ( AttachmentDownloadResponse, EmailContentBatchResponse, + EmailLabelsResponse, + EmailMarkResponse, EmailMetadataPageResponse, + EmailMoveResponse, + FolderListResponse, + FolderOperationResponse, + LabelListResponse, ) @@ -92,3 +98,192 @@ async def download_attachment( Returns: AttachmentDownloadResponse with download result information. """ + + @abc.abstractmethod + async def list_folders(self) -> "FolderListResponse": + """ + List all folders/mailboxes for the account. + + Returns: + FolderListResponse with list of folders and their metadata. + """ + + @abc.abstractmethod + async def move_emails( + self, + email_ids: list[str], + destination_folder: str, + source_mailbox: str = "INBOX", + ) -> "EmailMoveResponse": + """ + Move emails to a destination folder. + + Args: + email_ids: List of email UIDs to move. + destination_folder: The target folder name. + source_mailbox: The source mailbox (default: "INBOX"). + + Returns: + EmailMoveResponse with operation results. + """ + + @abc.abstractmethod + async def copy_emails( + self, + email_ids: list[str], + destination_folder: str, + source_mailbox: str = "INBOX", + ) -> "EmailMoveResponse": + """ + Copy emails to a destination folder (preserves original). + + Args: + email_ids: List of email UIDs to copy. + destination_folder: The target folder name. + source_mailbox: The source mailbox (default: "INBOX"). + + Returns: + EmailMoveResponse with operation results. + """ + + @abc.abstractmethod + async def create_folder(self, folder_name: str) -> "FolderOperationResponse": + """ + Create a new folder/mailbox. + + Args: + folder_name: The name of the folder to create. + + Returns: + FolderOperationResponse with operation result. + """ + + @abc.abstractmethod + async def delete_folder(self, folder_name: str) -> "FolderOperationResponse": + """ + Delete a folder/mailbox. + + Args: + folder_name: The name of the folder to delete. + + Returns: + FolderOperationResponse with operation result. + """ + + @abc.abstractmethod + async def rename_folder(self, old_name: str, new_name: str) -> "FolderOperationResponse": + """ + Rename a folder/mailbox. + + Args: + old_name: The current folder name. + new_name: The new folder name. + + Returns: + FolderOperationResponse with operation result. + """ + + @abc.abstractmethod + async def list_labels(self) -> "LabelListResponse": + """ + List all labels (ProtonMail: folders under Labels/ prefix). + + Returns: + LabelListResponse with list of labels. + """ + + @abc.abstractmethod + async def apply_label( + self, + email_ids: list[str], + label_name: str, + source_mailbox: str = "INBOX", + ) -> "EmailMoveResponse": + """ + Apply a label to emails by copying to the label folder. + + Args: + email_ids: List of email UIDs to label. + label_name: The label name (without Labels/ prefix). + source_mailbox: The source mailbox (default: "INBOX"). + + Returns: + EmailMoveResponse with operation results. + """ + + @abc.abstractmethod + async def remove_label( + self, + email_ids: list[str], + label_name: str, + ) -> "EmailMoveResponse": + """ + Remove a label from emails by deleting from the label folder. + + Args: + email_ids: List of email UIDs to unlabel. + label_name: The label name (without Labels/ prefix). + + Returns: + EmailMoveResponse with operation results. + """ + + @abc.abstractmethod + async def get_email_labels( + self, + email_id: str, + source_mailbox: str = "INBOX", + ) -> "EmailLabelsResponse": + """ + Get all labels applied to a specific email. + + Args: + email_id: The email UID to check. + source_mailbox: The source mailbox (default: "INBOX"). + + Returns: + EmailLabelsResponse with list of label names. + """ + + @abc.abstractmethod + async def create_label(self, label_name: str) -> "FolderOperationResponse": + """ + Create a new label (creates Labels/name folder). + + Args: + label_name: The label name (without Labels/ prefix). + + Returns: + FolderOperationResponse with operation result. + """ + + @abc.abstractmethod + async def delete_label(self, label_name: str) -> "FolderOperationResponse": + """ + Delete a label (deletes Labels/name folder). + + Args: + label_name: The label name (without Labels/ prefix). + + Returns: + FolderOperationResponse with operation result. + """ + + @abc.abstractmethod + async def mark_emails( + self, + email_ids: list[str], + mark_as: str, + mailbox: str = "INBOX", + ) -> "EmailMarkResponse": + """ + Mark emails as read or unread. + + Args: + email_ids: List of email UIDs to mark. + mark_as: Either "read" or "unread". + mailbox: The mailbox containing the emails (default: "INBOX"). + + Returns: + EmailMarkResponse with operation results. + """ diff --git a/mcp_email_server/emails/classic.py b/mcp_email_server/emails/classic.py index 7c643cd..701d082 100644 --- a/mcp_email_server/emails/classic.py +++ b/mcp_email_server/emails/classic.py @@ -24,8 +24,16 @@ AttachmentDownloadResponse, EmailBodyResponse, EmailContentBatchResponse, + EmailLabelsResponse, + EmailMarkResponse, EmailMetadata, EmailMetadataPageResponse, + EmailMoveResponse, + Folder, + FolderListResponse, + FolderOperationResponse, + Label, + LabelListResponse, ) from mcp_email_server.log import logger @@ -912,6 +920,388 @@ async def delete_emails(self, email_ids: list[str], mailbox: str = "INBOX") -> t return deleted_ids, failed_ids + def _parse_list_response(self, folder_data: bytes | str) -> Folder | None: + """Parse a single IMAP LIST response line into a Folder object. + + IMAP LIST response format: (flags) "delimiter" "name" + Example: (\\HasNoChildren \\Sent) "/" "Sent" + """ + folder_str = folder_data.decode("utf-8") if isinstance(folder_data, bytes) else str(folder_data) + + # Skip empty or invalid responses + if not folder_str or folder_str == "LIST completed.": + return None + + try: + # Extract flags (content between first set of parentheses) + flags_start = folder_str.find("(") + flags_end = folder_str.find(")") + if flags_start == -1 or flags_end == -1: + return None + + flags_str = folder_str[flags_start + 1 : flags_end] + flags = [f.strip() for f in flags_str.split() if f.strip()] + + # Extract delimiter and name from the rest + # Format after flags: "delimiter" "name" + rest = folder_str[flags_end + 1 :].strip() + parts = rest.split('"') + # parts should be like: ['', '/', ' ', 'INBOX', ''] + if len(parts) >= 4: + delimiter = parts[1] + folder_name = parts[3] + return Folder(name=folder_name, delimiter=delimiter, flags=flags) + + except Exception as e: + logger.debug(f"Error parsing folder response '{folder_str}': {e}") + + return None + + async def list_folders(self) -> list[Folder]: + """List all folders/mailboxes.""" + imap = self.imap_class(self.email_server.host, self.email_server.port) + folders = [] + + try: + await imap._client_task + await imap.wait_hello_from_server() + await imap.login(self.email_server.user_name, self.email_server.password) + await _send_imap_id(imap) + + # List all folders + _, folder_data = await imap.list('""', "*") + + for item in folder_data: + folder = self._parse_list_response(item) + if folder: + folders.append(folder) + + logger.info(f"Found {len(folders)} folders") + return folders + + finally: + try: + await imap.logout() + except Exception as e: + logger.info(f"Error during logout: {e}") + + async def copy_emails( + self, + email_ids: list[str], + destination_folder: str, + source_mailbox: str = "INBOX", + ) -> tuple[list[str], list[str]]: + """Copy emails to a destination folder. Returns (copied_ids, failed_ids).""" + imap = self.imap_class(self.email_server.host, self.email_server.port) + copied_ids = [] + failed_ids = [] + + try: + await imap._client_task + await imap.wait_hello_from_server() + await imap.login(self.email_server.user_name, self.email_server.password) + await _send_imap_id(imap) + await imap.select(_quote_mailbox(source_mailbox)) + + for email_id in email_ids: + try: + result = await imap.uid("copy", email_id, _quote_mailbox(destination_folder)) + status = result[0] if isinstance(result, tuple) else result + if str(status).upper() == "OK": + copied_ids.append(email_id) + logger.debug(f"Copied email {email_id} to {destination_folder}") + else: + logger.error(f"Failed to copy email {email_id}: {status}") + failed_ids.append(email_id) + except Exception as e: + logger.error(f"Failed to copy email {email_id}: {e}") + failed_ids.append(email_id) + + finally: + try: + await imap.logout() + except Exception as e: + logger.info(f"Error during logout: {e}") + + return copied_ids, failed_ids + + async def move_emails( + self, + email_ids: list[str], + destination_folder: str, + source_mailbox: str = "INBOX", + ) -> tuple[list[str], list[str]]: + """Move emails to a destination folder. Returns (moved_ids, failed_ids). + + Attempts to use MOVE command first (RFC 6851), falls back to COPY + DELETE. + """ + imap = self.imap_class(self.email_server.host, self.email_server.port) + moved_ids = [] + failed_ids = [] + + try: + await imap._client_task + await imap.wait_hello_from_server() + await imap.login(self.email_server.user_name, self.email_server.password) + await _send_imap_id(imap) + await imap.select(_quote_mailbox(source_mailbox)) + + for email_id in email_ids: + try: + # Try MOVE command first (RFC 6851) + try: + result = await imap.uid("move", email_id, _quote_mailbox(destination_folder)) + status = result[0] if isinstance(result, tuple) else result + if str(status).upper() == "OK": + moved_ids.append(email_id) + logger.debug(f"Moved email {email_id} to {destination_folder} using MOVE") + continue + except Exception as move_error: + logger.debug(f"MOVE command failed, falling back to COPY+DELETE: {move_error}") + + # Fallback: COPY + mark as deleted + copy_result = await imap.uid("copy", email_id, _quote_mailbox(destination_folder)) + copy_status = copy_result[0] if isinstance(copy_result, tuple) else copy_result + if str(copy_status).upper() == "OK": + await imap.uid("store", email_id, "+FLAGS", r"(\Deleted)") + moved_ids.append(email_id) + logger.debug(f"Moved email {email_id} to {destination_folder} using COPY+DELETE") + else: + logger.error(f"Failed to copy email {email_id}: {copy_status}") + failed_ids.append(email_id) + except Exception as e: + logger.error(f"Failed to move email {email_id}: {e}") + failed_ids.append(email_id) + + # Expunge deleted messages + if moved_ids: + await imap.expunge() + + finally: + try: + await imap.logout() + except Exception as e: + logger.info(f"Error during logout: {e}") + + return moved_ids, failed_ids + + async def create_folder(self, folder_name: str) -> tuple[bool, str]: + """Create a new folder. Returns (success, message).""" + imap = self.imap_class(self.email_server.host, self.email_server.port) + + try: + await imap._client_task + await imap.wait_hello_from_server() + await imap.login(self.email_server.user_name, self.email_server.password) + await _send_imap_id(imap) + + result = await imap.create(_quote_mailbox(folder_name)) + status = result[0] if isinstance(result, tuple) else result + if str(status).upper() == "OK": + logger.info(f"Created folder: {folder_name}") + return True, f"Folder '{folder_name}' created successfully" + else: + logger.error(f"Failed to create folder {folder_name}: {status}") + return False, f"Failed to create folder: {status}" + + except Exception as e: + logger.error(f"Error creating folder {folder_name}: {e}") + return False, f"Error creating folder: {e}" + finally: + try: + await imap.logout() + except Exception as e: + logger.info(f"Error during logout: {e}") + + async def delete_folder(self, folder_name: str) -> tuple[bool, str]: + """Delete a folder. Returns (success, message).""" + imap = self.imap_class(self.email_server.host, self.email_server.port) + + try: + await imap._client_task + await imap.wait_hello_from_server() + await imap.login(self.email_server.user_name, self.email_server.password) + await _send_imap_id(imap) + + result = await imap.delete(_quote_mailbox(folder_name)) + status = result[0] if isinstance(result, tuple) else result + if str(status).upper() == "OK": + logger.info(f"Deleted folder: {folder_name}") + return True, f"Folder '{folder_name}' deleted successfully" + else: + logger.error(f"Failed to delete folder {folder_name}: {status}") + return False, f"Failed to delete folder: {status}" + + except Exception as e: + logger.error(f"Error deleting folder {folder_name}: {e}") + return False, f"Error deleting folder: {e}" + finally: + try: + await imap.logout() + except Exception as e: + logger.info(f"Error during logout: {e}") + + async def rename_folder(self, old_name: str, new_name: str) -> tuple[bool, str]: + """Rename a folder. Returns (success, message).""" + imap = self.imap_class(self.email_server.host, self.email_server.port) + + try: + await imap._client_task + await imap.wait_hello_from_server() + await imap.login(self.email_server.user_name, self.email_server.password) + await _send_imap_id(imap) + + result = await imap.rename(_quote_mailbox(old_name), _quote_mailbox(new_name)) + status = result[0] if isinstance(result, tuple) else result + if str(status).upper() == "OK": + logger.info(f"Renamed folder '{old_name}' to '{new_name}'") + return True, f"Folder renamed from '{old_name}' to '{new_name}'" + else: + logger.error(f"Failed to rename folder {old_name}: {status}") + return False, f"Failed to rename folder: {status}" + + except Exception as e: + logger.error(f"Error renaming folder {old_name}: {e}") + return False, f"Error renaming folder: {e}" + finally: + try: + await imap.logout() + except Exception as e: + logger.info(f"Error during logout: {e}") + + async def list_labels(self) -> list[Label]: + """List all labels (folders under Labels/ prefix).""" + folders = await self.list_folders() + labels = [] + for folder in folders: + if folder.name.startswith("Labels/"): + # Extract label name without prefix + label_name = folder.name[7:] # Remove "Labels/" prefix + if label_name: # Skip if just "Labels/" with no name + labels.append( + Label( + name=label_name, + full_path=folder.name, + delimiter=folder.delimiter, + flags=folder.flags, + ) + ) + return labels + + async def get_email_message_id(self, email_id: str, mailbox: str = "INBOX") -> str | None: + """Get the Message-ID header for an email.""" + imap = self.imap_class(self.email_server.host, self.email_server.port) + + try: + await imap._client_task + await imap.wait_hello_from_server() + await imap.login(self.email_server.user_name, self.email_server.password) + await _send_imap_id(imap) + await imap.select(_quote_mailbox(mailbox)) + + _, data = await imap.uid("fetch", email_id, "BODY.PEEK[HEADER.FIELDS (MESSAGE-ID)]") + + for item in data: + if isinstance(item, bytearray): + header_str = bytes(item).decode("utf-8", errors="replace").strip() + if header_str.lower().startswith("message-id:"): + return header_str[11:].strip() + + return None + + except Exception as e: + logger.error(f"Error getting Message-ID for email {email_id}: {e}") + return None + finally: + try: + await imap.logout() + except Exception as e: + logger.info(f"Error during logout: {e}") + + async def search_by_message_id(self, message_id: str, mailbox: str) -> str | None: + """Search for an email by Message-ID in a specific mailbox. Returns email UID or None.""" + import re + + imap = self.imap_class(self.email_server.host, self.email_server.port) + + try: + await imap._client_task + await imap.wait_hello_from_server() + await imap.login(self.email_server.user_name, self.email_server.password) + await _send_imap_id(imap) + await imap.select(_quote_mailbox(mailbox)) + + # Search by Message-ID header (returns sequence numbers, not UIDs) + _, data = await imap.search(f'HEADER MESSAGE-ID "{message_id}"') + + # data[0] contains space-separated sequence numbers + if data and data[0]: + seq_nums = data[0].decode("utf-8") if isinstance(data[0], bytes) else str(data[0]) + seq_list = seq_nums.split() + if seq_list: + # Fetch the UID for this sequence number + _, fetch_data = await imap.fetch(seq_list[0], "(UID)") + for item in fetch_data: + if isinstance(item, bytes): + item_str = item.decode("utf-8", errors="replace") + uid_match = re.search(r"UID\s+(\d+)", item_str) + if uid_match: + return uid_match.group(1) + + return None + + except Exception as e: + logger.debug(f"Error searching for Message-ID in {mailbox}: {e}") + return None + finally: + try: + await imap.logout() + except Exception as e: + logger.info(f"Error during logout: {e}") + + async def mark_emails( + self, email_ids: list[str], mark_as: str, mailbox: str = "INBOX" + ) -> tuple[list[str], list[str]]: + """Mark emails as read or unread. Returns (marked_ids, failed_ids).""" + imap = self.imap_class(self.email_server.host, self.email_server.port) + marked_ids = [] + failed_ids = [] + + # Determine flag operation: +FLAGS for read, -FLAGS for unread + if mark_as == "read": + flag_op = "+FLAGS" + elif mark_as == "unread": + flag_op = "-FLAGS" + else: + raise ValueError(f"Invalid mark_as value: {mark_as}. Must be 'read' or 'unread'.") + + try: + await imap._client_task + await imap.wait_hello_from_server() + await imap.login(self.email_server.user_name, self.email_server.password) + await _send_imap_id(imap) + await imap.select(_quote_mailbox(mailbox)) + + for email_id in email_ids: + try: + await imap.uid("store", email_id, flag_op, r"(\Seen)") + marked_ids.append(email_id) + except Exception as e: + logger.error(f"Failed to mark email {email_id} as {mark_as}: {e}") + failed_ids.append(email_id) + + finally: + try: + await imap.logout() + except Exception as e: + logger.info(f"Error during logout: {e}") + + return marked_ids, failed_ids + + async def delete_from_folder(self, email_ids: list[str], folder: str) -> tuple[list[str], list[str]]: + """Delete emails from a specific folder. Returns (deleted_ids, failed_ids).""" + return await self.delete_emails(email_ids, folder) + class ClassicEmailHandler(EmailHandler): def __init__(self, email_settings: EmailSettings): @@ -1067,3 +1457,195 @@ async def download_attachment( size=result["size"], saved_path=result["saved_path"], ) + + async def list_folders(self) -> FolderListResponse: + """List all folders/mailboxes for the account.""" + folders = await self.incoming_client.list_folders() + return FolderListResponse(folders=folders, total=len(folders)) + + async def move_emails( + self, + email_ids: list[str], + destination_folder: str, + source_mailbox: str = "INBOX", + ) -> EmailMoveResponse: + """Move emails to a destination folder.""" + moved_ids, failed_ids = await self.incoming_client.move_emails(email_ids, destination_folder, source_mailbox) + return EmailMoveResponse( + success=len(failed_ids) == 0, + moved_ids=moved_ids, + failed_ids=failed_ids, + source_mailbox=source_mailbox, + destination_folder=destination_folder, + ) + + async def copy_emails( + self, + email_ids: list[str], + destination_folder: str, + source_mailbox: str = "INBOX", + ) -> EmailMoveResponse: + """Copy emails to a destination folder (preserves original).""" + copied_ids, failed_ids = await self.incoming_client.copy_emails(email_ids, destination_folder, source_mailbox) + return EmailMoveResponse( + success=len(failed_ids) == 0, + moved_ids=copied_ids, + failed_ids=failed_ids, + source_mailbox=source_mailbox, + destination_folder=destination_folder, + ) + + async def create_folder(self, folder_name: str) -> FolderOperationResponse: + """Create a new folder/mailbox.""" + success, message = await self.incoming_client.create_folder(folder_name) + return FolderOperationResponse( + success=success, + folder_name=folder_name, + message=message, + ) + + async def delete_folder(self, folder_name: str) -> FolderOperationResponse: + """Delete a folder/mailbox.""" + success, message = await self.incoming_client.delete_folder(folder_name) + return FolderOperationResponse( + success=success, + folder_name=folder_name, + message=message, + ) + + async def rename_folder(self, old_name: str, new_name: str) -> FolderOperationResponse: + """Rename a folder/mailbox.""" + success, message = await self.incoming_client.rename_folder(old_name, new_name) + return FolderOperationResponse( + success=success, + folder_name=new_name, + message=message, + ) + + async def list_labels(self) -> LabelListResponse: + """List all labels (ProtonMail: folders under Labels/ prefix).""" + labels = await self.incoming_client.list_labels() + return LabelListResponse(labels=labels, total=len(labels)) + + async def apply_label( + self, + email_ids: list[str], + label_name: str, + source_mailbox: str = "INBOX", + ) -> EmailMoveResponse: + """Apply a label to emails by copying to the label folder.""" + label_folder = f"Labels/{label_name}" + copied_ids, failed_ids = await self.incoming_client.copy_emails(email_ids, label_folder, source_mailbox) + return EmailMoveResponse( + success=len(failed_ids) == 0, + moved_ids=copied_ids, + failed_ids=failed_ids, + source_mailbox=source_mailbox, + destination_folder=label_folder, + ) + + async def remove_label( + self, + email_ids: list[str], + label_name: str, + ) -> EmailMoveResponse: + """Remove a label from emails by deleting from the label folder. + + This finds the emails in the label folder by their Message-ID and deletes them. + The original emails in other folders are preserved. + """ + label_folder = f"Labels/{label_name}" + removed_ids = [] + failed_ids = [] + + for email_id in email_ids: + # Get the Message-ID from the source email + # Note: We need to find this email in the label folder + # The email_id provided is from the source mailbox, not the label folder + # We need to search by Message-ID to find the copy in the label folder + message_id = await self.incoming_client.get_email_message_id(email_id, "INBOX") + if not message_id: + logger.warning(f"Could not get Message-ID for email {email_id}") + failed_ids.append(email_id) + continue + + # Find the email in the label folder + label_uid = await self.incoming_client.search_by_message_id(message_id, label_folder) + if not label_uid: + logger.warning(f"Email {email_id} not found in label {label_name}") + failed_ids.append(email_id) + continue + + # Delete from label folder + deleted, _failed = await self.incoming_client.delete_from_folder([label_uid], label_folder) + if deleted: + removed_ids.append(email_id) + else: + failed_ids.append(email_id) + + return EmailMoveResponse( + success=len(failed_ids) == 0, + moved_ids=removed_ids, + failed_ids=failed_ids, + source_mailbox=label_folder, + destination_folder="", + ) + + async def get_email_labels( + self, + email_id: str, + source_mailbox: str = "INBOX", + ) -> EmailLabelsResponse: + """Get all labels applied to a specific email.""" + # Get Message-ID from the source email + message_id = await self.incoming_client.get_email_message_id(email_id, source_mailbox) + if not message_id: + return EmailLabelsResponse(email_id=email_id, labels=[]) + + # Get all labels + labels = await self.incoming_client.list_labels() + applied_labels = [] + + # Search each label folder for this email + for label in labels: + found_uid = await self.incoming_client.search_by_message_id(message_id, label.full_path) + if found_uid: + applied_labels.append(label.name) + + return EmailLabelsResponse(email_id=email_id, labels=applied_labels) + + async def create_label(self, label_name: str) -> FolderOperationResponse: + """Create a new label (creates Labels/name folder).""" + label_folder = f"Labels/{label_name}" + success, message = await self.incoming_client.create_folder(label_folder) + return FolderOperationResponse( + success=success, + folder_name=label_name, + message=message.replace(label_folder, label_name) if success else message, + ) + + async def delete_label(self, label_name: str) -> FolderOperationResponse: + """Delete a label (deletes Labels/name folder).""" + label_folder = f"Labels/{label_name}" + success, message = await self.incoming_client.delete_folder(label_folder) + return FolderOperationResponse( + success=success, + folder_name=label_name, + message=message.replace(label_folder, label_name) if success else message, + ) + + async def mark_emails( + self, + email_ids: list[str], + mark_as: str, + mailbox: str = "INBOX", + ) -> EmailMarkResponse: + """Mark emails as read or unread.""" + marked_ids, failed_ids = await self.incoming_client.mark_emails(email_ids, mark_as, mailbox) + return EmailMarkResponse( + success=len(failed_ids) == 0, + marked_ids=marked_ids, + failed_ids=failed_ids, + mailbox=mailbox, + marked_as=mark_as, + ) diff --git a/mcp_email_server/emails/models.py b/mcp_email_server/emails/models.py index 8c1ee4a..44041cc 100644 --- a/mcp_email_server/emails/models.py +++ b/mcp_email_server/emails/models.py @@ -70,3 +70,69 @@ class AttachmentDownloadResponse(BaseModel): mime_type: str size: int saved_path: str + + +class Folder(BaseModel): + """IMAP folder/mailbox information""" + + name: str + delimiter: str + flags: list[str] + + +class FolderListResponse(BaseModel): + """Response for list_folders operation""" + + folders: list[Folder] + total: int + + +class FolderOperationResponse(BaseModel): + """Response for folder operations (create, delete, rename)""" + + success: bool + folder_name: str + message: str + + +class EmailMoveResponse(BaseModel): + """Response for move/copy email operations""" + + success: bool + moved_ids: list[str] + failed_ids: list[str] + source_mailbox: str + destination_folder: str + + +class Label(BaseModel): + """ProtonMail label information""" + + name: str # Label name without prefix (e.g., "Important" not "Labels/Important") + full_path: str # Full IMAP path (e.g., "Labels/Important") + delimiter: str + flags: list[str] + + +class LabelListResponse(BaseModel): + """Response for list_labels operation""" + + labels: list[Label] + total: int + + +class EmailLabelsResponse(BaseModel): + """Response for get_email_labels operation""" + + email_id: str + labels: list[str] # List of label names (without prefix) + + +class EmailMarkResponse(BaseModel): + """Response for mark_emails (read/unread) operations""" + + success: bool + marked_ids: list[str] + failed_ids: list[str] + mailbox: str + marked_as: str # "read" or "unread" diff --git a/tests/test_classic_handler.py b/tests/test_classic_handler.py index 3245a6a..69b21ca 100644 --- a/tests/test_classic_handler.py +++ b/tests/test_classic_handler.py @@ -274,6 +274,42 @@ async def test_delete_emails_custom_mailbox(self, classic_handler): assert failed_ids == [] mock_delete.assert_called_once_with(["789"], "Archive") + @pytest.mark.asyncio + async def test_mark_emails(self, classic_handler): + """Test mark_emails method.""" + mock_mark = AsyncMock(return_value=(["123", "456"], [])) + + with patch.object(classic_handler.incoming_client, "mark_emails", mock_mark): + result = await classic_handler.mark_emails( + email_ids=["123", "456"], + mark_as="read", + mailbox="INBOX", + ) + + assert result.success is True + assert result.marked_ids == ["123", "456"] + assert result.failed_ids == [] + assert result.mailbox == "INBOX" + assert result.marked_as == "read" + mock_mark.assert_called_once_with(["123", "456"], "read", "INBOX") + + @pytest.mark.asyncio + async def test_mark_emails_with_failures(self, classic_handler): + """Test mark_emails method with some failures.""" + mock_mark = AsyncMock(return_value=(["123"], ["456"])) + + with patch.object(classic_handler.incoming_client, "mark_emails", mock_mark): + result = await classic_handler.mark_emails( + email_ids=["123", "456"], + mark_as="unread", + mailbox="INBOX", + ) + + assert result.success is False + assert result.marked_ids == ["123"] + assert result.failed_ids == ["456"] + assert result.marked_as == "unread" + @pytest.mark.asyncio async def test_download_attachment(self, classic_handler, tmp_path): """Test download_attachment method.""" @@ -365,110 +401,40 @@ async def test_get_emails_content_includes_message_id(self, classic_handler): # Verify the client method was called correctly mock_get_body.assert_called_once_with("123", "INBOX") - -class TestEmailClientBatchMethods: - """Test batch fetch methods for performance optimization.""" - - @pytest.fixture - def email_client(self, email_settings): - return EmailClient(email_settings.incoming) - - def test_parse_headers(self, email_client): - """Test _parse_headers method parses email headers correctly.""" - raw_headers = b"""From: sender@example.com -To: recipient@example.com -Cc: cc@example.com -Subject: Test Subject -Date: Mon, 20 Jan 2025 10:30:00 +0000 - -""" - result = email_client._parse_headers("123", raw_headers) - - assert result is not None - assert result["email_id"] == "123" - assert result["subject"] == "Test Subject" - assert result["from"] == "sender@example.com" - assert "recipient@example.com" in result["to"] - assert "cc@example.com" in result["to"] - assert result["attachments"] == [] - - def test_parse_headers_with_invalid_data(self, email_client): - """Test _parse_headers handles malformed headers gracefully.""" - # Completely broken data that can't be parsed - raw_headers = b"\xff\xfe\x00\x00" - result = email_client._parse_headers("123", raw_headers) - - # Should return None or a valid dict with fallback values - # The implementation catches exceptions and returns None - assert result is None or isinstance(result, dict) - - def test_parse_headers_missing_date(self, email_client): - """Test _parse_headers handles missing date with fallback.""" - raw_headers = b"""From: sender@example.com -To: recipient@example.com -Subject: No Date Email - -""" - result = email_client._parse_headers("123", raw_headers) - - assert result is not None - assert result["email_id"] == "123" - assert result["date"] is not None # Should have fallback to now() - @pytest.mark.asyncio - async def test_batch_fetch_dates_empty_list(self, email_client): - """Test _batch_fetch_dates with empty list returns empty dict.""" - mock_imap = AsyncMock() - result = await email_client._batch_fetch_dates(mock_imap, []) - - assert result == {} - mock_imap.uid.assert_not_called() + async def test_get_emails_content_returns_none(self, classic_handler): + """Test get_emails_content handles None response (covers 1107-1108).""" + # Mock the get_email_body_by_id method to return None + mock_get_body = AsyncMock(return_value=None) - @pytest.mark.asyncio - async def test_batch_fetch_headers_empty_list(self, email_client): - """Test _batch_fetch_headers with empty list returns empty dict.""" - mock_imap = AsyncMock() - result = await email_client._batch_fetch_headers(mock_imap, []) + with patch.object(classic_handler.incoming_client, "get_email_body_by_id", mock_get_body): + result = await classic_handler.get_emails_content( + email_ids=["123"], + mailbox="INBOX", + ) - assert result == {} - mock_imap.uid.assert_not_called() + # Verify the result + assert isinstance(result, EmailContentBatchResponse) + assert len(result.emails) == 0 + assert result.requested_count == 1 + assert result.retrieved_count == 0 + assert result.failed_ids == ["123"] @pytest.mark.asyncio - async def test_batch_fetch_dates_parses_response(self, email_client): - """Test _batch_fetch_dates correctly parses IMAP INTERNALDATE response.""" - mock_imap = AsyncMock() - # Simulate IMAP response format for INTERNALDATE - mock_imap.uid.return_value = ( - "OK", - [ - b'1 FETCH (UID 100 INTERNALDATE "20-Jan-2025 10:30:00 +0000")', - b'2 FETCH (UID 101 INTERNALDATE "21-Jan-2025 11:00:00 +0000")', - ], - ) - - result = await email_client._batch_fetch_dates(mock_imap, [b"100", b"101"]) - - assert "100" in result - assert "101" in result - assert result["100"].day == 20 - assert result["101"].day == 21 + async def test_get_emails_content_exception(self, classic_handler): + """Test get_emails_content handles exception (covers 1109-1111).""" + # Mock the get_email_body_by_id method to raise an exception + mock_get_body = AsyncMock(side_effect=Exception("Connection error")) - @pytest.mark.asyncio - async def test_batch_fetch_headers_parses_response(self, email_client): - """Test _batch_fetch_headers correctly parses IMAP BODY[HEADER] response.""" - mock_imap = AsyncMock() - # Simulate IMAP response format for BODY[HEADER] - mock_imap.uid.return_value = ( - "OK", - [ - b"1 FETCH (UID 100 BODY[HEADER] {100}", - bytearray(b"From: sender@example.com\r\nTo: recipient@example.com\r\nSubject: Test\r\n\r\n"), - b")", - ], - ) - - result = await email_client._batch_fetch_headers(mock_imap, ["100"]) - - assert "100" in result - assert result["100"]["subject"] == "Test" - assert result["100"]["from"] == "sender@example.com" + with patch.object(classic_handler.incoming_client, "get_email_body_by_id", mock_get_body): + result = await classic_handler.get_emails_content( + email_ids=["123", "456"], + mailbox="INBOX", + ) + + # Verify the result - both emails should fail + assert isinstance(result, EmailContentBatchResponse) + assert len(result.emails) == 0 + assert result.requested_count == 2 + assert result.retrieved_count == 0 + assert result.failed_ids == ["123", "456"] diff --git a/tests/test_email_client.py b/tests/test_email_client.py index 20343c5..73962ef 100644 --- a/tests/test_email_client.py +++ b/tests/test_email_client.py @@ -640,3 +640,217 @@ async def test_batch_fetch_headers_preserves_uid_mapping(self, email_client): assert len(result) == 2 assert result["100"]["subject"] == "First" assert result["200"]["subject"] == "Second" + + +class TestDeleteEmails: + """Tests for delete_emails functionality.""" + + @pytest.mark.asyncio + async def test_delete_emails_success(self, email_client): + """Test successful deletion of emails.""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.select = AsyncMock() + mock_imap.uid = AsyncMock(return_value=(None, None)) + mock_imap.expunge = AsyncMock() + mock_imap.logout = AsyncMock() + + with patch.object(email_client, "imap_class", return_value=mock_imap): + deleted_ids, failed_ids = await email_client.delete_emails(["123", "456"]) + assert deleted_ids == ["123", "456"] + assert failed_ids == [] + mock_imap.expunge.assert_called_once() + + @pytest.mark.asyncio + async def test_delete_emails_partial_failure(self, email_client): + """Test delete_emails with some failures.""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.select = AsyncMock() + mock_imap.expunge = AsyncMock() + mock_imap.logout = AsyncMock() + + call_count = [0] + + def uid_side_effect(*args): + call_count[0] += 1 + if call_count[0] == 1: + return (None, None) + else: + raise OSError("IMAP error") + + mock_imap.uid = AsyncMock(side_effect=uid_side_effect) + + with patch.object(email_client, "imap_class", return_value=mock_imap): + deleted_ids, failed_ids = await email_client.delete_emails(["123", "456"]) + assert deleted_ids == ["123"] + assert failed_ids == ["456"] + + @pytest.mark.asyncio + async def test_delete_emails_logout_error(self, email_client): + """Test delete_emails handles logout errors gracefully.""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.select = AsyncMock() + mock_imap.uid = AsyncMock(return_value=(None, None)) + mock_imap.expunge = AsyncMock() + mock_imap.logout = AsyncMock(side_effect=OSError("Connection closed")) + + with patch.object(email_client, "imap_class", return_value=mock_imap): + deleted_ids, failed_ids = await email_client.delete_emails(["123"]) + assert deleted_ids == ["123"] + assert failed_ids == [] + + +class TestMarkEmails: + """Tests for mark_emails method.""" + + @pytest.mark.asyncio + async def test_mark_emails_as_read_success(self, email_client): + """Test marking emails as read successfully.""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.select = AsyncMock() + mock_imap.uid = AsyncMock(return_value=(None, None)) + mock_imap.logout = AsyncMock() + + with patch.object(email_client, "imap_class", return_value=mock_imap): + marked_ids, failed_ids = await email_client.mark_emails( + email_ids=["123", "456"], + mark_as="read", + mailbox="INBOX", + ) + + assert marked_ids == ["123", "456"] + assert failed_ids == [] + # Verify +FLAGS was used for marking as read + calls = mock_imap.uid.call_args_list + assert len(calls) == 2 + assert calls[0][0] == ("store", "123", "+FLAGS", r"(\Seen)") + assert calls[1][0] == ("store", "456", "+FLAGS", r"(\Seen)") + + @pytest.mark.asyncio + async def test_mark_emails_as_unread_success(self, email_client): + """Test marking emails as unread successfully.""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.select = AsyncMock() + mock_imap.uid = AsyncMock(return_value=(None, None)) + mock_imap.logout = AsyncMock() + + with patch.object(email_client, "imap_class", return_value=mock_imap): + marked_ids, failed_ids = await email_client.mark_emails( + email_ids=["123", "456"], + mark_as="unread", + mailbox="INBOX", + ) + + assert marked_ids == ["123", "456"] + assert failed_ids == [] + # Verify -FLAGS was used for marking as unread + calls = mock_imap.uid.call_args_list + assert len(calls) == 2 + assert calls[0][0] == ("store", "123", "-FLAGS", r"(\Seen)") + assert calls[1][0] == ("store", "456", "-FLAGS", r"(\Seen)") + + @pytest.mark.asyncio + async def test_mark_emails_partial_failure(self, email_client): + """Test marking emails with some failures.""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.select = AsyncMock() + # First call succeeds, second raises exception + mock_imap.uid = AsyncMock(side_effect=[None, Exception("Email not found")]) + mock_imap.logout = AsyncMock() + + with patch.object(email_client, "imap_class", return_value=mock_imap): + marked_ids, failed_ids = await email_client.mark_emails( + email_ids=["123", "456"], + mark_as="read", + mailbox="INBOX", + ) + + assert marked_ids == ["123"] + assert failed_ids == ["456"] + + @pytest.mark.asyncio + async def test_mark_emails_invalid_mark_as_value(self, email_client): + """Test that invalid mark_as value raises ValueError.""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.select = AsyncMock() + mock_imap.logout = AsyncMock() + + with patch.object(email_client, "imap_class", return_value=mock_imap): + with pytest.raises(ValueError) as exc_info: + await email_client.mark_emails( + email_ids=["123"], + mark_as="invalid", + mailbox="INBOX", + ) + assert "Invalid mark_as value" in str(exc_info.value) + + @pytest.mark.asyncio + async def test_mark_emails_custom_mailbox(self, email_client): + """Test marking emails in a custom mailbox.""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.select = AsyncMock() + mock_imap.uid = AsyncMock(return_value=(None, None)) + mock_imap.logout = AsyncMock() + + with patch.object(email_client, "imap_class", return_value=mock_imap): + await email_client.mark_emails( + email_ids=["123"], + mark_as="read", + mailbox="[Gmail]/All Mail", + ) + + # Verify custom mailbox was selected (quoted) + mock_imap.select.assert_called_once_with('"[Gmail]/All Mail"') + + @pytest.mark.asyncio + async def test_mark_emails_logout_error(self, email_client): + """Test mark_emails handles logout errors gracefully (covers logout exception handler).""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.select = AsyncMock() + mock_imap.uid = AsyncMock(return_value=(None, None)) + mock_imap.logout = AsyncMock(side_effect=OSError("Connection closed")) + + with patch.object(email_client, "imap_class", return_value=mock_imap): + # Should complete successfully despite logout error + marked_ids, failed_ids = await email_client.mark_emails( + email_ids=["123"], + mark_as="read", + mailbox="INBOX", + ) + assert marked_ids == ["123"] + assert failed_ids == [] diff --git a/tests/test_folder_management.py b/tests/test_folder_management.py new file mode 100644 index 0000000..27a3c3a --- /dev/null +++ b/tests/test_folder_management.py @@ -0,0 +1,1200 @@ +"""Tests for folder management functionality.""" + +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from mcp_email_server.app import ( + copy_emails, + create_folder, + delete_folder, + list_folders, + move_emails, + rename_folder, +) +from mcp_email_server.config import EmailServer, EmailSettings +from mcp_email_server.emails.classic import ClassicEmailHandler, EmailClient +from mcp_email_server.emails.models import ( + EmailMoveResponse, + Folder, + FolderListResponse, + FolderOperationResponse, +) + +# ============================================================================ +# MCP Tool Tests - Permission Checks +# ============================================================================ + + +class TestFolderManagementDisabled: + """Test that folder management tools raise PermissionError when disabled.""" + + @pytest.mark.asyncio + async def test_list_folders_disabled(self): + """Test list_folders raises PermissionError when disabled.""" + mock_settings = MagicMock() + mock_settings.enable_folder_management = False + + with patch("mcp_email_server.app.get_settings", return_value=mock_settings): + with pytest.raises(PermissionError) as exc_info: + await list_folders(account_name="test_account") + + assert "Folder management is disabled" in str(exc_info.value) + + @pytest.mark.asyncio + async def test_move_emails_disabled(self): + """Test move_emails raises PermissionError when disabled.""" + mock_settings = MagicMock() + mock_settings.enable_folder_management = False + + with patch("mcp_email_server.app.get_settings", return_value=mock_settings): + with pytest.raises(PermissionError) as exc_info: + await move_emails( + account_name="test_account", + email_ids=["123"], + destination_folder="Archive", + ) + + assert "Folder management is disabled" in str(exc_info.value) + + @pytest.mark.asyncio + async def test_copy_emails_disabled(self): + """Test copy_emails raises PermissionError when disabled.""" + mock_settings = MagicMock() + mock_settings.enable_folder_management = False + + with patch("mcp_email_server.app.get_settings", return_value=mock_settings): + with pytest.raises(PermissionError) as exc_info: + await copy_emails( + account_name="test_account", + email_ids=["123"], + destination_folder="Archive", + ) + + assert "Folder management is disabled" in str(exc_info.value) + + @pytest.mark.asyncio + async def test_create_folder_disabled(self): + """Test create_folder raises PermissionError when disabled.""" + mock_settings = MagicMock() + mock_settings.enable_folder_management = False + + with patch("mcp_email_server.app.get_settings", return_value=mock_settings): + with pytest.raises(PermissionError) as exc_info: + await create_folder(account_name="test_account", folder_name="NewFolder") + + assert "Folder management is disabled" in str(exc_info.value) + + @pytest.mark.asyncio + async def test_delete_folder_disabled(self): + """Test delete_folder raises PermissionError when disabled.""" + mock_settings = MagicMock() + mock_settings.enable_folder_management = False + + with patch("mcp_email_server.app.get_settings", return_value=mock_settings): + with pytest.raises(PermissionError) as exc_info: + await delete_folder(account_name="test_account", folder_name="OldFolder") + + assert "Folder management is disabled" in str(exc_info.value) + + @pytest.mark.asyncio + async def test_rename_folder_disabled(self): + """Test rename_folder raises PermissionError when disabled.""" + mock_settings = MagicMock() + mock_settings.enable_folder_management = False + + with patch("mcp_email_server.app.get_settings", return_value=mock_settings): + with pytest.raises(PermissionError) as exc_info: + await rename_folder( + account_name="test_account", + old_name="OldName", + new_name="NewName", + ) + + assert "Folder management is disabled" in str(exc_info.value) + + +class TestFolderManagementEnabled: + """Test that folder management tools work when enabled.""" + + @pytest.mark.asyncio + async def test_list_folders_enabled(self): + """Test list_folders works when enabled.""" + mock_settings = MagicMock() + mock_settings.enable_folder_management = True + + folder_response = FolderListResponse( + folders=[ + Folder(name="INBOX", delimiter="/", flags=["\\HasNoChildren"]), + Folder(name="Sent", delimiter="/", flags=["\\HasNoChildren", "\\Sent"]), + Folder(name="Archive", delimiter="/", flags=["\\HasNoChildren"]), + ], + total=3, + ) + + mock_handler = AsyncMock() + mock_handler.list_folders.return_value = folder_response + + with patch("mcp_email_server.app.get_settings", return_value=mock_settings): + with patch("mcp_email_server.app.dispatch_handler", return_value=mock_handler): + result = await list_folders(account_name="test_account") + + assert result == folder_response + assert len(result.folders) == 3 + assert result.folders[0].name == "INBOX" + mock_handler.list_folders.assert_called_once() + + @pytest.mark.asyncio + async def test_move_emails_enabled(self): + """Test move_emails works when enabled.""" + mock_settings = MagicMock() + mock_settings.enable_folder_management = True + + move_response = EmailMoveResponse( + success=True, + moved_ids=["123", "456"], + failed_ids=[], + source_mailbox="INBOX", + destination_folder="Archive", + ) + + mock_handler = AsyncMock() + mock_handler.move_emails.return_value = move_response + + with patch("mcp_email_server.app.get_settings", return_value=mock_settings): + with patch("mcp_email_server.app.dispatch_handler", return_value=mock_handler): + result = await move_emails( + account_name="test_account", + email_ids=["123", "456"], + destination_folder="Archive", + ) + + assert result == move_response + assert result.success is True + assert result.moved_ids == ["123", "456"] + mock_handler.move_emails.assert_called_once_with(["123", "456"], "Archive", "INBOX") + + @pytest.mark.asyncio + async def test_copy_emails_enabled(self): + """Test copy_emails works when enabled.""" + mock_settings = MagicMock() + mock_settings.enable_folder_management = True + + copy_response = EmailMoveResponse( + success=True, + moved_ids=["123"], + failed_ids=[], + source_mailbox="INBOX", + destination_folder="Labels/Important", + ) + + mock_handler = AsyncMock() + mock_handler.copy_emails.return_value = copy_response + + with patch("mcp_email_server.app.get_settings", return_value=mock_settings): + with patch("mcp_email_server.app.dispatch_handler", return_value=mock_handler): + result = await copy_emails( + account_name="test_account", + email_ids=["123"], + destination_folder="Labels/Important", + ) + + assert result == copy_response + assert result.success is True + mock_handler.copy_emails.assert_called_once_with(["123"], "Labels/Important", "INBOX") + + @pytest.mark.asyncio + async def test_create_folder_enabled(self): + """Test create_folder works when enabled.""" + mock_settings = MagicMock() + mock_settings.enable_folder_management = True + + create_response = FolderOperationResponse( + success=True, + folder_name="Projects/2024", + message="Folder 'Projects/2024' created successfully", + ) + + mock_handler = AsyncMock() + mock_handler.create_folder.return_value = create_response + + with patch("mcp_email_server.app.get_settings", return_value=mock_settings): + with patch("mcp_email_server.app.dispatch_handler", return_value=mock_handler): + result = await create_folder( + account_name="test_account", + folder_name="Projects/2024", + ) + + assert result == create_response + assert result.success is True + mock_handler.create_folder.assert_called_once_with("Projects/2024") + + @pytest.mark.asyncio + async def test_delete_folder_enabled(self): + """Test delete_folder works when enabled.""" + mock_settings = MagicMock() + mock_settings.enable_folder_management = True + + delete_response = FolderOperationResponse( + success=True, + folder_name="OldFolder", + message="Folder 'OldFolder' deleted successfully", + ) + + mock_handler = AsyncMock() + mock_handler.delete_folder.return_value = delete_response + + with patch("mcp_email_server.app.get_settings", return_value=mock_settings): + with patch("mcp_email_server.app.dispatch_handler", return_value=mock_handler): + result = await delete_folder( + account_name="test_account", + folder_name="OldFolder", + ) + + assert result == delete_response + assert result.success is True + mock_handler.delete_folder.assert_called_once_with("OldFolder") + + @pytest.mark.asyncio + async def test_rename_folder_enabled(self): + """Test rename_folder works when enabled.""" + mock_settings = MagicMock() + mock_settings.enable_folder_management = True + + rename_response = FolderOperationResponse( + success=True, + folder_name="NewName", + message="Folder renamed from 'OldName' to 'NewName'", + ) + + mock_handler = AsyncMock() + mock_handler.rename_folder.return_value = rename_response + + with patch("mcp_email_server.app.get_settings", return_value=mock_settings): + with patch("mcp_email_server.app.dispatch_handler", return_value=mock_handler): + result = await rename_folder( + account_name="test_account", + old_name="OldName", + new_name="NewName", + ) + + assert result == rename_response + assert result.success is True + mock_handler.rename_folder.assert_called_once_with("OldName", "NewName") + + +# ============================================================================ +# Handler Tests +# ============================================================================ + + +@pytest.fixture +def email_settings(): + return EmailSettings( + account_name="test_account", + full_name="Test User", + email_address="test@example.com", + incoming=EmailServer( + user_name="test_user", + password="test_password", + host="imap.example.com", + port=993, + use_ssl=True, + ), + outgoing=EmailServer( + user_name="test_user", + password="test_password", + host="smtp.example.com", + port=465, + use_ssl=True, + ), + ) + + +@pytest.fixture +def classic_handler(email_settings): + return ClassicEmailHandler(email_settings) + + +class TestClassicEmailHandlerFolders: + """Test ClassicEmailHandler folder operations.""" + + @pytest.mark.asyncio + async def test_list_folders(self, classic_handler): + """Test list_folders handler method.""" + # EmailClient.list_folders returns list[Folder], not FolderListResponse + mock_folders = [ + Folder(name="INBOX", delimiter="/", flags=["\\HasNoChildren"]), + Folder(name="Sent", delimiter="/", flags=["\\Sent"]), + ] + + mock_list = AsyncMock(return_value=mock_folders) + + with patch.object(classic_handler.incoming_client, "list_folders", mock_list): + result = await classic_handler.list_folders() + + assert isinstance(result, FolderListResponse) + assert len(result.folders) == 2 + assert result.total == 2 + mock_list.assert_called_once() + + @pytest.mark.asyncio + async def test_move_emails(self, classic_handler): + """Test move_emails handler method.""" + # EmailClient.move_emails returns (moved_ids, failed_ids) tuple + mock_move = AsyncMock(return_value=(["123"], [])) + + with patch.object(classic_handler.incoming_client, "move_emails", mock_move): + result = await classic_handler.move_emails( + email_ids=["123"], + destination_folder="Archive", + source_mailbox="INBOX", + ) + + assert isinstance(result, EmailMoveResponse) + assert result.success is True + assert result.moved_ids == ["123"] + assert result.failed_ids == [] + mock_move.assert_called_once_with(["123"], "Archive", "INBOX") + + @pytest.mark.asyncio + async def test_copy_emails(self, classic_handler): + """Test copy_emails handler method.""" + # EmailClient.copy_emails returns (copied_ids, failed_ids) tuple + mock_copy = AsyncMock(return_value=(["123"], [])) + + with patch.object(classic_handler.incoming_client, "copy_emails", mock_copy): + result = await classic_handler.copy_emails( + email_ids=["123"], + destination_folder="Backup", + source_mailbox="INBOX", + ) + + assert isinstance(result, EmailMoveResponse) + assert result.success is True + assert result.moved_ids == ["123"] + mock_copy.assert_called_once_with(["123"], "Backup", "INBOX") + + @pytest.mark.asyncio + async def test_create_folder(self, classic_handler): + """Test create_folder handler method.""" + # EmailClient.create_folder returns (success, message) tuple + mock_create = AsyncMock(return_value=(True, "Folder created")) + + with patch.object(classic_handler.incoming_client, "create_folder", mock_create): + result = await classic_handler.create_folder("NewFolder") + + assert isinstance(result, FolderOperationResponse) + assert result.success is True + assert result.folder_name == "NewFolder" + mock_create.assert_called_once_with("NewFolder") + + @pytest.mark.asyncio + async def test_delete_folder(self, classic_handler): + """Test delete_folder handler method.""" + # EmailClient.delete_folder returns (success, message) tuple + mock_delete = AsyncMock(return_value=(True, "Folder deleted")) + + with patch.object(classic_handler.incoming_client, "delete_folder", mock_delete): + result = await classic_handler.delete_folder("OldFolder") + + assert isinstance(result, FolderOperationResponse) + assert result.success is True + assert result.folder_name == "OldFolder" + mock_delete.assert_called_once_with("OldFolder") + + @pytest.mark.asyncio + async def test_rename_folder(self, classic_handler): + """Test rename_folder handler method.""" + # EmailClient.rename_folder returns (success, message) tuple + mock_rename = AsyncMock(return_value=(True, "Folder renamed")) + + with patch.object(classic_handler.incoming_client, "rename_folder", mock_rename): + result = await classic_handler.rename_folder("OldName", "NewName") + + assert isinstance(result, FolderOperationResponse) + assert result.success is True + assert result.folder_name == "NewName" + mock_rename.assert_called_once_with("OldName", "NewName") + + +# ============================================================================ +# EmailClient Tests +# ============================================================================ + + +@pytest.fixture +def email_server(): + return EmailServer( + user_name="test_user", + password="test_password", + host="imap.example.com", + port=993, + use_ssl=True, + ) + + +@pytest.fixture +def email_client(email_server): + return EmailClient(email_server, sender="Test User ") + + +class TestEmailClientFolders: + """Test EmailClient folder operations.""" + + @pytest.mark.asyncio + async def test_list_folders(self, email_client): + """Test list_folders IMAP operation.""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.list = AsyncMock( + return_value=( + "OK", + [ + b'(\\HasNoChildren) "/" "INBOX"', + b'(\\HasNoChildren \\Sent) "/" "Sent"', + b'(\\HasChildren) "/" "Folders"', + ], + ) + ) + mock_imap.logout = AsyncMock() + + with patch.object(email_client, "imap_class", return_value=mock_imap): + result = await email_client.list_folders() + + # EmailClient.list_folders returns list[Folder] + assert isinstance(result, list) + assert len(result) == 3 + assert result[0].name == "INBOX" + assert result[1].name == "Sent" + assert "\\Sent" in result[1].flags + mock_imap.list.assert_called_once_with('""', "*") + + @pytest.mark.asyncio + async def test_copy_emails(self, email_client): + """Test copy_emails IMAP operation.""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.select = AsyncMock() + mock_imap.uid = AsyncMock(return_value=("OK", [b"[COPYUID 1234 1:2 100:101]"])) + mock_imap.logout = AsyncMock() + + with patch.object(email_client, "imap_class", return_value=mock_imap): + copied_ids, failed_ids = await email_client.copy_emails(["123", "456"], "Archive", "INBOX") + + # EmailClient.copy_emails returns (copied_ids, failed_ids) tuple + assert copied_ids == ["123", "456"] + assert failed_ids == [] + mock_imap.select.assert_called_once_with('"INBOX"') + + @pytest.mark.asyncio + async def test_move_emails_with_move_command(self, email_client): + """Test move_emails using MOVE command.""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.select = AsyncMock() + # First call (MOVE) succeeds + mock_imap.uid = AsyncMock(return_value=("OK", [b"OK"])) + mock_imap.logout = AsyncMock() + + with patch.object(email_client, "imap_class", return_value=mock_imap): + moved_ids, failed_ids = await email_client.move_emails(["123"], "Archive", "INBOX") + + # EmailClient.move_emails returns (moved_ids, failed_ids) tuple + assert moved_ids == ["123"] + assert failed_ids == [] + + @pytest.mark.asyncio + async def test_create_folder(self, email_client): + """Test create_folder IMAP operation.""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.create = AsyncMock(return_value=("OK", [b"CREATE completed"])) + mock_imap.logout = AsyncMock() + + with patch.object(email_client, "imap_class", return_value=mock_imap): + success, message = await email_client.create_folder("NewFolder") + + # EmailClient.create_folder returns (success, message) tuple + assert success is True + assert "NewFolder" in message + mock_imap.create.assert_called_once_with('"NewFolder"') + + @pytest.mark.asyncio + async def test_delete_folder(self, email_client): + """Test delete_folder IMAP operation.""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.delete = AsyncMock(return_value=("OK", [b"DELETE completed"])) + mock_imap.logout = AsyncMock() + + with patch.object(email_client, "imap_class", return_value=mock_imap): + success, message = await email_client.delete_folder("OldFolder") + + # EmailClient.delete_folder returns (success, message) tuple + assert success is True + assert "OldFolder" in message + mock_imap.delete.assert_called_once_with('"OldFolder"') + + @pytest.mark.asyncio + async def test_rename_folder(self, email_client): + """Test rename_folder IMAP operation.""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.rename = AsyncMock(return_value=("OK", [b"RENAME completed"])) + mock_imap.logout = AsyncMock() + + with patch.object(email_client, "imap_class", return_value=mock_imap): + success, _message = await email_client.rename_folder("OldName", "NewName") + + # EmailClient.rename_folder returns (success, message) tuple + assert success is True + mock_imap.rename.assert_called_once_with('"OldName"', '"NewName"') + + +class TestEmailClientFolderEdgeCases: + """Test edge cases for EmailClient folder operations.""" + + @pytest.mark.asyncio + async def test_copy_emails_partial_failure(self, email_client): + """Test copy_emails with partial failures.""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.select = AsyncMock() + # Simulate failure for one email + mock_imap.uid = AsyncMock(side_effect=[("OK", []), ("NO", [b"Message not found"])]) + mock_imap.logout = AsyncMock() + + with patch.object(email_client, "imap_class", return_value=mock_imap): + copied_ids, failed_ids = await email_client.copy_emails(["123", "456"], "Archive", "INBOX") + + # First email succeeds, second fails with NO status + assert copied_ids == ["123"] + assert failed_ids == ["456"] + + @pytest.mark.asyncio + async def test_create_folder_failure(self, email_client): + """Test create_folder when it fails.""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.create = AsyncMock(return_value=("NO", [b"Folder already exists"])) + mock_imap.logout = AsyncMock() + + with patch.object(email_client, "imap_class", return_value=mock_imap): + success, _message = await email_client.create_folder("ExistingFolder") + + # EmailClient.create_folder returns (success, message) tuple + assert success is False + + @pytest.mark.asyncio + async def test_list_folders_special_characters(self, email_client): + """Test list_folders with special characters in folder names.""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.list = AsyncMock( + return_value=( + "OK", + [ + b'(\\HasNoChildren) "/" "INBOX"', + b'(\\HasNoChildren) "/" "Folders/My Folder"', + b'(\\HasNoChildren) "/" "[Gmail]/Sent Mail"', + ], + ) + ) + mock_imap.logout = AsyncMock() + + with patch.object(email_client, "imap_class", return_value=mock_imap): + result = await email_client.list_folders() + + # EmailClient.list_folders returns list[Folder] + assert len(result) == 3 + assert result[1].name == "Folders/My Folder" + assert result[2].name == "[Gmail]/Sent Mail" + + @pytest.mark.asyncio + async def test_move_emails_move_returns_non_ok(self, email_client): + """Test move_emails fallback when MOVE returns non-OK status (covers 927->935).""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.select = AsyncMock() + mock_imap.expunge = AsyncMock() + mock_imap.logout = AsyncMock() + + # MOVE returns non-OK status (not exception), then COPY succeeds + mock_imap.uid = AsyncMock( + side_effect=[ + ("NO", [b"MOVE not supported"]), # MOVE returns NO status + ("OK", [b"[COPYUID 1234 1 100]"]), # COPY succeeds + ("OK", []), # STORE \\Deleted succeeds + ] + ) + + with patch.object(email_client, "imap_class", return_value=mock_imap): + moved_ids, failed_ids = await email_client.move_emails(["123"], "Archive", "INBOX") + + assert moved_ids == ["123"] + assert failed_ids == [] + # Should have called uid 3 times: move (NO), copy, store + assert mock_imap.uid.call_count == 3 + mock_imap.expunge.assert_called_once() + + @pytest.mark.asyncio + async def test_move_emails_fallback_to_copy_delete(self, email_client): + """Test move_emails falls back to COPY+DELETE when MOVE fails (covers 931-940).""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.select = AsyncMock() + mock_imap.expunge = AsyncMock() + mock_imap.logout = AsyncMock() + + # First call (MOVE) fails with exception, second call (COPY) succeeds + mock_imap.uid = AsyncMock( + side_effect=[ + Exception("MOVE not supported"), # MOVE fails + ("OK", [b"[COPYUID 1234 1 100]"]), # COPY succeeds + ("OK", []), # STORE \\Deleted succeeds + ] + ) + + with patch.object(email_client, "imap_class", return_value=mock_imap): + moved_ids, failed_ids = await email_client.move_emails(["123"], "Archive", "INBOX") + + assert moved_ids == ["123"] + assert failed_ids == [] + # Should have called uid 3 times: move, copy, store + assert mock_imap.uid.call_count == 3 + mock_imap.expunge.assert_called_once() + + @pytest.mark.asyncio + async def test_move_emails_copy_failure_after_move_failure(self, email_client): + """Test move_emails when both MOVE and COPY fail (covers 941-946).""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.select = AsyncMock() + mock_imap.expunge = AsyncMock() + mock_imap.logout = AsyncMock() + + # MOVE fails, then COPY also fails + mock_imap.uid = AsyncMock( + side_effect=[ + Exception("MOVE not supported"), # MOVE fails + ("NO", [b"COPY failed"]), # COPY fails with NO status + ] + ) + + with patch.object(email_client, "imap_class", return_value=mock_imap): + moved_ids, failed_ids = await email_client.move_emails(["123"], "Archive", "INBOX") + + assert moved_ids == [] + assert failed_ids == ["123"] + + @pytest.mark.asyncio + async def test_move_emails_exception_during_operation(self, email_client): + """Test move_emails when an exception occurs during processing (covers 944-946).""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.select = AsyncMock() + mock_imap.expunge = AsyncMock() + mock_imap.logout = AsyncMock() + + # MOVE fails, then COPY raises unexpected exception + mock_imap.uid = AsyncMock( + side_effect=[ + Exception("MOVE not supported"), # MOVE fails + Exception("Connection lost during COPY"), # COPY fails with exception + ] + ) + + with patch.object(email_client, "imap_class", return_value=mock_imap): + moved_ids, failed_ids = await email_client.move_emails(["123"], "Archive", "INBOX") + + assert moved_ids == [] + assert failed_ids == ["123"] + + @pytest.mark.asyncio + async def test_move_emails_logout_error(self, email_client): + """Test move_emails handles logout error gracefully (covers 955-956).""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.select = AsyncMock() + mock_imap.uid = AsyncMock(return_value=("OK", [])) + mock_imap.expunge = AsyncMock() + mock_imap.logout = AsyncMock(side_effect=Exception("Logout error")) + + with patch.object(email_client, "imap_class", return_value=mock_imap): + moved_ids, failed_ids = await email_client.move_emails(["123"], "Archive", "INBOX") + + # Should complete despite logout error + assert moved_ids == ["123"] + assert failed_ids == [] + + @pytest.mark.asyncio + async def test_create_folder_exception(self, email_client): + """Test create_folder handles exceptions (covers 979-981).""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.create = AsyncMock(side_effect=Exception("Connection lost")) + mock_imap.logout = AsyncMock() + + with patch.object(email_client, "imap_class", return_value=mock_imap): + success, message = await email_client.create_folder("NewFolder") + + assert success is False + assert "Error creating folder" in message + + @pytest.mark.asyncio + async def test_create_folder_logout_error(self, email_client): + """Test create_folder handles logout error (covers 985-986).""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.create = AsyncMock(return_value=("OK", [])) + mock_imap.logout = AsyncMock(side_effect=Exception("Logout error")) + + with patch.object(email_client, "imap_class", return_value=mock_imap): + success, _message = await email_client.create_folder("NewFolder") + + # Should complete despite logout error + assert success is True + + @pytest.mark.asyncio + async def test_delete_folder_failure(self, email_client): + """Test delete_folder with NO status (covers 1004-1005).""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.delete = AsyncMock(return_value=("NO", [b"Folder not empty"])) + mock_imap.logout = AsyncMock() + + with patch.object(email_client, "imap_class", return_value=mock_imap): + success, message = await email_client.delete_folder("NonEmptyFolder") + + assert success is False + assert "Failed to delete folder" in message + + @pytest.mark.asyncio + async def test_delete_folder_exception(self, email_client): + """Test delete_folder handles exceptions (covers 1007-1009).""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.delete = AsyncMock(side_effect=Exception("Connection lost")) + mock_imap.logout = AsyncMock() + + with patch.object(email_client, "imap_class", return_value=mock_imap): + success, message = await email_client.delete_folder("SomeFolder") + + assert success is False + assert "Error deleting folder" in message + + @pytest.mark.asyncio + async def test_delete_folder_logout_error(self, email_client): + """Test delete_folder handles logout error (covers 1013-1014).""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.delete = AsyncMock(return_value=("OK", [])) + mock_imap.logout = AsyncMock(side_effect=Exception("Logout error")) + + with patch.object(email_client, "imap_class", return_value=mock_imap): + success, _message = await email_client.delete_folder("SomeFolder") + + # Should complete despite logout error + assert success is True + + @pytest.mark.asyncio + async def test_rename_folder_failure(self, email_client): + """Test rename_folder with NO status (covers 1032-1033).""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.rename = AsyncMock(return_value=("NO", [b"Folder does not exist"])) + mock_imap.logout = AsyncMock() + + with patch.object(email_client, "imap_class", return_value=mock_imap): + success, message = await email_client.rename_folder("OldName", "NewName") + + assert success is False + assert "Failed to rename folder" in message + + @pytest.mark.asyncio + async def test_rename_folder_exception(self, email_client): + """Test rename_folder handles exceptions (covers 1035-1037).""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.rename = AsyncMock(side_effect=Exception("Connection lost")) + mock_imap.logout = AsyncMock() + + with patch.object(email_client, "imap_class", return_value=mock_imap): + success, message = await email_client.rename_folder("OldName", "NewName") + + assert success is False + assert "Error renaming folder" in message + + @pytest.mark.asyncio + async def test_rename_folder_logout_error(self, email_client): + """Test rename_folder handles logout error (covers 1041-1042).""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.rename = AsyncMock(return_value=("OK", [])) + mock_imap.logout = AsyncMock(side_effect=Exception("Logout error")) + + with patch.object(email_client, "imap_class", return_value=mock_imap): + success, _message = await email_client.rename_folder("OldName", "NewName") + + # Should complete despite logout error + assert success is True + + def test_parse_list_response_empty_string(self, email_client): + """Test _parse_list_response with empty string (covers 804-805).""" + result = email_client._parse_list_response("") + assert result is None + + def test_parse_list_response_list_completed(self, email_client): + """Test _parse_list_response with 'LIST completed.' (covers 804-805).""" + result = email_client._parse_list_response("LIST completed.") + assert result is None + + def test_parse_list_response_no_parentheses(self, email_client): + """Test _parse_list_response with no parentheses (covers 811-812).""" + result = email_client._parse_list_response("invalid folder response") + assert result is None + + def test_parse_list_response_malformed_parts(self, email_client): + """Test _parse_list_response with malformed parts (covers 827-830).""" + # Has parentheses but not enough quoted parts + result = email_client._parse_list_response(b'(\\HasNoChildren) "/"') + assert result is None + + def test_parse_list_response_empty_flags(self, email_client): + """Test _parse_list_response with empty flags.""" + # Empty flags but valid format + result = email_client._parse_list_response(b'() "/" "test"') + assert result is not None + assert result.name == "test" + assert result.flags == [] + + def test_parse_list_response_folder_constructor_exception(self, email_client): + """Test _parse_list_response handles Folder constructor exception (covers 827-828).""" + + # Mock Folder to raise an exception + with patch("mcp_email_server.emails.classic.Folder", side_effect=ValueError("Test error")): + result = email_client._parse_list_response(b'(\\HasNoChildren) "/" "INBOX"') + assert result is None + + @pytest.mark.asyncio + async def test_list_folders_logout_error(self, email_client): + """Test list_folders handles logout error (covers 857-858).""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.list = AsyncMock( + return_value=( + "OK", + [b'(\\HasNoChildren) "/" "INBOX"'], + ) + ) + mock_imap.logout = AsyncMock(side_effect=Exception("Logout error")) + + with patch.object(email_client, "imap_class", return_value=mock_imap): + result = await email_client.list_folders() + + # Should complete despite logout error + assert len(result) == 1 + assert result[0].name == "INBOX" + + @pytest.mark.asyncio + async def test_list_folders_with_invalid_items(self, email_client): + """Test list_folders filters out invalid/unparseable items (covers 848->846).""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.list = AsyncMock( + return_value=( + "OK", + [ + b'(\\HasNoChildren) "/" "INBOX"', + b"", # Empty string - should be filtered out + b"LIST completed.", # Invalid - should be filtered out + b'(\\HasNoChildren) "/" "Sent"', + b"invalid folder response", # No parentheses - should be filtered out + ], + ) + ) + mock_imap.logout = AsyncMock() + + with patch.object(email_client, "imap_class", return_value=mock_imap): + result = await email_client.list_folders() + + # Only valid folders should be returned + assert len(result) == 2 + assert result[0].name == "INBOX" + assert result[1].name == "Sent" + + @pytest.mark.asyncio + async def test_copy_emails_exception(self, email_client): + """Test copy_emails handles exception during copy (covers 888-890).""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.select = AsyncMock() + mock_imap.uid = AsyncMock(side_effect=Exception("Connection lost")) + mock_imap.logout = AsyncMock() + + with patch.object(email_client, "imap_class", return_value=mock_imap): + copied_ids, failed_ids = await email_client.copy_emails(["123"], "Archive", "INBOX") + + assert copied_ids == [] + assert failed_ids == ["123"] + + @pytest.mark.asyncio + async def test_copy_emails_logout_error(self, email_client): + """Test copy_emails handles logout error (covers 895-896).""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.select = AsyncMock() + mock_imap.uid = AsyncMock(return_value=("OK", [])) + mock_imap.logout = AsyncMock(side_effect=Exception("Logout error")) + + with patch.object(email_client, "imap_class", return_value=mock_imap): + copied_ids, failed_ids = await email_client.copy_emails(["123"], "Archive", "INBOX") + + # Should complete despite logout error + assert copied_ids == ["123"] + assert failed_ids == [] + + @pytest.mark.asyncio + async def test_copy_emails_non_tuple_result(self, email_client): + """Test copy_emails when IMAP returns non-tuple result (covers ternary else branch).""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.select = AsyncMock() + # Return a string instead of tuple + mock_imap.uid = AsyncMock(return_value="OK") + mock_imap.logout = AsyncMock() + + with patch.object(email_client, "imap_class", return_value=mock_imap): + copied_ids, failed_ids = await email_client.copy_emails(["123"], "Archive", "INBOX") + assert copied_ids == ["123"] + assert failed_ids == [] + + @pytest.mark.asyncio + async def test_move_emails_non_tuple_move_result(self, email_client): + """Test move_emails when MOVE returns non-tuple result (covers ternary else branch).""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.select = AsyncMock() + mock_imap.expunge = AsyncMock() + # Return a string instead of tuple for MOVE + mock_imap.uid = AsyncMock(return_value="OK") + mock_imap.logout = AsyncMock() + + with patch.object(email_client, "imap_class", return_value=mock_imap): + moved_ids, failed_ids = await email_client.move_emails(["123"], "Archive", "INBOX") + assert moved_ids == ["123"] + assert failed_ids == [] + + @pytest.mark.asyncio + async def test_move_emails_non_tuple_copy_result(self, email_client): + """Test move_emails when COPY fallback returns non-tuple result (covers ternary else branch).""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.select = AsyncMock() + mock_imap.expunge = AsyncMock() + mock_imap.logout = AsyncMock() + + # MOVE fails, then COPY returns non-tuple string + mock_imap.uid = AsyncMock( + side_effect=[ + Exception("MOVE not supported"), # MOVE fails + "OK", # COPY returns string instead of tuple + "OK", # STORE returns string + ] + ) + + with patch.object(email_client, "imap_class", return_value=mock_imap): + moved_ids, failed_ids = await email_client.move_emails(["123"], "Archive", "INBOX") + assert moved_ids == ["123"] + assert failed_ids == [] + + @pytest.mark.asyncio + async def test_create_folder_non_tuple_result(self, email_client): + """Test create_folder when IMAP returns non-tuple result (covers ternary else branch).""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + # Return a string instead of tuple + mock_imap.create = AsyncMock(return_value="OK") + mock_imap.logout = AsyncMock() + + with patch.object(email_client, "imap_class", return_value=mock_imap): + success, message = await email_client.create_folder("NewFolder") + assert success is True + assert "NewFolder" in message + + @pytest.mark.asyncio + async def test_delete_folder_non_tuple_result(self, email_client): + """Test delete_folder when IMAP returns non-tuple result (covers ternary else branch).""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + # Return a string instead of tuple + mock_imap.delete = AsyncMock(return_value="OK") + mock_imap.logout = AsyncMock() + + with patch.object(email_client, "imap_class", return_value=mock_imap): + success, message = await email_client.delete_folder("OldFolder") + assert success is True + assert "OldFolder" in message + + @pytest.mark.asyncio + async def test_rename_folder_non_tuple_result(self, email_client): + """Test rename_folder when IMAP returns non-tuple result (covers ternary else branch).""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + # Return a string instead of tuple + mock_imap.rename = AsyncMock(return_value="OK") + mock_imap.logout = AsyncMock() + + with patch.object(email_client, "imap_class", return_value=mock_imap): + success, _message = await email_client.rename_folder("OldName", "NewName") + assert success is True + + +# ============================================================================ +# Config Tests for enable_folder_management +# ============================================================================ + + +class TestFolderManagementConfig: + """Test configuration for folder management.""" + + def test_default_disabled(self): + """Test that folder management is disabled by default.""" + from mcp_email_server.config import Settings + + with patch("mcp_email_server.config.CONFIG_PATH") as mock_path: + mock_path.exists.return_value = False + mock_path.with_name.return_value = mock_path + + # Create settings without any config + with patch.object(Settings, "settings_customise_sources", return_value=()): + settings = Settings() + assert settings.enable_folder_management is False + + def test_env_variable_enables(self): + """Test that environment variable can enable folder management.""" + import os + + from mcp_email_server.config import Settings + + with patch.dict(os.environ, {"MCP_EMAIL_SERVER_ENABLE_FOLDER_MANAGEMENT": "true"}): + with patch("mcp_email_server.config.CONFIG_PATH") as mock_path: + mock_path.exists.return_value = False + mock_path.with_name.return_value = mock_path + + with patch.object(Settings, "settings_customise_sources", return_value=()): + settings = Settings() + assert settings.enable_folder_management is True + + def test_env_variable_disables(self): + """Test that environment variable can explicitly disable folder management.""" + import os + + from mcp_email_server.config import Settings + + with patch.dict(os.environ, {"MCP_EMAIL_SERVER_ENABLE_FOLDER_MANAGEMENT": "false"}): + with patch("mcp_email_server.config.CONFIG_PATH") as mock_path: + mock_path.exists.return_value = False + mock_path.with_name.return_value = mock_path + + with patch.object(Settings, "settings_customise_sources", return_value=()): + settings = Settings() + assert settings.enable_folder_management is False diff --git a/tests/test_label_management.py b/tests/test_label_management.py new file mode 100644 index 0000000..83712ad --- /dev/null +++ b/tests/test_label_management.py @@ -0,0 +1,609 @@ +"""Tests for ProtonMail label management functionality.""" + +import asyncio +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from mcp_email_server.app import ( + apply_label, + create_label, + delete_label, + get_email_labels, + list_labels, + remove_label, +) +from mcp_email_server.config import EmailServer, EmailSettings +from mcp_email_server.emails.classic import ClassicEmailHandler, EmailClient +from mcp_email_server.emails.models import ( + EmailLabelsResponse, + EmailMoveResponse, + Folder, + FolderOperationResponse, + Label, + LabelListResponse, +) + +# ============================================================================ +# MCP Tool Tests - Permission Checks +# ============================================================================ + + +class TestLabelManagementDisabled: + """Test that label management tools raise PermissionError when folder management is disabled.""" + + @pytest.mark.asyncio + async def test_list_labels_disabled(self): + """Test list_labels raises PermissionError when folder management is disabled.""" + mock_settings = MagicMock() + mock_settings.enable_folder_management = False + + with patch("mcp_email_server.app.get_settings", return_value=mock_settings): + with pytest.raises(PermissionError) as exc_info: + await list_labels(account_name="test_account") + + assert "Folder management is disabled" in str(exc_info.value) + + @pytest.mark.asyncio + async def test_apply_label_disabled(self): + """Test apply_label raises PermissionError when folder management is disabled.""" + mock_settings = MagicMock() + mock_settings.enable_folder_management = False + + with patch("mcp_email_server.app.get_settings", return_value=mock_settings): + with pytest.raises(PermissionError) as exc_info: + await apply_label( + account_name="test_account", + email_ids=["123"], + label_name="Important", + ) + + assert "Folder management is disabled" in str(exc_info.value) + + @pytest.mark.asyncio + async def test_remove_label_disabled(self): + """Test remove_label raises PermissionError when folder management is disabled.""" + mock_settings = MagicMock() + mock_settings.enable_folder_management = False + + with patch("mcp_email_server.app.get_settings", return_value=mock_settings): + with pytest.raises(PermissionError) as exc_info: + await remove_label( + account_name="test_account", + email_ids=["123"], + label_name="Important", + ) + + assert "Folder management is disabled" in str(exc_info.value) + + @pytest.mark.asyncio + async def test_get_email_labels_disabled(self): + """Test get_email_labels raises PermissionError when folder management is disabled.""" + mock_settings = MagicMock() + mock_settings.enable_folder_management = False + + with patch("mcp_email_server.app.get_settings", return_value=mock_settings): + with pytest.raises(PermissionError) as exc_info: + await get_email_labels( + account_name="test_account", + email_id="123", + ) + + assert "Folder management is disabled" in str(exc_info.value) + + @pytest.mark.asyncio + async def test_create_label_disabled(self): + """Test create_label raises PermissionError when folder management is disabled.""" + mock_settings = MagicMock() + mock_settings.enable_folder_management = False + + with patch("mcp_email_server.app.get_settings", return_value=mock_settings): + with pytest.raises(PermissionError) as exc_info: + await create_label( + account_name="test_account", + label_name="NewLabel", + ) + + assert "Folder management is disabled" in str(exc_info.value) + + @pytest.mark.asyncio + async def test_delete_label_disabled(self): + """Test delete_label raises PermissionError when folder management is disabled.""" + mock_settings = MagicMock() + mock_settings.enable_folder_management = False + + with patch("mcp_email_server.app.get_settings", return_value=mock_settings): + with pytest.raises(PermissionError) as exc_info: + await delete_label( + account_name="test_account", + label_name="OldLabel", + ) + + assert "Folder management is disabled" in str(exc_info.value) + + +class TestLabelManagementEnabled: + """Test that label management tools work when folder management is enabled.""" + + @pytest.mark.asyncio + async def test_list_labels_enabled(self): + """Test list_labels works when enabled.""" + mock_settings = MagicMock() + mock_settings.enable_folder_management = True + + label_response = LabelListResponse( + labels=[ + Label(name="Important", full_path="Labels/Important", delimiter="/", flags=[]), + Label(name="Work", full_path="Labels/Work", delimiter="/", flags=[]), + ], + total=2, + ) + + mock_handler = AsyncMock() + mock_handler.list_labels.return_value = label_response + + with patch("mcp_email_server.app.get_settings", return_value=mock_settings): + with patch("mcp_email_server.app.dispatch_handler", return_value=mock_handler): + result = await list_labels(account_name="test_account") + + assert result == label_response + assert len(result.labels) == 2 + assert result.labels[0].name == "Important" + mock_handler.list_labels.assert_called_once() + + @pytest.mark.asyncio + async def test_apply_label_enabled(self): + """Test apply_label works when enabled.""" + mock_settings = MagicMock() + mock_settings.enable_folder_management = True + + apply_response = EmailMoveResponse( + success=True, + moved_ids=["123"], + failed_ids=[], + source_mailbox="INBOX", + destination_folder="Labels/Important", + ) + + mock_handler = AsyncMock() + mock_handler.apply_label.return_value = apply_response + + with patch("mcp_email_server.app.get_settings", return_value=mock_settings): + with patch("mcp_email_server.app.dispatch_handler", return_value=mock_handler): + result = await apply_label( + account_name="test_account", + email_ids=["123"], + label_name="Important", + ) + + assert result == apply_response + assert result.success is True + mock_handler.apply_label.assert_called_once_with(["123"], "Important", "INBOX") + + @pytest.mark.asyncio + async def test_remove_label_enabled(self): + """Test remove_label works when enabled.""" + mock_settings = MagicMock() + mock_settings.enable_folder_management = True + + remove_response = EmailMoveResponse( + success=True, + moved_ids=["123"], + failed_ids=[], + source_mailbox="Labels/Important", + destination_folder="", + ) + + mock_handler = AsyncMock() + mock_handler.remove_label.return_value = remove_response + + with patch("mcp_email_server.app.get_settings", return_value=mock_settings): + with patch("mcp_email_server.app.dispatch_handler", return_value=mock_handler): + result = await remove_label( + account_name="test_account", + email_ids=["123"], + label_name="Important", + ) + + assert result == remove_response + assert result.success is True + mock_handler.remove_label.assert_called_once_with(["123"], "Important") + + @pytest.mark.asyncio + async def test_get_email_labels_enabled(self): + """Test get_email_labels works when enabled.""" + mock_settings = MagicMock() + mock_settings.enable_folder_management = True + + labels_response = EmailLabelsResponse( + email_id="123", + labels=["Important", "Work"], + ) + + mock_handler = AsyncMock() + mock_handler.get_email_labels.return_value = labels_response + + with patch("mcp_email_server.app.get_settings", return_value=mock_settings): + with patch("mcp_email_server.app.dispatch_handler", return_value=mock_handler): + result = await get_email_labels( + account_name="test_account", + email_id="123", + ) + + assert result == labels_response + assert result.labels == ["Important", "Work"] + mock_handler.get_email_labels.assert_called_once_with("123", "INBOX") + + @pytest.mark.asyncio + async def test_create_label_enabled(self): + """Test create_label works when enabled.""" + mock_settings = MagicMock() + mock_settings.enable_folder_management = True + + create_response = FolderOperationResponse( + success=True, + folder_name="NewLabel", + message="Label 'NewLabel' created successfully", + ) + + mock_handler = AsyncMock() + mock_handler.create_label.return_value = create_response + + with patch("mcp_email_server.app.get_settings", return_value=mock_settings): + with patch("mcp_email_server.app.dispatch_handler", return_value=mock_handler): + result = await create_label( + account_name="test_account", + label_name="NewLabel", + ) + + assert result == create_response + assert result.success is True + mock_handler.create_label.assert_called_once_with("NewLabel") + + @pytest.mark.asyncio + async def test_delete_label_enabled(self): + """Test delete_label works when enabled.""" + mock_settings = MagicMock() + mock_settings.enable_folder_management = True + + delete_response = FolderOperationResponse( + success=True, + folder_name="OldLabel", + message="Label 'OldLabel' deleted successfully", + ) + + mock_handler = AsyncMock() + mock_handler.delete_label.return_value = delete_response + + with patch("mcp_email_server.app.get_settings", return_value=mock_settings): + with patch("mcp_email_server.app.dispatch_handler", return_value=mock_handler): + result = await delete_label( + account_name="test_account", + label_name="OldLabel", + ) + + assert result == delete_response + assert result.success is True + mock_handler.delete_label.assert_called_once_with("OldLabel") + + +# ============================================================================ +# Handler Tests +# ============================================================================ + + +@pytest.fixture +def email_settings(): + return EmailSettings( + account_name="test_account", + full_name="Test User", + email_address="test@example.com", + incoming=EmailServer( + user_name="test_user", + password="test_password", + host="imap.example.com", + port=993, + use_ssl=True, + ), + outgoing=EmailServer( + user_name="test_user", + password="test_password", + host="smtp.example.com", + port=465, + use_ssl=True, + ), + ) + + +@pytest.fixture +def classic_handler(email_settings): + return ClassicEmailHandler(email_settings) + + +class TestClassicEmailHandlerLabels: + """Test ClassicEmailHandler label operations.""" + + @pytest.mark.asyncio + async def test_list_labels(self, classic_handler): + """Test list_labels handler method.""" + mock_labels = [ + Label(name="Important", full_path="Labels/Important", delimiter="/", flags=[]), + Label(name="Work", full_path="Labels/Work", delimiter="/", flags=[]), + ] + + mock_list = AsyncMock(return_value=mock_labels) + + with patch.object(classic_handler.incoming_client, "list_labels", mock_list): + result = await classic_handler.list_labels() + + assert isinstance(result, LabelListResponse) + assert len(result.labels) == 2 + assert result.total == 2 + mock_list.assert_called_once() + + @pytest.mark.asyncio + async def test_apply_label(self, classic_handler): + """Test apply_label handler method.""" + mock_copy = AsyncMock(return_value=(["123"], [])) + + with patch.object(classic_handler.incoming_client, "copy_emails", mock_copy): + result = await classic_handler.apply_label( + email_ids=["123"], + label_name="Important", + source_mailbox="INBOX", + ) + + assert isinstance(result, EmailMoveResponse) + assert result.success is True + assert result.moved_ids == ["123"] + assert result.destination_folder == "Labels/Important" + mock_copy.assert_called_once_with(["123"], "Labels/Important", "INBOX") + + @pytest.mark.asyncio + async def test_remove_label(self, classic_handler): + """Test remove_label handler method.""" + mock_get_message_id = AsyncMock(return_value="") + mock_search = AsyncMock(return_value="456") + mock_delete = AsyncMock(return_value=(["456"], [])) + + with patch.object(classic_handler.incoming_client, "get_email_message_id", mock_get_message_id): + with patch.object(classic_handler.incoming_client, "search_by_message_id", mock_search): + with patch.object(classic_handler.incoming_client, "delete_from_folder", mock_delete): + result = await classic_handler.remove_label( + email_ids=["123"], + label_name="Important", + ) + + assert isinstance(result, EmailMoveResponse) + assert result.success is True + assert result.moved_ids == ["123"] + mock_get_message_id.assert_called_once_with("123", "INBOX") + mock_search.assert_called_once_with("", "Labels/Important") + mock_delete.assert_called_once_with(["456"], "Labels/Important") + + @pytest.mark.asyncio + async def test_get_email_labels(self, classic_handler): + """Test get_email_labels handler method.""" + mock_labels = [ + Label(name="Important", full_path="Labels/Important", delimiter="/", flags=[]), + Label(name="Work", full_path="Labels/Work", delimiter="/", flags=[]), + ] + mock_get_message_id = AsyncMock(return_value="") + mock_list_labels = AsyncMock(return_value=mock_labels) + # Email found in Important but not Work + mock_search = AsyncMock(side_effect=["789", None]) + + with patch.object(classic_handler.incoming_client, "get_email_message_id", mock_get_message_id): + with patch.object(classic_handler.incoming_client, "list_labels", mock_list_labels): + with patch.object(classic_handler.incoming_client, "search_by_message_id", mock_search): + result = await classic_handler.get_email_labels( + email_id="123", + source_mailbox="INBOX", + ) + + assert isinstance(result, EmailLabelsResponse) + assert result.email_id == "123" + assert result.labels == ["Important"] + + @pytest.mark.asyncio + async def test_create_label(self, classic_handler): + """Test create_label handler method.""" + mock_create = AsyncMock(return_value=(True, "Folder 'Labels/NewLabel' created successfully")) + + with patch.object(classic_handler.incoming_client, "create_folder", mock_create): + result = await classic_handler.create_label("NewLabel") + + assert isinstance(result, FolderOperationResponse) + assert result.success is True + assert result.folder_name == "NewLabel" + mock_create.assert_called_once_with("Labels/NewLabel") + + @pytest.mark.asyncio + async def test_delete_label(self, classic_handler): + """Test delete_label handler method.""" + mock_delete = AsyncMock(return_value=(True, "Folder 'Labels/OldLabel' deleted successfully")) + + with patch.object(classic_handler.incoming_client, "delete_folder", mock_delete): + result = await classic_handler.delete_label("OldLabel") + + assert isinstance(result, FolderOperationResponse) + assert result.success is True + assert result.folder_name == "OldLabel" + mock_delete.assert_called_once_with("Labels/OldLabel") + + +# ============================================================================ +# EmailClient Tests +# ============================================================================ + + +@pytest.fixture +def email_server(): + return EmailServer( + user_name="test_user", + password="test_password", + host="imap.example.com", + port=993, + use_ssl=True, + ) + + +@pytest.fixture +def email_client(email_server): + return EmailClient(email_server, sender="Test User ") + + +class TestEmailClientLabels: + """Test EmailClient label operations.""" + + @pytest.mark.asyncio + async def test_list_labels_filters_labels_prefix(self, email_client): + """Test list_labels filters only Labels/ prefix folders.""" + mock_folders = [ + Folder(name="INBOX", delimiter="/", flags=[]), + Folder(name="Sent", delimiter="/", flags=[]), + Folder(name="Labels/Important", delimiter="/", flags=[]), + Folder(name="Labels/Work", delimiter="/", flags=[]), + Folder(name="Folders/Archive", delimiter="/", flags=[]), + ] + + mock_list = AsyncMock(return_value=mock_folders) + + with patch.object(email_client, "list_folders", mock_list): + result = await email_client.list_labels() + + assert len(result) == 2 + assert result[0].name == "Important" + assert result[0].full_path == "Labels/Important" + assert result[1].name == "Work" + assert result[1].full_path == "Labels/Work" + + @pytest.mark.asyncio + async def test_list_labels_empty(self, email_client): + """Test list_labels returns empty list when no labels exist.""" + mock_folders = [ + Folder(name="INBOX", delimiter="/", flags=[]), + Folder(name="Sent", delimiter="/", flags=[]), + ] + + mock_list = AsyncMock(return_value=mock_folders) + + with patch.object(email_client, "list_folders", mock_list): + result = await email_client.list_labels() + + assert len(result) == 0 + + @pytest.mark.asyncio + async def test_get_email_message_id(self, email_client): + """Test get_email_message_id IMAP operation.""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.select = AsyncMock() + mock_imap.uid = AsyncMock( + return_value=( + "OK", + [bytearray(b"Message-ID: ")], + ) + ) + mock_imap.logout = AsyncMock() + + with patch.object(email_client, "imap_class", return_value=mock_imap): + result = await email_client.get_email_message_id("123", "INBOX") + + assert result == "" + + @pytest.mark.asyncio + async def test_search_by_message_id(self, email_client): + """Test search_by_message_id IMAP operation.""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.select = AsyncMock() + # search returns sequence numbers + mock_imap.search = AsyncMock(return_value=("OK", [b"1"])) + # fetch returns UID for the sequence number + mock_imap.fetch = AsyncMock(return_value=("OK", [b"1 FETCH (UID 456)"])) + mock_imap.logout = AsyncMock() + + with patch.object(email_client, "imap_class", return_value=mock_imap): + result = await email_client.search_by_message_id("", "Labels/Important") + + assert result == "456" + mock_imap.select.assert_called_once_with('"Labels/Important"') + + @pytest.mark.asyncio + async def test_search_by_message_id_not_found(self, email_client): + """Test search_by_message_id returns None when email not found.""" + mock_imap = AsyncMock() + mock_imap._client_task = asyncio.Future() + mock_imap._client_task.set_result(None) + mock_imap.wait_hello_from_server = AsyncMock() + mock_imap.login = AsyncMock() + mock_imap.select = AsyncMock() + # search returns empty when not found + mock_imap.search = AsyncMock(return_value=("OK", [b""])) + mock_imap.logout = AsyncMock() + + with patch.object(email_client, "imap_class", return_value=mock_imap): + result = await email_client.search_by_message_id("", "Labels/Work") + + assert result is None + + +class TestEmailClientLabelEdgeCases: + """Test edge cases for EmailClient label operations.""" + + @pytest.mark.asyncio + async def test_list_labels_skips_labels_folder_itself(self, email_client): + """Test that list_labels skips the 'Labels' folder itself if it exists.""" + mock_folders = [ + Folder(name="Labels", delimiter="/", flags=["\\HasChildren"]), + Folder(name="Labels/Important", delimiter="/", flags=[]), + ] + + mock_list = AsyncMock(return_value=mock_folders) + + with patch.object(email_client, "list_folders", mock_list): + result = await email_client.list_labels() + + # Should only include "Important", not the "Labels" folder itself + assert len(result) == 1 + assert result[0].name == "Important" + + @pytest.mark.asyncio + async def test_remove_label_email_not_in_label(self, classic_handler): + """Test remove_label when email is not in the label folder.""" + mock_get_message_id = AsyncMock(return_value="") + mock_search = AsyncMock(return_value=None) # Email not found in label + + with patch.object(classic_handler.incoming_client, "get_email_message_id", mock_get_message_id): + with patch.object(classic_handler.incoming_client, "search_by_message_id", mock_search): + result = await classic_handler.remove_label( + email_ids=["123"], + label_name="Important", + ) + + assert isinstance(result, EmailMoveResponse) + assert result.success is False + assert result.failed_ids == ["123"] + assert result.moved_ids == [] + + @pytest.mark.asyncio + async def test_get_email_labels_no_message_id(self, classic_handler): + """Test get_email_labels when email has no Message-ID.""" + mock_get_message_id = AsyncMock(return_value=None) + + with patch.object(classic_handler.incoming_client, "get_email_message_id", mock_get_message_id): + result = await classic_handler.get_email_labels( + email_id="123", + source_mailbox="INBOX", + ) + + assert isinstance(result, EmailLabelsResponse) + assert result.email_id == "123" + assert result.labels == [] diff --git a/tests/test_mcp_tools.py b/tests/test_mcp_tools.py index 31cca24..fbf2293 100644 --- a/tests/test_mcp_tools.py +++ b/tests/test_mcp_tools.py @@ -10,6 +10,7 @@ get_emails_content, list_available_accounts, list_emails_metadata, + mark_emails, send_email, ) from mcp_email_server.config import EmailServer, EmailSettings, ProviderSettings @@ -17,6 +18,7 @@ AttachmentDownloadResponse, EmailBodyResponse, EmailContentBatchResponse, + EmailMarkResponse, EmailMetadata, EmailMetadataPageResponse, ) @@ -436,6 +438,102 @@ async def test_delete_emails_with_mailbox(self): assert result == "Successfully deleted 1 email(s)" mock_handler.delete_emails.assert_called_once_with(["12345"], "Trash") + @pytest.mark.asyncio + async def test_mark_emails_as_read(self): + """Test mark_emails MCP tool for marking as read.""" + mock_response = EmailMarkResponse( + success=True, + marked_ids=["12345", "12346"], + failed_ids=[], + mailbox="INBOX", + marked_as="read", + ) + mock_handler = AsyncMock() + mock_handler.mark_emails.return_value = mock_response + + with patch("mcp_email_server.app.dispatch_handler", return_value=mock_handler): + result = await mark_emails( + account_name="test_account", + email_ids=["12345", "12346"], + mark_as="read", + ) + + assert result.success is True + assert result.marked_ids == ["12345", "12346"] + assert result.marked_as == "read" + mock_handler.mark_emails.assert_called_once_with(["12345", "12346"], "read", "INBOX") + + @pytest.mark.asyncio + async def test_mark_emails_as_unread(self): + """Test mark_emails MCP tool for marking as unread.""" + mock_response = EmailMarkResponse( + success=True, + marked_ids=["12345"], + failed_ids=[], + mailbox="INBOX", + marked_as="unread", + ) + mock_handler = AsyncMock() + mock_handler.mark_emails.return_value = mock_response + + with patch("mcp_email_server.app.dispatch_handler", return_value=mock_handler): + result = await mark_emails( + account_name="test_account", + email_ids=["12345"], + mark_as="unread", + ) + + assert result.success is True + assert result.marked_as == "unread" + + @pytest.mark.asyncio + async def test_mark_emails_with_failures(self): + """Test mark_emails MCP tool with some failures.""" + mock_response = EmailMarkResponse( + success=False, + marked_ids=["12345"], + failed_ids=["12346"], + mailbox="INBOX", + marked_as="read", + ) + mock_handler = AsyncMock() + mock_handler.mark_emails.return_value = mock_response + + with patch("mcp_email_server.app.dispatch_handler", return_value=mock_handler): + result = await mark_emails( + account_name="test_account", + email_ids=["12345", "12346"], + mark_as="read", + ) + + assert result.success is False + assert result.marked_ids == ["12345"] + assert result.failed_ids == ["12346"] + + @pytest.mark.asyncio + async def test_mark_emails_custom_mailbox(self): + """Test mark_emails MCP tool with custom mailbox.""" + mock_response = EmailMarkResponse( + success=True, + marked_ids=["12345"], + failed_ids=[], + mailbox="[Gmail]/All Mail", + marked_as="read", + ) + mock_handler = AsyncMock() + mock_handler.mark_emails.return_value = mock_response + + with patch("mcp_email_server.app.dispatch_handler", return_value=mock_handler): + result = await mark_emails( + account_name="test_account", + email_ids=["12345"], + mark_as="read", + mailbox="[Gmail]/All Mail", + ) + + assert result.mailbox == "[Gmail]/All Mail" + mock_handler.mark_emails.assert_called_once_with(["12345"], "read", "[Gmail]/All Mail") + @pytest.mark.asyncio async def test_download_attachment_disabled(self): """Test download_attachment MCP tool when feature is disabled."""