Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 40 additions & 4 deletions mcp_email_server/emails/classic.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,28 @@ def _parse_email_data(self, raw_email: bytes, email_id: str | None = None) -> di

# Get body content
body = ""
html_body = "" # Fallback if no text/plain
attachments = []

def _strip_html(html: str) -> str:
"""Simple HTML to text conversion."""
import re

# Remove script and style elements
text = re.sub(r"<(script|style)[^>]*>.*?</\1>", "", html, flags=re.DOTALL | re.IGNORECASE)
# Convert common block elements to newlines
text = re.sub(r"<(br|p|div|tr|li)[^>]*/?>", "\n", text, flags=re.IGNORECASE)
# Remove all remaining HTML tags
text = re.sub(r"<[^>]+>", "", text)
# Decode common HTML entities
text = text.replace("&nbsp;", " ").replace("&amp;", "&")
text = text.replace("&lt;", "<").replace("&gt;", ">")
text = text.replace("&quot;", '"').replace("&#39;", "'")
# Collapse multiple newlines and whitespace
text = re.sub(r"\n\s*\n", "\n\n", text)
text = re.sub(r" +", " ", text)
return text.strip()

if email_message.is_multipart():
for part in email_message.walk():
content_type = part.get_content_type()
Expand All @@ -155,7 +175,7 @@ def _parse_email_data(self, raw_email: bytes, email_id: str | None = None) -> di
filename = part.get_filename()
if filename:
attachments.append(filename)
# Handle text parts
# Handle text parts - prefer text/plain
elif content_type == "text/plain":
body_part = part.get_payload(decode=True)
if body_part:
Expand All @@ -164,15 +184,31 @@ def _parse_email_data(self, raw_email: bytes, email_id: str | None = None) -> di
body += body_part.decode(charset)
except UnicodeDecodeError:
body += body_part.decode("utf-8", errors="replace")
# Collect HTML as fallback
elif content_type == "text/html" and not body:
html_part = part.get_payload(decode=True)
if html_part:
charset = part.get_content_charset("utf-8")
try:
html_body += html_part.decode(charset)
except UnicodeDecodeError:
html_body += html_part.decode("utf-8", errors="replace")

# Fall back to HTML if no plain text found
if not body and html_body:
body = _strip_html(html_body)
else:
# Handle plain text emails
# Handle single-part emails
content_type = email_message.get_content_type()
payload = email_message.get_payload(decode=True)
if payload:
charset = email_message.get_content_charset("utf-8")
try:
body = payload.decode(charset)
text = payload.decode(charset)
except UnicodeDecodeError:
body = payload.decode("utf-8", errors="replace")
text = payload.decode("utf-8", errors="replace")

body = _strip_html(text) if content_type == "text/html" else text
# TODO: Allow retrieving full email body
if body and len(body) > 20000:
body = body[:20000] + "...[TRUNCATED]"
Expand Down
Loading