Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions mcp_email_server/emails/classic.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,15 @@ async def _batch_fetch_headers(
for i, item in enumerate(data):
if not isinstance(item, bytes) or b"BODY[HEADER]" not in item:
continue
uid_match = re.search(rb"UID (\d+)", item)
if uid_match and i + 1 < len(data) and isinstance(data[i + 1], bytearray):
# aioimaplib returns FETCH response in 3 parts:
# i: b'N FETCH (BODY[HEADER] {size}' - contains BODY[HEADER]
# i+1: bytearray(...) - raw header content
# i+2: b' UID N)' - contains UID
if i + 2 >= len(data) or not isinstance(data[i + 1], bytearray):
continue
uid_item = data[i + 2] if isinstance(data[i + 2], bytes) else None
uid_match = re.search(rb"UID (\d+)", uid_item) if uid_item else None
if uid_match:
uid = uid_match.group(1).decode()
raw_headers = bytes(data[i + 1])
metadata = self._parse_headers(uid, raw_headers)
Expand Down
9 changes: 6 additions & 3 deletions tests/test_classic_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,13 +457,16 @@ async def test_batch_fetch_dates_parses_response(self, email_client):
async def test_batch_fetch_headers_parses_response(self, email_client):
"""Test _batch_fetch_headers correctly parses IMAP BODY[HEADER] response."""
mock_imap = AsyncMock()
# Simulate IMAP response format for BODY[HEADER]
# aioimaplib returns FETCH response in 3 parts:
# - BODY[HEADER] line (no UID)
# - header content as bytearray
# - UID line
mock_imap.uid.return_value = (
"OK",
[
b"1 FETCH (UID 100 BODY[HEADER] {100}",
b"1 FETCH (BODY[HEADER] {100}",
bytearray(b"From: sender@example.com\r\nTo: recipient@example.com\r\nSubject: Test\r\n\r\n"),
b")",
b" UID 100)",
],
)

Expand Down
165 changes: 159 additions & 6 deletions tests/test_email_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,13 +590,17 @@ class TestBatchFetchHeaders:
async def test_batch_fetch_headers_parses_imap_response(self, email_client):
"""Test that _batch_fetch_headers correctly parses IMAP header responses."""
mock_imap = AsyncMock()
# aioimaplib returns FETCH response in 3 parts:
# - BODY[HEADER] line (no UID)
# - header content as bytearray
# - UID line
mock_imap.uid = AsyncMock(
return_value=(
None,
[
b"1 FETCH (UID 100 BODY[HEADER] {50}",
b"1 FETCH (BODY[HEADER] {50}",
bytearray(b"From: a@test.com\r\nSubject: Test\r\n\r\n"),
b")",
b" UID 100)",
b"FETCH completed",
],
)
Expand All @@ -620,16 +624,17 @@ async def test_batch_fetch_headers_empty_input(self, email_client):
async def test_batch_fetch_headers_preserves_uid_mapping(self, email_client):
"""Test that _batch_fetch_headers returns dict keyed by UID."""
mock_imap = AsyncMock()
# aioimaplib returns each email as 3 items: BODY[HEADER] line, content, UID line
mock_imap.uid = AsyncMock(
return_value=(
None,
[
b"1 FETCH (UID 100 BODY[HEADER] {50}",
b"1 FETCH (BODY[HEADER] {50}",
bytearray(b"From: a@test.com\r\nSubject: First\r\n\r\n"),
b")",
b"2 FETCH (UID 200 BODY[HEADER] {50}",
b" UID 100)",
b"2 FETCH (BODY[HEADER] {50}",
bytearray(b"From: b@test.com\r\nSubject: Second\r\n\r\n"),
b")",
b" UID 200)",
b"FETCH completed",
],
)
Expand All @@ -640,3 +645,151 @@ async def test_batch_fetch_headers_preserves_uid_mapping(self, email_client):
assert len(result) == 2
assert result["100"]["subject"] == "First"
assert result["200"]["subject"] == "Second"

@pytest.mark.asyncio
async def test_batch_fetch_headers_skips_non_bytes_items(self, email_client):
"""Test that _batch_fetch_headers skips non-bytes items in response."""
mock_imap = AsyncMock()
mock_imap.uid = AsyncMock(
return_value=(
None,
[
"not bytes", # Should be skipped
b"1 FETCH (BODY[HEADER] {50}",
bytearray(b"From: a@test.com\r\nSubject: Test\r\n\r\n"),
b" UID 100)",
],
)
)

result = await email_client._batch_fetch_headers(mock_imap, ["100"])

assert "100" in result
assert result["100"]["subject"] == "Test"

@pytest.mark.asyncio
async def test_batch_fetch_headers_skips_items_without_body_header(self, email_client):
"""Test that _batch_fetch_headers skips bytes without BODY[HEADER]."""
mock_imap = AsyncMock()
mock_imap.uid = AsyncMock(
return_value=(
None,
[
b"some other data", # No BODY[HEADER], should be skipped
b"1 FETCH (BODY[HEADER] {50}",
bytearray(b"From: a@test.com\r\nSubject: Test\r\n\r\n"),
b" UID 100)",
],
)
)

result = await email_client._batch_fetch_headers(mock_imap, ["100"])

assert "100" in result
assert result["100"]["subject"] == "Test"

@pytest.mark.asyncio
async def test_batch_fetch_headers_skips_truncated_response(self, email_client):
"""Test that _batch_fetch_headers skips when i+2 >= len(data)."""
mock_imap = AsyncMock()
mock_imap.uid = AsyncMock(
return_value=(
None,
[
b"1 FETCH (BODY[HEADER] {50}",
bytearray(b"From: a@test.com\r\nSubject: Test\r\n\r\n"),
# Missing UID line (i+2 doesn't exist)
],
)
)

result = await email_client._batch_fetch_headers(mock_imap, ["100"])

# Should return empty dict since response is truncated
assert result == {}

@pytest.mark.asyncio
async def test_batch_fetch_headers_skips_non_bytearray_content(self, email_client):
"""Test that _batch_fetch_headers skips when data[i+1] is not bytearray."""
mock_imap = AsyncMock()
mock_imap.uid = AsyncMock(
return_value=(
None,
[
b"1 FETCH (BODY[HEADER] {50}",
b"not a bytearray", # Should be bytearray, not bytes
b" UID 100)",
],
)
)

result = await email_client._batch_fetch_headers(mock_imap, ["100"])

# Should return empty dict since content is not bytearray
assert result == {}

@pytest.mark.asyncio
async def test_batch_fetch_headers_skips_non_bytes_uid_item(self, email_client):
"""Test that _batch_fetch_headers skips when data[i+2] is not bytes."""
mock_imap = AsyncMock()
mock_imap.uid = AsyncMock(
return_value=(
None,
[
b"1 FETCH (BODY[HEADER] {50}",
bytearray(b"From: a@test.com\r\nSubject: Test\r\n\r\n"),
12345, # Not bytes, should result in uid_item = None
],
)
)

result = await email_client._batch_fetch_headers(mock_imap, ["100"])

# Should return empty dict since UID item is not bytes
assert result == {}

@pytest.mark.asyncio
async def test_batch_fetch_headers_skips_missing_uid_in_response(self, email_client):
"""Test that _batch_fetch_headers skips when UID regex doesn't match."""
mock_imap = AsyncMock()
mock_imap.uid = AsyncMock(
return_value=(
None,
[
b"1 FETCH (BODY[HEADER] {50}",
bytearray(b"From: a@test.com\r\nSubject: Test\r\n\r\n"),
b" NO_UID_HERE)", # No UID in this line
],
)
)

result = await email_client._batch_fetch_headers(mock_imap, ["100"])

# Should return empty dict since UID regex doesn't match
assert result == {}

@pytest.mark.asyncio
async def test_batch_fetch_headers_handles_mixed_valid_invalid(self, email_client):
"""Test that _batch_fetch_headers processes valid items and skips invalid ones."""
mock_imap = AsyncMock()
mock_imap.uid = AsyncMock(
return_value=(
None,
[
# Invalid: truncated (no UID line)
b"1 FETCH (BODY[HEADER] {50}",
bytearray(b"From: bad@test.com\r\nSubject: Bad\r\n\r\n"),
# Valid email
b"2 FETCH (BODY[HEADER] {50}",
bytearray(b"From: good@test.com\r\nSubject: Good\r\n\r\n"),
b" UID 200)",
],
)
)

result = await email_client._batch_fetch_headers(mock_imap, ["100", "200"])

# Only the valid email should be in results
assert len(result) == 1
assert "200" in result
assert result["200"]["subject"] == "Good"
Loading