Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,21 @@ await send_email(

The `in_reply_to` parameter sets the `In-Reply-To` header, and `references` sets the `References` header. Both are used by email clients to thread conversations properly.

### Archiving Emails

To move emails to your Archive folder:

```python
result = await archive_emails(
account_name="work",
email_ids=["123", "456"],
)
print(f"Archived: {result.archived_ids}")
print(f"Archive folder: {result.archive_folder}")
```

The server auto-detects common Archive folder names: `Archive`, `INBOX.Archive`, `Archives`, `[Gmail]/All Mail`. It also checks for the IMAP `\Archive` flag to find the correct folder.

## Development

This project is managed using [uv](https://github.com/ai-zerolab/uv).
Expand Down
16 changes: 16 additions & 0 deletions mcp_email_server/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
)
from mcp_email_server.emails.dispatcher import dispatch_handler
from mcp_email_server.emails.models import (
ArchiveEmailResponse,
AttachmentDownloadResponse,
EmailContentBatchResponse,
EmailMetadataPageResponse,
Expand Down Expand Up @@ -204,3 +205,18 @@ async def download_attachment(

handler = dispatch_handler(account_name)
return await handler.download_attachment(email_id, attachment_name, save_path, mailbox)


@mcp.tool(
description="Archive emails by moving them to the Archive folder. Use list_emails_metadata first to get the email_id."
)
async def archive_emails(
account_name: Annotated[str, Field(description="The name of the email account.")],
email_ids: Annotated[
list[str],
Field(description="List of email_id to archive (obtained from list_emails_metadata)."),
],
mailbox: Annotated[str, Field(default="INBOX", description="The source mailbox to archive emails from.")] = "INBOX",
) -> ArchiveEmailResponse:
handler = dispatch_handler(account_name)
return await handler.archive_emails(email_ids, mailbox)
18 changes: 18 additions & 0 deletions mcp_email_server/emails/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

if TYPE_CHECKING:
from mcp_email_server.emails.models import (
ArchiveEmailResponse,
AttachmentDownloadResponse,
EmailContentBatchResponse,
EmailMetadataPageResponse,
Expand Down Expand Up @@ -75,3 +76,20 @@ async def download_attachment(
Returns:
AttachmentDownloadResponse with download result information.
"""

@abc.abstractmethod
async def archive_emails(
self,
email_ids: list[str],
mailbox: str = "INBOX",
) -> "ArchiveEmailResponse":
"""
Archive emails by moving them to the Archive folder.

Args:
email_ids: List of email UIDs to archive.
mailbox: The source mailbox (default: "INBOX").

Returns:
ArchiveEmailResponse with archive result information.
"""
138 changes: 138 additions & 0 deletions mcp_email_server/emails/classic.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from mcp_email_server.config import EmailServer, EmailSettings
from mcp_email_server.emails import EmailHandler
from mcp_email_server.emails.models import (
ArchiveEmailResponse,
AttachmentDownloadResponse,
EmailBodyResponse,
EmailContentBatchResponse,
Expand Down Expand Up @@ -676,6 +677,37 @@ async def _find_sent_folder_by_flag(self, imap) -> str | None:

return None

async def _find_archive_folder_by_flag(self, imap) -> str | None:
"""Find the Archive folder by searching for the \\Archive IMAP flag.

Args:
imap: Connected IMAP client

Returns:
The folder name with the \\Archive flag, or None if not found
"""
try:
# List all folders - aioimaplib requires reference_name and mailbox_pattern
_, folders = await imap.list('""', "*")

# Search for folder with \Archive flag
for folder in folders:
folder_str = folder.decode("utf-8") if isinstance(folder, bytes) else str(folder)
# IMAP LIST response format: (flags) "delimiter" "name"
# Example: (\Archive \HasNoChildren) "/" "Archive"
if r"\Archive" in folder_str or "\\Archive" in folder_str:
# Extract folder name from the response
# Split by quotes and get the last quoted part
parts = folder_str.split('"')
if len(parts) >= 3:
folder_name = parts[-2] # The folder name is the second-to-last quoted part
logger.info(f"Found Archive folder by \\Archive flag: '{folder_name}'")
return folder_name
except Exception as e:
logger.debug(f"Error finding Archive folder by flag: {e}")

return None

async def append_to_sent(
self,
msg: MIMEText | MIMEMultipart,
Expand Down Expand Up @@ -795,6 +827,91 @@ async def delete_emails(self, email_ids: list[str], mailbox: str = "INBOX") -> t

return deleted_ids, failed_ids

async def move_to_archive(self, email_ids: list[str], mailbox: str = "INBOX") -> tuple[list[str], list[str], str]:
"""Move emails to the Archive folder.

Args:
email_ids: List of email UIDs to archive.
mailbox: The source mailbox (default: "INBOX").

Returns:
Tuple of (archived_ids, failed_ids, archive_folder_name)
"""
imap = self.imap_class(self.email_server.host, self.email_server.port)
archived_ids = []
failed_ids = []
archive_folder = ""

# Common Archive folder names across different providers
archive_folder_candidates = [
"Archive",
"INBOX.Archive",
"Archives",
"[Gmail]/All Mail",
]

try:
await imap._client_task
await imap.wait_hello_from_server()
await imap.login(self.email_server.user_name, self.email_server.password)
await _send_imap_id(imap)

# Try to find Archive folder by IMAP \Archive flag first
flag_folder = await self._find_archive_folder_by_flag(imap)
if flag_folder and flag_folder not in archive_folder_candidates:
# Add it at the beginning (high priority)
archive_folder_candidates.insert(0, flag_folder)

# Find a valid archive folder
for folder in archive_folder_candidates:
try:
logger.debug(f"Trying Archive folder: '{folder}'")
result = await imap.select(_quote_mailbox(folder))
status = result[0] if isinstance(result, tuple) else result
if str(status).upper() == "OK":
archive_folder = folder
logger.info(f"Found Archive folder: '{folder}'")
break
except Exception as e:
logger.debug(f"Archive folder '{folder}' not available: {e}")
continue

if not archive_folder:
logger.error("Could not find a valid Archive folder")
return [], email_ids, ""

# Select the source mailbox
await imap.select(_quote_mailbox(mailbox))

# Move each email: COPY to archive, then mark as deleted
for email_id in email_ids:
try:
# Copy to archive folder
copy_result = await imap.uid("copy", email_id, _quote_mailbox(archive_folder))
copy_status = copy_result[0] if isinstance(copy_result, tuple) else copy_result
if str(copy_status).upper() != "OK":
logger.error(f"Failed to copy email {email_id} to archive: {copy_status}")
failed_ids.append(email_id)
continue

# Mark as deleted in source mailbox
await imap.uid("store", email_id, "+FLAGS", r"(\Deleted)")
archived_ids.append(email_id)
except Exception as e:
logger.error(f"Failed to archive email {email_id}: {e}")
failed_ids.append(email_id)

# Expunge to remove deleted messages from source
await imap.expunge()

finally:
try:
await imap.logout()
except Exception as e:
logger.info(f"Error during logout: {e}")

return archived_ids, failed_ids, archive_folder


class ClassicEmailHandler(EmailHandler):
def __init__(self, email_settings: EmailSettings):
Expand Down Expand Up @@ -928,3 +1045,24 @@ async def download_attachment(
size=result["size"],
saved_path=result["saved_path"],
)

async def archive_emails(
self,
email_ids: list[str],
mailbox: str = "INBOX",
) -> ArchiveEmailResponse:
"""Archive emails by moving them to the Archive folder.

Args:
email_ids: List of email UIDs to archive.
mailbox: The source mailbox (default: "INBOX").

Returns:
ArchiveEmailResponse with archive result information.
"""
archived_ids, failed_ids, archive_folder = await self.incoming_client.move_to_archive(email_ids, mailbox)
return ArchiveEmailResponse(
archived_ids=archived_ids,
failed_ids=failed_ids,
archive_folder=archive_folder,
)
8 changes: 8 additions & 0 deletions mcp_email_server/emails/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,11 @@ class AttachmentDownloadResponse(BaseModel):
mime_type: str
size: int
saved_path: str


class ArchiveEmailResponse(BaseModel):
"""Archive email response"""

archived_ids: list[str]
failed_ids: list[str]
archive_folder: str
51 changes: 51 additions & 0 deletions tests/test_classic_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from mcp_email_server.config import EmailServer, EmailSettings
from mcp_email_server.emails.classic import ClassicEmailHandler, EmailClient
from mcp_email_server.emails.models import (
ArchiveEmailResponse,
AttachmentDownloadResponse,
EmailBodyResponse,
EmailContentBatchResponse,
Expand Down Expand Up @@ -346,3 +347,53 @@ async def test_get_emails_content_includes_message_id(self, classic_handler):

# Verify the client method was called correctly
mock_get_body.assert_called_once_with("123", "INBOX")

@pytest.mark.asyncio
async def test_archive_emails(self, classic_handler):
"""Test archive_emails method."""
mock_archive = AsyncMock(return_value=(["123", "456"], [], "Archive"))

with patch.object(classic_handler.incoming_client, "move_to_archive", mock_archive):
result = await classic_handler.archive_emails(
email_ids=["123", "456"],
mailbox="INBOX",
)

assert isinstance(result, ArchiveEmailResponse)
assert result.archived_ids == ["123", "456"]
assert result.failed_ids == []
assert result.archive_folder == "Archive"
mock_archive.assert_called_once_with(["123", "456"], "INBOX")

@pytest.mark.asyncio
async def test_archive_emails_with_failures(self, classic_handler):
"""Test archive_emails method with some failures."""
mock_archive = AsyncMock(return_value=(["123"], ["456"], "Archive"))

with patch.object(classic_handler.incoming_client, "move_to_archive", mock_archive):
result = await classic_handler.archive_emails(
email_ids=["123", "456"],
mailbox="INBOX",
)

assert isinstance(result, ArchiveEmailResponse)
assert result.archived_ids == ["123"]
assert result.failed_ids == ["456"]
assert result.archive_folder == "Archive"
mock_archive.assert_called_once_with(["123", "456"], "INBOX")

@pytest.mark.asyncio
async def test_archive_emails_custom_mailbox(self, classic_handler):
"""Test archive_emails method with custom mailbox."""
mock_archive = AsyncMock(return_value=(["789"], [], "Archive"))

with patch.object(classic_handler.incoming_client, "move_to_archive", mock_archive):
result = await classic_handler.archive_emails(
email_ids=["789"],
mailbox="Sent",
)

assert isinstance(result, ArchiveEmailResponse)
assert result.archived_ids == ["789"]
assert result.failed_ids == []
mock_archive.assert_called_once_with(["789"], "Sent")
Loading
Loading