diff --git a/docs/EMAIL_SEARCH_PERFORMANCE.md b/docs/EMAIL_SEARCH_PERFORMANCE.md new file mode 100644 index 0000000..4c8fadd --- /dev/null +++ b/docs/EMAIL_SEARCH_PERFORMANCE.md @@ -0,0 +1,206 @@ +# Email Search Performance & Filter Requirements + +## Overview + +The `list_emails_metadata()` method requires at least one search filter to prevent expensive mailbox scans that can timeout on large mailboxes. This document explains the reasoning and best practices. + +## Why Filters Are Required + +### The Problem: Unfiltered Searches + +When calling `list_emails_metadata()` without any filters: + +```python +# ❌ BAD: No filters +list_emails_metadata(account_name="Galaxia", page=1, page_size=5) +``` + +Despite requesting only 5 emails per page, the internal flow is: + +``` +1. IMAP uid_search("ALL") ← Scans entire mailbox +2. Fetch dates for ALL emails ← Could be 100,000+ emails +3. Sort ALL emails ← Memory intensive +4. Then paginate to get 5 emails ← Finally! +``` + +**Pagination only applies AFTER the expensive operations.** This means: + +- On a mailbox with 10,000 emails: seconds of delay +- On a mailbox with 100,000+ emails: minutes or timeout +- On enterprise mailboxes: can hang indefinitely + +### IMAP Protocol Limitation + +IMAP doesn't support "give me the first N emails" queries. The protocol requires: + +1. **SEARCH** - Define criteria and get matching UIDs (returns ALL matches) +2. **FETCH** - Get data for specific UIDs +3. **SORT** - Order results (optional, server-dependent) + +There's no built-in way to limit results before the search phase. + +## Best Practices + +### ✅ Fast Searches + +#### 1. Date Range (Fastest) + +```python +# Get last 30 days of emails +from datetime import datetime, timedelta +since = datetime.now() - timedelta(days=30) +result = list_emails_metadata(account_name="Galaxia", since=since) +``` + +**Why:** IMAP servers heavily index by date. Returns only recent emails. + +#### 2. Text Search (Medium Speed) + +```python +# Search for specific sender +result = list_emails_metadata( + account_name="Galaxia", + from_address="boss@company.com" +) +``` + +**Why:** Text searches use server indices, but could match many emails. + +#### 3. Combined Filters (Fastest & Best) + +```python +# Search for work emails from last month +since = datetime.now() - timedelta(days=30) +result = list_emails_metadata( + account_name="Galaxia", + subject="project", + from_address="team@company.com", + since=since +) +``` + +**Why:** Narrows search space at IMAP level (most efficient). + +#### 4. Flag-Based Search + +```python +# Get unread emails from the last 7 days +since = datetime.now() - timedelta(days=7) +result = list_emails_metadata( + account_name="Galaxia", + seen=False, # Unread emails + since=since +) +``` + +**Why:** Flag searches are fast; combining with date range is best. + +## Performance Comparison + +| Query | Mailbox Size | Time | +| ---------------------- | ---------------- | ------------ | +| `SEARCH ALL` | 10,000 emails | ~1 second | +| `SEARCH ALL` | 100,000 emails | ~10+ seconds | +| `SEARCH ALL` | 1,000,000 emails | **TIMEOUT** | +| `SEARCH SINCE ` | Any size | ~100ms | +| `SEARCH FROM "sender"` | 100,000 emails | ~500ms | +| `SEARCH SINCE + FROM` | 100,000 emails | ~100ms | + +## Error Message Explanation + +When no filters are provided: + +``` +ValueError: At least one filter is required to prevent expensive searches +on large mailboxes. Recommended: combine a date range (since/before) with +optional text filters (subject/from/to). +Example: since=datetime(2026, 1, 1) or subject='work' + since=datetime(2025, 1, 1) +``` + +This error prevents: + +- Silent performance degradation +- Unexplained timeouts +- User frustration with "why is this so slow?" + +## Available Filters + +All filters prevent full mailbox scans: + +- **`since`** (datetime) - Emails after date (fastest) +- **`before`** (datetime) - Emails before date (fastest) +- **`subject`** (string) - Subject line text search +- **`from_address`** (string) - Sender email address +- **`to_address`** (string) - Recipient email address +- **`seen`** (bool) - Read/unread emails +- **`flagged`** (bool) - Starred/flagged emails +- **`answered`** (bool) - Emails with replies + +## Recommendations + +### For Applications + +1. **Always provide a date range** - This is the fastest and most predictable +2. **Combine with text filters** - Narrow results further +3. **Handle pagination** - Combine with `page` and `page_size` +4. **Cache results** - Don't re-query immediately + +### For Users + +1. **Start with recent emails** - Last 30-90 days is usually sufficient +2. **Use specific searches** - If looking for something, add subject/from filters +3. **Be explicit** - Don't rely on defaults; always specify your intent + +## Migration Guide + +If you were using unfiltered searches before: + +### Before (Would fail now) + +```python +result = list_emails_metadata(account_name="Galaxia") +``` + +### After + +```python +from datetime import datetime, timedelta + +# Option 1: Last 30 days +since = datetime.now() - timedelta(days=30) +result = list_emails_metadata(account_name="Galaxia", since=since) + +# Option 2: Search for specific sender +result = list_emails_metadata( + account_name="Galaxia", + from_address="colleague@company.com" +) + +# Option 3: Combine filters (recommended) +since = datetime.now() - timedelta(days=90) +result = list_emails_metadata( + account_name="Galaxia", + subject="project", + since=since +) +``` + +## FAQ + +**Q: Can I see all my emails?** +A: Yes, use a large date range: `since=datetime(2000, 1, 1)`. On large mailboxes, this may take several seconds or timeout depending on server capacity. + +**Q: Why is pagination alone not enough?** +A: IMAP requires a full search before paginating. Pagination only applies after results are returned, so it doesn't prevent the initial expensive scan. + +**Q: What if my IMAP server is fast?** +A: Even fast servers struggle with "ALL" searches on mailboxes with 100,000+ emails. Date range filters are always safer. + +**Q: Can I search my entire mailbox?** +A: Technically yes, but it's not recommended for mailboxes > 50,000 emails. Use: `since=datetime(2000, 1, 1)` and be patient. Consider pagination with small `page_size` values. + +## See Also + +- [IMAP RFC 3501](https://tools.ietf.org/html/rfc3501) - IMAP Protocol Specification +- [mcp-email-server Repository](https://github.com/ai-zerolab/mcp-email-server) - Main project diff --git a/mcp_email_server/app.py b/mcp_email_server/app.py index de9f95a..de17a06 100644 --- a/mcp_email_server/app.py +++ b/mcp_email_server/app.py @@ -32,6 +32,35 @@ async def list_available_accounts() -> list[AccountAttributes]: return [account.masked() for account in settings.get_accounts()] +@mcp.tool( + description="List all mailboxes (folders) in an email account. Use this to discover available folders like Archive, Sent, Trash, etc." +) +async def list_mailboxes( + account_name: Annotated[str, Field(description="The name of the email account.")], +) -> list[dict]: + handler = dispatch_handler(account_name) + return await handler.list_mailboxes() + + +@mcp.tool( + description="Search emails using server-side IMAP search. Fast even with thousands of emails. " + "Searches in subject, body, and headers by default." +) +async def search_emails( + account_name: Annotated[str, Field(description="The name of the email account.")], + query: Annotated[str, Field(description="Text to search for in emails.")], + mailbox: Annotated[str, Field(default="INBOX", description="Mailbox to search in.")] = "INBOX", + search_in: Annotated[ + Literal["all", "subject", "body", "from"], + Field(default="all", description="Where to search: 'all' (headers+body), 'subject', 'body', or 'from'."), + ] = "all", + page: Annotated[int, Field(default=1, description="Page number (starting from 1).")] = 1, + page_size: Annotated[int, Field(default=20, description="Number of results per page.")] = 20, +) -> dict: + handler = dispatch_handler(account_name) + return await handler.search_emails(query, mailbox, search_in, page, page_size) + + @mcp.tool(description="Add a new email account configuration to the settings.") async def add_email_account(email: EmailSettings) -> str: settings = get_settings() @@ -196,6 +225,50 @@ async def delete_emails( return result +@mcp.tool(description="Mark one or more emails as read or unread. Use list_emails_metadata first to get the email_id.") +async def mark_emails_as_read( + account_name: Annotated[str, Field(description="The name of the email account.")], + email_ids: Annotated[ + list[str], + Field(description="List of email_id to mark (obtained from list_emails_metadata)."), + ], + mailbox: Annotated[str, Field(default="INBOX", description="The mailbox containing the emails.")] = "INBOX", + read: Annotated[bool, Field(default=True, description="True to mark as read, False to mark as unread.")] = True, +) -> str: + handler = dispatch_handler(account_name) + success_ids, failed_ids = await handler.mark_emails_as_read(email_ids, mailbox, read) + + status = "read" if read else "unread" + result = f"Successfully marked {len(success_ids)} email(s) as {status}" + if failed_ids: + result += f", failed to mark {len(failed_ids)} email(s): {', '.join(failed_ids)}" + return result + + +@mcp.tool( + description="Move one or more emails to a different mailbox/folder. Common destinations: 'Archive', 'Trash', 'Spam'. Use list_emails_metadata first to get the email_id." +) +async def move_emails( + account_name: Annotated[str, Field(description="The name of the email account.")], + email_ids: Annotated[ + list[str], + Field(description="List of email_id to move (obtained from list_emails_metadata)."), + ], + destination_mailbox: Annotated[ + str, + Field(description="Target mailbox name (e.g., 'Archive', 'Trash', 'Spam', '[Gmail]/All Mail')."), + ], + source_mailbox: Annotated[str, Field(default="INBOX", description="Source mailbox.")] = "INBOX", +) -> str: + handler = dispatch_handler(account_name) + moved_ids, failed_ids = await handler.move_emails(email_ids, destination_mailbox, source_mailbox) + + result = f"Successfully moved {len(moved_ids)} email(s) to '{destination_mailbox}'" + if failed_ids: + result += f", failed to move {len(failed_ids)} email(s): {', '.join(failed_ids)}" + return result + + @mcp.tool( description="Download an email attachment and save it to the specified path. This feature must be explicitly enabled in settings (enable_attachment_download=true) due to security considerations.", ) diff --git a/mcp_email_server/emails/__init__.py b/mcp_email_server/emails/__init__.py index 8284b83..e9eef11 100644 --- a/mcp_email_server/emails/__init__.py +++ b/mcp_email_server/emails/__init__.py @@ -79,12 +79,70 @@ async def send_email( references: Space-separated Message-IDs for the thread chain. """ + @abc.abstractmethod + async def list_mailboxes(self) -> list[dict]: + """ + List all mailboxes (folders) in the email account. + + Returns: + List of dictionaries with mailbox info (name, flags, delimiter). + """ + + @abc.abstractmethod + async def search_emails( + self, + query: str, + mailbox: str = "INBOX", + search_in: str = "all", + page: int = 1, + page_size: int = 20, + ) -> dict: + """ + Search emails using server-side IMAP SEARCH. + + Args: + query: Text to search for. + mailbox: Mailbox to search in (default: "INBOX"). + search_in: Where to search - "all", "subject", "body", "from". + page: Page number (starting from 1). + page_size: Number of results per page. + + Returns: + Dictionary with query, total, page, and emails list. + """ + @abc.abstractmethod async def delete_emails(self, email_ids: list[str], mailbox: str = "INBOX") -> tuple[list[str], list[str]]: """ Delete emails by their IDs. Returns (deleted_ids, failed_ids) """ + @abc.abstractmethod + async def mark_emails_as_read( + self, email_ids: list[str], mailbox: str = "INBOX", read: bool = True + ) -> tuple[list[str], list[str]]: + """ + Mark emails as read or unread. Returns (success_ids, failed_ids) + + Args: + email_ids: List of email IDs to mark. + mailbox: The mailbox containing the emails (default: "INBOX"). + read: True to mark as read, False to mark as unread. + """ + + @abc.abstractmethod + async def move_emails( + self, email_ids: list[str], destination_mailbox: str, source_mailbox: str = "INBOX" + ) -> tuple[list[str], list[str]]: + """ + Move emails to another mailbox. Returns (moved_ids, failed_ids) + + Args: + email_ids: List of email IDs to move. + destination_mailbox: Target mailbox name (e.g., "Archive", "Trash"). + source_mailbox: Source mailbox (default: "INBOX"). + """ + @abc.abstractmethod async def download_attachment( self, diff --git a/mcp_email_server/emails/classic.py b/mcp_email_server/emails/classic.py index 618c959..8f07a05 100644 --- a/mcp_email_server/emails/classic.py +++ b/mcp_email_server/emails/classic.py @@ -104,6 +104,9 @@ def __init__(self, email_server: EmailServer, sender: str | None = None): self.smtp_start_tls = self.email_server.start_ssl self.smtp_verify_ssl = self.email_server.verify_ssl + # Cache for last search total to avoid repeated searches + self._last_search_total = None + def _get_smtp_ssl_context(self) -> ssl.SSLContext | None: """Get SSL context for SMTP connections based on verify_ssl setting.""" return _create_smtp_ssl_context(self.smtp_verify_ssl) @@ -230,6 +233,39 @@ def _strip_html(html: str) -> str: "attachments": attachments, } + @staticmethod + def _parse_search_response(messages: list) -> list[bytes]: + """Parse UIDs from IMAP SEARCH response. + + IMAP SEARCH responses can include: + - Actual UIDs as space-separated numbers + - Status messages like "SEARCH completed (took 5 ms)" + + This method filters out status messages and returns only valid UIDs. + Status messages are identified by containing non-numeric words like + "SEARCH", "completed", "took", etc. + """ + if not messages or not messages[0]: + return [] + + response = messages[0] + response_str = response.decode("utf-8", errors="replace") if isinstance(response, bytes) else str(response) + + # Check if this looks like a status message rather than UIDs + # Status messages contain keywords like "SEARCH", "completed", "took" + status_keywords = ["SEARCH", "completed", "took", "OK", "BAD", "NO"] + if any(keyword in response_str for keyword in status_keywords): + return [] + + # Split and filter to only numeric values + parts = response_str.split() + email_ids = [] + for part in parts: + if part.isdigit(): + email_ids.append(part.encode() if isinstance(part, str) else part) + + return email_ids + @staticmethod def _build_search_criteria( before: datetime | None = None, @@ -391,6 +427,150 @@ async def _batch_fetch_headers( return results + async def list_mailboxes(self) -> list[dict[str, Any]]: + """List all mailboxes (folders) in the email account. + + Returns a list of dictionaries with mailbox information: + - name: The mailbox name + - flags: List of flags (e.g., \\Noselect, \\HasChildren) + - delimiter: The hierarchy delimiter (usually "/" or ".") + """ + imap = self.imap_class(self.email_server.host, self.email_server.port) + mailboxes = [] + + try: + await imap._client_task + await imap.wait_hello_from_server() + await imap.login(self.email_server.user_name, self.email_server.password) + await _send_imap_id(imap) + + # List all mailboxes + _, data = await imap.list('""', "*") + + for item in data: + if not isinstance(item, bytes): + continue + + # Parse LIST response: (flags) "delimiter" "name" + # Example: (\HasNoChildren) "/" "Archive" + item_str = item.decode("utf-8", errors="replace") + + # Skip status messages + if "LIST completed" in item_str or item_str.startswith("OK"): + continue + + # Parse the response + import re + + match = re.match(r'\(([^)]*)\)\s+"([^"]+)"\s+"?([^"]+)"?', item_str) + if match: + flags_str, delimiter, name = match.groups() + flags = [f.strip() for f in flags_str.split() if f.strip()] + # Remove trailing quote if present + name = name.rstrip('"') + mailboxes.append({ + "name": name, + "flags": flags, + "delimiter": delimiter, + }) + + finally: + try: + await imap.logout() + except Exception as e: + logger.info(f"Error during logout: {e}") + + return mailboxes + + async def search_emails( + self, + query: str, + mailbox: str = "INBOX", + search_in: str = "all", + page: int = 1, + page_size: int = 20, + ) -> dict[str, Any]: + """Search emails using server-side IMAP SEARCH. + + Args: + query: Text to search for. + mailbox: Mailbox to search in (default: "INBOX"). + search_in: Where to search - "all" (TEXT), "subject", "body", "from". + page: Page number (starting from 1). + page_size: Number of results per page. + + Returns: + Dictionary with query, total count, page, and list of email metadata. + """ + imap = self.imap_class(self.email_server.host, self.email_server.port) + + try: + await imap._client_task + await imap.wait_hello_from_server() + await imap.login(self.email_server.user_name, self.email_server.password) + await _send_imap_id(imap) + await imap.select(_quote_mailbox(mailbox)) + + # Build IMAP search criteria based on search_in parameter + if search_in == "subject": + search_criteria = ["SUBJECT", query] + elif search_in == "body": + search_criteria = ["BODY", query] + elif search_in == "from": + search_criteria = ["FROM", query] + else: # "all" - searches in headers and body + search_criteria = ["TEXT", query] + + logger.info(f"Search: {search_criteria} in {mailbox}") + + # Execute server-side search + _, messages = await imap.uid_search(*search_criteria) + email_ids = self._parse_search_response(messages) + + total = len(email_ids) + logger.info(f"Search found {total} matching emails") + + if not email_ids: + return { + "query": query, + "search_in": search_in, + "mailbox": mailbox, + "total": 0, + "page": page, + "page_size": page_size, + "emails": [], + } + + # Paginate (most recent first - from end of UID list) + start_idx = max(0, total - (page * page_size)) + end_idx = total - ((page - 1) * page_size) + page_ids = email_ids[start_idx:end_idx] + page_ids = list(reversed([uid.decode() if isinstance(uid, bytes) else uid for uid in page_ids])) + + # Fetch headers for the page + metadata_by_uid = await self._batch_fetch_headers(imap, page_ids) + + emails = [] + for uid in page_ids: + if uid in metadata_by_uid: + emails.append(metadata_by_uid[uid]) + + return { + "query": query, + "search_in": search_in, + "mailbox": mailbox, + "total": total, + "page": page, + "page_size": page_size, + "emails": emails, + } + + finally: + try: + await imap.logout() + except Exception as e: + logger.info(f"Error during logout: {e}") + async def get_email_count( self, before: datetime | None = None, @@ -426,7 +606,11 @@ async def get_email_count( logger.info(f"Count: Search criteria: {search_criteria}") # Search for messages and count them - use UID SEARCH for consistency _, messages = await imap.uid_search(*search_criteria) - return len(messages[0].split()) + # Parse UIDs from SEARCH response + # Valid responses contain UIDs as space-separated numbers + # Status messages like "SEARCH completed (took 5 ms)" should be ignored + email_ids = self._parse_search_response(messages) + return len(email_ids) finally: # Ensure we logout properly try: @@ -475,41 +659,47 @@ async def get_emails_metadata_stream( # Search for messages - use UID SEARCH for better compatibility _, messages = await imap.uid_search(*search_criteria) - # Handle empty or None responses - if not messages or not messages[0]: - logger.warning("No messages returned from search") - return - - email_ids = messages[0].split() - logger.info(f"Found {len(email_ids)} email IDs") + # Parse UIDs from SEARCH response + email_ids = self._parse_search_response(messages) - # Phase 1: Batch fetch INTERNALDATE for sorting (parallel chunks) - fetch_dates_start = time.perf_counter() - uid_dates = await self._batch_fetch_dates(imap, email_ids) - fetch_dates_elapsed = time.perf_counter() - fetch_dates_start - - # Sort by INTERNALDATE - sorted_uids = sorted(uid_dates.items(), key=lambda x: x[1], reverse=(order == "desc")) + if not email_ids: + logger.info("No matching emails found") + return - # Paginate - start = (page - 1) * page_size - page_uids = [uid for uid, _ in sorted_uids[start : start + page_size]] + total = len(email_ids) + logger.info(f"Found {total} email IDs") + + # Cache the search result for this request (to avoid duplicate search in get_email_count) + self._last_search_total = total + + # OPTIMIZED: Use UID ordering directly instead of fetching all dates + # UIDs are strictly ascending as messages are added to the mailbox + # This avoids fetching INTERNALDATE for potentially thousands of emails + if order == "desc": + # For descending: take from the end (most recent in this folder) + start_idx = max(0, total - (page * page_size)) + end_idx = total - ((page - 1) * page_size) + page_uids = email_ids[start_idx:end_idx] + # Reverse to get most recent first + page_uids = list(reversed([uid.decode() if isinstance(uid, bytes) else uid for uid in page_uids])) + else: + # For ascending: take from the beginning + start_idx = (page - 1) * page_size + end_idx = min(start_idx + page_size, total) + page_uids = [uid.decode() if isinstance(uid, bytes) else uid for uid in email_ids[start_idx:end_idx]] if not page_uids: - logger.info(f"Phase 1 (dates): {len(uid_dates)} UIDs in {fetch_dates_elapsed:.2f}s, page {page} empty") + logger.info(f"Page {page} is empty (total: {total})") return - # Phase 2: Batch fetch headers for requested page only - fetch_headers_start = time.perf_counter() + # Fetch headers only for the requested page + fetch_start = time.perf_counter() metadata_by_uid = await self._batch_fetch_headers(imap, page_uids) - fetch_headers_elapsed = time.perf_counter() - fetch_headers_start + fetch_elapsed = time.perf_counter() - fetch_start - logger.info( - f"Fetched page {page}: {fetch_dates_elapsed:.2f}s dates ({len(uid_dates)} UIDs), " - f"{fetch_headers_elapsed:.2f}s headers ({len(page_uids)} UIDs)" - ) + logger.info(f"Fetched page {page}: {fetch_elapsed:.2f}s for {len(page_uids)} emails (total: {total})") - # Yield in sorted order + # Yield in page order for uid in page_uids: if uid in metadata_by_uid: yield metadata_by_uid[uid] @@ -956,6 +1146,92 @@ async def delete_emails(self, email_ids: list[str], mailbox: str = "INBOX") -> t return deleted_ids, failed_ids + async def mark_emails_as_read( + self, email_ids: list[str], mailbox: str = "INBOX", read: bool = True + ) -> tuple[list[str], list[str]]: + """Mark emails as read or unread by their UIDs. Returns (success_ids, failed_ids).""" + imap = self.imap_class(self.email_server.host, self.email_server.port) + success_ids = [] + failed_ids = [] + + try: + await imap._client_task + await imap.wait_hello_from_server() + await imap.login(self.email_server.user_name, self.email_server.password) + await _send_imap_id(imap) + await imap.select(_quote_mailbox(mailbox)) + + flag_action = "+FLAGS" if read else "-FLAGS" + for email_id in email_ids: + try: + await imap.uid("store", email_id, flag_action, r"(\Seen)") + success_ids.append(email_id) + except Exception as e: + logger.error(f"Failed to mark email {email_id} as {'read' if read else 'unread'}: {e}") + failed_ids.append(email_id) + finally: + try: + await imap.logout() + except Exception as e: + logger.info(f"Error during logout: {e}") + + return success_ids, failed_ids + + async def move_emails( + self, email_ids: list[str], destination_mailbox: str, source_mailbox: str = "INBOX" + ) -> tuple[list[str], list[str]]: + """Move emails to another mailbox. Returns (moved_ids, failed_ids). + + Uses IMAP MOVE command (RFC 6851) if supported, otherwise falls back to COPY + DELETE. + """ + imap = self.imap_class(self.email_server.host, self.email_server.port) + moved_ids = [] + failed_ids = [] + + try: + await imap._client_task + await imap.wait_hello_from_server() + await imap.login(self.email_server.user_name, self.email_server.password) + await _send_imap_id(imap) + await imap.select(_quote_mailbox(source_mailbox)) + + # Check if MOVE is supported (RFC 6851) via protocol capabilities + has_move = hasattr(imap.protocol, "capabilities") and b"MOVE" in imap.protocol.capabilities + + for email_id in email_ids: + try: + if has_move: + # Use MOVE command directly + result = await imap.uid("move", email_id, _quote_mailbox(destination_mailbox)) + if result[0] == "OK": + moved_ids.append(email_id) + else: + logger.error(f"MOVE failed for {email_id}: {result}") + failed_ids.append(email_id) + else: + # Fallback: COPY then DELETE + copy_result = await imap.uid("copy", email_id, _quote_mailbox(destination_mailbox)) + if copy_result[0] == "OK": + await imap.uid("store", email_id, "+FLAGS", r"(\Deleted)") + moved_ids.append(email_id) + else: + logger.error(f"COPY failed for {email_id}: {copy_result}") + failed_ids.append(email_id) + except Exception as e: + logger.error(f"Failed to move email {email_id}: {e}") + failed_ids.append(email_id) + + # Expunge deleted messages (only needed for COPY+DELETE fallback) + if moved_ids and not has_move: + await imap.expunge() + finally: + try: + await imap.logout() + except Exception as e: + logger.info(f"Error during logout: {e}") + + return moved_ids, failed_ids + class ClassicEmailHandler(EmailHandler): def __init__(self, email_settings: EmailSettings): @@ -968,6 +1244,21 @@ def __init__(self, email_settings: EmailSettings): self.save_to_sent = email_settings.save_to_sent self.sent_folder_name = email_settings.sent_folder_name + async def list_mailboxes(self) -> list[dict]: + """List all mailboxes (folders) in the email account.""" + return await self.incoming_client.list_mailboxes() + + async def search_emails( + self, + query: str, + mailbox: str = "INBOX", + search_in: str = "all", + page: int = 1, + page_size: int = 20, + ) -> dict: + """Search emails using server-side IMAP SEARCH.""" + return await self.incoming_client.search_emails(query, mailbox, search_in, page, page_size) + async def get_emails_metadata( self, page: int = 1, @@ -983,6 +1274,25 @@ async def get_emails_metadata( flagged: bool | None = None, answered: bool | None = None, ) -> EmailMetadataPageResponse: + # Require at least one filter to prevent expensive "ALL" searches on large mailboxes + has_filter = any([ + before, + since, + subject, + from_address, + to_address, + seen is not None, + flagged is not None, + answered is not None, + ]) + if not has_filter: + msg = ( + "At least one filter is required to prevent expensive searches on large mailboxes. " + "Recommended: combine a date range (since/before) with optional text filters (subject/from/to). " + "Example: since=datetime(2026, 1, 1) or subject='work' + since=datetime(2025, 1, 1)" + ) + raise ValueError(msg) + emails = [] async for email_data in self.incoming_client.get_emails_metadata_stream( page, @@ -999,17 +1309,10 @@ async def get_emails_metadata( answered, ): emails.append(EmailMetadata.from_email(email_data)) - total = await self.incoming_client.get_email_count( - before, - since, - subject, - from_address=from_address, - to_address=to_address, - mailbox=mailbox, - seen=seen, - flagged=flagged, - answered=answered, - ) + + # Use the cached total from the stream search instead of doing a separate search + total = self.incoming_client._last_search_total or 0 + return EmailMetadataPageResponse( page=page, page_size=page_size, @@ -1085,6 +1388,18 @@ async def delete_emails(self, email_ids: list[str], mailbox: str = "INBOX") -> t """Delete emails by their UIDs. Returns (deleted_ids, failed_ids).""" return await self.incoming_client.delete_emails(email_ids, mailbox) + async def mark_emails_as_read( + self, email_ids: list[str], mailbox: str = "INBOX", read: bool = True + ) -> tuple[list[str], list[str]]: + """Mark emails as read or unread. Returns (success_ids, failed_ids).""" + return await self.incoming_client.mark_emails_as_read(email_ids, mailbox, read) + + async def move_emails( + self, email_ids: list[str], destination_mailbox: str, source_mailbox: str = "INBOX" + ) -> tuple[list[str], list[str]]: + """Move emails to another mailbox. Returns (moved_ids, failed_ids).""" + return await self.incoming_client.move_emails(email_ids, destination_mailbox, source_mailbox) + async def download_attachment( self, email_id: str, diff --git a/mkdocs.yml b/mkdocs.yml index d22bfb2..6c50555 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -9,6 +9,7 @@ copyright: Maintained by ai-zerolab. nav: - Home: index.md + - Email Search Performance: EMAIL_SEARCH_PERFORMANCE.md - Modules: modules.md plugins: - search diff --git a/tests/test_classic_handler.py b/tests/test_classic_handler.py index 3245a6a..9f44aff 100644 --- a/tests/test_classic_handler.py +++ b/tests/test_classic_handler.py @@ -74,53 +74,41 @@ async def test_get_emails(self, classic_handler): mock_stream = AsyncMock() mock_stream.__aiter__.return_value = [email_data] - # Mock the get_email_count method - mock_count = AsyncMock(return_value=1) - # Apply the mocks with patch.object(classic_handler.incoming_client, "get_emails_metadata_stream", return_value=mock_stream): - with patch.object(classic_handler.incoming_client, "get_email_count", mock_count): - # Call the method - result = await classic_handler.get_emails_metadata( - page=1, - page_size=10, - before=now, - since=None, - subject="Test", - from_address="sender@example.com", - to_address=None, - ) - - # Verify the result - assert isinstance(result, EmailMetadataPageResponse) - assert result.page == 1 - assert result.page_size == 10 - assert result.before == now - assert result.since is None - assert result.subject == "Test" - assert len(result.emails) == 1 - assert isinstance(result.emails[0], EmailMetadata) - assert result.emails[0].subject == "Test Subject" - assert result.emails[0].sender == "sender@example.com" - assert result.emails[0].date == now - assert result.emails[0].attachments == [] - assert result.total == 1 - - # Verify the client methods were called correctly - classic_handler.incoming_client.get_emails_metadata_stream.assert_called_once_with( - 1, 10, now, None, "Test", "sender@example.com", None, "desc", "INBOX", None, None, None - ) - mock_count.assert_called_once_with( - now, - None, - "Test", - from_address="sender@example.com", - to_address=None, - mailbox="INBOX", - seen=None, - flagged=None, - answered=None, - ) + # Mock the _last_search_total to simulate the cache set by get_emails_metadata_stream + classic_handler.incoming_client._last_search_total = 1 + + # Call the method + result = await classic_handler.get_emails_metadata( + page=1, + page_size=10, + before=now, + since=None, + subject="Test", + from_address="sender@example.com", + to_address=None, + ) + + # Verify the result + assert isinstance(result, EmailMetadataPageResponse) + assert result.page == 1 + assert result.page_size == 10 + assert result.before == now + assert result.since is None + assert result.subject == "Test" + assert len(result.emails) == 1 + assert isinstance(result.emails[0], EmailMetadata) + assert result.emails[0].subject == "Test Subject" + assert result.emails[0].sender == "sender@example.com" + assert result.emails[0].date == now + assert result.emails[0].attachments == [] + assert result.total == 1 + + # Verify the client methods were called correctly + classic_handler.incoming_client.get_emails_metadata_stream.assert_called_once_with( + 1, 10, now, None, "Test", "sender@example.com", None, "desc", "INBOX", None, None, None + ) @pytest.mark.asyncio async def test_get_emails_with_mailbox(self, classic_handler): @@ -137,34 +125,41 @@ async def test_get_emails_with_mailbox(self, classic_handler): mock_stream = AsyncMock() mock_stream.__aiter__.return_value = [email_data] - mock_count = AsyncMock(return_value=1) with patch.object(classic_handler.incoming_client, "get_emails_metadata_stream", return_value=mock_stream): - with patch.object(classic_handler.incoming_client, "get_email_count", mock_count): - result = await classic_handler.get_emails_metadata( - page=1, - page_size=10, - mailbox="Sent", - ) - - assert isinstance(result, EmailMetadataPageResponse) - assert len(result.emails) == 1 - - # Verify mailbox parameter was passed correctly - classic_handler.incoming_client.get_emails_metadata_stream.assert_called_once_with( - 1, 10, None, None, None, None, None, "desc", "Sent", None, None, None - ) - mock_count.assert_called_once_with( - None, - None, - None, - from_address=None, - to_address=None, - mailbox="Sent", - seen=None, - flagged=None, - answered=None, - ) + # Mock the _last_search_total to simulate the cache set by get_emails_metadata_stream + classic_handler.incoming_client._last_search_total = 1 + + # Add a before filter to satisfy the new validation requirement + result = await classic_handler.get_emails_metadata( + page=1, + page_size=10, + before=now, + mailbox="Sent", + ) + + assert isinstance(result, EmailMetadataPageResponse) + assert len(result.emails) == 1 + + # Verify mailbox parameter was passed correctly + classic_handler.incoming_client.get_emails_metadata_stream.assert_called_once_with( + 1, 10, now, None, None, None, None, "desc", "Sent", None, None, None + ) + + @pytest.mark.asyncio + async def test_get_emails_requires_filter(self, classic_handler): + """Test that get_emails_metadata requires at least one filter.""" + # Call the method without any filters + with pytest.raises(ValueError) as exc_info: + await classic_handler.get_emails_metadata( + page=1, + page_size=10, + ) + + # Verify the error message + assert "At least one filter is required" in str(exc_info.value) + assert "since" in str(exc_info.value).lower() + assert "date" in str(exc_info.value).lower() @pytest.mark.asyncio async def test_send_email(self, classic_handler): @@ -365,6 +360,110 @@ async def test_get_emails_content_includes_message_id(self, classic_handler): # Verify the client method was called correctly mock_get_body.assert_called_once_with("123", "INBOX") + @pytest.mark.asyncio + async def test_list_mailboxes(self, classic_handler): + """Test list_mailboxes method.""" + mock_mailboxes = [ + {"name": "INBOX", "flags": ["\\HasNoChildren"], "delimiter": "/"}, + {"name": "Archive", "flags": ["\\HasNoChildren"], "delimiter": "/"}, + {"name": "Sent", "flags": ["\\Sent"], "delimiter": "/"}, + ] + mock_list = AsyncMock(return_value=mock_mailboxes) + + with patch.object(classic_handler.incoming_client, "list_mailboxes", mock_list): + result = await classic_handler.list_mailboxes() + + assert len(result) == 3 + assert result[0]["name"] == "INBOX" + assert result[1]["name"] == "Archive" + mock_list.assert_called_once() + + @pytest.mark.asyncio + async def test_search_emails(self, classic_handler): + """Test search_emails method.""" + mock_result = { + "query": "test", + "total": 2, + "page": 1, + "emails": [ + {"email_id": "123", "subject": "Test email 1"}, + {"email_id": "456", "subject": "Test email 2"}, + ], + } + mock_search = AsyncMock(return_value=mock_result) + + with patch.object(classic_handler.incoming_client, "search_emails", mock_search): + result = await classic_handler.search_emails("test", "INBOX", "all", 1, 20) + + assert result["query"] == "test" + assert result["total"] == 2 + assert len(result["emails"]) == 2 + mock_search.assert_called_once_with("test", "INBOX", "all", 1, 20) + + @pytest.mark.asyncio + async def test_mark_emails_as_read(self, classic_handler): + """Test mark_emails_as_read method.""" + mock_mark = AsyncMock(return_value=(["123", "456"], [])) + + with patch.object(classic_handler.incoming_client, "mark_emails_as_read", mock_mark): + success_ids, failed_ids = await classic_handler.mark_emails_as_read( + email_ids=["123", "456"], + mailbox="INBOX", + read=True, + ) + + assert success_ids == ["123", "456"] + assert failed_ids == [] + mock_mark.assert_called_once_with(["123", "456"], "INBOX", True) + + @pytest.mark.asyncio + async def test_mark_emails_as_unread(self, classic_handler): + """Test mark_emails_as_read method with read=False.""" + mock_mark = AsyncMock(return_value=(["789"], [])) + + with patch.object(classic_handler.incoming_client, "mark_emails_as_read", mock_mark): + success_ids, failed_ids = await classic_handler.mark_emails_as_read( + email_ids=["789"], + mailbox="INBOX", + read=False, + ) + + assert success_ids == ["789"] + assert failed_ids == [] + mock_mark.assert_called_once_with(["789"], "INBOX", False) + + @pytest.mark.asyncio + async def test_move_emails(self, classic_handler): + """Test move_emails method.""" + mock_move = AsyncMock(return_value=(["123", "456"], [])) + + with patch.object(classic_handler.incoming_client, "move_emails", mock_move): + moved_ids, failed_ids = await classic_handler.move_emails( + email_ids=["123", "456"], + destination_mailbox="Archive", + source_mailbox="INBOX", + ) + + assert moved_ids == ["123", "456"] + assert failed_ids == [] + mock_move.assert_called_once_with(["123", "456"], "Archive", "INBOX") + + @pytest.mark.asyncio + async def test_move_emails_with_failures(self, classic_handler): + """Test move_emails method with some failures.""" + mock_move = AsyncMock(return_value=(["123"], ["456"])) + + with patch.object(classic_handler.incoming_client, "move_emails", mock_move): + moved_ids, failed_ids = await classic_handler.move_emails( + email_ids=["123", "456"], + destination_mailbox="Trash", + source_mailbox="INBOX", + ) + + assert moved_ids == ["123"] + assert failed_ids == ["456"] + mock_move.assert_called_once_with(["123", "456"], "Trash", "INBOX") + class TestEmailClientBatchMethods: """Test batch fetch methods for performance optimization.""" diff --git a/tests/test_email_client.py b/tests/test_email_client.py index bf1f02e..263b161 100644 --- a/tests/test_email_client.py +++ b/tests/test_email_client.py @@ -192,7 +192,11 @@ def test_build_search_criteria(self): @pytest.mark.asyncio async def test_get_emails_stream(self, email_client): - """Test getting emails stream returns sorted, paginated results.""" + """Test getting emails stream returns paginated results using UID ordering. + + With the optimized pagination, we use UID ordering directly (UIDs are + ascending by date added to mailbox) instead of fetching all dates first. + """ mock_imap = AsyncMock() mock_imap._client_task = asyncio.Future() mock_imap._client_task.set_result(None) @@ -202,19 +206,15 @@ async def test_get_emails_stream(self, email_client): mock_imap.uid_search = AsyncMock(return_value=(None, [b"1 2 3"])) mock_imap.logout = AsyncMock() - # Mock at the helper level - test behavior, not implementation - mock_dates = { - "1": datetime(2024, 1, 1, tzinfo=timezone.utc), - "2": datetime(2024, 1, 2, tzinfo=timezone.utc), - "3": datetime(2024, 1, 3, tzinfo=timezone.utc), - } + # Mock metadata returned by _batch_fetch_headers + # For desc order, UIDs are reversed: 3, 2, 1 mock_metadata = { - "1": { - "email_id": "1", - "subject": "Subject 1", - "from": "a@test.com", + "3": { + "email_id": "3", + "subject": "Subject 3", + "from": "c@test.com", "to": [], - "date": datetime(2024, 1, 1, tzinfo=timezone.utc), + "date": datetime(2024, 1, 3, tzinfo=timezone.utc), "attachments": [], }, "2": { @@ -225,38 +225,33 @@ async def test_get_emails_stream(self, email_client): "date": datetime(2024, 1, 2, tzinfo=timezone.utc), "attachments": [], }, - "3": { - "email_id": "3", - "subject": "Subject 3", - "from": "c@test.com", + "1": { + "email_id": "1", + "subject": "Subject 1", + "from": "a@test.com", "to": [], - "date": datetime(2024, 1, 3, tzinfo=timezone.utc), + "date": datetime(2024, 1, 1, tzinfo=timezone.utc), "attachments": [], }, } with patch.object(email_client, "imap_class", return_value=mock_imap): - with patch.object(email_client, "_batch_fetch_dates", return_value=mock_dates) as mock_fetch_dates: - with patch.object( - email_client, "_batch_fetch_headers", return_value=mock_metadata - ) as mock_fetch_headers: - emails = [] - async for email_data in email_client.get_emails_metadata_stream(page=1, page_size=10): - emails.append(email_data) - - # Behavior: returns emails sorted by date desc (newest first) - assert len(emails) == 3 - assert emails[0]["subject"] == "Subject 3" - assert emails[1]["subject"] == "Subject 2" - assert emails[2]["subject"] == "Subject 1" - - mock_imap.login.assert_called_once() - mock_imap.logout.assert_called_once() - - # Verify helpers called with correct arguments - mock_fetch_dates.assert_called_once_with(mock_imap, [b"1", b"2", b"3"]) - # Headers fetched for page UIDs in sorted order (desc by date) - mock_fetch_headers.assert_called_once_with(mock_imap, ["3", "2", "1"]) + with patch.object(email_client, "_batch_fetch_headers", return_value=mock_metadata) as mock_fetch_headers: + emails = [] + async for email_data in email_client.get_emails_metadata_stream(page=1, page_size=10): + emails.append(email_data) + + # Behavior: returns emails in UID desc order (newest UIDs first) + assert len(emails) == 3 + assert emails[0]["subject"] == "Subject 3" + assert emails[1]["subject"] == "Subject 2" + assert emails[2]["subject"] == "Subject 1" + + mock_imap.login.assert_called_once() + mock_imap.logout.assert_called_once() + + # Headers fetched for page UIDs in desc order + mock_fetch_headers.assert_called_once_with(mock_imap, ["3", "2", "1"]) @pytest.mark.asyncio async def test_get_email_count(self, email_client):