diff --git a/mcp_email_server/app.py b/mcp_email_server/app.py index de9f95a..d64fdb8 100644 --- a/mcp_email_server/app.py +++ b/mcp_email_server/app.py @@ -219,3 +219,48 @@ async def download_attachment( handler = dispatch_handler(account_name) return await handler.download_attachment(email_id, attachment_name, save_path, mailbox) + + +@mcp.tool(description="List all available mailboxes/folders for an email account.") +async def list_mailboxes( + account_name: Annotated[str, Field(description="The name of the email account.")], +) -> list[str]: + handler = dispatch_handler(account_name) + return await handler.list_mailboxes() + + +@mcp.tool( + description="Move one or more emails to another mailbox/folder. Use list_emails_metadata first to get email_ids and list_mailboxes to see available folders." +) +async def move_emails( + account_name: Annotated[str, Field(description="The name of the email account.")], + email_ids: Annotated[ + list[str], + Field(description="List of email_id to move (obtained from list_emails_metadata)."), + ], + target_mailbox: Annotated[str, Field(description="The target mailbox/folder to move emails to.")], + source_mailbox: Annotated[ + str, Field(default="INBOX", description="The source mailbox to move emails from.") + ] = "INBOX", +) -> str: + handler = dispatch_handler(account_name) + moved_ids, failed_ids = await handler.move_emails(email_ids, target_mailbox, source_mailbox) + + result = f"Successfully moved {len(moved_ids)} email(s) to '{target_mailbox}'" + if failed_ids: + result += f", failed to move {len(failed_ids)} email(s): {', '.join(failed_ids)}" + return result + + +@mcp.tool(description="Create a new mailbox/folder. Use this to organize emails with custom folders.") +async def create_mailbox( + account_name: Annotated[str, Field(description="The name of the email account.")], + mailbox_name: Annotated[str, Field(description="The name of the mailbox/folder to create.")], +) -> str: + handler = dispatch_handler(account_name) + success = await handler.create_mailbox(mailbox_name) + + if success: + return f"Successfully created mailbox '{mailbox_name}'" + else: + return f"Failed to create mailbox '{mailbox_name}'" diff --git a/mcp_email_server/emails/__init__.py b/mcp_email_server/emails/__init__.py index 251e9bc..da0dca0 100644 --- a/mcp_email_server/emails/__init__.py +++ b/mcp_email_server/emails/__init__.py @@ -92,3 +92,40 @@ async def download_attachment( Returns: AttachmentDownloadResponse with download result information. """ + + @abc.abstractmethod + async def list_mailboxes(self) -> list[str]: + """ + List all available mailboxes/folders. + + Returns: + List of mailbox names. + """ + + @abc.abstractmethod + async def move_emails( + self, email_ids: list[str], target_mailbox: str, source_mailbox: str = "INBOX" + ) -> tuple[list[str], list[str]]: + """ + Move emails to another mailbox. + + Args: + email_ids: List of email UIDs to move. + target_mailbox: Target mailbox name. + source_mailbox: Source mailbox name (default: "INBOX"). + + Returns: + Tuple of (moved_ids, failed_ids). + """ + + @abc.abstractmethod + async def create_mailbox(self, mailbox_name: str) -> bool: + """ + Create a new mailbox/folder. + + Args: + mailbox_name: The name of the mailbox to create. + + Returns: + True if successfully created, False otherwise. + """ diff --git a/mcp_email_server/emails/classic.py b/mcp_email_server/emails/classic.py index 7c643cd..3556f0e 100644 --- a/mcp_email_server/emails/classic.py +++ b/mcp_email_server/emails/classic.py @@ -912,6 +912,124 @@ async def delete_emails(self, email_ids: list[str], mailbox: str = "INBOX") -> t return deleted_ids, failed_ids + async def list_mailboxes(self) -> list[str]: + """List all available mailboxes/folders.""" + imap = self.imap_class(self.email_server.host, self.email_server.port) + mailboxes = [] + + try: + await imap._client_task + await imap.wait_hello_from_server() + await imap.login(self.email_server.user_name, self.email_server.password) + await _send_imap_id(imap) + + # List all folders + _, folders = await imap.list('""', "*") + + for folder in folders: + folder_str = folder.decode("utf-8") if isinstance(folder, bytes) else str(folder) + # IMAP LIST response format: (flags) "delimiter" "name" + # Example: (\HasNoChildren) "/" "INBOX" + parts = folder_str.split('"') + if len(parts) >= 3: + folder_name = parts[-2] # The folder name is the second-to-last quoted part + mailboxes.append(folder_name) + + finally: + try: + await imap.logout() + except Exception as e: + logger.info(f"Error during logout: {e}") + + return mailboxes + + async def create_mailbox(self, mailbox_name: str) -> bool: + """Create a new mailbox/folder. + + Args: + mailbox_name: The name of the mailbox to create. + + Returns: + True if successfully created, False otherwise. + """ + imap = self.imap_class(self.email_server.host, self.email_server.port) + + try: + await imap._client_task + await imap.wait_hello_from_server() + await imap.login(self.email_server.user_name, self.email_server.password) + await _send_imap_id(imap) + + # Create the mailbox + result = await imap.create(_quote_mailbox(mailbox_name)) + status = result[0] if isinstance(result, tuple) else result + + if str(status).upper() == "OK": + logger.info(f"Created mailbox: '{mailbox_name}'") + return True + else: + logger.error(f"Failed to create mailbox '{mailbox_name}': {result}") + return False + + except Exception as e: + logger.error(f"Error creating mailbox '{mailbox_name}': {e}") + return False + finally: + try: + await imap.logout() + except Exception as e: + logger.info(f"Error during logout: {e}") + + async def move_emails( + self, email_ids: list[str], target_mailbox: str, source_mailbox: str = "INBOX" + ) -> tuple[list[str], list[str]]: + """Move emails to another mailbox. Returns (moved_ids, failed_ids). + + IMAP doesn't have a native MOVE command in older versions, so we use: + 1. COPY to target mailbox + 2. STORE \\Deleted flag + 3. EXPUNGE + """ + imap = self.imap_class(self.email_server.host, self.email_server.port) + moved_ids = [] + failed_ids = [] + + try: + await imap._client_task + await imap.wait_hello_from_server() + await imap.login(self.email_server.user_name, self.email_server.password) + await _send_imap_id(imap) + await imap.select(_quote_mailbox(source_mailbox)) + + for email_id in email_ids: + try: + # Copy to target mailbox + result = await imap.uid("copy", email_id, _quote_mailbox(target_mailbox)) + status = result[0] if isinstance(result, tuple) else result + if str(status).upper() != "OK": + logger.error(f"Failed to copy email {email_id} to {target_mailbox}: {result}") + failed_ids.append(email_id) + continue + + # Mark as deleted in source + await imap.uid("store", email_id, "+FLAGS", r"(\Deleted)") + moved_ids.append(email_id) + except Exception as e: + logger.error(f"Failed to move email {email_id}: {e}") + failed_ids.append(email_id) + + # Expunge deleted messages + if moved_ids: + await imap.expunge() + + finally: + try: + await imap.logout() + except Exception as e: + logger.info(f"Error during logout: {e}") + + return moved_ids, failed_ids + class ClassicEmailHandler(EmailHandler): def __init__(self, email_settings: EmailSettings): @@ -1067,3 +1185,17 @@ async def download_attachment( size=result["size"], saved_path=result["saved_path"], ) + + async def list_mailboxes(self) -> list[str]: + """List all available mailboxes/folders.""" + return await self.incoming_client.list_mailboxes() + + async def move_emails( + self, email_ids: list[str], target_mailbox: str, source_mailbox: str = "INBOX" + ) -> tuple[list[str], list[str]]: + """Move emails to another mailbox. Returns (moved_ids, failed_ids).""" + return await self.incoming_client.move_emails(email_ids, target_mailbox, source_mailbox) + + async def create_mailbox(self, mailbox_name: str) -> bool: + """Create a new mailbox/folder.""" + return await self.incoming_client.create_mailbox(mailbox_name)