Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 15 additions & 21 deletions mcp_email_server/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,17 @@

from mcp_email_server.log import logger

DEFAILT_CONFIG_PATH = "~/.config/zerolib/mcp_email_server/config.toml"
DEFAULT_CONFIG_PATH = "~/.config/zerolib/mcp_email_server/config.toml"

CONFIG_PATH = Path(os.getenv("MCP_EMAIL_SERVER_CONFIG_PATH", DEFAILT_CONFIG_PATH)).expanduser().resolve()

def _parse_bool_env(value: str | None, default: bool = False) -> bool:
"""Parse boolean value from environment variable."""
if value is None:
return default
return value.lower() in ("true", "1", "yes", "on")


CONFIG_PATH = Path(os.getenv("MCP_EMAIL_SERVER_CONFIG_PATH", DEFAULT_CONFIG_PATH)).expanduser().resolve()


class EmailServer(BaseModel):
Expand Down Expand Up @@ -155,12 +163,6 @@ def from_env(cls) -> EmailSettings | None:
if not email_address or not password:
return None

# Parse boolean values
def parse_bool(value: str | None, default: bool = True) -> bool:
if value is None:
return default
return value.lower() in ("true", "1", "yes", "on")

# Get all environment variables with defaults
account_name = os.getenv("MCP_EMAIL_SERVER_ACCOUNT_NAME", "default")
full_name = os.getenv("MCP_EMAIL_SERVER_FULL_NAME", email_address.split("@")[0])
Expand All @@ -182,17 +184,17 @@ def parse_bool(value: str | None, default: bool = True) -> bool:
password=password,
imap_host=imap_host,
imap_port=int(os.getenv("MCP_EMAIL_SERVER_IMAP_PORT", "993")),
imap_ssl=parse_bool(os.getenv("MCP_EMAIL_SERVER_IMAP_SSL"), True),
imap_ssl=_parse_bool_env(os.getenv("MCP_EMAIL_SERVER_IMAP_SSL"), True),
smtp_host=smtp_host,
smtp_port=int(os.getenv("MCP_EMAIL_SERVER_SMTP_PORT", "465")),
smtp_ssl=parse_bool(os.getenv("MCP_EMAIL_SERVER_SMTP_SSL"), True),
smtp_start_ssl=parse_bool(os.getenv("MCP_EMAIL_SERVER_SMTP_START_SSL"), False),
smtp_verify_ssl=parse_bool(os.getenv("MCP_EMAIL_SERVER_SMTP_VERIFY_SSL"), True),
smtp_ssl=_parse_bool_env(os.getenv("MCP_EMAIL_SERVER_SMTP_SSL"), True),
smtp_start_ssl=_parse_bool_env(os.getenv("MCP_EMAIL_SERVER_SMTP_START_SSL"), False),
smtp_verify_ssl=_parse_bool_env(os.getenv("MCP_EMAIL_SERVER_SMTP_VERIFY_SSL"), True),
smtp_user_name=os.getenv("MCP_EMAIL_SERVER_SMTP_USER_NAME", user_name),
smtp_password=os.getenv("MCP_EMAIL_SERVER_SMTP_PASSWORD", password),
imap_user_name=os.getenv("MCP_EMAIL_SERVER_IMAP_USER_NAME", user_name),
imap_password=os.getenv("MCP_EMAIL_SERVER_IMAP_PASSWORD", password),
save_to_sent=parse_bool(os.getenv("MCP_EMAIL_SERVER_SAVE_TO_SENT"), True),
save_to_sent=_parse_bool_env(os.getenv("MCP_EMAIL_SERVER_SAVE_TO_SENT"), True),
sent_folder_name=os.getenv("MCP_EMAIL_SERVER_SENT_FOLDER_NAME"),
)
except (ValueError, TypeError) as e:
Expand All @@ -216,13 +218,6 @@ def masked(self) -> AccountAttributes:
return self.model_copy(update={"api_key": "********"})


def _parse_bool_env(value: str | None, default: bool = False) -> bool:
"""Parse boolean value from environment variable."""
if value is None:
return default
return value.lower() in ("true", "1", "yes", "on")


class Settings(BaseSettings):
emails: list[EmailSettings] = []
providers: list[ProviderSettings] = []
Expand Down Expand Up @@ -343,7 +338,6 @@ def store_settings(settings: Settings | None = None) -> None:
if not settings:
settings = get_settings()
settings.store()
return


def delete_settings() -> None:
Expand Down
13 changes: 13 additions & 0 deletions mcp_email_server/emails/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,22 @@ async def send_email(
bcc: list[str] | None = None,
html: bool = False,
attachments: list[str] | None = None,
in_reply_to: str | None = None,
references: str | None = None,
) -> None:
"""
Send email

Args:
recipients: List of recipient email addresses.
subject: Email subject.
body: Email body content.
cc: List of CC email addresses.
bcc: List of BCC email addresses.
html: Whether to send as HTML (True) or plain text (False).
attachments: List of file paths to attach.
in_reply_to: Message-ID of the email being replied to (for threading).
references: Space-separated Message-IDs for the thread chain.
"""

