diff --git a/.jules/bolt.md b/.jules/bolt.md index 30c2edd..6b6353c 100644 --- a/.jules/bolt.md +++ b/.jules/bolt.md @@ -58,3 +58,6 @@ ## 2026-02-04 - [Optimize Buffer for Large Downloads] **Learning:** When downloading large files (e.g., blocklists), the default chunk size of HTTP libraries might be small, leading to excessive loop iterations and list operations. Increasing the buffer size (e.g., to 16KB) reduces CPU overhead during I/O-bound operations. **Action:** When using `iter_bytes()` or similar streaming methods for large resources, explicitly set a larger `chunk_size` (e.g., 16384) to improve throughput and reduce CPU usage. +## 2024-03-24 - Thread Pool Churn +**Learning:** Python's `ThreadPoolExecutor` incurs measurable overhead (thread creation/shutdown) when created/destroyed repeatedly inside loops, even with small worker counts. +**Action:** Lift `ThreadPoolExecutor` creation to the highest possible scope and pass it down as a dependency (using `contextlib.nullcontext` for flexible ownership). diff --git a/main.py b/main.py index 1f4f047..e74c6c3 100644 --- a/main.py +++ b/main.py @@ -15,6 +15,7 @@ import argparse import concurrent.futures +import contextlib import getpass import ipaddress import json @@ -1465,6 +1466,7 @@ def push_rules( hostnames: List[str], existing_rules: Set[str], client: httpx.Client, + batch_executor: Optional[concurrent.futures.Executor] = None, ) -> bool: if not hostnames: log.info("Folder %s - no rules to push", sanitize_for_log(folder_name)) @@ -1577,7 +1579,13 @@ def process_batch(batch_idx: int, batch_data: List[str]) -> Optional[List[str]]: progress_label, ) else: - with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: + # Use provided executor or create a local one (fallback) + if batch_executor: + executor_ctx = contextlib.nullcontext(batch_executor) + else: + executor_ctx = concurrent.futures.ThreadPoolExecutor(max_workers=3) + + with executor_ctx as executor: futures = { executor.submit(process_batch, i, batch): i for i, batch in enumerate(batches, 1) @@ -1621,6 +1629,7 @@ def _process_single_folder( profile_id: str, existing_rules: Set[str], client: httpx.Client, + batch_executor: Optional[concurrent.futures.Executor] = None, ) -> bool: grp = folder_data["group"] name = grp["group"].strip() @@ -1649,6 +1658,7 @@ def _process_single_folder( hostnames, existing_rules, client, + batch_executor=batch_executor, ): folder_success = False else: @@ -1662,6 +1672,7 @@ def _process_single_folder( hostnames, existing_rules, client, + batch_executor=batch_executor, ): folder_success = False @@ -1775,9 +1786,11 @@ def _fetch_if_valid(url: str): # This prevents API rate limits and ensures stability for large folders. max_workers = 1 - # Initial client for getting existing state AND processing folders - # Optimization: Reuse the same client session to keep TCP connections alive - with _api_client() as client: + # Shared executor for rate-limited operations (DELETE, push_rules batches) + # Reusing this executor prevents thread churn and enforces global rate limits. + with concurrent.futures.ThreadPoolExecutor( + max_workers=DELETE_WORKERS + ) as shared_executor, _api_client() as client: # Verify access and list existing folders in one request existing_folders = verify_access_and_get_folders(client, profile_id) if existing_folders is None: @@ -1795,30 +1808,27 @@ def _fetch_if_valid(url: str): if folders_to_delete: # Parallel delete to speed up the "clean slate" phase - # Using DELETE_WORKERS (3) for balance between speed and rate limits - with concurrent.futures.ThreadPoolExecutor( - max_workers=DELETE_WORKERS - ) as delete_executor: - future_to_name = { - delete_executor.submit( - delete_folder, client, profile_id, name, folder_id - ): name - for name, folder_id in folders_to_delete - } + # Use shared_executor (3 workers) + future_to_name = { + shared_executor.submit( + delete_folder, client, profile_id, name, folder_id + ): name + for name, folder_id in folders_to_delete + } - for future in concurrent.futures.as_completed(future_to_name): - name = future_to_name[future] - try: - if future.result(): - del existing_folders[name] - deletion_occurred = True - except Exception as exc: - # Sanitize both name and exception to prevent log injection - log.error( - "Failed to delete folder %s: %s", - sanitize_for_log(name), - sanitize_for_log(exc), - ) + for future in concurrent.futures.as_completed(future_to_name): + name = future_to_name[future] + try: + if future.result(): + del existing_folders[name] + deletion_occurred = True + except Exception as exc: + # Sanitize both name and exception to prevent log injection + log.error( + "Failed to delete folder %s: %s", + sanitize_for_log(name), + sanitize_for_log(exc), + ) # CRITICAL FIX: Increased wait time for massive folders to clear if deletion_occurred: @@ -1843,6 +1853,7 @@ def _fetch_if_valid(url: str): profile_id, existing_rules, client, # Pass the persistent client + batch_executor=shared_executor, ): folder_data for folder_data in folder_data_list } diff --git a/tests/test_push_rules_perf.py b/tests/test_push_rules_perf.py index 1019d2e..afd0baf 100644 --- a/tests/test_push_rules_perf.py +++ b/tests/test_push_rules_perf.py @@ -6,6 +6,7 @@ # Add root to path to import main sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +import main class TestPushRulesPerf(unittest.TestCase): def setUp(self): @@ -24,6 +25,7 @@ def setUp(self): self.do = 1 self.status = 1 self.existing_rules = set() + self.main = main @patch("main.concurrent.futures.ThreadPoolExecutor") def test_push_rules_single_batch_optimization(self, mock_executor): @@ -130,5 +132,37 @@ def test_push_rules_skips_validation_for_existing(self): # So match should be called EXACTLY once, with "h2". mock_match.assert_called_once_with("h2") + @patch("main.concurrent.futures.as_completed") + def test_push_rules_uses_provided_executor(self, mock_as_completed): + """ + Test that push_rules uses the provided executor. + """ + # Create > 500 rules (2 batches) + hostnames = [f"example{i}.com" for i in range(600)] + + # Mock the executor passed as argument + mock_executor = MagicMock() + mock_future = MagicMock() + mock_future.result.return_value = ["some_rule"] + mock_executor.submit.return_value = mock_future + + # Mock as_completed to return our futures + mock_as_completed.return_value = [mock_future, mock_future] + + self.main.push_rules( + self.profile_id, + self.folder_name, + self.folder_id, + self.do, + self.status, + hostnames, + self.existing_rules, + self.client, + batch_executor=mock_executor + ) + + # Verify executor.submit was called twice (once for each batch) + self.assertEqual(mock_executor.submit.call_count, 2) + if __name__ == '__main__': unittest.main()