-
Notifications
You must be signed in to change notification settings - Fork 70
Expand file tree
/
Copy pathtest_classic_handler.py
More file actions
477 lines (405 loc) · 17.8 KB
/
test_classic_handler.py
File metadata and controls
477 lines (405 loc) · 17.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
from datetime import datetime, timezone
from unittest.mock import AsyncMock, patch
import pytest
from mcp_email_server.config import EmailServer, EmailSettings
from mcp_email_server.emails.classic import ClassicEmailHandler, EmailClient
from mcp_email_server.emails.models import (
AttachmentDownloadResponse,
EmailBodyResponse,
EmailContentBatchResponse,
EmailMetadata,
EmailMetadataPageResponse,
)
@pytest.fixture
def email_settings():
return EmailSettings(
account_name="test_account",
full_name="Test User",
email_address="test@example.com",
incoming=EmailServer(
user_name="test_user",
password="test_password",
host="imap.example.com",
port=993,
use_ssl=True,
),
outgoing=EmailServer(
user_name="test_user",
password="test_password",
host="smtp.example.com",
port=465,
use_ssl=True,
),
)
@pytest.fixture
def classic_handler(email_settings):
return ClassicEmailHandler(email_settings)
class TestClassicEmailHandler:
def test_init(self, email_settings):
"""Test initialization of ClassicEmailHandler."""
handler = ClassicEmailHandler(email_settings)
assert handler.email_settings == email_settings
assert isinstance(handler.incoming_client, EmailClient)
assert isinstance(handler.outgoing_client, EmailClient)
# Check that clients are initialized correctly
assert handler.incoming_client.email_server == email_settings.incoming
assert handler.outgoing_client.email_server == email_settings.outgoing
assert handler.outgoing_client.sender == f"{email_settings.full_name} <{email_settings.email_address}>"
@pytest.mark.asyncio
async def test_get_emails(self, classic_handler):
"""Test get_emails method."""
# Create test data
now = datetime.now(timezone.utc)
email_data = {
"email_id": "123",
"subject": "Test Subject",
"from": "sender@example.com",
"to": ["recipient@example.com"],
"date": now,
"attachments": [],
}
# Mock the get_emails_stream method to yield our test data
mock_stream = AsyncMock()
mock_stream.__aiter__.return_value = [email_data]
# Mock the get_email_count method
mock_count = AsyncMock(return_value=1)
# Apply the mocks
with patch.object(classic_handler.incoming_client, "get_emails_metadata_stream", return_value=mock_stream):
with patch.object(classic_handler.incoming_client, "get_email_count", mock_count):
# Call the method
result = await classic_handler.get_emails_metadata(
page=1,
page_size=10,
before=now,
since=None,
subject="Test",
from_address="sender@example.com",
to_address=None,
)
# Verify the result
assert isinstance(result, EmailMetadataPageResponse)
assert result.page == 1
assert result.page_size == 10
assert result.before == now
assert result.since is None
assert result.subject == "Test"
assert len(result.emails) == 1
assert isinstance(result.emails[0], EmailMetadata)
assert result.emails[0].subject == "Test Subject"
assert result.emails[0].sender == "sender@example.com"
assert result.emails[0].date == now
assert result.emails[0].attachments == []
assert result.total == 1
# Verify the client methods were called correctly
classic_handler.incoming_client.get_emails_metadata_stream.assert_called_once_with(
1, 10, now, None, "Test", "sender@example.com", None, "desc", "INBOX", None, None, None
)
mock_count.assert_called_once_with(
now,
None,
"Test",
from_address="sender@example.com",
to_address=None,
mailbox="INBOX",
seen=None,
flagged=None,
answered=None,
)
@pytest.mark.asyncio
async def test_get_emails_with_mailbox(self, classic_handler):
"""Test get_emails method with custom mailbox."""
now = datetime.now(timezone.utc)
email_data = {
"email_id": "456",
"subject": "Sent Mail Subject",
"from": "me@example.com",
"to": ["recipient@example.com"],
"date": now,
"attachments": [],
}
mock_stream = AsyncMock()
mock_stream.__aiter__.return_value = [email_data]
mock_count = AsyncMock(return_value=1)
with patch.object(classic_handler.incoming_client, "get_emails_metadata_stream", return_value=mock_stream):
with patch.object(classic_handler.incoming_client, "get_email_count", mock_count):
result = await classic_handler.get_emails_metadata(
page=1,
page_size=10,
mailbox="Sent",
)
assert isinstance(result, EmailMetadataPageResponse)
assert len(result.emails) == 1
# Verify mailbox parameter was passed correctly
classic_handler.incoming_client.get_emails_metadata_stream.assert_called_once_with(
1, 10, None, None, None, None, None, "desc", "Sent", None, None, None
)
mock_count.assert_called_once_with(
None,
None,
None,
from_address=None,
to_address=None,
mailbox="Sent",
seen=None,
flagged=None,
answered=None,
)
@pytest.mark.asyncio
async def test_send_email(self, classic_handler):
"""Test send_email method."""
# Mock the outgoing_client.send_email method
mock_send = AsyncMock()
# Apply the mock
with patch.object(classic_handler.outgoing_client, "send_email", mock_send):
# Call the method
await classic_handler.send_email(
recipients=["recipient@example.com"],
subject="Test Subject",
body="Test Body",
cc=["cc@example.com"],
bcc=["bcc@example.com"],
)
# Verify the client method was called correctly
mock_send.assert_called_once_with(
["recipient@example.com"],
"Test Subject",
"Test Body",
["cc@example.com"],
["bcc@example.com"],
False,
None,
None,
None,
)
@pytest.mark.asyncio
async def test_send_email_with_attachments(self, classic_handler, tmp_path):
"""Test send_email method with attachments."""
# Create a temporary test file
test_file = tmp_path / "test_attachment.txt"
test_file.write_text("This is a test attachment")
# Mock the outgoing_client.send_email method
mock_send = AsyncMock()
# Apply the mock
with patch.object(classic_handler.outgoing_client, "send_email", mock_send):
# Call the method with attachments
await classic_handler.send_email(
recipients=["recipient@example.com"],
subject="Test Subject",
body="Test Body with attachment",
attachments=[str(test_file)],
)
# Verify the client method was called correctly with attachments
mock_send.assert_called_once_with(
["recipient@example.com"],
"Test Subject",
"Test Body with attachment",
None,
None,
False,
[str(test_file)],
None,
None,
)
@pytest.mark.asyncio
async def test_delete_emails(self, classic_handler):
"""Test delete_emails method."""
mock_delete = AsyncMock(return_value=(["123", "456"], []))
with patch.object(classic_handler.incoming_client, "delete_emails", mock_delete):
deleted_ids, failed_ids = await classic_handler.delete_emails(
email_ids=["123", "456"],
mailbox="INBOX",
)
assert deleted_ids == ["123", "456"]
assert failed_ids == []
mock_delete.assert_called_once_with(["123", "456"], "INBOX")
@pytest.mark.asyncio
async def test_delete_emails_with_failures(self, classic_handler):
"""Test delete_emails method with some failures."""
mock_delete = AsyncMock(return_value=(["123"], ["456"]))
with patch.object(classic_handler.incoming_client, "delete_emails", mock_delete):
deleted_ids, failed_ids = await classic_handler.delete_emails(
email_ids=["123", "456"],
mailbox="Trash",
)
assert deleted_ids == ["123"]
assert failed_ids == ["456"]
mock_delete.assert_called_once_with(["123", "456"], "Trash")
@pytest.mark.asyncio
async def test_delete_emails_custom_mailbox(self, classic_handler):
"""Test delete_emails method with custom mailbox."""
mock_delete = AsyncMock(return_value=(["789"], []))
with patch.object(classic_handler.incoming_client, "delete_emails", mock_delete):
deleted_ids, failed_ids = await classic_handler.delete_emails(
email_ids=["789"],
mailbox="Archive",
)
assert deleted_ids == ["789"]
assert failed_ids == []
mock_delete.assert_called_once_with(["789"], "Archive")
@pytest.mark.asyncio
async def test_download_attachment(self, classic_handler, tmp_path):
"""Test download_attachment method."""
save_path = str(tmp_path / "downloaded_attachment.pdf")
mock_result = {
"email_id": "123",
"attachment_name": "document.pdf",
"mime_type": "application/pdf",
"size": 1024,
"saved_path": save_path,
}
mock_download = AsyncMock(return_value=mock_result)
with patch.object(classic_handler.incoming_client, "download_attachment", mock_download):
result = await classic_handler.download_attachment(
email_id="123",
attachment_name="document.pdf",
save_path=save_path,
)
assert isinstance(result, AttachmentDownloadResponse)
assert result.email_id == "123"
assert result.attachment_name == "document.pdf"
assert result.mime_type == "application/pdf"
assert result.size == 1024
assert result.saved_path == save_path
mock_download.assert_called_once_with("123", "document.pdf", save_path, "INBOX")
@pytest.mark.asyncio
async def test_send_email_with_reply_headers(self, classic_handler):
"""Test sending email with reply headers."""
mock_smtp = AsyncMock()
mock_smtp.__aenter__.return_value = mock_smtp
mock_smtp.__aexit__.return_value = None
mock_smtp.login = AsyncMock()
mock_smtp.send_message = AsyncMock()
with patch("aiosmtplib.SMTP", return_value=mock_smtp):
await classic_handler.send_email(
recipients=["recipient@example.com"],
subject="Re: Test",
body="Reply body",
in_reply_to="<original@example.com>",
references="<original@example.com>",
)
call_args = mock_smtp.send_message.call_args
msg = call_args[0][0]
assert msg["In-Reply-To"] == "<original@example.com>"
assert msg["References"] == "<original@example.com>"
@pytest.mark.asyncio
async def test_get_emails_content_includes_message_id(self, classic_handler):
"""Test that get_emails_content returns message_id from parsed email data."""
now = datetime.now(timezone.utc)
email_data = {
"email_id": "123",
"message_id": "<test-message-id@example.com>",
"subject": "Test Subject",
"from": "sender@example.com",
"to": ["recipient@example.com"],
"date": now,
"body": "Test email body",
"attachments": [],
}
# Mock the get_email_body_by_id method to return our test data
mock_get_body = AsyncMock(return_value=email_data)
with patch.object(classic_handler.incoming_client, "get_email_body_by_id", mock_get_body):
result = await classic_handler.get_emails_content(
email_ids=["123"],
mailbox="INBOX",
)
# Verify the result
assert isinstance(result, EmailContentBatchResponse)
assert len(result.emails) == 1
assert isinstance(result.emails[0], EmailBodyResponse)
assert result.emails[0].email_id == "123"
assert result.emails[0].message_id == "<test-message-id@example.com>"
assert result.emails[0].subject == "Test Subject"
assert result.emails[0].sender == "sender@example.com"
assert result.emails[0].body == "Test email body"
# Verify the client method was called correctly
mock_get_body.assert_called_once_with("123", "INBOX")
class TestEmailClientBatchMethods:
"""Test batch fetch methods for performance optimization."""
@pytest.fixture
def email_client(self, email_settings):
return EmailClient(email_settings.incoming)
def test_parse_headers(self, email_client):
"""Test _parse_headers method parses email headers correctly."""
raw_headers = b"""From: sender@example.com
To: recipient@example.com
Cc: cc@example.com
Subject: Test Subject
Date: Mon, 20 Jan 2025 10:30:00 +0000
"""
result = email_client._parse_headers("123", raw_headers)
assert result is not None
assert result["email_id"] == "123"
assert result["subject"] == "Test Subject"
assert result["from"] == "sender@example.com"
assert "recipient@example.com" in result["to"]
assert "cc@example.com" in result["to"]
assert result["attachments"] == []
def test_parse_headers_with_invalid_data(self, email_client):
"""Test _parse_headers handles malformed headers gracefully."""
# Completely broken data that can't be parsed
raw_headers = b"\xff\xfe\x00\x00"
result = email_client._parse_headers("123", raw_headers)
# Should return None or a valid dict with fallback values
# The implementation catches exceptions and returns None
assert result is None or isinstance(result, dict)
def test_parse_headers_missing_date(self, email_client):
"""Test _parse_headers handles missing date with fallback."""
raw_headers = b"""From: sender@example.com
To: recipient@example.com
Subject: No Date Email
"""
result = email_client._parse_headers("123", raw_headers)
assert result is not None
assert result["email_id"] == "123"
assert result["date"] is not None # Should have fallback to now()
@pytest.mark.asyncio
async def test_batch_fetch_dates_empty_list(self, email_client):
"""Test _batch_fetch_dates with empty list returns empty dict."""
mock_imap = AsyncMock()
result = await email_client._batch_fetch_dates(mock_imap, [])
assert result == {}
mock_imap.uid.assert_not_called()
@pytest.mark.asyncio
async def test_batch_fetch_headers_empty_list(self, email_client):
"""Test _batch_fetch_headers with empty list returns empty dict."""
mock_imap = AsyncMock()
result = await email_client._batch_fetch_headers(mock_imap, [])
assert result == {}
mock_imap.uid.assert_not_called()
@pytest.mark.asyncio
async def test_batch_fetch_dates_parses_response(self, email_client):
"""Test _batch_fetch_dates correctly parses IMAP INTERNALDATE response."""
mock_imap = AsyncMock()
# Simulate IMAP response format for INTERNALDATE
mock_imap.uid.return_value = (
"OK",
[
b'1 FETCH (UID 100 INTERNALDATE "20-Jan-2025 10:30:00 +0000")',
b'2 FETCH (UID 101 INTERNALDATE "21-Jan-2025 11:00:00 +0000")',
],
)
result = await email_client._batch_fetch_dates(mock_imap, [b"100", b"101"])
assert "100" in result
assert "101" in result
assert result["100"].day == 20
assert result["101"].day == 21
@pytest.mark.asyncio
async def test_batch_fetch_headers_parses_response(self, email_client):
"""Test _batch_fetch_headers correctly parses IMAP BODY[HEADER] response."""
mock_imap = AsyncMock()
# aioimaplib returns FETCH response in 3 parts:
# - BODY[HEADER] line (no UID)
# - header content as bytearray
# - UID line
mock_imap.uid.return_value = (
"OK",
[
b"1 FETCH (BODY[HEADER] {100}",
bytearray(b"From: sender@example.com\r\nTo: recipient@example.com\r\nSubject: Test\r\n\r\n"),
b" UID 100)",
],
)
result = await email_client._batch_fetch_headers(mock_imap, ["100"])
assert "100" in result
assert result["100"]["subject"] == "Test"
assert result["100"]["from"] == "sender@example.com"