From 20a72119eb3d93b06e20c11db2ef294a48975820 Mon Sep 17 00:00:00 2001 From: Anonymous Date: Sun, 8 Feb 2026 22:50:12 +0100 Subject: [PATCH 1/6] feat: add mark_emails_as_read and move_emails tools Add two new MCP tools for email management: - mark_emails_as_read: Mark emails as read/unread using IMAP \Seen flag - move_emails: Move emails between mailboxes using MOVE (RFC 6851) with fallback to COPY+DELETE for older servers Co-Authored-By: Claude Opus 4.5 --- mcp_email_server/app.py | 46 ++++++++++++++ mcp_email_server/emails/__init__.py | 26 ++++++++ mcp_email_server/emails/classic.py | 97 +++++++++++++++++++++++++++++ 3 files changed, 169 insertions(+) diff --git a/mcp_email_server/app.py b/mcp_email_server/app.py index de9f95a..8c233a5 100644 --- a/mcp_email_server/app.py +++ b/mcp_email_server/app.py @@ -196,6 +196,52 @@ async def delete_emails( return result +@mcp.tool( + description="Mark one or more emails as read or unread. Use list_emails_metadata first to get the email_id." +) +async def mark_emails_as_read( + account_name: Annotated[str, Field(description="The name of the email account.")], + email_ids: Annotated[ + list[str], + Field(description="List of email_id to mark (obtained from list_emails_metadata)."), + ], + mailbox: Annotated[str, Field(default="INBOX", description="The mailbox containing the emails.")] = "INBOX", + read: Annotated[bool, Field(default=True, description="True to mark as read, False to mark as unread.")] = True, +) -> str: + handler = dispatch_handler(account_name) + success_ids, failed_ids = await handler.mark_emails_as_read(email_ids, mailbox, read) + + status = "read" if read else "unread" + result = f"Successfully marked {len(success_ids)} email(s) as {status}" + if failed_ids: + result += f", failed to mark {len(failed_ids)} email(s): {', '.join(failed_ids)}" + return result + + +@mcp.tool( + description="Move one or more emails to a different mailbox/folder. Common destinations: 'Archive', 'Trash', 'Spam'. Use list_emails_metadata first to get the email_id." +) +async def move_emails( + account_name: Annotated[str, Field(description="The name of the email account.")], + email_ids: Annotated[ + list[str], + Field(description="List of email_id to move (obtained from list_emails_metadata)."), + ], + destination_mailbox: Annotated[ + str, + Field(description="Target mailbox name (e.g., 'Archive', 'Trash', 'Spam', '[Gmail]/All Mail')."), + ], + source_mailbox: Annotated[str, Field(default="INBOX", description="Source mailbox.")] = "INBOX", +) -> str: + handler = dispatch_handler(account_name) + moved_ids, failed_ids = await handler.move_emails(email_ids, destination_mailbox, source_mailbox) + + result = f"Successfully moved {len(moved_ids)} email(s) to '{destination_mailbox}'" + if failed_ids: + result += f", failed to move {len(failed_ids)} email(s): {', '.join(failed_ids)}" + return result + + @mcp.tool( description="Download an email attachment and save it to the specified path. This feature must be explicitly enabled in settings (enable_attachment_download=true) due to security considerations.", ) diff --git a/mcp_email_server/emails/__init__.py b/mcp_email_server/emails/__init__.py index 251e9bc..4c784d6 100644 --- a/mcp_email_server/emails/__init__.py +++ b/mcp_email_server/emails/__init__.py @@ -72,6 +72,32 @@ async def delete_emails(self, email_ids: list[str], mailbox: str = "INBOX") -> t Delete emails by their IDs. Returns (deleted_ids, failed_ids) """ + @abc.abstractmethod + async def mark_emails_as_read( + self, email_ids: list[str], mailbox: str = "INBOX", read: bool = True + ) -> tuple[list[str], list[str]]: + """ + Mark emails as read or unread. Returns (success_ids, failed_ids) + + Args: + email_ids: List of email IDs to mark. + mailbox: The mailbox containing the emails (default: "INBOX"). + read: True to mark as read, False to mark as unread. + """ + + @abc.abstractmethod + async def move_emails( + self, email_ids: list[str], destination_mailbox: str, source_mailbox: str = "INBOX" + ) -> tuple[list[str], list[str]]: + """ + Move emails to another mailbox. Returns (moved_ids, failed_ids) + + Args: + email_ids: List of email IDs to move. + destination_mailbox: Target mailbox name (e.g., "Archive", "Trash"). + source_mailbox: Source mailbox (default: "INBOX"). + """ + @abc.abstractmethod async def download_attachment( self, diff --git a/mcp_email_server/emails/classic.py b/mcp_email_server/emails/classic.py index 7c643cd..dce7eab 100644 --- a/mcp_email_server/emails/classic.py +++ b/mcp_email_server/emails/classic.py @@ -912,6 +912,91 @@ async def delete_emails(self, email_ids: list[str], mailbox: str = "INBOX") -> t return deleted_ids, failed_ids + async def mark_emails_as_read( + self, email_ids: list[str], mailbox: str = "INBOX", read: bool = True + ) -> tuple[list[str], list[str]]: + """Mark emails as read or unread by their UIDs. Returns (success_ids, failed_ids).""" + imap = self.imap_class(self.email_server.host, self.email_server.port) + success_ids = [] + failed_ids = [] + + try: + await imap._client_task + await imap.wait_hello_from_server() + await imap.login(self.email_server.user_name, self.email_server.password) + await _send_imap_id(imap) + await imap.select(_quote_mailbox(mailbox)) + + flag_action = "+FLAGS" if read else "-FLAGS" + for email_id in email_ids: + try: + await imap.uid("store", email_id, flag_action, r"(\Seen)") + success_ids.append(email_id) + except Exception as e: + logger.error(f"Failed to mark email {email_id} as {'read' if read else 'unread'}: {e}") + failed_ids.append(email_id) + finally: + try: + await imap.logout() + except Exception as e: + logger.info(f"Error during logout: {e}") + + return success_ids, failed_ids + + async def move_emails( + self, email_ids: list[str], destination_mailbox: str, source_mailbox: str = "INBOX" + ) -> tuple[list[str], list[str]]: + """Move emails to another mailbox. Returns (moved_ids, failed_ids). + + Uses IMAP MOVE command (RFC 6851) if supported, otherwise falls back to COPY + DELETE. + """ + imap = self.imap_class(self.email_server.host, self.email_server.port) + moved_ids = [] + failed_ids = [] + + try: + await imap._client_task + await imap.wait_hello_from_server() + await imap.login(self.email_server.user_name, self.email_server.password) + await _send_imap_id(imap) + await imap.select(_quote_mailbox(source_mailbox)) + + # Check if MOVE is supported (RFC 6851) + capabilities = await imap.capability() + has_move = b"MOVE" in capabilities[1] if capabilities and len(capabilities) > 1 else False + + for email_id in email_ids: + try: + if has_move: + # Use MOVE command directly + result = await imap.uid("move", email_id, _quote_mailbox(destination_mailbox)) + if result[0] == "OK": + moved_ids.append(email_id) + else: + raise Exception(f"MOVE failed: {result}") + else: + # Fallback: COPY then DELETE + copy_result = await imap.uid("copy", email_id, _quote_mailbox(destination_mailbox)) + if copy_result[0] == "OK": + await imap.uid("store", email_id, "+FLAGS", r"(\Deleted)") + moved_ids.append(email_id) + else: + raise Exception(f"COPY failed: {copy_result}") + except Exception as e: + logger.error(f"Failed to move email {email_id}: {e}") + failed_ids.append(email_id) + + # Expunge deleted messages (only needed for COPY+DELETE fallback) + if moved_ids and not has_move: + await imap.expunge() + finally: + try: + await imap.logout() + except Exception as e: + logger.info(f"Error during logout: {e}") + + return moved_ids, failed_ids + class ClassicEmailHandler(EmailHandler): def __init__(self, email_settings: EmailSettings): @@ -1041,6 +1126,18 @@ async def delete_emails(self, email_ids: list[str], mailbox: str = "INBOX") -> t """Delete emails by their UIDs. Returns (deleted_ids, failed_ids).""" return await self.incoming_client.delete_emails(email_ids, mailbox) + async def mark_emails_as_read( + self, email_ids: list[str], mailbox: str = "INBOX", read: bool = True + ) -> tuple[list[str], list[str]]: + """Mark emails as read or unread. Returns (success_ids, failed_ids).""" + return await self.incoming_client.mark_emails_as_read(email_ids, mailbox, read) + + async def move_emails( + self, email_ids: list[str], destination_mailbox: str, source_mailbox: str = "INBOX" + ) -> tuple[list[str], list[str]]: + """Move emails to another mailbox. Returns (moved_ids, failed_ids).""" + return await self.incoming_client.move_emails(email_ids, destination_mailbox, source_mailbox) + async def download_attachment( self, email_id: str, From 14d09a2f0a4c0f46d67358c58fcc1a0346066ff9 Mon Sep 17 00:00:00 2001 From: Anonymous Date: Sun, 8 Feb 2026 22:57:26 +0100 Subject: [PATCH 2/6] fix: properly parse IMAP SEARCH response to avoid false UIDs Fix bug where status messages like "SEARCH completed (took 5 ms)" were incorrectly parsed as email UIDs. The number in the timing info (e.g., "5") was being treated as a valid UID. Add _parse_search_response() method that: - Detects status messages by checking for keywords - Returns empty list for status-only responses - Only returns actual numeric UIDs from valid responses Co-Authored-By: Claude Opus 4.5 --- mcp_email_server/emails/classic.py | 51 +++++++++++++++++++++++++++--- 1 file changed, 46 insertions(+), 5 deletions(-) diff --git a/mcp_email_server/emails/classic.py b/mcp_email_server/emails/classic.py index dce7eab..35ead55 100644 --- a/mcp_email_server/emails/classic.py +++ b/mcp_email_server/emails/classic.py @@ -187,6 +187,42 @@ def _parse_email_data(self, raw_email: bytes, email_id: str | None = None) -> di "attachments": attachments, } + @staticmethod + def _parse_search_response(messages: list) -> list[bytes]: + """Parse UIDs from IMAP SEARCH response. + + IMAP SEARCH responses can include: + - Actual UIDs as space-separated numbers + - Status messages like "SEARCH completed (took 5 ms)" + + This method filters out status messages and returns only valid UIDs. + Status messages are identified by containing non-numeric words like + "SEARCH", "completed", "took", etc. + """ + if not messages or not messages[0]: + return [] + + response = messages[0] + if isinstance(response, bytes): + response_str = response.decode("utf-8", errors="replace") + else: + response_str = str(response) + + # Check if this looks like a status message rather than UIDs + # Status messages contain keywords like "SEARCH", "completed", "took" + status_keywords = ["SEARCH", "completed", "took", "OK", "BAD", "NO"] + if any(keyword in response_str for keyword in status_keywords): + return [] + + # Split and filter to only numeric values + parts = response_str.split() + email_ids = [] + for part in parts: + if part.isdigit(): + email_ids.append(part.encode() if isinstance(part, str) else part) + + return email_ids + @staticmethod def _build_search_criteria( before: datetime | None = None, @@ -388,7 +424,11 @@ async def get_email_count( logger.info(f"Count: Search criteria: {search_criteria}") # Search for messages and count them - use UID SEARCH for consistency _, messages = await imap.uid_search(*search_criteria) - return len(messages[0].split()) + # Parse UIDs from SEARCH response + # Valid responses contain UIDs as space-separated numbers + # Status messages like "SEARCH completed (took 5 ms)" should be ignored + email_ids = self._parse_search_response(messages) + return len(email_ids) finally: # Ensure we logout properly try: @@ -437,12 +477,13 @@ async def get_emails_metadata_stream( # Search for messages - use UID SEARCH for better compatibility _, messages = await imap.uid_search(*search_criteria) - # Handle empty or None responses - if not messages or not messages[0]: - logger.warning("No messages returned from search") + # Parse UIDs from SEARCH response + email_ids = self._parse_search_response(messages) + + if not email_ids: + logger.info("No matching emails found") return - email_ids = messages[0].split() logger.info(f"Found {len(email_ids)} email IDs") # Phase 1: Batch fetch INTERNALDATE for sorting (parallel chunks) From f80dead284724b3ec4de11c3ea579320043b0d52 Mon Sep 17 00:00:00 2001 From: Anonymous Date: Sun, 8 Feb 2026 23:05:50 +0100 Subject: [PATCH 3/6] feat: add list_mailboxes tool to discover available folders Add new MCP tool to list all mailboxes (folders) in an email account. Returns mailbox name, flags, and hierarchy delimiter. Useful for discovering folder names like Archive, Sent, Trash which may vary across email providers (e.g., iCloud uses different names). Co-Authored-By: Claude Opus 4.5 --- mcp_email_server/app.py | 10 +++++ mcp_email_server/emails/__init__.py | 9 +++++ mcp_email_server/emails/classic.py | 58 +++++++++++++++++++++++++++++ 3 files changed, 77 insertions(+) diff --git a/mcp_email_server/app.py b/mcp_email_server/app.py index 8c233a5..c739159 100644 --- a/mcp_email_server/app.py +++ b/mcp_email_server/app.py @@ -32,6 +32,16 @@ async def list_available_accounts() -> list[AccountAttributes]: return [account.masked() for account in settings.get_accounts()] +@mcp.tool( + description="List all mailboxes (folders) in an email account. Use this to discover available folders like Archive, Sent, Trash, etc." +) +async def list_mailboxes( + account_name: Annotated[str, Field(description="The name of the email account.")], +) -> list[dict]: + handler = dispatch_handler(account_name) + return await handler.list_mailboxes() + + @mcp.tool(description="Add a new email account configuration to the settings.") async def add_email_account(email: EmailSettings) -> str: settings = get_settings() diff --git a/mcp_email_server/emails/__init__.py b/mcp_email_server/emails/__init__.py index 4c784d6..e6588de 100644 --- a/mcp_email_server/emails/__init__.py +++ b/mcp_email_server/emails/__init__.py @@ -66,6 +66,15 @@ async def send_email( Send email """ + @abc.abstractmethod + async def list_mailboxes(self) -> list[dict]: + """ + List all mailboxes (folders) in the email account. + + Returns: + List of dictionaries with mailbox info (name, flags, delimiter). + """ + @abc.abstractmethod async def delete_emails(self, email_ids: list[str], mailbox: str = "INBOX") -> tuple[list[str], list[str]]: """ diff --git a/mcp_email_server/emails/classic.py b/mcp_email_server/emails/classic.py index 35ead55..bb169bc 100644 --- a/mcp_email_server/emails/classic.py +++ b/mcp_email_server/emails/classic.py @@ -389,6 +389,60 @@ async def _batch_fetch_headers( return results + async def list_mailboxes(self) -> list[dict[str, Any]]: + """List all mailboxes (folders) in the email account. + + Returns a list of dictionaries with mailbox information: + - name: The mailbox name + - flags: List of flags (e.g., \\Noselect, \\HasChildren) + - delimiter: The hierarchy delimiter (usually "/" or ".") + """ + imap = self.imap_class(self.email_server.host, self.email_server.port) + mailboxes = [] + + try: + await imap._client_task + await imap.wait_hello_from_server() + await imap.login(self.email_server.user_name, self.email_server.password) + await _send_imap_id(imap) + + # List all mailboxes + _, data = await imap.list('""', "*") + + for item in data: + if not isinstance(item, bytes): + continue + + # Parse LIST response: (flags) "delimiter" "name" + # Example: (\HasNoChildren) "/" "Archive" + item_str = item.decode("utf-8", errors="replace") + + # Skip status messages + if "LIST completed" in item_str or item_str.startswith("OK"): + continue + + # Parse the response + import re + match = re.match(r'\(([^)]*)\)\s+"([^"]+)"\s+"?([^"]+)"?', item_str) + if match: + flags_str, delimiter, name = match.groups() + flags = [f.strip() for f in flags_str.split() if f.strip()] + # Remove trailing quote if present + name = name.rstrip('"') + mailboxes.append({ + "name": name, + "flags": flags, + "delimiter": delimiter, + }) + + finally: + try: + await imap.logout() + except Exception as e: + logger.info(f"Error during logout: {e}") + + return mailboxes + async def get_email_count( self, before: datetime | None = None, @@ -1050,6 +1104,10 @@ def __init__(self, email_settings: EmailSettings): self.save_to_sent = email_settings.save_to_sent self.sent_folder_name = email_settings.sent_folder_name + async def list_mailboxes(self) -> list[dict]: + """List all mailboxes (folders) in the email account.""" + return await self.incoming_client.list_mailboxes() + async def get_emails_metadata( self, page: int = 1, From 56ba27e8e446cc5d3cbe3d112d15d178cd96d2dd Mon Sep 17 00:00:00 2001 From: Anonymous Date: Sun, 8 Feb 2026 23:14:59 +0100 Subject: [PATCH 4/6] perf: optimize pagination + add search_emails tool Performance optimization: - Don't fetch INTERNALDATE for all emails when paginating - Use UID ordering directly (UIDs are ascending by add date) - Only fetch headers for the requested page New search_emails tool: - Server-side IMAP SEARCH (fast even with thousands of emails) - Search in: all (TEXT), subject, body, or from - Paginated results Co-Authored-By: Claude Opus 4.5 --- mcp_email_server/app.py | 19 ++++ mcp_email_server/emails/__init__.py | 23 +++++ mcp_email_server/emails/classic.py | 146 +++++++++++++++++++++++----- 3 files changed, 166 insertions(+), 22 deletions(-) diff --git a/mcp_email_server/app.py b/mcp_email_server/app.py index c739159..8b56160 100644 --- a/mcp_email_server/app.py +++ b/mcp_email_server/app.py @@ -42,6 +42,25 @@ async def list_mailboxes( return await handler.list_mailboxes() +@mcp.tool( + description="Search emails using server-side IMAP search. Fast even with thousands of emails. " + "Searches in subject, body, and headers by default." +) +async def search_emails( + account_name: Annotated[str, Field(description="The name of the email account.")], + query: Annotated[str, Field(description="Text to search for in emails.")], + mailbox: Annotated[str, Field(default="INBOX", description="Mailbox to search in.")] = "INBOX", + search_in: Annotated[ + Literal["all", "subject", "body", "from"], + Field(default="all", description="Where to search: 'all' (headers+body), 'subject', 'body', or 'from'."), + ] = "all", + page: Annotated[int, Field(default=1, description="Page number (starting from 1).")] = 1, + page_size: Annotated[int, Field(default=20, description="Number of results per page.")] = 20, +) -> dict: + handler = dispatch_handler(account_name) + return await handler.search_emails(query, mailbox, search_in, page, page_size) + + @mcp.tool(description="Add a new email account configuration to the settings.") async def add_email_account(email: EmailSettings) -> str: settings = get_settings() diff --git a/mcp_email_server/emails/__init__.py b/mcp_email_server/emails/__init__.py index e6588de..cb7af4f 100644 --- a/mcp_email_server/emails/__init__.py +++ b/mcp_email_server/emails/__init__.py @@ -75,6 +75,29 @@ async def list_mailboxes(self) -> list[dict]: List of dictionaries with mailbox info (name, flags, delimiter). """ + @abc.abstractmethod + async def search_emails( + self, + query: str, + mailbox: str = "INBOX", + search_in: str = "all", + page: int = 1, + page_size: int = 20, + ) -> dict: + """ + Search emails using server-side IMAP SEARCH. + + Args: + query: Text to search for. + mailbox: Mailbox to search in (default: "INBOX"). + search_in: Where to search - "all", "subject", "body", "from". + page: Page number (starting from 1). + page_size: Number of results per page. + + Returns: + Dictionary with query, total, page, and emails list. + """ + @abc.abstractmethod async def delete_emails(self, email_ids: list[str], mailbox: str = "INBOX") -> tuple[list[str], list[str]]: """ diff --git a/mcp_email_server/emails/classic.py b/mcp_email_server/emails/classic.py index bb169bc..ba74fac 100644 --- a/mcp_email_server/emails/classic.py +++ b/mcp_email_server/emails/classic.py @@ -443,6 +443,95 @@ async def list_mailboxes(self) -> list[dict[str, Any]]: return mailboxes + async def search_emails( + self, + query: str, + mailbox: str = "INBOX", + search_in: str = "all", + page: int = 1, + page_size: int = 20, + ) -> dict[str, Any]: + """Search emails using server-side IMAP SEARCH. + + Args: + query: Text to search for. + mailbox: Mailbox to search in (default: "INBOX"). + search_in: Where to search - "all" (TEXT), "subject", "body", "from". + page: Page number (starting from 1). + page_size: Number of results per page. + + Returns: + Dictionary with query, total count, page, and list of email metadata. + """ + imap = self.imap_class(self.email_server.host, self.email_server.port) + + try: + await imap._client_task + await imap.wait_hello_from_server() + await imap.login(self.email_server.user_name, self.email_server.password) + await _send_imap_id(imap) + await imap.select(_quote_mailbox(mailbox)) + + # Build IMAP search criteria based on search_in parameter + if search_in == "subject": + search_criteria = ["SUBJECT", query] + elif search_in == "body": + search_criteria = ["BODY", query] + elif search_in == "from": + search_criteria = ["FROM", query] + else: # "all" - searches in headers and body + search_criteria = ["TEXT", query] + + logger.info(f"Search: {search_criteria} in {mailbox}") + + # Execute server-side search + _, messages = await imap.uid_search(*search_criteria) + email_ids = self._parse_search_response(messages) + + total = len(email_ids) + logger.info(f"Search found {total} matching emails") + + if not email_ids: + return { + "query": query, + "search_in": search_in, + "mailbox": mailbox, + "total": 0, + "page": page, + "page_size": page_size, + "emails": [], + } + + # Paginate (most recent first - from end of UID list) + start_idx = max(0, total - (page * page_size)) + end_idx = total - ((page - 1) * page_size) + page_ids = email_ids[start_idx:end_idx] + page_ids = list(reversed([uid.decode() if isinstance(uid, bytes) else uid for uid in page_ids])) + + # Fetch headers for the page + metadata_by_uid = await self._batch_fetch_headers(imap, page_ids) + + emails = [] + for uid in page_ids: + if uid in metadata_by_uid: + emails.append(metadata_by_uid[uid]) + + return { + "query": query, + "search_in": search_in, + "mailbox": mailbox, + "total": total, + "page": page, + "page_size": page_size, + "emails": emails, + } + + finally: + try: + await imap.logout() + except Exception as e: + logger.info(f"Error during logout: {e}") + async def get_email_count( self, before: datetime | None = None, @@ -538,35 +627,37 @@ async def get_emails_metadata_stream( logger.info("No matching emails found") return - logger.info(f"Found {len(email_ids)} email IDs") - - # Phase 1: Batch fetch INTERNALDATE for sorting (parallel chunks) - fetch_dates_start = time.perf_counter() - uid_dates = await self._batch_fetch_dates(imap, email_ids) - fetch_dates_elapsed = time.perf_counter() - fetch_dates_start - - # Sort by INTERNALDATE - sorted_uids = sorted(uid_dates.items(), key=lambda x: x[1], reverse=(order == "desc")) - - # Paginate - start = (page - 1) * page_size - page_uids = [uid for uid, _ in sorted_uids[start : start + page_size]] + total = len(email_ids) + logger.info(f"Found {total} email IDs") + + # OPTIMIZED: Use UID ordering directly instead of fetching all dates + # UIDs are strictly ascending as messages are added to the mailbox + # This avoids fetching INTERNALDATE for potentially thousands of emails + if order == "desc": + # For descending: take from the end (most recent in this folder) + start_idx = max(0, total - (page * page_size)) + end_idx = total - ((page - 1) * page_size) + page_uids = email_ids[start_idx:end_idx] + # Reverse to get most recent first + page_uids = list(reversed([uid.decode() if isinstance(uid, bytes) else uid for uid in page_uids])) + else: + # For ascending: take from the beginning + start_idx = (page - 1) * page_size + end_idx = min(start_idx + page_size, total) + page_uids = [uid.decode() if isinstance(uid, bytes) else uid for uid in email_ids[start_idx:end_idx]] if not page_uids: - logger.info(f"Phase 1 (dates): {len(uid_dates)} UIDs in {fetch_dates_elapsed:.2f}s, page {page} empty") + logger.info(f"Page {page} is empty (total: {total})") return - # Phase 2: Batch fetch headers for requested page only - fetch_headers_start = time.perf_counter() + # Fetch headers only for the requested page + fetch_start = time.perf_counter() metadata_by_uid = await self._batch_fetch_headers(imap, page_uids) - fetch_headers_elapsed = time.perf_counter() - fetch_headers_start + fetch_elapsed = time.perf_counter() - fetch_start - logger.info( - f"Fetched page {page}: {fetch_dates_elapsed:.2f}s dates ({len(uid_dates)} UIDs), " - f"{fetch_headers_elapsed:.2f}s headers ({len(page_uids)} UIDs)" - ) + logger.info(f"Fetched page {page}: {fetch_elapsed:.2f}s for {len(page_uids)} emails (total: {total})") - # Yield in sorted order + # Yield in page order for uid in page_uids: if uid in metadata_by_uid: yield metadata_by_uid[uid] @@ -1108,6 +1199,17 @@ async def list_mailboxes(self) -> list[dict]: """List all mailboxes (folders) in the email account.""" return await self.incoming_client.list_mailboxes() + async def search_emails( + self, + query: str, + mailbox: str = "INBOX", + search_in: str = "all", + page: int = 1, + page_size: int = 20, + ) -> dict: + """Search emails using server-side IMAP SEARCH.""" + return await self.incoming_client.search_emails(query, mailbox, search_in, page, page_size) + async def get_emails_metadata( self, page: int = 1, From 328d00cde7f929346cbabf068b03aebae00b8a7f Mon Sep 17 00:00:00 2001 From: Anonymous Date: Sun, 8 Feb 2026 23:16:56 +0100 Subject: [PATCH 5/6] fix: use protocol.capabilities instead of capability() method --- mcp_email_server/emails/classic.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/mcp_email_server/emails/classic.py b/mcp_email_server/emails/classic.py index ba74fac..7a678da 100644 --- a/mcp_email_server/emails/classic.py +++ b/mcp_email_server/emails/classic.py @@ -1147,9 +1147,8 @@ async def move_emails( await _send_imap_id(imap) await imap.select(_quote_mailbox(source_mailbox)) - # Check if MOVE is supported (RFC 6851) - capabilities = await imap.capability() - has_move = b"MOVE" in capabilities[1] if capabilities and len(capabilities) > 1 else False + # Check if MOVE is supported (RFC 6851) via protocol capabilities + has_move = hasattr(imap.protocol, 'capabilities') and b"MOVE" in imap.protocol.capabilities for email_id in email_ids: try: From 478729df962e35c605e16cf49d8bb00edd4b11a9 Mon Sep 17 00:00:00 2001 From: Anonymous Date: Sun, 8 Feb 2026 23:25:48 +0100 Subject: [PATCH 6/6] style: fix linting errors and add tests for new email tools - Use ternary operator in _parse_search_response (SIM108) - Replace raise Exception with logging + failed_ids (TRY301) - Add tests for list_mailboxes, search_emails, mark_emails_as_read, move_emails - Update test_get_emails_stream to match optimized pagination behavior Co-Authored-By: Claude Opus 4.5 --- mcp_email_server/app.py | 4 +- mcp_email_server/emails/classic.py | 14 ++-- tests/test_classic_handler.py | 104 +++++++++++++++++++++++++++++ tests/test_email_client.py | 71 +++++++++----------- 4 files changed, 145 insertions(+), 48 deletions(-) diff --git a/mcp_email_server/app.py b/mcp_email_server/app.py index 8b56160..de17a06 100644 --- a/mcp_email_server/app.py +++ b/mcp_email_server/app.py @@ -225,9 +225,7 @@ async def delete_emails( return result -@mcp.tool( - description="Mark one or more emails as read or unread. Use list_emails_metadata first to get the email_id." -) +@mcp.tool(description="Mark one or more emails as read or unread. Use list_emails_metadata first to get the email_id.") async def mark_emails_as_read( account_name: Annotated[str, Field(description="The name of the email account.")], email_ids: Annotated[ diff --git a/mcp_email_server/emails/classic.py b/mcp_email_server/emails/classic.py index 7a678da..c2d4b30 100644 --- a/mcp_email_server/emails/classic.py +++ b/mcp_email_server/emails/classic.py @@ -203,10 +203,7 @@ def _parse_search_response(messages: list) -> list[bytes]: return [] response = messages[0] - if isinstance(response, bytes): - response_str = response.decode("utf-8", errors="replace") - else: - response_str = str(response) + response_str = response.decode("utf-8", errors="replace") if isinstance(response, bytes) else str(response) # Check if this looks like a status message rather than UIDs # Status messages contain keywords like "SEARCH", "completed", "took" @@ -423,6 +420,7 @@ async def list_mailboxes(self) -> list[dict[str, Any]]: # Parse the response import re + match = re.match(r'\(([^)]*)\)\s+"([^"]+)"\s+"?([^"]+)"?', item_str) if match: flags_str, delimiter, name = match.groups() @@ -1148,7 +1146,7 @@ async def move_emails( await imap.select(_quote_mailbox(source_mailbox)) # Check if MOVE is supported (RFC 6851) via protocol capabilities - has_move = hasattr(imap.protocol, 'capabilities') and b"MOVE" in imap.protocol.capabilities + has_move = hasattr(imap.protocol, "capabilities") and b"MOVE" in imap.protocol.capabilities for email_id in email_ids: try: @@ -1158,7 +1156,8 @@ async def move_emails( if result[0] == "OK": moved_ids.append(email_id) else: - raise Exception(f"MOVE failed: {result}") + logger.error(f"MOVE failed for {email_id}: {result}") + failed_ids.append(email_id) else: # Fallback: COPY then DELETE copy_result = await imap.uid("copy", email_id, _quote_mailbox(destination_mailbox)) @@ -1166,7 +1165,8 @@ async def move_emails( await imap.uid("store", email_id, "+FLAGS", r"(\Deleted)") moved_ids.append(email_id) else: - raise Exception(f"COPY failed: {copy_result}") + logger.error(f"COPY failed for {email_id}: {copy_result}") + failed_ids.append(email_id) except Exception as e: logger.error(f"Failed to move email {email_id}: {e}") failed_ids.append(email_id) diff --git a/tests/test_classic_handler.py b/tests/test_classic_handler.py index 3245a6a..6f294f9 100644 --- a/tests/test_classic_handler.py +++ b/tests/test_classic_handler.py @@ -365,6 +365,110 @@ async def test_get_emails_content_includes_message_id(self, classic_handler): # Verify the client method was called correctly mock_get_body.assert_called_once_with("123", "INBOX") + @pytest.mark.asyncio + async def test_list_mailboxes(self, classic_handler): + """Test list_mailboxes method.""" + mock_mailboxes = [ + {"name": "INBOX", "flags": ["\\HasNoChildren"], "delimiter": "/"}, + {"name": "Archive", "flags": ["\\HasNoChildren"], "delimiter": "/"}, + {"name": "Sent", "flags": ["\\Sent"], "delimiter": "/"}, + ] + mock_list = AsyncMock(return_value=mock_mailboxes) + + with patch.object(classic_handler.incoming_client, "list_mailboxes", mock_list): + result = await classic_handler.list_mailboxes() + + assert len(result) == 3 + assert result[0]["name"] == "INBOX" + assert result[1]["name"] == "Archive" + mock_list.assert_called_once() + + @pytest.mark.asyncio + async def test_search_emails(self, classic_handler): + """Test search_emails method.""" + mock_result = { + "query": "test", + "total": 2, + "page": 1, + "emails": [ + {"email_id": "123", "subject": "Test email 1"}, + {"email_id": "456", "subject": "Test email 2"}, + ], + } + mock_search = AsyncMock(return_value=mock_result) + + with patch.object(classic_handler.incoming_client, "search_emails", mock_search): + result = await classic_handler.search_emails("test", "INBOX", "all", 1, 20) + + assert result["query"] == "test" + assert result["total"] == 2 + assert len(result["emails"]) == 2 + mock_search.assert_called_once_with("test", "INBOX", "all", 1, 20) + + @pytest.mark.asyncio + async def test_mark_emails_as_read(self, classic_handler): + """Test mark_emails_as_read method.""" + mock_mark = AsyncMock(return_value=(["123", "456"], [])) + + with patch.object(classic_handler.incoming_client, "mark_emails_as_read", mock_mark): + success_ids, failed_ids = await classic_handler.mark_emails_as_read( + email_ids=["123", "456"], + mailbox="INBOX", + read=True, + ) + + assert success_ids == ["123", "456"] + assert failed_ids == [] + mock_mark.assert_called_once_with(["123", "456"], "INBOX", True) + + @pytest.mark.asyncio + async def test_mark_emails_as_unread(self, classic_handler): + """Test mark_emails_as_read method with read=False.""" + mock_mark = AsyncMock(return_value=(["789"], [])) + + with patch.object(classic_handler.incoming_client, "mark_emails_as_read", mock_mark): + success_ids, failed_ids = await classic_handler.mark_emails_as_read( + email_ids=["789"], + mailbox="INBOX", + read=False, + ) + + assert success_ids == ["789"] + assert failed_ids == [] + mock_mark.assert_called_once_with(["789"], "INBOX", False) + + @pytest.mark.asyncio + async def test_move_emails(self, classic_handler): + """Test move_emails method.""" + mock_move = AsyncMock(return_value=(["123", "456"], [])) + + with patch.object(classic_handler.incoming_client, "move_emails", mock_move): + moved_ids, failed_ids = await classic_handler.move_emails( + email_ids=["123", "456"], + destination_mailbox="Archive", + source_mailbox="INBOX", + ) + + assert moved_ids == ["123", "456"] + assert failed_ids == [] + mock_move.assert_called_once_with(["123", "456"], "Archive", "INBOX") + + @pytest.mark.asyncio + async def test_move_emails_with_failures(self, classic_handler): + """Test move_emails method with some failures.""" + mock_move = AsyncMock(return_value=(["123"], ["456"])) + + with patch.object(classic_handler.incoming_client, "move_emails", mock_move): + moved_ids, failed_ids = await classic_handler.move_emails( + email_ids=["123", "456"], + destination_mailbox="Trash", + source_mailbox="INBOX", + ) + + assert moved_ids == ["123"] + assert failed_ids == ["456"] + mock_move.assert_called_once_with(["123", "456"], "Trash", "INBOX") + class TestEmailClientBatchMethods: """Test batch fetch methods for performance optimization.""" diff --git a/tests/test_email_client.py b/tests/test_email_client.py index 20343c5..da5b4a9 100644 --- a/tests/test_email_client.py +++ b/tests/test_email_client.py @@ -192,7 +192,11 @@ def test_build_search_criteria(self): @pytest.mark.asyncio async def test_get_emails_stream(self, email_client): - """Test getting emails stream returns sorted, paginated results.""" + """Test getting emails stream returns paginated results using UID ordering. + + With the optimized pagination, we use UID ordering directly (UIDs are + ascending by date added to mailbox) instead of fetching all dates first. + """ mock_imap = AsyncMock() mock_imap._client_task = asyncio.Future() mock_imap._client_task.set_result(None) @@ -202,19 +206,15 @@ async def test_get_emails_stream(self, email_client): mock_imap.uid_search = AsyncMock(return_value=(None, [b"1 2 3"])) mock_imap.logout = AsyncMock() - # Mock at the helper level - test behavior, not implementation - mock_dates = { - "1": datetime(2024, 1, 1, tzinfo=timezone.utc), - "2": datetime(2024, 1, 2, tzinfo=timezone.utc), - "3": datetime(2024, 1, 3, tzinfo=timezone.utc), - } + # Mock metadata returned by _batch_fetch_headers + # For desc order, UIDs are reversed: 3, 2, 1 mock_metadata = { - "1": { - "email_id": "1", - "subject": "Subject 1", - "from": "a@test.com", + "3": { + "email_id": "3", + "subject": "Subject 3", + "from": "c@test.com", "to": [], - "date": datetime(2024, 1, 1, tzinfo=timezone.utc), + "date": datetime(2024, 1, 3, tzinfo=timezone.utc), "attachments": [], }, "2": { @@ -225,38 +225,33 @@ async def test_get_emails_stream(self, email_client): "date": datetime(2024, 1, 2, tzinfo=timezone.utc), "attachments": [], }, - "3": { - "email_id": "3", - "subject": "Subject 3", - "from": "c@test.com", + "1": { + "email_id": "1", + "subject": "Subject 1", + "from": "a@test.com", "to": [], - "date": datetime(2024, 1, 3, tzinfo=timezone.utc), + "date": datetime(2024, 1, 1, tzinfo=timezone.utc), "attachments": [], }, } with patch.object(email_client, "imap_class", return_value=mock_imap): - with patch.object(email_client, "_batch_fetch_dates", return_value=mock_dates) as mock_fetch_dates: - with patch.object( - email_client, "_batch_fetch_headers", return_value=mock_metadata - ) as mock_fetch_headers: - emails = [] - async for email_data in email_client.get_emails_metadata_stream(page=1, page_size=10): - emails.append(email_data) - - # Behavior: returns emails sorted by date desc (newest first) - assert len(emails) == 3 - assert emails[0]["subject"] == "Subject 3" - assert emails[1]["subject"] == "Subject 2" - assert emails[2]["subject"] == "Subject 1" - - mock_imap.login.assert_called_once() - mock_imap.logout.assert_called_once() - - # Verify helpers called with correct arguments - mock_fetch_dates.assert_called_once_with(mock_imap, [b"1", b"2", b"3"]) - # Headers fetched for page UIDs in sorted order (desc by date) - mock_fetch_headers.assert_called_once_with(mock_imap, ["3", "2", "1"]) + with patch.object(email_client, "_batch_fetch_headers", return_value=mock_metadata) as mock_fetch_headers: + emails = [] + async for email_data in email_client.get_emails_metadata_stream(page=1, page_size=10): + emails.append(email_data) + + # Behavior: returns emails in UID desc order (newest UIDs first) + assert len(emails) == 3 + assert emails[0]["subject"] == "Subject 3" + assert emails[1]["subject"] == "Subject 2" + assert emails[2]["subject"] == "Subject 1" + + mock_imap.login.assert_called_once() + mock_imap.logout.assert_called_once() + + # Headers fetched for page UIDs in desc order + mock_fetch_headers.assert_called_once_with(mock_imap, ["3", "2", "1"]) @pytest.mark.asyncio async def test_get_email_count(self, email_client):