Skip to content

Commit f9f08f1

Browse files
committed
fix
1 parent fe497fe commit f9f08f1

File tree

2 files changed

+10
-6
lines changed

2 files changed

+10
-6
lines changed

cozeloop/internal/trace/queue_manager.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ def __init__(self, options: BatchQueueManagerOptions):
5555
self.batch = []
5656
self.batch_byte_size = 0
5757
self.batch_lock = threading.Lock()
58+
self.size_lock = threading.Lock()
5859
self.export_func = options.export_func
5960
self.stop_event = threading.Event()
6061

@@ -88,8 +89,9 @@ def worker(self):
8889
def is_should_export(self) -> bool:
8990
if len(self.batch) >= self.options.max_export_batch_length:
9091
return True
91-
if self.batch_byte_size >= self.options.max_export_batch_byte_size:
92-
return True
92+
with self.size_lock:
93+
if self.batch_byte_size >= self.options.max_export_batch_byte_size:
94+
return True
9395
return False
9496

9597
def _drain_queue(self):
@@ -125,7 +127,8 @@ def _do_export_batch(self):
125127
if self.export_func:
126128
self.export_func({}, self.batch)
127129
self.batch = []
128-
self.batch_byte_size = 0
130+
with self.size_lock:
131+
self.batch_byte_size = 0
129132

130133
def enqueue(self, item: Any, byte_size: int):
131134
if self.stop_event.is_set():
@@ -144,7 +147,7 @@ def enqueue(self, item: Any, byte_size: int):
144147
detail_msg = f"{self.options.queue_name} queue is full, dropped span"
145148
self.dropped += 1
146149
else:
147-
with self.batch_lock:
150+
with self.size_lock:
148151
self.batch_byte_size += byte_size
149152

150153
event_typ = "queue_manager.file_entry.rate"

cozeloop/internal/trace/span_processor.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ def __init__(
7171
if queue_conf:
7272
if queue_conf.span_queue_length > 0:
7373
span_queue_length = queue_conf.span_queue_length
74-
if queue_conf.span_max_export_batch_length > 0:
74+
if queue_conf.span_max_export_batch_length > 0: # todo: need max limit
7575
span_export_batch_size = queue_conf.span_max_export_batch_length
7676

7777
self.file_retry_qm = BatchQueueManager(
@@ -117,7 +117,8 @@ def __init__(
117117
max_queue_length=span_queue_length,
118118
max_export_batch_length=span_export_batch_size,
119119
max_export_batch_byte_size=DEFAULT_MAX_EXPORT_BATCH_BYTE_SIZE,
120-
export_func=self._new_export_spans_func(self.exporter, self.span_retry_qm, self.file_qm, finish_event_processor),
120+
export_func=self._new_export_spans_func(self.exporter, self.span_retry_qm, self.file_qm,
121+
finish_event_processor),
121122
finish_event_processor=finish_event_processor,
122123
)
123124
)

0 commit comments

Comments
 (0)