1+ import asyncio
12import email .utils
23import mimetypes
4+ import re
35import ssl
6+ import time
47from collections .abc import AsyncGenerator
58from datetime import datetime , timezone
69from email .header import Header
@@ -225,6 +228,131 @@ def _build_search_criteria(
225228
226229 return search_criteria or ["ALL" ]
227230
231+ def _parse_headers (self , email_id : str , raw_headers : bytes ) -> dict [str , Any ] | None :
232+ """Parse raw email headers into metadata dictionary."""
233+ try :
234+ parser = BytesParser (policy = default )
235+ email_message = parser .parsebytes (raw_headers )
236+
237+ subject = email_message .get ("Subject" , "" )
238+ sender = email_message .get ("From" , "" )
239+ date_str = email_message .get ("Date" , "" )
240+
241+ to_addresses = []
242+ to_header = email_message .get ("To" , "" )
243+ if to_header :
244+ to_addresses = [addr .strip () for addr in to_header .split ("," )]
245+
246+ cc_header = email_message .get ("Cc" , "" )
247+ if cc_header :
248+ to_addresses .extend ([addr .strip () for addr in cc_header .split ("," )])
249+
250+ try :
251+ date_tuple = email .utils .parsedate_tz (date_str )
252+ date = (
253+ datetime .fromtimestamp (email .utils .mktime_tz (date_tuple ), tz = timezone .utc )
254+ if date_tuple
255+ else datetime .now (timezone .utc )
256+ )
257+ except Exception :
258+ date = datetime .now (timezone .utc )
259+
260+ return {
261+ "email_id" : email_id ,
262+ "subject" : subject ,
263+ "from" : sender ,
264+ "to" : to_addresses ,
265+ "date" : date ,
266+ "attachments" : [],
267+ }
268+ except Exception as e :
269+ logger .error (f"Error parsing email headers: { e !s} " )
270+ return None
271+
272+ async def _fetch_dates_chunk (
273+ self ,
274+ imap : aioimaplib .IMAP4_SSL | aioimaplib .IMAP4 ,
275+ chunk : list [bytes ],
276+ chunk_num : int ,
277+ total_chunks : int ,
278+ ) -> dict [str , datetime ]:
279+ """Fetch INTERNALDATE for a single chunk of UIDs."""
280+ uid_list = "," .join (uid .decode () for uid in chunk )
281+ chunk_start = time .perf_counter ()
282+ _ , data = await imap .uid ("fetch" , uid_list , "(INTERNALDATE)" )
283+ chunk_elapsed = time .perf_counter () - chunk_start
284+
285+ chunk_dates : dict [str , datetime ] = {}
286+ for item in data :
287+ if not isinstance (item , bytes ) or b"INTERNALDATE" not in item :
288+ continue
289+ uid_match = re .search (rb"UID (\d+)" , item )
290+ date_match = re .search (rb'INTERNALDATE "([^"]+)"' , item )
291+ if uid_match and date_match :
292+ uid = uid_match .group (1 ).decode ()
293+ date_str = date_match .group (1 ).decode ().strip ()
294+ chunk_dates [uid ] = datetime .strptime (date_str , "%d-%b-%Y %H:%M:%S %z" )
295+
296+ if total_chunks > 1 :
297+ logger .info (f"Fetched dates chunk { chunk_num } /{ total_chunks } : { len (chunk )} UIDs in { chunk_elapsed :.2f} s" )
298+
299+ return chunk_dates
300+
301+ async def _batch_fetch_dates (
302+ self ,
303+ imap : aioimaplib .IMAP4_SSL | aioimaplib .IMAP4 ,
304+ email_ids : list [bytes ],
305+ chunk_size : int = 5000 ,
306+ ) -> dict [str , datetime ]:
307+ """Batch fetch INTERNALDATE for all UIDs in parallel chunks."""
308+ if not email_ids :
309+ return {}
310+
311+ # Split into chunks
312+ chunks = [email_ids [i : i + chunk_size ] for i in range (0 , len (email_ids ), chunk_size )]
313+ total_chunks = len (chunks )
314+
315+ # Fetch all chunks in parallel
316+ tasks = [
317+ self ._fetch_dates_chunk (imap , chunk , chunk_num , total_chunks ) for chunk_num , chunk in enumerate (chunks , 1 )
318+ ]
319+ results = await asyncio .gather (* tasks )
320+
321+ # Merge results
322+ uid_dates : dict [str , datetime ] = {}
323+ for chunk_dates in results :
324+ uid_dates .update (chunk_dates )
325+
326+ return uid_dates
327+
328+ async def _batch_fetch_headers (
329+ self ,
330+ imap : aioimaplib .IMAP4_SSL | aioimaplib .IMAP4 ,
331+ email_ids : list [bytes ] | list [str ],
332+ ) -> dict [str , dict [str , Any ]]:
333+ """Batch fetch headers for a list of UIDs."""
334+ if not email_ids :
335+ return {}
336+
337+ # Normalize to list of strings
338+ str_ids = [uid .decode () if isinstance (uid , bytes ) else uid for uid in email_ids ]
339+ uid_list = "," .join (str_ids )
340+ _ , data = await imap .uid ("fetch" , uid_list , "BODY.PEEK[HEADER]" )
341+
342+ results : dict [str , dict [str , Any ]] = {}
343+ for i , item in enumerate (data ):
344+ if not isinstance (item , bytes ) or b"BODY[HEADER]" not in item :
345+ continue
346+ uid_match = re .search (rb"UID (\d+)" , item )
347+ if uid_match and i + 1 < len (data ) and isinstance (data [i + 1 ], bytearray ):
348+ uid = uid_match .group (1 ).decode ()
349+ raw_headers = bytes (data [i + 1 ])
350+ metadata = self ._parse_headers (uid , raw_headers )
351+ if metadata :
352+ results [uid ] = metadata
353+
354+ return results
355+
228356 async def get_email_count (
229357 self ,
230358 before : datetime | None = None ,
@@ -268,7 +396,7 @@ async def get_email_count(
268396 except Exception as e :
269397 logger .info (f"Error during logout: { e } " )
270398
271- async def get_emails_metadata_stream ( # noqa: C901
399+ async def get_emails_metadata_stream (
272400 self ,
273401 page : int = 1 ,
274402 page_size : int = 10 ,
@@ -289,7 +417,7 @@ async def get_emails_metadata_stream( # noqa: C901
289417 await imap ._client_task
290418 await imap .wait_hello_from_server ()
291419
292- # Login and select inbox
420+ # Login and select mailbox
293421 await imap .login (self .email_server .user_name , self .email_server .password )
294422 await _send_imap_id (imap )
295423 await imap .select (_quote_mailbox (mailbox ))
@@ -312,103 +440,42 @@ async def get_emails_metadata_stream( # noqa: C901
312440 # Handle empty or None responses
313441 if not messages or not messages [0 ]:
314442 logger .warning ("No messages returned from search" )
315- email_ids = []
316- else :
317- email_ids = messages [0 ].split ()
318- logger .info (f"Found { len (email_ids )} email IDs" )
319- # Fetch metadata for all emails first, then sort by date
320- # (UID order doesn't guarantee chronological order on all IMAP servers)
321- all_metadata : list [dict [str , Any ]] = []
322-
323- # Fetch each message's metadata only
324- for _ , email_id in enumerate (email_ids ):
325- try :
326- # Convert email_id from bytes to string
327- email_id_str = email_id .decode ("utf-8" )
443+ return
328444
329- # Fetch only headers to get metadata without body
330- _ , data = await imap . uid ( "fetch" , email_id_str , "BODY.PEEK[HEADER] " )
445+ email_ids = messages [ 0 ]. split ()
446+ logger . info ( f"Found { len ( email_ids ) } email IDs " )
331447
332- if not data :
333- logger .error (f"Failed to fetch headers for UID { email_id_str } " )
334- continue
448+ # Phase 1: Batch fetch INTERNALDATE for sorting (parallel chunks)
449+ fetch_dates_start = time .perf_counter ()
450+ uid_dates = await self ._batch_fetch_dates (imap , email_ids )
451+ fetch_dates_elapsed = time .perf_counter () - fetch_dates_start
335452
336- # Find the email headers in the response
337- raw_headers = None
338- if len (data ) > 1 and isinstance (data [1 ], bytearray ):
339- raw_headers = bytes (data [1 ])
340- else :
341- # Search through all items for header content
342- for item in data :
343- if isinstance (item , bytes | bytearray ) and len (item ) > 10 :
344- # Skip IMAP protocol responses
345- if isinstance (item , bytes ) and b"FETCH" in item :
346- continue
347- # This is likely the header content
348- raw_headers = bytes (item ) if isinstance (item , bytearray ) else item
349- break
350-
351- if raw_headers :
352- try :
353- # Parse headers only
354- parser = BytesParser (policy = default )
355- email_message = parser .parsebytes (raw_headers )
356-
357- # Extract metadata
358- subject = email_message .get ("Subject" , "" )
359- sender = email_message .get ("From" , "" )
360- date_str = email_message .get ("Date" , "" )
361-
362- # Extract recipients
363- to_addresses = []
364- to_header = email_message .get ("To" , "" )
365- if to_header :
366- to_addresses = [addr .strip () for addr in to_header .split ("," )]
367-
368- cc_header = email_message .get ("Cc" , "" )
369- if cc_header :
370- to_addresses .extend ([addr .strip () for addr in cc_header .split ("," )])
371-
372- # Parse date
373- try :
374- date_tuple = email .utils .parsedate_tz (date_str )
375- date = (
376- datetime .fromtimestamp (email .utils .mktime_tz (date_tuple ), tz = timezone .utc )
377- if date_tuple
378- else datetime .now (timezone .utc )
379- )
380- except Exception :
381- date = datetime .now (timezone .utc )
382-
383- # For metadata, we don't fetch attachments to save bandwidth
384- # We'll mark it as unknown for now
385- metadata = {
386- "email_id" : email_id_str ,
387- "subject" : subject ,
388- "from" : sender ,
389- "to" : to_addresses ,
390- "date" : date ,
391- "attachments" : [], # We don't fetch attachment info for metadata
392- }
393- all_metadata .append (metadata )
394- except Exception as e :
395- # Log error but continue with other emails
396- logger .error (f"Error parsing email metadata: { e !s} " )
397- else :
398- logger .error (f"Could not find header data in response for email ID: { email_id_str } " )
399- except Exception as e :
400- logger .error (f"Error fetching email metadata { email_id } : { e !s} " )
453+ # Sort by INTERNALDATE
454+ sorted_uids = sorted (uid_dates .items (), key = lambda x : x [1 ], reverse = (order == "desc" ))
401455
402- # Sort by date (desc = newest first, asc = oldest first)
403- all_metadata .sort (key = lambda x : x ["date" ], reverse = (order == "desc" ))
404-
405- # Apply pagination after sorting
456+ # Paginate
406457 start = (page - 1 ) * page_size
407- end = start + page_size
408- for metadata in all_metadata [start :end ]:
409- yield metadata
458+ page_uids = [uid for uid , _ in sorted_uids [start : start + page_size ]]
459+
460+ if not page_uids :
461+ logger .info (f"Phase 1 (dates): { len (uid_dates )} UIDs in { fetch_dates_elapsed :.2f} s, page { page } empty" )
462+ return
463+
464+ # Phase 2: Batch fetch headers for requested page only
465+ fetch_headers_start = time .perf_counter ()
466+ metadata_by_uid = await self ._batch_fetch_headers (imap , page_uids )
467+ fetch_headers_elapsed = time .perf_counter () - fetch_headers_start
468+
469+ logger .info (
470+ f"Fetched page { page } : { fetch_dates_elapsed :.2f} s dates ({ len (uid_dates )} UIDs), "
471+ f"{ fetch_headers_elapsed :.2f} s headers ({ len (page_uids )} UIDs)"
472+ )
473+
474+ # Yield in sorted order
475+ for uid in page_uids :
476+ if uid in metadata_by_uid :
477+ yield metadata_by_uid [uid ]
410478 finally :
411- # Ensure we logout properly
412479 try :
413480 await imap .logout ()
414481 except Exception as e :
0 commit comments