Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions mcp_email_server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,48 @@ async def download_attachment(

handler = dispatch_handler(account_name)
return await handler.download_attachment(email_id, attachment_name, save_path, mailbox)


@mcp.tool(description="List all available mailboxes/folders for an email account.")
async def list_mailboxes(
account_name: Annotated[str, Field(description="The name of the email account.")],
) -> list[str]:
handler = dispatch_handler(account_name)
return await handler.list_mailboxes()


@mcp.tool(
description="Move one or more emails to another mailbox/folder. Use list_emails_metadata first to get email_ids and list_mailboxes to see available folders."
)
async def move_emails(
account_name: Annotated[str, Field(description="The name of the email account.")],
email_ids: Annotated[
list[str],
Field(description="List of email_id to move (obtained from list_emails_metadata)."),
],
target_mailbox: Annotated[str, Field(description="The target mailbox/folder to move emails to.")],
source_mailbox: Annotated[
str, Field(default="INBOX", description="The source mailbox to move emails from.")
] = "INBOX",
) -> str:
handler = dispatch_handler(account_name)
moved_ids, failed_ids = await handler.move_emails(email_ids, target_mailbox, source_mailbox)

result = f"Successfully moved {len(moved_ids)} email(s) to '{target_mailbox}'"
if failed_ids:
result += f", failed to move {len(failed_ids)} email(s): {', '.join(failed_ids)}"
return result


@mcp.tool(description="Create a new mailbox/folder. Use this to organize emails with custom folders.")
async def create_mailbox(
account_name: Annotated[str, Field(description="The name of the email account.")],
mailbox_name: Annotated[str, Field(description="The name of the mailbox/folder to create.")],
) -> str:
handler = dispatch_handler(account_name)
success = await handler.create_mailbox(mailbox_name)

if success:
return f"Successfully created mailbox '{mailbox_name}'"
else:
return f"Failed to create mailbox '{mailbox_name}'"
37 changes: 37 additions & 0 deletions mcp_email_server/emails/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,40 @@ async def download_attachment(
Returns:
AttachmentDownloadResponse with download result information.
"""

@abc.abstractmethod
async def list_mailboxes(self) -> list[str]:
"""
List all available mailboxes/folders.

Returns:
List of mailbox names.
"""

@abc.abstractmethod
async def move_emails(
self, email_ids: list[str], target_mailbox: str, source_mailbox: str = "INBOX"
) -> tuple[list[str], list[str]]:
"""
Move emails to another mailbox.

Args:
email_ids: List of email UIDs to move.
target_mailbox: Target mailbox name.
source_mailbox: Source mailbox name (default: "INBOX").

Returns:
Tuple of (moved_ids, failed_ids).
"""

@abc.abstractmethod
async def create_mailbox(self, mailbox_name: str) -> bool:
"""
Create a new mailbox/folder.

Args:
mailbox_name: The name of the mailbox to create.

Returns:
True if successfully created, False otherwise.
"""
132 changes: 132 additions & 0 deletions mcp_email_server/emails/classic.py
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,124 @@ async def delete_emails(self, email_ids: list[str], mailbox: str = "INBOX") -> t

return deleted_ids, failed_ids

async def list_mailboxes(self) -> list[str]:
"""List all available mailboxes/folders."""
imap = self.imap_class(self.email_server.host, self.email_server.port)
mailboxes = []

try:
await imap._client_task
await imap.wait_hello_from_server()
await imap.login(self.email_server.user_name, self.email_server.password)
await _send_imap_id(imap)

# List all folders
_, folders = await imap.list('""', "*")

for folder in folders:
folder_str = folder.decode("utf-8") if isinstance(folder, bytes) else str(folder)
# IMAP LIST response format: (flags) "delimiter" "name"
# Example: (\HasNoChildren) "/" "INBOX"
parts = folder_str.split('"')
if len(parts) >= 3:
folder_name = parts[-2] # The folder name is the second-to-last quoted part
mailboxes.append(folder_name)

finally:
try:
await imap.logout()
except Exception as e:
logger.info(f"Error during logout: {e}")

return mailboxes

async def create_mailbox(self, mailbox_name: str) -> bool:
"""Create a new mailbox/folder.

Args:
mailbox_name: The name of the mailbox to create.

Returns:
True if successfully created, False otherwise.
"""
imap = self.imap_class(self.email_server.host, self.email_server.port)

try:
await imap._client_task
await imap.wait_hello_from_server()
await imap.login(self.email_server.user_name, self.email_server.password)
await _send_imap_id(imap)

# Create the mailbox
result = await imap.create(_quote_mailbox(mailbox_name))
status = result[0] if isinstance(result, tuple) else result

if str(status).upper() == "OK":
logger.info(f"Created mailbox: '{mailbox_name}'")
return True
else:
logger.error(f"Failed to create mailbox '{mailbox_name}': {result}")
return False

except Exception as e:
logger.error(f"Error creating mailbox '{mailbox_name}': {e}")
return False
finally:
try:
await imap.logout()
except Exception as e:
logger.info(f"Error during logout: {e}")

async def move_emails(
self, email_ids: list[str], target_mailbox: str, source_mailbox: str = "INBOX"
) -> tuple[list[str], list[str]]:
"""Move emails to another mailbox. Returns (moved_ids, failed_ids).

IMAP doesn't have a native MOVE command in older versions, so we use:
1. COPY to target mailbox
2. STORE \\Deleted flag
3. EXPUNGE
"""
imap = self.imap_class(self.email_server.host, self.email_server.port)
moved_ids = []
failed_ids = []

try:
await imap._client_task
await imap.wait_hello_from_server()
await imap.login(self.email_server.user_name, self.email_server.password)
await _send_imap_id(imap)
await imap.select(_quote_mailbox(source_mailbox))

for email_id in email_ids:
try:
# Copy to target mailbox
result = await imap.uid("copy", email_id, _quote_mailbox(target_mailbox))
status = result[0] if isinstance(result, tuple) else result
if str(status).upper() != "OK":
logger.error(f"Failed to copy email {email_id} to {target_mailbox}: {result}")
failed_ids.append(email_id)
continue

# Mark as deleted in source
await imap.uid("store", email_id, "+FLAGS", r"(\Deleted)")
moved_ids.append(email_id)
except Exception as e:
logger.error(f"Failed to move email {email_id}: {e}")
failed_ids.append(email_id)

# Expunge deleted messages
if moved_ids:
await imap.expunge()

finally:
try:
await imap.logout()
except Exception as e:
logger.info(f"Error during logout: {e}")

return moved_ids, failed_ids


class ClassicEmailHandler(EmailHandler):
def __init__(self, email_settings: EmailSettings):
Expand Down Expand Up @@ -1067,3 +1185,17 @@ async def download_attachment(
size=result["size"],
saved_path=result["saved_path"],
)

async def list_mailboxes(self) -> list[str]:
"""List all available mailboxes/folders."""
return await self.incoming_client.list_mailboxes()

async def move_emails(
self, email_ids: list[str], target_mailbox: str, source_mailbox: str = "INBOX"
) -> tuple[list[str], list[str]]:
"""Move emails to another mailbox. Returns (moved_ids, failed_ids)."""
return await self.incoming_client.move_emails(email_ids, target_mailbox, source_mailbox)

async def create_mailbox(self, mailbox_name: str) -> bool:
"""Create a new mailbox/folder."""
return await self.incoming_client.create_mailbox(mailbox_name)
Loading