diff --git a/mcp_email_server/app.py b/mcp_email_server/app.py index de9f95a..de17a06 100644 --- a/mcp_email_server/app.py +++ b/mcp_email_server/app.py @@ -32,6 +32,35 @@ async def list_available_accounts() -> list[AccountAttributes]: return [account.masked() for account in settings.get_accounts()] +@mcp.tool( + description="List all mailboxes (folders) in an email account. Use this to discover available folders like Archive, Sent, Trash, etc." +) +async def list_mailboxes( + account_name: Annotated[str, Field(description="The name of the email account.")], +) -> list[dict]: + handler = dispatch_handler(account_name) + return await handler.list_mailboxes() + + +@mcp.tool( + description="Search emails using server-side IMAP search. Fast even with thousands of emails. " + "Searches in subject, body, and headers by default." +) +async def search_emails( + account_name: Annotated[str, Field(description="The name of the email account.")], + query: Annotated[str, Field(description="Text to search for in emails.")], + mailbox: Annotated[str, Field(default="INBOX", description="Mailbox to search in.")] = "INBOX", + search_in: Annotated[ + Literal["all", "subject", "body", "from"], + Field(default="all", description="Where to search: 'all' (headers+body), 'subject', 'body', or 'from'."), + ] = "all", + page: Annotated[int, Field(default=1, description="Page number (starting from 1).")] = 1, + page_size: Annotated[int, Field(default=20, description="Number of results per page.")] = 20, +) -> dict: + handler = dispatch_handler(account_name) + return await handler.search_emails(query, mailbox, search_in, page, page_size) + + @mcp.tool(description="Add a new email account configuration to the settings.") async def add_email_account(email: EmailSettings) -> str: settings = get_settings() @@ -196,6 +225,50 @@ async def delete_emails( return result +@mcp.tool(description="Mark one or more emails as read or unread. Use list_emails_metadata first to get the email_id.") +async def mark_emails_as_read( + account_name: Annotated[str, Field(description="The name of the email account.")], + email_ids: Annotated[ + list[str], + Field(description="List of email_id to mark (obtained from list_emails_metadata)."), + ], + mailbox: Annotated[str, Field(default="INBOX", description="The mailbox containing the emails.")] = "INBOX", + read: Annotated[bool, Field(default=True, description="True to mark as read, False to mark as unread.")] = True, +) -> str: + handler = dispatch_handler(account_name) + success_ids, failed_ids = await handler.mark_emails_as_read(email_ids, mailbox, read) + + status = "read" if read else "unread" + result = f"Successfully marked {len(success_ids)} email(s) as {status}" + if failed_ids: + result += f", failed to mark {len(failed_ids)} email(s): {', '.join(failed_ids)}" + return result + + +@mcp.tool( + description="Move one or more emails to a different mailbox/folder. Common destinations: 'Archive', 'Trash', 'Spam'. Use list_emails_metadata first to get the email_id." +) +async def move_emails( + account_name: Annotated[str, Field(description="The name of the email account.")], + email_ids: Annotated[ + list[str], + Field(description="List of email_id to move (obtained from list_emails_metadata)."), + ], + destination_mailbox: Annotated[ + str, + Field(description="Target mailbox name (e.g., 'Archive', 'Trash', 'Spam', '[Gmail]/All Mail')."), + ], + source_mailbox: Annotated[str, Field(default="INBOX", description="Source mailbox.")] = "INBOX", +) -> str: + handler = dispatch_handler(account_name) + moved_ids, failed_ids = await handler.move_emails(email_ids, destination_mailbox, source_mailbox) + + result = f"Successfully moved {len(moved_ids)} email(s) to '{destination_mailbox}'" + if failed_ids: + result += f", failed to move {len(failed_ids)} email(s): {', '.join(failed_ids)}" + return result + + @mcp.tool( description="Download an email attachment and save it to the specified path. This feature must be explicitly enabled in settings (enable_attachment_download=true) due to security considerations.", ) diff --git a/mcp_email_server/emails/__init__.py b/mcp_email_server/emails/__init__.py index 251e9bc..cb7af4f 100644 --- a/mcp_email_server/emails/__init__.py +++ b/mcp_email_server/emails/__init__.py @@ -66,12 +66,70 @@ async def send_email( Send email """ + @abc.abstractmethod + async def list_mailboxes(self) -> list[dict]: + """ + List all mailboxes (folders) in the email account. + + Returns: + List of dictionaries with mailbox info (name, flags, delimiter). + """ + + @abc.abstractmethod + async def search_emails( + self, + query: str, + mailbox: str = "INBOX", + search_in: str = "all", + page: int = 1, + page_size: int = 20, + ) -> dict: + """ + Search emails using server-side IMAP SEARCH. + + Args: + query: Text to search for. + mailbox: Mailbox to search in (default: "INBOX"). + search_in: Where to search - "all", "subject", "body", "from". + page: Page number (starting from 1). + page_size: Number of results per page. + + Returns: + Dictionary with query, total, page, and emails list. + """ + @abc.abstractmethod async def delete_emails(self, email_ids: list[str], mailbox: str = "INBOX") -> tuple[list[str], list[str]]: """ Delete emails by their IDs. Returns (deleted_ids, failed_ids) """ + @abc.abstractmethod + async def mark_emails_as_read( + self, email_ids: list[str], mailbox: str = "INBOX", read: bool = True + ) -> tuple[list[str], list[str]]: + """ + Mark emails as read or unread. Returns (success_ids, failed_ids) + + Args: + email_ids: List of email IDs to mark. + mailbox: The mailbox containing the emails (default: "INBOX"). + read: True to mark as read, False to mark as unread. + """ + + @abc.abstractmethod + async def move_emails( + self, email_ids: list[str], destination_mailbox: str, source_mailbox: str = "INBOX" + ) -> tuple[list[str], list[str]]: + """ + Move emails to another mailbox. Returns (moved_ids, failed_ids) + + Args: + email_ids: List of email IDs to move. + destination_mailbox: Target mailbox name (e.g., "Archive", "Trash"). + source_mailbox: Source mailbox (default: "INBOX"). + """ + @abc.abstractmethod async def download_attachment( self, diff --git a/mcp_email_server/emails/classic.py b/mcp_email_server/emails/classic.py index 7c643cd..c2d4b30 100644 --- a/mcp_email_server/emails/classic.py +++ b/mcp_email_server/emails/classic.py @@ -187,6 +187,39 @@ def _parse_email_data(self, raw_email: bytes, email_id: str | None = None) -> di "attachments": attachments, } + @staticmethod + def _parse_search_response(messages: list) -> list[bytes]: + """Parse UIDs from IMAP SEARCH response. + + IMAP SEARCH responses can include: + - Actual UIDs as space-separated numbers + - Status messages like "SEARCH completed (took 5 ms)" + + This method filters out status messages and returns only valid UIDs. + Status messages are identified by containing non-numeric words like + "SEARCH", "completed", "took", etc. + """ + if not messages or not messages[0]: + return [] + + response = messages[0] + response_str = response.decode("utf-8", errors="replace") if isinstance(response, bytes) else str(response) + + # Check if this looks like a status message rather than UIDs + # Status messages contain keywords like "SEARCH", "completed", "took" + status_keywords = ["SEARCH", "completed", "took", "OK", "BAD", "NO"] + if any(keyword in response_str for keyword in status_keywords): + return [] + + # Split and filter to only numeric values + parts = response_str.split() + email_ids = [] + for part in parts: + if part.isdigit(): + email_ids.append(part.encode() if isinstance(part, str) else part) + + return email_ids + @staticmethod def _build_search_criteria( before: datetime | None = None, @@ -353,6 +386,150 @@ async def _batch_fetch_headers( return results + async def list_mailboxes(self) -> list[dict[str, Any]]: + """List all mailboxes (folders) in the email account. + + Returns a list of dictionaries with mailbox information: + - name: The mailbox name + - flags: List of flags (e.g., \\Noselect, \\HasChildren) + - delimiter: The hierarchy delimiter (usually "/" or ".") + """ + imap = self.imap_class(self.email_server.host, self.email_server.port) + mailboxes = [] + + try: + await imap._client_task + await imap.wait_hello_from_server() + await imap.login(self.email_server.user_name, self.email_server.password) + await _send_imap_id(imap) + + # List all mailboxes + _, data = await imap.list('""', "*") + + for item in data: + if not isinstance(item, bytes): + continue + + # Parse LIST response: (flags) "delimiter" "name" + # Example: (\HasNoChildren) "/" "Archive" + item_str = item.decode("utf-8", errors="replace") + + # Skip status messages + if "LIST completed" in item_str or item_str.startswith("OK"): + continue + + # Parse the response + import re + + match = re.match(r'\(([^)]*)\)\s+"([^"]+)"\s+"?([^"]+)"?', item_str) + if match: + flags_str, delimiter, name = match.groups() + flags = [f.strip() for f in flags_str.split() if f.strip()] + # Remove trailing quote if present + name = name.rstrip('"') + mailboxes.append({ + "name": name, + "flags": flags, + "delimiter": delimiter, + }) + + finally: + try: + await imap.logout() + except Exception as e: + logger.info(f"Error during logout: {e}") + + return mailboxes + + async def search_emails( + self, + query: str, + mailbox: str = "INBOX", + search_in: str = "all", + page: int = 1, + page_size: int = 20, + ) -> dict[str, Any]: + """Search emails using server-side IMAP SEARCH. + + Args: + query: Text to search for. + mailbox: Mailbox to search in (default: "INBOX"). + search_in: Where to search - "all" (TEXT), "subject", "body", "from". + page: Page number (starting from 1). + page_size: Number of results per page. + + Returns: + Dictionary with query, total count, page, and list of email metadata. + """ + imap = self.imap_class(self.email_server.host, self.email_server.port) + + try: + await imap._client_task + await imap.wait_hello_from_server() + await imap.login(self.email_server.user_name, self.email_server.password) + await _send_imap_id(imap) + await imap.select(_quote_mailbox(mailbox)) + + # Build IMAP search criteria based on search_in parameter + if search_in == "subject": + search_criteria = ["SUBJECT", query] + elif search_in == "body": + search_criteria = ["BODY", query] + elif search_in == "from": + search_criteria = ["FROM", query] + else: # "all" - searches in headers and body + search_criteria = ["TEXT", query] + + logger.info(f"Search: {search_criteria} in {mailbox}") + + # Execute server-side search + _, messages = await imap.uid_search(*search_criteria) + email_ids = self._parse_search_response(messages) + + total = len(email_ids) + logger.info(f"Search found {total} matching emails") + + if not email_ids: + return { + "query": query, + "search_in": search_in, + "mailbox": mailbox, + "total": 0, + "page": page, + "page_size": page_size, + "emails": [], + } + + # Paginate (most recent first - from end of UID list) + start_idx = max(0, total - (page * page_size)) + end_idx = total - ((page - 1) * page_size) + page_ids = email_ids[start_idx:end_idx] + page_ids = list(reversed([uid.decode() if isinstance(uid, bytes) else uid for uid in page_ids])) + + # Fetch headers for the page + metadata_by_uid = await self._batch_fetch_headers(imap, page_ids) + + emails = [] + for uid in page_ids: + if uid in metadata_by_uid: + emails.append(metadata_by_uid[uid]) + + return { + "query": query, + "search_in": search_in, + "mailbox": mailbox, + "total": total, + "page": page, + "page_size": page_size, + "emails": emails, + } + + finally: + try: + await imap.logout() + except Exception as e: + logger.info(f"Error during logout: {e}") + async def get_email_count( self, before: datetime | None = None, @@ -388,7 +565,11 @@ async def get_email_count( logger.info(f"Count: Search criteria: {search_criteria}") # Search for messages and count them - use UID SEARCH for consistency _, messages = await imap.uid_search(*search_criteria) - return len(messages[0].split()) + # Parse UIDs from SEARCH response + # Valid responses contain UIDs as space-separated numbers + # Status messages like "SEARCH completed (took 5 ms)" should be ignored + email_ids = self._parse_search_response(messages) + return len(email_ids) finally: # Ensure we logout properly try: @@ -437,41 +618,44 @@ async def get_emails_metadata_stream( # Search for messages - use UID SEARCH for better compatibility _, messages = await imap.uid_search(*search_criteria) - # Handle empty or None responses - if not messages or not messages[0]: - logger.warning("No messages returned from search") - return - - email_ids = messages[0].split() - logger.info(f"Found {len(email_ids)} email IDs") - - # Phase 1: Batch fetch INTERNALDATE for sorting (parallel chunks) - fetch_dates_start = time.perf_counter() - uid_dates = await self._batch_fetch_dates(imap, email_ids) - fetch_dates_elapsed = time.perf_counter() - fetch_dates_start + # Parse UIDs from SEARCH response + email_ids = self._parse_search_response(messages) - # Sort by INTERNALDATE - sorted_uids = sorted(uid_dates.items(), key=lambda x: x[1], reverse=(order == "desc")) + if not email_ids: + logger.info("No matching emails found") + return - # Paginate - start = (page - 1) * page_size - page_uids = [uid for uid, _ in sorted_uids[start : start + page_size]] + total = len(email_ids) + logger.info(f"Found {total} email IDs") + + # OPTIMIZED: Use UID ordering directly instead of fetching all dates + # UIDs are strictly ascending as messages are added to the mailbox + # This avoids fetching INTERNALDATE for potentially thousands of emails + if order == "desc": + # For descending: take from the end (most recent in this folder) + start_idx = max(0, total - (page * page_size)) + end_idx = total - ((page - 1) * page_size) + page_uids = email_ids[start_idx:end_idx] + # Reverse to get most recent first + page_uids = list(reversed([uid.decode() if isinstance(uid, bytes) else uid for uid in page_uids])) + else: + # For ascending: take from the beginning + start_idx = (page - 1) * page_size + end_idx = min(start_idx + page_size, total) + page_uids = [uid.decode() if isinstance(uid, bytes) else uid for uid in email_ids[start_idx:end_idx]] if not page_uids: - logger.info(f"Phase 1 (dates): {len(uid_dates)} UIDs in {fetch_dates_elapsed:.2f}s, page {page} empty") + logger.info(f"Page {page} is empty (total: {total})") return - # Phase 2: Batch fetch headers for requested page only - fetch_headers_start = time.perf_counter() + # Fetch headers only for the requested page + fetch_start = time.perf_counter() metadata_by_uid = await self._batch_fetch_headers(imap, page_uids) - fetch_headers_elapsed = time.perf_counter() - fetch_headers_start + fetch_elapsed = time.perf_counter() - fetch_start - logger.info( - f"Fetched page {page}: {fetch_dates_elapsed:.2f}s dates ({len(uid_dates)} UIDs), " - f"{fetch_headers_elapsed:.2f}s headers ({len(page_uids)} UIDs)" - ) + logger.info(f"Fetched page {page}: {fetch_elapsed:.2f}s for {len(page_uids)} emails (total: {total})") - # Yield in sorted order + # Yield in page order for uid in page_uids: if uid in metadata_by_uid: yield metadata_by_uid[uid] @@ -912,6 +1096,92 @@ async def delete_emails(self, email_ids: list[str], mailbox: str = "INBOX") -> t return deleted_ids, failed_ids + async def mark_emails_as_read( + self, email_ids: list[str], mailbox: str = "INBOX", read: bool = True + ) -> tuple[list[str], list[str]]: + """Mark emails as read or unread by their UIDs. Returns (success_ids, failed_ids).""" + imap = self.imap_class(self.email_server.host, self.email_server.port) + success_ids = [] + failed_ids = [] + + try: + await imap._client_task + await imap.wait_hello_from_server() + await imap.login(self.email_server.user_name, self.email_server.password) + await _send_imap_id(imap) + await imap.select(_quote_mailbox(mailbox)) + + flag_action = "+FLAGS" if read else "-FLAGS" + for email_id in email_ids: + try: + await imap.uid("store", email_id, flag_action, r"(\Seen)") + success_ids.append(email_id) + except Exception as e: + logger.error(f"Failed to mark email {email_id} as {'read' if read else 'unread'}: {e}") + failed_ids.append(email_id) + finally: + try: + await imap.logout() + except Exception as e: + logger.info(f"Error during logout: {e}") + + return success_ids, failed_ids + + async def move_emails( + self, email_ids: list[str], destination_mailbox: str, source_mailbox: str = "INBOX" + ) -> tuple[list[str], list[str]]: + """Move emails to another mailbox. Returns (moved_ids, failed_ids). + + Uses IMAP MOVE command (RFC 6851) if supported, otherwise falls back to COPY + DELETE. + """ + imap = self.imap_class(self.email_server.host, self.email_server.port) + moved_ids = [] + failed_ids = [] + + try: + await imap._client_task + await imap.wait_hello_from_server() + await imap.login(self.email_server.user_name, self.email_server.password) + await _send_imap_id(imap) + await imap.select(_quote_mailbox(source_mailbox)) + + # Check if MOVE is supported (RFC 6851) via protocol capabilities + has_move = hasattr(imap.protocol, "capabilities") and b"MOVE" in imap.protocol.capabilities + + for email_id in email_ids: + try: + if has_move: + # Use MOVE command directly + result = await imap.uid("move", email_id, _quote_mailbox(destination_mailbox)) + if result[0] == "OK": + moved_ids.append(email_id) + else: + logger.error(f"MOVE failed for {email_id}: {result}") + failed_ids.append(email_id) + else: + # Fallback: COPY then DELETE + copy_result = await imap.uid("copy", email_id, _quote_mailbox(destination_mailbox)) + if copy_result[0] == "OK": + await imap.uid("store", email_id, "+FLAGS", r"(\Deleted)") + moved_ids.append(email_id) + else: + logger.error(f"COPY failed for {email_id}: {copy_result}") + failed_ids.append(email_id) + except Exception as e: + logger.error(f"Failed to move email {email_id}: {e}") + failed_ids.append(email_id) + + # Expunge deleted messages (only needed for COPY+DELETE fallback) + if moved_ids and not has_move: + await imap.expunge() + finally: + try: + await imap.logout() + except Exception as e: + logger.info(f"Error during logout: {e}") + + return moved_ids, failed_ids + class ClassicEmailHandler(EmailHandler): def __init__(self, email_settings: EmailSettings): @@ -924,6 +1194,21 @@ def __init__(self, email_settings: EmailSettings): self.save_to_sent = email_settings.save_to_sent self.sent_folder_name = email_settings.sent_folder_name + async def list_mailboxes(self) -> list[dict]: + """List all mailboxes (folders) in the email account.""" + return await self.incoming_client.list_mailboxes() + + async def search_emails( + self, + query: str, + mailbox: str = "INBOX", + search_in: str = "all", + page: int = 1, + page_size: int = 20, + ) -> dict: + """Search emails using server-side IMAP SEARCH.""" + return await self.incoming_client.search_emails(query, mailbox, search_in, page, page_size) + async def get_emails_metadata( self, page: int = 1, @@ -1041,6 +1326,18 @@ async def delete_emails(self, email_ids: list[str], mailbox: str = "INBOX") -> t """Delete emails by their UIDs. Returns (deleted_ids, failed_ids).""" return await self.incoming_client.delete_emails(email_ids, mailbox) + async def mark_emails_as_read( + self, email_ids: list[str], mailbox: str = "INBOX", read: bool = True + ) -> tuple[list[str], list[str]]: + """Mark emails as read or unread. Returns (success_ids, failed_ids).""" + return await self.incoming_client.mark_emails_as_read(email_ids, mailbox, read) + + async def move_emails( + self, email_ids: list[str], destination_mailbox: str, source_mailbox: str = "INBOX" + ) -> tuple[list[str], list[str]]: + """Move emails to another mailbox. Returns (moved_ids, failed_ids).""" + return await self.incoming_client.move_emails(email_ids, destination_mailbox, source_mailbox) + async def download_attachment( self, email_id: str, diff --git a/tests/test_classic_handler.py b/tests/test_classic_handler.py index 3245a6a..6f294f9 100644 --- a/tests/test_classic_handler.py +++ b/tests/test_classic_handler.py @@ -365,6 +365,110 @@ async def test_get_emails_content_includes_message_id(self, classic_handler): # Verify the client method was called correctly mock_get_body.assert_called_once_with("123", "INBOX") + @pytest.mark.asyncio + async def test_list_mailboxes(self, classic_handler): + """Test list_mailboxes method.""" + mock_mailboxes = [ + {"name": "INBOX", "flags": ["\\HasNoChildren"], "delimiter": "/"}, + {"name": "Archive", "flags": ["\\HasNoChildren"], "delimiter": "/"}, + {"name": "Sent", "flags": ["\\Sent"], "delimiter": "/"}, + ] + mock_list = AsyncMock(return_value=mock_mailboxes) + + with patch.object(classic_handler.incoming_client, "list_mailboxes", mock_list): + result = await classic_handler.list_mailboxes() + + assert len(result) == 3 + assert result[0]["name"] == "INBOX" + assert result[1]["name"] == "Archive" + mock_list.assert_called_once() + + @pytest.mark.asyncio + async def test_search_emails(self, classic_handler): + """Test search_emails method.""" + mock_result = { + "query": "test", + "total": 2, + "page": 1, + "emails": [ + {"email_id": "123", "subject": "Test email 1"}, + {"email_id": "456", "subject": "Test email 2"}, + ], + } + mock_search = AsyncMock(return_value=mock_result) + + with patch.object(classic_handler.incoming_client, "search_emails", mock_search): + result = await classic_handler.search_emails("test", "INBOX", "all", 1, 20) + + assert result["query"] == "test" + assert result["total"] == 2 + assert len(result["emails"]) == 2 + mock_search.assert_called_once_with("test", "INBOX", "all", 1, 20) + + @pytest.mark.asyncio + async def test_mark_emails_as_read(self, classic_handler): + """Test mark_emails_as_read method.""" + mock_mark = AsyncMock(return_value=(["123", "456"], [])) + + with patch.object(classic_handler.incoming_client, "mark_emails_as_read", mock_mark): + success_ids, failed_ids = await classic_handler.mark_emails_as_read( + email_ids=["123", "456"], + mailbox="INBOX", + read=True, + ) + + assert success_ids == ["123", "456"] + assert failed_ids == [] + mock_mark.assert_called_once_with(["123", "456"], "INBOX", True) + + @pytest.mark.asyncio + async def test_mark_emails_as_unread(self, classic_handler): + """Test mark_emails_as_read method with read=False.""" + mock_mark = AsyncMock(return_value=(["789"], [])) + + with patch.object(classic_handler.incoming_client, "mark_emails_as_read", mock_mark): + success_ids, failed_ids = await classic_handler.mark_emails_as_read( + email_ids=["789"], + mailbox="INBOX", + read=False, + ) + + assert success_ids == ["789"] + assert failed_ids == [] + mock_mark.assert_called_once_with(["789"], "INBOX", False) + + @pytest.mark.asyncio + async def test_move_emails(self, classic_handler): + """Test move_emails method.""" + mock_move = AsyncMock(return_value=(["123", "456"], [])) + + with patch.object(classic_handler.incoming_client, "move_emails", mock_move): + moved_ids, failed_ids = await classic_handler.move_emails( + email_ids=["123", "456"], + destination_mailbox="Archive", + source_mailbox="INBOX", + ) + + assert moved_ids == ["123", "456"] + assert failed_ids == [] + mock_move.assert_called_once_with(["123", "456"], "Archive", "INBOX") + + @pytest.mark.asyncio + async def test_move_emails_with_failures(self, classic_handler): + """Test move_emails method with some failures.""" + mock_move = AsyncMock(return_value=(["123"], ["456"])) + + with patch.object(classic_handler.incoming_client, "move_emails", mock_move): + moved_ids, failed_ids = await classic_handler.move_emails( + email_ids=["123", "456"], + destination_mailbox="Trash", + source_mailbox="INBOX", + ) + + assert moved_ids == ["123"] + assert failed_ids == ["456"] + mock_move.assert_called_once_with(["123", "456"], "Trash", "INBOX") + class TestEmailClientBatchMethods: """Test batch fetch methods for performance optimization.""" diff --git a/tests/test_email_client.py b/tests/test_email_client.py index 20343c5..da5b4a9 100644 --- a/tests/test_email_client.py +++ b/tests/test_email_client.py @@ -192,7 +192,11 @@ def test_build_search_criteria(self): @pytest.mark.asyncio async def test_get_emails_stream(self, email_client): - """Test getting emails stream returns sorted, paginated results.""" + """Test getting emails stream returns paginated results using UID ordering. + + With the optimized pagination, we use UID ordering directly (UIDs are + ascending by date added to mailbox) instead of fetching all dates first. + """ mock_imap = AsyncMock() mock_imap._client_task = asyncio.Future() mock_imap._client_task.set_result(None) @@ -202,19 +206,15 @@ async def test_get_emails_stream(self, email_client): mock_imap.uid_search = AsyncMock(return_value=(None, [b"1 2 3"])) mock_imap.logout = AsyncMock() - # Mock at the helper level - test behavior, not implementation - mock_dates = { - "1": datetime(2024, 1, 1, tzinfo=timezone.utc), - "2": datetime(2024, 1, 2, tzinfo=timezone.utc), - "3": datetime(2024, 1, 3, tzinfo=timezone.utc), - } + # Mock metadata returned by _batch_fetch_headers + # For desc order, UIDs are reversed: 3, 2, 1 mock_metadata = { - "1": { - "email_id": "1", - "subject": "Subject 1", - "from": "a@test.com", + "3": { + "email_id": "3", + "subject": "Subject 3", + "from": "c@test.com", "to": [], - "date": datetime(2024, 1, 1, tzinfo=timezone.utc), + "date": datetime(2024, 1, 3, tzinfo=timezone.utc), "attachments": [], }, "2": { @@ -225,38 +225,33 @@ async def test_get_emails_stream(self, email_client): "date": datetime(2024, 1, 2, tzinfo=timezone.utc), "attachments": [], }, - "3": { - "email_id": "3", - "subject": "Subject 3", - "from": "c@test.com", + "1": { + "email_id": "1", + "subject": "Subject 1", + "from": "a@test.com", "to": [], - "date": datetime(2024, 1, 3, tzinfo=timezone.utc), + "date": datetime(2024, 1, 1, tzinfo=timezone.utc), "attachments": [], }, } with patch.object(email_client, "imap_class", return_value=mock_imap): - with patch.object(email_client, "_batch_fetch_dates", return_value=mock_dates) as mock_fetch_dates: - with patch.object( - email_client, "_batch_fetch_headers", return_value=mock_metadata - ) as mock_fetch_headers: - emails = [] - async for email_data in email_client.get_emails_metadata_stream(page=1, page_size=10): - emails.append(email_data) - - # Behavior: returns emails sorted by date desc (newest first) - assert len(emails) == 3 - assert emails[0]["subject"] == "Subject 3" - assert emails[1]["subject"] == "Subject 2" - assert emails[2]["subject"] == "Subject 1" - - mock_imap.login.assert_called_once() - mock_imap.logout.assert_called_once() - - # Verify helpers called with correct arguments - mock_fetch_dates.assert_called_once_with(mock_imap, [b"1", b"2", b"3"]) - # Headers fetched for page UIDs in sorted order (desc by date) - mock_fetch_headers.assert_called_once_with(mock_imap, ["3", "2", "1"]) + with patch.object(email_client, "_batch_fetch_headers", return_value=mock_metadata) as mock_fetch_headers: + emails = [] + async for email_data in email_client.get_emails_metadata_stream(page=1, page_size=10): + emails.append(email_data) + + # Behavior: returns emails in UID desc order (newest UIDs first) + assert len(emails) == 3 + assert emails[0]["subject"] == "Subject 3" + assert emails[1]["subject"] == "Subject 2" + assert emails[2]["subject"] == "Subject 1" + + mock_imap.login.assert_called_once() + mock_imap.logout.assert_called_once() + + # Headers fetched for page UIDs in desc order + mock_fetch_headers.assert_called_once_with(mock_imap, ["3", "2", "1"]) @pytest.mark.asyncio async def test_get_email_count(self, email_client):