@abc.abstractmethod
Expand Down
77 changes: 34 additions & 43 deletions mcp_email_server/emails/classic.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
)
from mcp_email_server.log import logger

# Maximum body length before truncation (characters)
MAX_BODY_LENGTH = 20000


def _quote_mailbox(mailbox: str) -> str:
"""Quote mailbox name for IMAP compatibility.
Expand Down Expand Up @@ -105,6 +108,29 @@ def _get_smtp_ssl_context(self) -> ssl.SSLContext | None:
"""Get SSL context for SMTP connections based on verify_ssl setting."""
return _create_smtp_ssl_context(self.smtp_verify_ssl)

@staticmethod
def _parse_recipients(email_message) -> list[str]:
"""Extract recipient addresses from To and Cc headers."""
recipients = []
to_header = email_message.get("To", "")
if to_header:
recipients = [addr.strip() for addr in to_header.split(",")]
cc_header = email_message.get("Cc", "")
if cc_header:
recipients.extend([addr.strip() for addr in cc_header.split(",")])
return recipients

@staticmethod
def _parse_date(date_str: str) -> datetime:
"""Parse email date string to datetime, with fallback to current time."""
try:
date_tuple = email.utils.parsedate_tz(date_str)
if date_tuple:
return datetime.fromtimestamp(email.utils.mktime_tz(date_tuple), tz=timezone.utc)
return datetime.now(timezone.utc)
except Exception:
return datetime.now(timezone.utc)

def _parse_email_data(self, raw_email: bytes, email_id: str | None = None) -> dict[str, Any]: # noqa: C901
"""Parse raw email data into a structured dictionary."""
parser = BytesParser(policy=default)
Expand All @@ -118,28 +144,9 @@ def _parse_email_data(self, raw_email: bytes, email_id: str | None = None) -> di
# Extract Message-ID for reply threading
message_id = email_message.get("Message-ID")

# Extract recipients
to_addresses = []
to_header = email_message.get("To", "")
if to_header:
# Simple parsing - split by comma and strip whitespace
to_addresses = [addr.strip() for addr in to_header.split(",")]

# Also check CC recipients
cc_header = email_message.get("Cc", "")
if cc_header:
to_addresses.extend([addr.strip() for addr in cc_header.split(",")])

# Parse date
try:
date_tuple = email.utils.parsedate_tz(date_str)
date = (
datetime.fromtimestamp(email.utils.mktime_tz(date_tuple), tz=timezone.utc)
if date_tuple
else datetime.now(timezone.utc)
)
except Exception:
date = datetime.now(timezone.utc)
# Extract recipients and parse date
to_addresses = self._parse_recipients(email_message)
date = self._parse_date(date_str)

# Get body content
body = ""
Expand Down Expand Up @@ -174,8 +181,8 @@ def _parse_email_data(self, raw_email: bytes, email_id: str | None = None) -> di
except UnicodeDecodeError:
body = payload.decode("utf-8", errors="replace")
# TODO: Allow retrieving full email body
if body and len(body) > 20000:
body = body[:20000] + "...[TRUNCATED]"
if body and len(body) > MAX_BODY_LENGTH:
body = body[:MAX_BODY_LENGTH] + "...[TRUNCATED]"
return {
"email_id": email_id or "",
"message_id": message_id,
Expand All @@ -199,7 +206,7 @@ def _build_search_criteria(
seen: bool | None = None,
flagged: bool | None = None,
answered: bool | None = None,
):
) -> list[str]:
search_criteria = []
if before:
search_criteria.extend(["BEFORE", before.strftime("%d-%b-%Y").upper()])
Expand Down Expand Up @@ -238,24 +245,8 @@ def _parse_headers(self, email_id: str, raw_headers: bytes) -> dict[str, Any] |
sender = email_message.get("From", "")
date_str = email_message.get("Date", "")

to_addresses = []
to_header = email_message.get("To", "")
if to_header:
to_addresses = [addr.strip() for addr in to_header.split(",")]

cc_header = email_message.get("Cc", "")
if cc_header:
to_addresses.extend([addr.strip() for addr in cc_header.split(",")])

try:
date_tuple = email.utils.parsedate_tz(date_str)
date = (
datetime.fromtimestamp(email.utils.mktime_tz(date_tuple), tz=timezone.utc)
if date_tuple
else datetime.now(timezone.utc)
)
except Exception:
date = datetime.now(timezone.utc)
to_addresses = self._parse_recipients(email_message)
date = self._parse_date(date_str)

return {
"email_id": email_id,
Expand Down
11 changes: 2 additions & 9 deletions mcp_email_server/emails/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,10 @@ class EmailMetadataPageResponse(BaseModel):
total: int


class EmailBodyResponse(BaseModel):
"""Single email body response"""
class EmailBodyResponse(EmailMetadata):
"""Single email body response - extends EmailMetadata with body content"""

email_id: str # IMAP UID of this email
message_id: str | None = None # RFC 5322 Message-ID header for reply threading
subject: str
sender: str
recipients: list[str]
date: datetime
body: str
attachments: list[str]


class EmailContentBatchResponse(BaseModel):
Expand Down
Loading