Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .jules/bolt.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,6 @@
## 2026-02-04 - [Optimize Buffer for Large Downloads]
**Learning:** When downloading large files (e.g., blocklists), the default chunk size of HTTP libraries might be small, leading to excessive loop iterations and list operations. Increasing the buffer size (e.g., to 16KB) reduces CPU overhead during I/O-bound operations.
**Action:** When using `iter_bytes()` or similar streaming methods for large resources, explicitly set a larger `chunk_size` (e.g., 16384) to improve throughput and reduce CPU usage.
## 2024-03-24 - Thread Pool Churn
**Learning:** Python's `ThreadPoolExecutor` incurs measurable overhead (thread creation/shutdown) when created/destroyed repeatedly inside loops, even with small worker counts.
**Action:** Lift `ThreadPoolExecutor` creation to the highest possible scope and pass it down as a dependency (using `contextlib.nullcontext` for flexible ownership).
65 changes: 38 additions & 27 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import argparse
import concurrent.futures
import contextlib
import getpass
import ipaddress
import json
Expand Down Expand Up @@ -1465,6 +1466,7 @@ def push_rules(
hostnames: List[str],
existing_rules: Set[str],
client: httpx.Client,
batch_executor: Optional[concurrent.futures.Executor] = None,
) -> bool:
if not hostnames:
log.info("Folder %s - no rules to push", sanitize_for_log(folder_name))
Expand Down Expand Up @@ -1577,7 +1579,13 @@ def process_batch(batch_idx: int, batch_data: List[str]) -> Optional[List[str]]:
progress_label,
)
else:
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# Use provided executor or create a local one (fallback)
if batch_executor:
executor_ctx = contextlib.nullcontext(batch_executor)
else:
executor_ctx = concurrent.futures.ThreadPoolExecutor(max_workers=3)

with executor_ctx as executor:
futures = {
executor.submit(process_batch, i, batch): i
for i, batch in enumerate(batches, 1)
Comment on lines +1588 to 1591
Copy link

Copilot AI Feb 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The existing_rules set is being updated concurrently by multiple threads without synchronization. When batch_executor is provided (as done in this PR), multiple batch workers run in parallel and all call existing_rules.update(result) at lines 1592 within the loop. While CPython's GIL provides some protection, set.update() is not guaranteed to be atomic and can lead to race conditions or data corruption when called from multiple threads simultaneously. Consider using a lock (threading.Lock) around the existing_rules.update() calls or use a thread-safe data structure.

Copilot uses AI. Check for mistakes.
Expand Down Expand Up @@ -1621,6 +1629,7 @@ def _process_single_folder(
profile_id: str,
existing_rules: Set[str],
client: httpx.Client,
batch_executor: Optional[concurrent.futures.Executor] = None,
) -> bool:
grp = folder_data["group"]
name = grp["group"].strip()
Expand Down Expand Up @@ -1649,6 +1658,7 @@ def _process_single_folder(
hostnames,
existing_rules,
client,
batch_executor=batch_executor,
):
folder_success = False
else:
Expand All @@ -1662,6 +1672,7 @@ def _process_single_folder(
hostnames,
existing_rules,
client,
batch_executor=batch_executor,
):
folder_success = False

Expand Down Expand Up @@ -1775,9 +1786,11 @@ def _fetch_if_valid(url: str):
# This prevents API rate limits and ensures stability for large folders.
max_workers = 1

# Initial client for getting existing state AND processing folders
# Optimization: Reuse the same client session to keep TCP connections alive
with _api_client() as client:
# Shared executor for rate-limited operations (DELETE, push_rules batches)
# Reusing this executor prevents thread churn and enforces global rate limits.
with concurrent.futures.ThreadPoolExecutor(
max_workers=DELETE_WORKERS
) as shared_executor, _api_client() as client:
# Verify access and list existing folders in one request
existing_folders = verify_access_and_get_folders(client, profile_id)
if existing_folders is None:
Expand All @@ -1795,30 +1808,27 @@ def _fetch_if_valid(url: str):

if folders_to_delete:
# Parallel delete to speed up the "clean slate" phase
# Using DELETE_WORKERS (3) for balance between speed and rate limits
with concurrent.futures.ThreadPoolExecutor(
max_workers=DELETE_WORKERS
) as delete_executor:
future_to_name = {
delete_executor.submit(
delete_folder, client, profile_id, name, folder_id
): name
for name, folder_id in folders_to_delete
}
# Use shared_executor (3 workers)
future_to_name = {
shared_executor.submit(
delete_folder, client, profile_id, name, folder_id
): name
for name, folder_id in folders_to_delete
}

for future in concurrent.futures.as_completed(future_to_name):
name = future_to_name[future]
try:
if future.result():
del existing_folders[name]
deletion_occurred = True
except Exception as exc:
# Sanitize both name and exception to prevent log injection
log.error(
"Failed to delete folder %s: %s",
sanitize_for_log(name),
sanitize_for_log(exc),
)
for future in concurrent.futures.as_completed(future_to_name):
name = future_to_name[future]
try:
if future.result():
del existing_folders[name]
deletion_occurred = True
except Exception as exc:
# Sanitize both name and exception to prevent log injection
log.error(
"Failed to delete folder %s: %s",
sanitize_for_log(name),
sanitize_for_log(exc),
)

# CRITICAL FIX: Increased wait time for massive folders to clear
if deletion_occurred:
Expand All @@ -1843,6 +1853,7 @@ def _fetch_if_valid(url: str):
profile_id,
existing_rules,
client, # Pass the persistent client
batch_executor=shared_executor,
): folder_data
for folder_data in folder_data_list
}
Expand Down
34 changes: 34 additions & 0 deletions tests/test_push_rules_perf.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

# Add root to path to import main
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import main

class TestPushRulesPerf(unittest.TestCase):
def setUp(self):
Expand All @@ -24,6 +25,7 @@ def setUp(self):
self.do = 1
self.status = 1
self.existing_rules = set()
self.main = main

@patch("main.concurrent.futures.ThreadPoolExecutor")
def test_push_rules_single_batch_optimization(self, mock_executor):
Expand Down Expand Up @@ -130,5 +132,37 @@ def test_push_rules_skips_validation_for_existing(self):
# So match should be called EXACTLY once, with "h2".
mock_match.assert_called_once_with("h2")

@patch("main.concurrent.futures.as_completed")
def test_push_rules_uses_provided_executor(self, mock_as_completed):
"""
Test that push_rules uses the provided executor.
"""
# Create > 500 rules (2 batches)
hostnames = [f"example{i}.com" for i in range(600)]

# Mock the executor passed as argument
mock_executor = MagicMock()
mock_future = MagicMock()
mock_future.result.return_value = ["some_rule"]
mock_executor.submit.return_value = mock_future

# Mock as_completed to return our futures
mock_as_completed.return_value = [mock_future, mock_future]

self.main.push_rules(
self.profile_id,
self.folder_name,
self.folder_id,
self.do,
self.status,
hostnames,
self.existing_rules,
self.client,
batch_executor=mock_executor
)

# Verify executor.submit was called twice (once for each batch)
self.assertEqual(mock_executor.submit.call_count, 2)

if __name__ == '__main__':
unittest.main()
Loading