1+ import asyncio
12import email .utils
23import mimetypes
34import re
@@ -268,52 +269,74 @@ def _parse_headers(self, email_id: str, raw_headers: bytes) -> dict[str, Any] |
268269 logger .error (f"Error parsing email headers: { e !s} " )
269270 return None
270271
272+ async def _fetch_dates_chunk (
273+ self ,
274+ imap : aioimaplib .IMAP4_SSL | aioimaplib .IMAP4 ,
275+ chunk : list [bytes ],
276+ chunk_num : int ,
277+ total_chunks : int ,
278+ ) -> dict [str , datetime ]:
279+ """Fetch INTERNALDATE for a single chunk of UIDs."""
280+ uid_list = "," .join (uid .decode () for uid in chunk )
281+ chunk_start = time .perf_counter ()
282+ _ , data = await imap .uid ("fetch" , uid_list , "(INTERNALDATE)" )
283+ chunk_elapsed = time .perf_counter () - chunk_start
284+
285+ chunk_dates : dict [str , datetime ] = {}
286+ for item in data :
287+ if not isinstance (item , bytes ) or b"INTERNALDATE" not in item :
288+ continue
289+ uid_match = re .search (rb"UID (\d+)" , item )
290+ date_match = re .search (rb'INTERNALDATE "([^"]+)"' , item )
291+ if uid_match and date_match :
292+ uid = uid_match .group (1 ).decode ()
293+ date_str = date_match .group (1 ).decode ().strip ()
294+ chunk_dates [uid ] = datetime .strptime (date_str , "%d-%b-%Y %H:%M:%S %z" )
295+
296+ if total_chunks > 1 :
297+ logger .info (f"Fetched dates chunk { chunk_num } /{ total_chunks } : { len (chunk )} UIDs in { chunk_elapsed :.2f} s" )
298+
299+ return chunk_dates
300+
271301 async def _batch_fetch_dates (
272302 self ,
273303 imap : aioimaplib .IMAP4_SSL | aioimaplib .IMAP4 ,
274304 email_ids : list [bytes ],
275305 chunk_size : int = 5000 ,
276306 ) -> dict [str , datetime ]:
277- """Batch fetch INTERNALDATE for all UIDs in chunks."""
307+ """Batch fetch INTERNALDATE for all UIDs in parallel chunks."""
278308 if not email_ids :
279309 return {}
280310
311+ # Split into chunks
312+ chunks = [email_ids [i : i + chunk_size ] for i in range (0 , len (email_ids ), chunk_size )]
313+ total_chunks = len (chunks )
314+
315+ # Fetch all chunks in parallel
316+ tasks = [
317+ self ._fetch_dates_chunk (imap , chunk , chunk_num , total_chunks ) for chunk_num , chunk in enumerate (chunks , 1 )
318+ ]
319+ results = await asyncio .gather (* tasks )
320+
321+ # Merge results
281322 uid_dates : dict [str , datetime ] = {}
282- total_chunks = (len (email_ids ) + chunk_size - 1 ) // chunk_size
283- for chunk_num , i in enumerate (range (0 , len (email_ids ), chunk_size ), 1 ):
284- chunk = email_ids [i : i + chunk_size ]
285- uid_list = "," .join (uid .decode () for uid in chunk )
286- chunk_start = time .perf_counter ()
287- _ , data = await imap .uid ("fetch" , uid_list , "(INTERNALDATE)" )
288- chunk_elapsed = time .perf_counter () - chunk_start
289-
290- for item in data :
291- if not isinstance (item , bytes ) or b"INTERNALDATE" not in item :
292- continue
293- uid_match = re .search (rb"UID (\d+)" , item )
294- date_match = re .search (rb'INTERNALDATE "([^"]+)"' , item )
295- if uid_match and date_match :
296- uid = uid_match .group (1 ).decode ()
297- date_str = date_match .group (1 ).decode ().strip ()
298- uid_dates [uid ] = datetime .strptime (date_str , "%d-%b-%Y %H:%M:%S %z" )
299-
300- if total_chunks > 1 :
301- logger .info (
302- f"Fetched dates chunk { chunk_num } /{ total_chunks } : { len (chunk )} UIDs in { chunk_elapsed :.2f} s"
303- )
323+ for chunk_dates in results :
324+ uid_dates .update (chunk_dates )
304325
305326 return uid_dates
306327
307328 async def _batch_fetch_headers (
308329 self ,
309330 imap : aioimaplib .IMAP4_SSL | aioimaplib .IMAP4 ,
310- email_ids : list [str ],
331+ email_ids : list [bytes ] | list [ str ],
311332 ) -> dict [str , dict [str , Any ]]:
312333 """Batch fetch headers for a list of UIDs."""
313334 if not email_ids :
314335 return {}
315336
316- uid_list = "," .join (email_ids )
337+ # Normalize to list of strings
338+ str_ids = [uid .decode () if isinstance (uid , bytes ) else uid for uid in email_ids ]
339+ uid_list = "," .join (str_ids )
317340 _ , data = await imap .uid ("fetch" , uid_list , "BODY.PEEK[HEADER]" )
318341
319342 results : dict [str , dict [str , Any ]] = {}
@@ -390,8 +413,11 @@ async def get_emails_metadata_stream(
390413 ) -> AsyncGenerator [dict [str , Any ], None ]:
391414 imap = self .imap_class (self .email_server .host , self .email_server .port )
392415 try :
416+ # Wait for the connection to be established
393417 await imap ._client_task
394418 await imap .wait_hello_from_server ()
419+
420+ # Login and select mailbox
395421 await imap .login (self .email_server .user_name , self .email_server .password )
396422 await _send_imap_id (imap )
397423 await imap .select (_quote_mailbox (mailbox ))
@@ -408,19 +434,21 @@ async def get_emails_metadata_stream(
408434 )
409435 logger .info (f"Get metadata: Search criteria: { search_criteria } " )
410436
437+ # Search for messages - use UID SEARCH for better compatibility
411438 _ , messages = await imap .uid_search (* search_criteria )
412439
440+ # Handle empty or None responses
413441 if not messages or not messages [0 ]:
414442 logger .warning ("No messages returned from search" )
415443 return
416444
417445 email_ids = messages [0 ].split ()
418446 logger .info (f"Found { len (email_ids )} email IDs" )
419447
420- # Phase 1: Batch fetch INTERNALDATE for sorting
421- t0 = time .perf_counter ()
448+ # Phase 1: Batch fetch INTERNALDATE for sorting (parallel chunks)
449+ fetch_dates_start = time .perf_counter ()
422450 uid_dates = await self ._batch_fetch_dates (imap , email_ids )
423- t1 = time .perf_counter ()
451+ fetch_dates_elapsed = time .perf_counter () - fetch_dates_start
424452
425453 # Sort by INTERNALDATE
426454 sorted_uids = sorted (uid_dates .items (), key = lambda x : x [1 ], reverse = (order == "desc" ))
@@ -430,16 +458,17 @@ async def get_emails_metadata_stream(
430458 page_uids = [uid for uid , _ in sorted_uids [start : start + page_size ]]
431459
432460 if not page_uids :
433- logger .info (f"Phase 1 (dates): { len (uid_dates )} UIDs in { t1 - t0 :.2f} s, page { page } empty" )
461+ logger .info (f"Phase 1 (dates): { len (uid_dates )} UIDs in { fetch_dates_elapsed :.2f} s, page { page } empty" )
434462 return
435463
436464 # Phase 2: Batch fetch headers for requested page only
465+ fetch_headers_start = time .perf_counter ()
437466 metadata_by_uid = await self ._batch_fetch_headers (imap , page_uids )
438- t2 = time .perf_counter ()
467+ fetch_headers_elapsed = time .perf_counter () - fetch_headers_start
439468
440469 logger .info (
441- f"Fetched page { page } : { t1 - t0 :.2f} s dates ({ len (uid_dates )} UIDs), "
442- f"{ t2 - t1 :.2f} s headers ({ len (page_uids )} UIDs)"
470+ f"Fetched page { page } : { fetch_dates_elapsed :.2f} s dates ({ len (uid_dates )} UIDs), "
471+ f"{ fetch_headers_elapsed :.2f} s headers ({ len (page_uids )} UIDs)"
443472 )
444473
445474 # Yield in sorted order
0 commit comments