Skip to content

Commit 4ba57c4

Browse files
beastoinclaude
andauthored
6p2ny redis (#4224)
* Refactor cache to use lazy init and fix module hierarchy - Move cache_manager.py and redis_pubsub.py from utils/ to database/ to follow module hierarchy (database → utils → routers → main) - Replace startup/shutdown event handlers with lazy initialization and atexit cleanup for simpler usage (just import and use) - Remove init_cache/shutdown_cache from main.py and pusher/main.py Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> * Add singleflight pattern to prevent thundering herd - Add get_or_fetch() method with lock-per-key singleflight pattern - Only ONE concurrent request fetches from Redis, others wait - Update get_popular_apps, get_available_apps, get_approved_available_apps - Add tests for concurrent access and cache hit scenarios With 1000 concurrent requests on cold cache: only 1 hits Redis (was 1000) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 614c27c commit 4ba57c4

File tree

7 files changed

+223
-195
lines changed

7 files changed

+223
-195
lines changed

backend/database/cache.py

Lines changed: 43 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,79 +1,65 @@
11
"""
22
Global cache instances for in-memory caching.
33
4-
This module provides singleton access to cache managers.
5-
Instances are initialized during application startup in main.py.
4+
Uses lazy initialization - caches are created on first access.
5+
Cleanup is handled automatically via atexit.
66
"""
77

8+
import atexit
89
from typing import Optional
9-
from utils.cache_manager import InMemoryCacheManager
10-
from utils.redis_pubsub import RedisPubSubManager
10+
from database.cache_manager import InMemoryCacheManager
11+
from database.redis_pubsub import RedisPubSubManager
1112
from database.redis_db import r
1213

13-
# Global cache instances (initialized in main.py via init_cache())
14-
memory_cache: Optional[InMemoryCacheManager] = None
15-
pubsub_manager: Optional[RedisPubSubManager] = None
14+
# Global cache instances (lazily initialized)
15+
_memory_cache: Optional[InMemoryCacheManager] = None
16+
_pubsub_manager: Optional[RedisPubSubManager] = None
17+
_initialized: bool = False
1618

1719

18-
def get_memory_cache() -> InMemoryCacheManager:
19-
"""
20-
Get the global memory cache instance.
21-
22-
Returns:
23-
The initialized memory cache instance
24-
25-
Raises:
26-
RuntimeError: If cache not initialized
27-
"""
28-
if memory_cache is None:
29-
raise RuntimeError("Memory cache not initialized. Call init_cache() first.")
30-
return memory_cache
31-
32-
33-
def get_pubsub_manager() -> RedisPubSubManager:
34-
"""
35-
Get the global pub/sub manager instance.
36-
37-
Returns:
38-
The initialized pub/sub manager instance
39-
40-
Raises:
41-
RuntimeError: If pub/sub manager not initialized
42-
"""
43-
if pubsub_manager is None:
44-
raise RuntimeError("Pub/sub manager not initialized. Call init_cache() first.")
45-
return pubsub_manager
46-
47-
48-
def init_cache(max_memory_mb: int = 100):
49-
"""
50-
Initialize global cache instances.
20+
def _ensure_initialized():
21+
"""Initialize caches on first access."""
22+
global _memory_cache, _pubsub_manager, _initialized
5123

52-
Should be called once during application startup in main.py.
24+
if _initialized:
25+
return
5326

54-
Args:
55-
max_memory_mb: Maximum memory in MB for in-memory cache (default: 100MB)
56-
"""
57-
global memory_cache, pubsub_manager
58-
59-
memory_cache = InMemoryCacheManager(max_memory_mb=max_memory_mb)
60-
pubsub_manager = RedisPubSubManager(r)
27+
_memory_cache = InMemoryCacheManager(max_memory_mb=100)
28+
_pubsub_manager = RedisPubSubManager(r)
6129

6230
# Register callbacks: when invalidation message received, clear memory cache
63-
pubsub_manager.register_callback(
31+
_pubsub_manager.register_callback(
6432
'get_public_approved_apps_data*',
65-
lambda keys: [memory_cache.delete(k) for k in keys]
33+
lambda keys: [_memory_cache.delete(k) for k in keys]
6634
)
67-
pubsub_manager.register_callback(
35+
_pubsub_manager.register_callback(
6836
'get_popular_apps_data',
69-
lambda keys: [memory_cache.delete(k) for k in keys]
37+
lambda keys: [_memory_cache.delete(k) for k in keys]
7038
)
7139

7240
# Start pub/sub subscription
73-
pubsub_manager.start()
41+
_pubsub_manager.start()
42+
_initialized = True
43+
44+
45+
def get_memory_cache() -> InMemoryCacheManager:
46+
"""Get the global memory cache instance (lazy init)."""
47+
_ensure_initialized()
48+
return _memory_cache
49+
50+
51+
def get_pubsub_manager() -> RedisPubSubManager:
52+
"""Get the global pub/sub manager instance (lazy init)."""
53+
_ensure_initialized()
54+
return _pubsub_manager
55+
56+
57+
def _shutdown():
58+
"""Cleanup on process exit."""
59+
global _pubsub_manager
60+
if _pubsub_manager:
61+
_pubsub_manager.stop()
7462

7563

76-
def shutdown_cache():
77-
"""Shutdown cache managers gracefully."""
78-
if pubsub_manager:
79-
pubsub_manager.stop()
64+
# Register cleanup handler
65+
atexit.register(_shutdown)
Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
import time
1515
from collections import OrderedDict
1616
from dataclasses import dataclass
17-
from typing import Any, Optional
17+
from typing import Any, Callable, Dict, Optional
1818

1919

2020
@dataclass
@@ -59,6 +59,10 @@ def __init__(self, max_memory_mb: int = 100):
5959
self.misses = 0
6060
self.evictions = 0
6161

62+
# Singleflight: per-key locks to prevent thundering herd
63+
self._fetch_locks: Dict[str, threading.Lock] = {}
64+
self._fetch_lock_manager = threading.Lock()
65+
6266
def get(self, key: str) -> Optional[Any]:
6367
"""
6468
Get cache entry if exists and not expired.
@@ -87,6 +91,43 @@ def get(self, key: str) -> Optional[Any]:
8791
self.hits += 1
8892
return entry.data
8993

94+
def get_or_fetch(self, key: str, fetch_fn: Callable[[], Any], ttl: int = 30) -> Any:
95+
"""
96+
Get from cache or fetch with singleflight pattern.
97+
98+
Only ONE concurrent request will call fetch_fn, others wait.
99+
This prevents the thundering herd problem.
100+
101+
Args:
102+
key: Cache key
103+
fetch_fn: Function to call if cache miss (should return data)
104+
ttl: Time to live in seconds (default: 30)
105+
106+
Returns:
107+
Cached or fetched data
108+
"""
109+
# Fast path: cache hit
110+
if (value := self.get(key)) is not None:
111+
return value
112+
113+
# Get or create lock for this key
114+
with self._fetch_lock_manager:
115+
if key not in self._fetch_locks:
116+
self._fetch_locks[key] = threading.Lock()
117+
fetch_lock = self._fetch_locks[key]
118+
119+
# Only one request fetches, others wait
120+
with fetch_lock:
121+
# Double-check after acquiring lock (another thread may have fetched)
122+
if (value := self.get(key)) is not None:
123+
return value
124+
125+
# Fetch and cache
126+
value = fetch_fn()
127+
if value is not None:
128+
self.set(key, value, ttl=ttl)
129+
return value
130+
90131
def set(self, key: str, data: Any, ttl: int = 30):
91132
"""
92133
Set cache entry with automatic eviction if needed.

backend/main.py

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343

4444
from utils.other.timeout import TimeoutMiddleware
4545
from utils.observability import log_langsmith_status
46-
from database.cache import init_cache, shutdown_cache
4746

4847
# Log LangSmith tracing status at startup
4948
log_langsmith_status()
@@ -108,24 +107,6 @@
108107
app.add_middleware(TimeoutMiddleware, methods_timeout=methods_timeout)
109108

110109

111-
# Startup and shutdown event handlers for cache management
112-
@app.on_event("startup")
113-
async def startup_event():
114-
"""Initialize cache managers on startup."""
115-
try:
116-
init_cache(max_memory_mb=100)
117-
print("Cache managers initialized successfully")
118-
except Exception as e:
119-
print(f"Failed to initialize cache managers: {e}")
120-
# Continue startup even if cache managers fail
121-
122-
123-
@app.on_event("shutdown")
124-
async def shutdown_event():
125-
"""Stop cache managers on shutdown."""
126-
shutdown_cache()
127-
print("Cache managers stopped")
128-
129110
modal_app = App(
130111
name='backend',
131112
secrets=[Secret.from_name("gcp-credentials"), Secret.from_name('envs')],

backend/pusher/main.py

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
from fastapi import FastAPI
66

77
from routers import pusher
8-
from database.cache import init_cache, shutdown_cache
98

109
if os.environ.get('SERVICE_ACCOUNT_JSON'):
1110
service_account_info = json.loads(os.environ["SERVICE_ACCOUNT_JSON"])
@@ -26,21 +25,3 @@
2625
@app.get('/health')
2726
def health_check():
2827
return {"status": "healthy"}
29-
30-
31-
@app.on_event("startup")
32-
async def startup_event():
33-
"""Initialize cache managers on startup."""
34-
try:
35-
init_cache(max_memory_mb=100)
36-
print("Cache managers initialized successfully")
37-
except Exception as e:
38-
print(f"Failed to initialize cache managers: {e}")
39-
# Continue startup even if cache managers fail
40-
41-
42-
@app.on_event("shutdown")
43-
async def shutdown_event():
44-
"""Stop cache managers on shutdown."""
45-
shutdown_cache()
46-
print("Cache managers stopped")

backend/tests/test_cache_manager.py

Lines changed: 61 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
# Add parent directory to path
1111
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
1212

13-
from utils.cache_manager import InMemoryCacheManager
13+
from database.cache_manager import InMemoryCacheManager
1414

1515

1616
class TestInMemoryCacheManager(unittest.TestCase):
@@ -148,6 +148,66 @@ def test_lru_ordering(self):
148148
stats = self.cache.get_stats()
149149
self.assertGreater(stats['entries'], 0)
150150

151+
def test_singleflight_prevents_thundering_herd(self):
152+
"""Test that get_or_fetch prevents multiple concurrent fetches."""
153+
import threading
154+
import time
155+
156+
fetch_count = 0
157+
fetch_lock = threading.Lock()
158+
159+
def slow_fetch():
160+
"""Simulate a slow fetch that takes 100ms."""
161+
nonlocal fetch_count
162+
with fetch_lock:
163+
fetch_count += 1
164+
time.sleep(0.1) # Simulate slow operation
165+
return {'data': 'fetched'}
166+
167+
results = []
168+
errors = []
169+
170+
def worker():
171+
try:
172+
result = self.cache.get_or_fetch('test_key', slow_fetch, ttl=30)
173+
results.append(result)
174+
except Exception as e:
175+
errors.append(e)
176+
177+
# Launch 10 concurrent threads
178+
threads = [threading.Thread(target=worker) for _ in range(10)]
179+
for t in threads:
180+
t.start()
181+
for t in threads:
182+
t.join()
183+
184+
# All threads should get the same result
185+
self.assertEqual(len(results), 10)
186+
self.assertEqual(len(errors), 0)
187+
for result in results:
188+
self.assertEqual(result, {'data': 'fetched'})
189+
190+
# Only ONE fetch should have been called (singleflight)
191+
self.assertEqual(fetch_count, 1)
192+
193+
def test_get_or_fetch_cache_hit(self):
194+
"""Test that get_or_fetch returns cached data without calling fetch_fn."""
195+
fetch_called = False
196+
197+
def fetch_fn():
198+
nonlocal fetch_called
199+
fetch_called = True
200+
return {'data': 'new'}
201+
202+
# Pre-populate cache
203+
self.cache.set('existing_key', {'data': 'cached'}, ttl=30)
204+
205+
# get_or_fetch should return cached data
206+
result = self.cache.get_or_fetch('existing_key', fetch_fn, ttl=30)
207+
208+
self.assertEqual(result, {'data': 'cached'})
209+
self.assertFalse(fetch_called)
210+
151211

152212
if __name__ == '__main__':
153213
unittest.main()

0 commit comments

Comments
 (0)