Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
253 changes: 160 additions & 93 deletions mcp_email_server/emails/classic.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import asyncio
import email.utils
import mimetypes
import re
import ssl
import time
from collections.abc import AsyncGenerator
from datetime import datetime, timezone
from email.header import Header
Expand Down Expand Up @@ -225,6 +228,131 @@ def _build_search_criteria(

return search_criteria or ["ALL"]

def _parse_headers(self, email_id: str, raw_headers: bytes) -> dict[str, Any] | None:
"""Parse raw email headers into metadata dictionary."""
try:
parser = BytesParser(policy=default)
email_message = parser.parsebytes(raw_headers)

subject = email_message.get("Subject", "")
sender = email_message.get("From", "")
date_str = email_message.get("Date", "")

to_addresses = []
to_header = email_message.get("To", "")
if to_header:
to_addresses = [addr.strip() for addr in to_header.split(",")]

cc_header = email_message.get("Cc", "")
if cc_header:
to_addresses.extend([addr.strip() for addr in cc_header.split(",")])

try:
date_tuple = email.utils.parsedate_tz(date_str)
date = (
datetime.fromtimestamp(email.utils.mktime_tz(date_tuple), tz=timezone.utc)
if date_tuple
else datetime.now(timezone.utc)
)
except Exception:
date = datetime.now(timezone.utc)

return {
"email_id": email_id,
"subject": subject,
"from": sender,
"to": to_addresses,
"date": date,
"attachments": [],
}
except Exception as e:
logger.error(f"Error parsing email headers: {e!s}")
return None

async def _fetch_dates_chunk(
self,
imap: aioimaplib.IMAP4_SSL | aioimaplib.IMAP4,
chunk: list[bytes],
chunk_num: int,
total_chunks: int,
) -> dict[str, datetime]:
"""Fetch INTERNALDATE for a single chunk of UIDs."""
uid_list = ",".join(uid.decode() for uid in chunk)
chunk_start = time.perf_counter()
_, data = await imap.uid("fetch", uid_list, "(INTERNALDATE)")
chunk_elapsed = time.perf_counter() - chunk_start

chunk_dates: dict[str, datetime] = {}
for item in data:
if not isinstance(item, bytes) or b"INTERNALDATE" not in item:
continue
uid_match = re.search(rb"UID (\d+)", item)
date_match = re.search(rb'INTERNALDATE "([^"]+)"', item)
if uid_match and date_match:
uid = uid_match.group(1).decode()
date_str = date_match.group(1).decode().strip()
chunk_dates[uid] = datetime.strptime(date_str, "%d-%b-%Y %H:%M:%S %z")

if total_chunks > 1:
logger.info(f"Fetched dates chunk {chunk_num}/{total_chunks}: {len(chunk)} UIDs in {chunk_elapsed:.2f}s")

return chunk_dates

async def _batch_fetch_dates(
self,
imap: aioimaplib.IMAP4_SSL | aioimaplib.IMAP4,
email_ids: list[bytes],
chunk_size: int = 5000,
) -> dict[str, datetime]:
"""Batch fetch INTERNALDATE for all UIDs in parallel chunks."""
if not email_ids:
return {}

# Split into chunks
chunks = [email_ids[i : i + chunk_size] for i in range(0, len(email_ids), chunk_size)]
total_chunks = len(chunks)

# Fetch all chunks in parallel
tasks = [
self._fetch_dates_chunk(imap, chunk, chunk_num, total_chunks) for chunk_num, chunk in enumerate(chunks, 1)
]
results = await asyncio.gather(*tasks)

# Merge results
uid_dates: dict[str, datetime] = {}
for chunk_dates in results:
uid_dates.update(chunk_dates)

return uid_dates

async def _batch_fetch_headers(
self,
imap: aioimaplib.IMAP4_SSL | aioimaplib.IMAP4,
email_ids: list[bytes] | list[str],
) -> dict[str, dict[str, Any]]:
"""Batch fetch headers for a list of UIDs."""
if not email_ids:
return {}

# Normalize to list of strings
str_ids = [uid.decode() if isinstance(uid, bytes) else uid for uid in email_ids]
uid_list = ",".join(str_ids)
_, data = await imap.uid("fetch", uid_list, "BODY.PEEK[HEADER]")

results: dict[str, dict[str, Any]] = {}
for i, item in enumerate(data):
if not isinstance(item, bytes) or b"BODY[HEADER]" not in item:
continue
uid_match = re.search(rb"UID (\d+)", item)
if uid_match and i + 1 < len(data) and isinstance(data[i + 1], bytearray):
uid = uid_match.group(1).decode()
raw_headers = bytes(data[i + 1])
metadata = self._parse_headers(uid, raw_headers)
if metadata:
results[uid] = metadata

return results

async def get_email_count(
self,
before: datetime | None = None,
Expand Down Expand Up @@ -268,7 +396,7 @@ async def get_email_count(
except Exception as e:
logger.info(f"Error during logout: {e}")

async def get_emails_metadata_stream( # noqa: C901
async def get_emails_metadata_stream(
self,
page: int = 1,
page_size: int = 10,
Expand All @@ -289,7 +417,7 @@ async def get_emails_metadata_stream( # noqa: C901
await imap._client_task
await imap.wait_hello_from_server()

# Login and select inbox
# Login and select mailbox
await imap.login(self.email_server.user_name, self.email_server.password)
await _send_imap_id(imap)
await imap.select(_quote_mailbox(mailbox))
Expand All @@ -312,103 +440,42 @@ async def get_emails_metadata_stream( # noqa: C901
# Handle empty or None responses
if not messages or not messages[0]:
logger.warning("No messages returned from search")
email_ids = []
else:
email_ids = messages[0].split()
logger.info(f"Found {len(email_ids)} email IDs")
# Fetch metadata for all emails first, then sort by date
# (UID order doesn't guarantee chronological order on all IMAP servers)
all_metadata: list[dict[str, Any]] = []

# Fetch each message's metadata only
for _, email_id in enumerate(email_ids):
try:
# Convert email_id from bytes to string
email_id_str = email_id.decode("utf-8")
return

# Fetch only headers to get metadata without body
_, data = await imap.uid("fetch", email_id_str, "BODY.PEEK[HEADER]")
email_ids = messages[0].split()
logger.info(f"Found {len(email_ids)} email IDs")

if not data:
logger.error(f"Failed to fetch headers for UID {email_id_str}")
continue
# Phase 1: Batch fetch INTERNALDATE for sorting (parallel chunks)
fetch_dates_start = time.perf_counter()
uid_dates = await self._batch_fetch_dates(imap, email_ids)
fetch_dates_elapsed = time.perf_counter() - fetch_dates_start

# Find the email headers in the response
raw_headers = None
if len(data) > 1 and isinstance(data[1], bytearray):
raw_headers = bytes(data[1])
else:
# Search through all items for header content
for item in data:
if isinstance(item, bytes | bytearray) and len(item) > 10:
# Skip IMAP protocol responses
if isinstance(item, bytes) and b"FETCH" in item:
continue
# This is likely the header content
raw_headers = bytes(item) if isinstance(item, bytearray) else item
break

if raw_headers:
try:
# Parse headers only
parser = BytesParser(policy=default)
email_message = parser.parsebytes(raw_headers)

# Extract metadata
subject = email_message.get("Subject", "")
sender = email_message.get("From", "")
date_str = email_message.get("Date", "")

# Extract recipients
to_addresses = []
to_header = email_message.get("To", "")
if to_header:
to_addresses = [addr.strip() for addr in to_header.split(",")]

cc_header = email_message.get("Cc", "")
if cc_header:
to_addresses.extend([addr.strip() for addr in cc_header.split(",")])

# Parse date
try:
date_tuple = email.utils.parsedate_tz(date_str)
date = (
datetime.fromtimestamp(email.utils.mktime_tz(date_tuple), tz=timezone.utc)
if date_tuple
else datetime.now(timezone.utc)
)
except Exception:
date = datetime.now(timezone.utc)

# For metadata, we don't fetch attachments to save bandwidth
# We'll mark it as unknown for now
metadata = {
"email_id": email_id_str,
"subject": subject,
"from": sender,
"to": to_addresses,
"date": date,
"attachments": [], # We don't fetch attachment info for metadata
}
all_metadata.append(metadata)
except Exception as e:
# Log error but continue with other emails
logger.error(f"Error parsing email metadata: {e!s}")
else:
logger.error(f"Could not find header data in response for email ID: {email_id_str}")
except Exception as e:
logger.error(f"Error fetching email metadata {email_id}: {e!s}")
# Sort by INTERNALDATE
sorted_uids = sorted(uid_dates.items(), key=lambda x: x[1], reverse=(order == "desc"))

# Sort by date (desc = newest first, asc = oldest first)
all_metadata.sort(key=lambda x: x["date"], reverse=(order == "desc"))

# Apply pagination after sorting
# Paginate
start = (page - 1) * page_size
end = start + page_size
for metadata in all_metadata[start:end]:
yield metadata
page_uids = [uid for uid, _ in sorted_uids[start : start + page_size]]

if not page_uids:
logger.info(f"Phase 1 (dates): {len(uid_dates)} UIDs in {fetch_dates_elapsed:.2f}s, page {page} empty")
return

# Phase 2: Batch fetch headers for requested page only
fetch_headers_start = time.perf_counter()
metadata_by_uid = await self._batch_fetch_headers(imap, page_uids)
fetch_headers_elapsed = time.perf_counter() - fetch_headers_start

logger.info(
f"Fetched page {page}: {fetch_dates_elapsed:.2f}s dates ({len(uid_dates)} UIDs), "
f"{fetch_headers_elapsed:.2f}s headers ({len(page_uids)} UIDs)"
)

# Yield in sorted order
for uid in page_uids:
if uid in metadata_by_uid:
yield metadata_by_uid[uid]
finally:
# Ensure we logout properly
try:
await imap.logout()
except Exception as e:
Expand Down
Loading
Loading