From d0a924f729f77d0d97ef5779dd13f6319e9260f9 Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Mon, 16 Feb 2026 15:06:34 +0000 Subject: [PATCH] =?UTF-8?q?=E2=9A=A1=20Bolt:=20Reuse=20ThreadPoolExecutor?= =?UTF-8?q?=20to=20reduce=20thread=20churn?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Lifted `ThreadPoolExecutor` creation from `push_rules` to `sync_profile`. - Passed `batch_executor` down to `push_rules`. - Used `contextlib.nullcontext` to allow flexible executor ownership. - Reused the shared executor for folder deletion phase. - Updated tests to verify executor reuse. Impact: Reduces thread creation/destruction overhead per folder, improving sync performance especially for large numbers of folders. Co-authored-by: abhimehro <84992105+abhimehro@users.noreply.github.com> --- .jules/bolt.md | 3 ++ main.py | 65 ++++++++++++++++++++--------------- tests/test_push_rules_perf.py | 47 +++++++++++++++++++------ 3 files changed, 78 insertions(+), 37 deletions(-) diff --git a/.jules/bolt.md b/.jules/bolt.md index 30c2edd..6b6353c 100644 --- a/.jules/bolt.md +++ b/.jules/bolt.md @@ -58,3 +58,6 @@ ## 2026-02-04 - [Optimize Buffer for Large Downloads] **Learning:** When downloading large files (e.g., blocklists), the default chunk size of HTTP libraries might be small, leading to excessive loop iterations and list operations. Increasing the buffer size (e.g., to 16KB) reduces CPU overhead during I/O-bound operations. **Action:** When using `iter_bytes()` or similar streaming methods for large resources, explicitly set a larger `chunk_size` (e.g., 16384) to improve throughput and reduce CPU usage. +## 2024-03-24 - Thread Pool Churn +**Learning:** Python's `ThreadPoolExecutor` incurs measurable overhead (thread creation/shutdown) when created/destroyed repeatedly inside loops, even with small worker counts. +**Action:** Lift `ThreadPoolExecutor` creation to the highest possible scope and pass it down as a dependency (using `contextlib.nullcontext` for flexible ownership). diff --git a/main.py b/main.py index c5d63be..492de4c 100644 --- a/main.py +++ b/main.py @@ -15,6 +15,7 @@ import argparse import concurrent.futures +import contextlib import getpass import ipaddress import json @@ -1459,6 +1460,7 @@ def push_rules( hostnames: List[str], existing_rules: Set[str], client: httpx.Client, + batch_executor: Optional[concurrent.futures.Executor] = None, ) -> bool: if not hostnames: log.info("Folder %s - no rules to push", sanitize_for_log(folder_name)) @@ -1571,7 +1573,13 @@ def process_batch(batch_idx: int, batch_data: List[str]) -> Optional[List[str]]: progress_label, ) else: - with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: + # Use provided executor or create a local one (fallback) + if batch_executor: + executor_ctx = contextlib.nullcontext(batch_executor) + else: + executor_ctx = concurrent.futures.ThreadPoolExecutor(max_workers=3) + + with executor_ctx as executor: futures = { executor.submit(process_batch, i, batch): i for i, batch in enumerate(batches, 1) @@ -1615,6 +1623,7 @@ def _process_single_folder( profile_id: str, existing_rules: Set[str], client: httpx.Client, + batch_executor: Optional[concurrent.futures.Executor] = None, ) -> bool: grp = folder_data["group"] name = grp["group"].strip() @@ -1643,6 +1652,7 @@ def _process_single_folder( hostnames, existing_rules, client, + batch_executor=batch_executor, ): folder_success = False else: @@ -1656,6 +1666,7 @@ def _process_single_folder( hostnames, existing_rules, client, + batch_executor=batch_executor, ): folder_success = False @@ -1769,9 +1780,11 @@ def _fetch_if_valid(url: str): # This prevents API rate limits and ensures stability for large folders. max_workers = 1 - # Initial client for getting existing state AND processing folders - # Optimization: Reuse the same client session to keep TCP connections alive - with _api_client() as client: + # Shared executor for rate-limited operations (DELETE, push_rules batches) + # Reusing this executor prevents thread churn and enforces global rate limits. + with concurrent.futures.ThreadPoolExecutor( + max_workers=DELETE_WORKERS + ) as shared_executor, _api_client() as client: # Verify access and list existing folders in one request existing_folders = verify_access_and_get_folders(client, profile_id) if existing_folders is None: @@ -1789,30 +1802,27 @@ def _fetch_if_valid(url: str): if folders_to_delete: # Parallel delete to speed up the "clean slate" phase - # Using DELETE_WORKERS (3) for balance between speed and rate limits - with concurrent.futures.ThreadPoolExecutor( - max_workers=DELETE_WORKERS - ) as delete_executor: - future_to_name = { - delete_executor.submit( - delete_folder, client, profile_id, name, folder_id - ): name - for name, folder_id in folders_to_delete - } + # Use shared_executor (3 workers) + future_to_name = { + shared_executor.submit( + delete_folder, client, profile_id, name, folder_id + ): name + for name, folder_id in folders_to_delete + } - for future in concurrent.futures.as_completed(future_to_name): - name = future_to_name[future] - try: - if future.result(): - del existing_folders[name] - deletion_occurred = True - except Exception as exc: - # Sanitize both name and exception to prevent log injection - log.error( - "Failed to delete folder %s: %s", - sanitize_for_log(name), - sanitize_for_log(exc), - ) + for future in concurrent.futures.as_completed(future_to_name): + name = future_to_name[future] + try: + if future.result(): + del existing_folders[name] + deletion_occurred = True + except Exception as exc: + # Sanitize both name and exception to prevent log injection + log.error( + "Failed to delete folder %s: %s", + sanitize_for_log(name), + sanitize_for_log(exc), + ) # CRITICAL FIX: Increased wait time for massive folders to clear if deletion_occurred: @@ -1837,6 +1847,7 @@ def _fetch_if_valid(url: str): profile_id, existing_rules, client, # Pass the persistent client + batch_executor=shared_executor, ): folder_data for folder_data in folder_data_list } diff --git a/tests/test_push_rules_perf.py b/tests/test_push_rules_perf.py index 120bc29..650ce25 100644 --- a/tests/test_push_rules_perf.py +++ b/tests/test_push_rules_perf.py @@ -6,6 +6,7 @@ # Add root to path to import main sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +import main class TestPushRulesPerf(unittest.TestCase): def setUp(self): @@ -13,6 +14,8 @@ def setUp(self): global main if 'main' in sys.modules: main = sys.modules['main'] + else: + import main self.client = MagicMock() self.profile_id = "test_profile" @@ -21,6 +24,7 @@ def setUp(self): self.do = 1 self.status = 1 self.existing_rules = set() + self.main = main @patch("main.concurrent.futures.as_completed") @patch("main.concurrent.futures.ThreadPoolExecutor") @@ -44,15 +48,6 @@ def test_push_rules_single_batch_optimization(self, mock_executor, mock_as_compl # Mock as_completed to yield the future immediately mock_as_completed.return_value = [mock_future] - # Since we are bypassing TPE, we might need to mock API call? - # The code will call process_batch(1, batch). - # process_batch calls _api_post_form. - # client is mocked, so _api_post_form works (retries mocked). - # But we need to ensure process_batch works correctly in isolation. - - # For this test, we mock _api_post_form? - # No, _api_post_form calls client.post. - self.main.push_rules( self.profile_id, self.folder_name, @@ -114,7 +109,7 @@ def test_push_rules_skips_validation_for_existing(self, mock_rule_pattern): # h1 is already known, h2 is new existing_rules = {"h1"} - main.push_rules( + self.main.push_rules( self.profile_id, self.folder_name, self.folder_id, @@ -130,5 +125,37 @@ def test_push_rules_skips_validation_for_existing(self, mock_rule_pattern): # So match should be called EXACTLY once, with "h2". mock_match.assert_called_once_with("h2") + @patch("main.concurrent.futures.as_completed") + def test_push_rules_uses_provided_executor(self, mock_as_completed): + """ + Test that push_rules uses the provided executor. + """ + # Create > 500 rules (2 batches) + hostnames = [f"example{i}.com" for i in range(600)] + + # Mock the executor passed as argument + mock_executor = MagicMock() + mock_future = MagicMock() + mock_future.result.return_value = ["some_rule"] + mock_executor.submit.return_value = mock_future + + # Mock as_completed to return our futures + mock_as_completed.return_value = [mock_future, mock_future] + + self.main.push_rules( + self.profile_id, + self.folder_name, + self.folder_id, + self.do, + self.status, + hostnames, + self.existing_rules, + self.client, + batch_executor=mock_executor + ) + + # Verify executor.submit was called twice (once for each batch) + self.assertEqual(mock_executor.submit.call_count, 2) + if __name__ == '__main__': unittest.main()