diff --git a/charts/veaiops/Chart.yaml b/charts/veaiops/Chart.yaml
index 84785075..32abf949 100644
--- a/charts/veaiops/Chart.yaml
+++ b/charts/veaiops/Chart.yaml
@@ -29,13 +29,13 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
-version: 0.1.1
+version: 0.1.2
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
-appVersion: "0.1.1"
+appVersion: "0.1.2"
dependencies:
- name: common
diff --git a/charts/veaiops/values.yaml b/charts/veaiops/values.yaml
index b320ab0b..2f6a6698 100644
--- a/charts/veaiops/values.yaml
+++ b/charts/veaiops/values.yaml
@@ -245,7 +245,7 @@ backend:
image:
registry: veaiops-registry-cn-beijing.cr.volces.com
repository: veaiops/backend
- tag: v0.1.1
+ tag: v0.1.2
pullPolicy: Always
## Optionally specify an array of imagePullSecrets.
## Secrets must be manually created in the namespace.
@@ -312,7 +312,7 @@ chatops:
image:
registry: veaiops-registry-cn-beijing.cr.volces.com
repository: veaiops/chatops
- tag: v0.1.1
+ tag: v0.1.2
pullPolicy: Always
## Optionally specify an array of imagePullSecrets.
## Secrets must be manually created in the namespace.
@@ -378,7 +378,7 @@ frontend:
image:
registry: veaiops-registry-cn-beijing.cr.volces.com
repository: veaiops/frontend
- tag: v0.1.1
+ tag: v0.1.2
pullPolicy: Always
## Optionally specify an array of imagePullSecrets.
## Secrets must be manually created in the namespace.
@@ -445,7 +445,7 @@ intelligentThreshold:
image:
registry: veaiops-registry-cn-beijing.cr.volces.com
repository: veaiops/intelligent-threshold
- tag: v0.1.1
+ tag: v0.1.2
pullPolicy: Always
## Optionally specify an array of imagePullSecrets.
## Secrets must be manually created in the namespace.
@@ -538,7 +538,7 @@ initial:
image:
registry: veaiops-registry-cn-beijing.cr.volces.com
repository: veaiops/initial
- tag: v0.1.1
+ tag: v0.1.2
pullPolicy: Always
## Optionally specify an array of imagePullSecrets.
## Secrets must be manually created in the namespace.
diff --git a/frontend/apps/veaiops/src/components/wizard/steps/host-selection/components/shared/instance-selection-configs.tsx b/frontend/apps/veaiops/src/components/wizard/steps/host-selection/components/shared/instance-selection-configs.tsx
index a2826267..46bb1934 100644
--- a/frontend/apps/veaiops/src/components/wizard/steps/host-selection/components/shared/instance-selection-configs.tsx
+++ b/frontend/apps/veaiops/src/components/wizard/steps/host-selection/components/shared/instance-selection-configs.tsx
@@ -13,17 +13,18 @@
// limitations under the License.
/**
- * 实例选择配置工厂
- * @description 为不同数据源提供预定义的配置
+ * Instance selection config factory.
+ * @description Provides predefined instance selection configs for different data sources.
*/
import { IconCloud, IconDesktop } from '@arco-design/web-react/icon';
+import type { AliyunInstance, VolcengineInstance } from '@wizard/types';
+import { checkMatch } from '@wizard/utils/filter';
import type { ZabbixHost } from 'api-generate';
-import type { AliyunInstance, VolcengineInstance } from '../../../../types';
import type { InstanceSelectionConfig } from './instance-selection-config';
/**
- * 阿里云实例选择配置
+ * Aliyun instance selection config.
*/
export const createAliyunConfig = (
selectionAction: (instances: AliyunInstance[]) => void,
@@ -36,13 +37,13 @@ export const createAliyunConfig = (
itemType: '实例',
icon: ,
dataTransformer: (instance) => {
- // 当只有 userId 而没有 instanceId 时,使用 userId 作为 id
- // 这样可以确保标题和显示正确
+ // When only userId exists (no instanceId), use userId as the id
+ // This ensures the title and display are still meaningful
const id =
instance.instanceId ||
instance.dimensions?.instanceId ||
instance.dimensions?.userId ||
- instance.userId ||
+ (instance as AliyunInstance & { userId?: string }).userId ||
'';
const name =
instance.instanceName ||
@@ -62,28 +63,28 @@ export const createAliyunConfig = (
},
selectionAction,
searchFilter: (instance, searchValue) => {
- const searchLower = searchValue.toLowerCase();
return (
- (instance.instanceId?.toLowerCase() || '').includes(searchLower) ||
- (instance.instanceName?.toLowerCase() || '').includes(searchLower) ||
- (instance.region?.toLowerCase() || '').includes(searchLower) ||
- // 当只有 userId 时,也支持搜索 userId
- (instance.dimensions?.userId?.toLowerCase() || '').includes(
- searchLower,
- ) ||
- (instance.userId?.toLowerCase() || '').includes(searchLower)
+ checkMatch(instance.instanceId, searchValue) ||
+ checkMatch(instance.instanceName, searchValue) ||
+ checkMatch(instance.region, searchValue) ||
+ // When only userId exists, also allow searching by userId
+ checkMatch(instance.dimensions?.userId, searchValue) ||
+ checkMatch(
+ (instance as AliyunInstance & { userId?: string }).userId,
+ searchValue,
+ )
);
},
getId: (instance) =>
instance.instanceId ||
instance.dimensions?.instanceId ||
instance.dimensions?.userId ||
- instance.userId ||
+ (instance as AliyunInstance & { userId?: string }).userId ||
'',
});
/**
- * 火山引擎实例选择配置
+ * Volcengine instance selection config.
*/
export const createVolcengineConfig = (
selectionAction: (instances: VolcengineInstance[]) => void,
@@ -104,15 +105,15 @@ export const createVolcengineConfig = (
}),
selectionAction,
searchFilter: (instance, searchValue) =>
- instance.instanceId.toLowerCase().includes(searchValue) ||
- (instance.instanceName?.toLowerCase() || '').includes(searchValue) ||
- (instance.region?.toLowerCase() || '').includes(searchValue) ||
- (instance.namespace?.toLowerCase() || '').includes(searchValue),
+ checkMatch(instance.instanceId, searchValue) ||
+ checkMatch(instance.instanceName, searchValue) ||
+ checkMatch(instance.region, searchValue) ||
+ checkMatch(instance.namespace, searchValue),
getId: (instance) => instance.instanceId,
});
/**
- * Zabbix主机选择配置
+ * Zabbix host selection config.
*/
export const createZabbixConfig = (
selectionAction: (hosts: ZabbixHost[]) => void,
@@ -120,19 +121,18 @@ export const createZabbixConfig = (
title: '选择主机',
description: '选择要监控的主机,可以选择多个主机',
emptyDescription: '暂无可用的主机',
- searchPlaceholder: '搜索主机名称...',
+ searchPlaceholder: '搜索主机名称 (支持正则)...',
itemType: '主机',
icon: ,
dataTransformer: (host) => ({
- id: host.host, // 使用 host 作为唯一标识
+ id: host.host, // Use host as the unique identifier
name: host.name,
- region: undefined, // Zabbix没有region概念
- dimensions: undefined, // Zabbix没有dimensions概念
+ region: undefined, // Zabbix has no region concept
+ dimensions: undefined, // Zabbix has no dimensions concept
}),
selectionAction,
searchFilter: (host, searchValue) =>
- host.host.toLowerCase().includes(searchValue) ||
- host.name.toLowerCase().includes(searchValue),
- getId: (host) => host.host, // 使用 host 作为唯一标识
- useHostList: true, // 使用特殊的主机列表组件
+ checkMatch(host.host, searchValue) || checkMatch(host.name, searchValue),
+ getId: (host) => host.host, // Use host as the unique identifier
+ useHostList: true, // Use the specialized host list component
});
diff --git a/frontend/apps/veaiops/src/components/wizard/utils/filter.ts b/frontend/apps/veaiops/src/components/wizard/utils/filter.ts
new file mode 100644
index 00000000..626524ab
--- /dev/null
+++ b/frontend/apps/veaiops/src/components/wizard/utils/filter.ts
@@ -0,0 +1,135 @@
+// Copyright 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// https://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+/**
+ * Check whether the given text matches the search value.
+ * Supports both fuzzy substring matching and regular expressions.
+ * @param text Text to check
+ * @param searchValue Search value (plain string or regex pattern string)
+ * @returns Whether the text matches the search condition
+ */
+export const isMatch = (
+ text: string | undefined | null,
+ searchValue: string,
+): boolean => {
+ if (!text) {
+ return false;
+ }
+
+ const safeText = text.toLowerCase();
+ const safeSearch = searchValue.toLowerCase().trim();
+
+ if (!safeSearch) {
+ return true;
+ }
+
+ try {
+ // Try regex matching first
+ // Typical user inputs:
+ // - AA-\d+-BB style regex
+ // - AA-* style wildcard
+ // - Simple substring like AA
+
+ // Strategy:
+ // 1. Try to interpret the query as regex
+ // 2. If it fails and contains *, treat it as a wildcard
+ // 3. Otherwise fallback to plain substring match
+
+ // We directly build RegExp here to fully support regex syntax
+
+ const regex = new RegExp(safeSearch, 'i');
+ if (regex.test(text)) {
+ return true;
+ }
+ } catch (e) {
+ // If regex parsing fails (e.g. syntax error), try wildcard handling instead
+ }
+
+ // Wildcard handling (simple * to .*)
+ // Only used when query contains * and regex parsing failed
+ if (safeSearch.includes('*')) {
+ try {
+ // Escape all regex metacharacters except *
+ const pattern = safeSearch
+ .replace(/[.+?^${}()|[\]\\]/g, '\\$&')
+ .replace(/\*/g, '.*');
+ const wildcardRegex = new RegExp(`^${pattern}$`, 'i'); // Wildcard usually implies full match
+ if (wildcardRegex.test(text)) {
+ return true;
+ }
+ } catch (e) {
+ // Ignore and continue to substring fallback
+ }
+ }
+
+ // Final fallback: plain substring match
+ return safeText.includes(safeSearch);
+};
+
+/**
+ * Matching helper optimized for AA-number-BB style patterns
+ * while still supporting generic regex and wildcard queries.
+ */
+export const checkMatch = (
+ text: string | undefined | null,
+ searchValue: string,
+): boolean => {
+ if (!text) {
+ return false;
+ }
+
+ const safeText = text.trim();
+ const query = searchValue.trim();
+
+ if (!query) {
+ return true;
+ }
+
+ // 1. Try regex match first
+ // Typical usage:
+ // - AA-\d+-BB style patterns with \d for digits
+ // - ^server.* for prefix matching
+ // Using RegExp directly gives maximum flexibility for power users.
+
+ try {
+ // Use 'i' flag to make the match case-insensitive
+ const regex = new RegExp(query, 'i');
+ if (regex.test(safeText)) {
+ return true;
+ }
+ } catch (e) {
+ // If regex parsing fails (e.g. unclosed parenthesis), fall through to wildcard or substring
+ }
+
+ // 2. Try wildcard * match
+ // Only used when query contains * and regex parsing did not succeed
+ if (query.includes('*')) {
+ try {
+ // Escape all regex metacharacters except *
+ const pattern = query
+ .replace(/[.+?^${}()|[\]\\]/g, '\\$&')
+ .replace(/\*/g, '.*');
+ const wildcardRegex = new RegExp(`^${pattern}$`, 'i'); // Wildcard usually implies full match
+ if (wildcardRegex.test(safeText)) {
+ return true;
+ }
+ } catch (e) {
+ // Ignore and fallback to plain substring match
+ }
+ }
+
+ // 3. Plain substring match (case-insensitive)
+ // Fallback for queries like "server(" which are invalid regex but valid text
+ return safeText.toLowerCase().includes(query.toLowerCase());
+};
diff --git a/frontend/apps/veaiops/src/modules/threshold/features/task-config/ui/components/table-columns/labels-column.tsx b/frontend/apps/veaiops/src/modules/threshold/features/task-config/ui/components/table-columns/labels-column.tsx
index 0f1b57d4..987ee2ea 100644
--- a/frontend/apps/veaiops/src/modules/threshold/features/task-config/ui/components/table-columns/labels-column.tsx
+++ b/frontend/apps/veaiops/src/modules/threshold/features/task-config/ui/components/table-columns/labels-column.tsx
@@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+import { Tooltip } from '@arco-design/web-react';
import { CellRender } from '@veaiops/components';
import type React from 'react';
@@ -29,9 +30,13 @@ export const LabelsColumn: React.FC<{
{Object.entries(labels).map(([key, value]) => (
-
- {`${key}: ${value}`}
-
+
+
+
+ {`${key}: ${value}`}
+
+
+
))}
diff --git a/frontend/apps/veaiops/src/modules/threshold/features/task-config/ui/version/hooks/use-rerun-drawer.tsx b/frontend/apps/veaiops/src/modules/threshold/features/task-config/ui/version/hooks/use-rerun-drawer.tsx
index 4acae8a2..1bb6ac87 100644
--- a/frontend/apps/veaiops/src/modules/threshold/features/task-config/ui/version/hooks/use-rerun-drawer.tsx
+++ b/frontend/apps/veaiops/src/modules/threshold/features/task-config/ui/version/hooks/use-rerun-drawer.tsx
@@ -12,9 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-import apiClient from '@/utils/api-client';
import { Button, Drawer, Form, Message } from '@arco-design/web-react';
-import { API_RESPONSE_CODE } from '@veaiops/constants';
import { logger } from '@veaiops/utils';
import type {
IntelligentThresholdTaskVersion,
@@ -22,6 +20,7 @@ import type {
} from 'api-generate';
import type React from 'react';
import { useCallback, useEffect, useState } from 'react';
+import { rerunTask } from '../../../lib/data-source';
import { renderRerunForm } from '../../task';
/**
@@ -66,51 +65,67 @@ export const useRerunDrawer = (
try {
setLoading(true);
+
+ // Build metric_template_value, setting normal_range_start and normal_range_end to null
+ // if user didn't fill them, so backend won't use default values (10/90)
+ // Note: Backend model default is now None, so sending null or omitting field will result in None
+ const metricTemplateValue = values.metric_template_value || {};
+ const cleanedMetricTemplateValue: Record = {
+ ...metricTemplateValue,
+ };
+
+ // Set to null if user didn't fill normal_range_start or normal_range_end
+ // Backend will use None (not 10/90) when these fields are null or omitted
+ if (
+ cleanedMetricTemplateValue.normal_range_start === undefined ||
+ cleanedMetricTemplateValue.normal_range_start === null ||
+ cleanedMetricTemplateValue.normal_range_start === ''
+ ) {
+ cleanedMetricTemplateValue.normal_range_start = null;
+ }
+
+ if (
+ cleanedMetricTemplateValue.normal_range_end === undefined ||
+ cleanedMetricTemplateValue.normal_range_end === null ||
+ cleanedMetricTemplateValue.normal_range_end === ''
+ ) {
+ cleanedMetricTemplateValue.normal_range_end = null;
+ }
+
const requestBody: RerunIntelligentThresholdTaskRequest = {
task_id,
direction: values.direction,
n_count: values.n_count,
- metric_template_value: values.metric_template_value,
+ metric_template_value:
+ cleanedMetricTemplateValue as RerunIntelligentThresholdTaskRequest['metric_template_value'],
+ sensitivity:
+ (values.sensitivity as number) ??
+ (values.metric_template_value?.sensitivity as number) ??
+ 0.5,
};
- const response =
- await apiClient.intelligentThresholdTask.postApisV1IntelligentThresholdTaskRerun(
- {
- requestBody,
- },
- );
+ // ✅ Reuse rerunTask from api.ts to avoid duplicate API calls
+ const success = await rerunTask(requestBody);
- if (response.code === API_RESPONSE_CODE.SUCCESS && response.data) {
- Message.success('任务重新执行成功');
+ if (success) {
close();
- // 刷新表格数据以显示新的版本结果
+ // Refresh table data to show new version results
if (tableRef.current?.refresh) {
+ logger.info({
+ message:
+ '[useRerunDrawer] Rerun successful, triggering single table refresh',
+ source: 'useRerunDrawer',
+ });
const refreshResult = await tableRef.current.refresh();
return refreshResult ?? true;
}
return true;
- } else {
- throw new Error(response.message || '任务重新执行失败');
}
+ return false;
} catch (error: unknown) {
- // ✅ 正确:使用 logger 记录错误,并透出实际错误信息
- const errorObj =
- error instanceof Error ? error : new Error(String(error));
- logger.error({
- message: '任务重新执行失败',
- data: {
- error: errorObj.message,
- stack: errorObj.stack,
- errorObj,
- request: values,
- timestamp: Date.now(),
- },
- source: 'useRerunDrawer',
- component: 'handleSubmit',
- });
- const errorMessage = errorObj.message || '任务重新执行失败';
- Message.error(errorMessage);
+ // Error handling is already done in rerunTask (Message.error and logger.error)
+ // Just return false to indicate failure
return false;
} finally {
setLoading(false);
@@ -124,11 +139,12 @@ export const useRerunDrawer = (
const formData = {
direction: rerunData.direction || 'both',
n_count: rerunData.n_count || 3,
+ sensitivity: rerunData.sensitivity ?? 0.5,
metric_template_value: {
normal_range_start:
- rerunData.metric_template_value?.normal_range_start || 0,
+ rerunData.metric_template_value?.normal_range_start ?? undefined,
normal_range_end:
- rerunData.metric_template_value?.normal_range_end || 55,
+ rerunData.metric_template_value?.normal_range_end ?? undefined,
},
task_id: rerunData.task_id,
};
diff --git a/frontend/apps/veaiops/src/modules/threshold/features/task-config/ui/version/table.tsx b/frontend/apps/veaiops/src/modules/threshold/features/task-config/ui/version/table.tsx
index 01e3fcc9..65277c7f 100644
--- a/frontend/apps/veaiops/src/modules/threshold/features/task-config/ui/version/table.tsx
+++ b/frontend/apps/veaiops/src/modules/threshold/features/task-config/ui/version/table.tsx
@@ -14,6 +14,8 @@
import type { CustomTableActionType } from '@veaiops/components';
import type { BaseQuery, BaseRecord } from '@veaiops/types';
+import { logger } from '@veaiops/utils';
+import { useInterval } from 'ahooks';
import { useRef } from 'react';
import {
useAlarmDrawer,
@@ -49,6 +51,18 @@ const TaskVersionTable: React.FC = ({
tableRef,
});
+ // Auto refresh table data every 3 seconds to update task status
+ useInterval(() => {
+ // Only refresh when tableRef is available
+ if (tableRef.current?.refresh) {
+ logger.debug({
+ message: '[TaskVersionTable] Auto refreshing task versions',
+ source: 'TaskVersionTable',
+ });
+ tableRef.current.refresh();
+ }
+ }, 3000);
+
return (
<>
{tableElement}
diff --git a/tests/algorithm/intelligent_threshold/test_robust_daily_period_detector.py b/tests/algorithm/intelligent_threshold/test_robust_daily_period_detector.py
index e2bee696..67124db6 100644
--- a/tests/algorithm/intelligent_threshold/test_robust_daily_period_detector.py
+++ b/tests/algorithm/intelligent_threshold/test_robust_daily_period_detector.py
@@ -113,7 +113,7 @@ def short_data():
def test_init_default_parameters():
"""Test initialization with default parameters."""
detector = RobustDailyPeriodDetector()
- assert detector.correlation_threshold == 0.75
+ assert detector.correlation_threshold == 0.3
assert detector.min_data_points_per_day == 720
assert detector.min_common_points == 720
diff --git a/tests/algorithm/intelligent_threshold/test_threshold_recommendation_algorithm.py b/tests/algorithm/intelligent_threshold/test_threshold_recommendation_algorithm.py
index 5f8b250f..36ff3b55 100644
--- a/tests/algorithm/intelligent_threshold/test_threshold_recommendation_algorithm.py
+++ b/tests/algorithm/intelligent_threshold/test_threshold_recommendation_algorithm.py
@@ -195,6 +195,7 @@ def test_recommend_threshold_no_time_split(threshold_recommender, sample_timesta
max_value=100.0,
normal_threshold=50.0,
min_ts_length=50,
+ sensitivity=0.5,
direction="up",
)
@@ -225,6 +226,7 @@ def test_recommend_threshold_with_time_split(threshold_recommender, sample_times
max_value=100.0,
normal_threshold=50.0,
min_ts_length=30, # Reduced from 50 to ensure all periods have enough data
+ sensitivity=0.5,
direction="up",
)
@@ -260,6 +262,7 @@ def test_recommend_threshold_direction_down(threshold_recommender, sample_timest
max_value=100.0,
normal_threshold=30.0,
min_ts_length=50,
+ sensitivity=0.5,
direction="down",
)
@@ -290,6 +293,7 @@ def test_recommend_threshold_insufficient_data(threshold_recommender, sample_tim
max_value=100.0,
normal_threshold=50.0,
min_ts_length=1000, # High minimum length
+ sensitivity=0.5,
direction="up",
)
@@ -320,6 +324,7 @@ def test_threshold_recommendation_with_sliding_window_basic(threshold_recommende
max_value=100.0,
normal_threshold=50.0,
ignore_count=1,
+ sensitivity=0.5,
direction="up",
)
@@ -351,6 +356,7 @@ def test_threshold_recommendation_with_sliding_window_auto_adjust(threshold_reco
max_value=100.0,
normal_threshold=50.0,
ignore_count=1,
+ sensitivity=0.5,
direction="up",
)
@@ -376,6 +382,7 @@ def test_threshold_recommendation_with_sliding_window_normal_threshold_up(thresh
max_value=100.0,
normal_threshold=60.0,
ignore_count=1,
+ sensitivity=0.5,
direction="up",
)
@@ -401,6 +408,7 @@ def test_threshold_recommendation_with_sliding_window_normal_threshold_down(thre
max_value=100.0,
normal_threshold=30.0,
ignore_count=1,
+ sensitivity=0.5,
direction="down",
)
@@ -518,96 +526,6 @@ def test_check_and_consolidate_threshold_groups_single_group(threshold_recommend
assert result is None
-def test_recommend_threshold_with_consolidation_up(threshold_recommender, sample_timestamps):
- """Test full threshold recommendation with consolidation for up direction."""
- # Create values that result in similar thresholds across time periods
- values = []
- for i in range(len(sample_timestamps)):
- # Create values that vary slightly but result in similar thresholds
- hour = (i // 60) % 24
- if hour < 6:
- values.append(45) # Night: around 45
- elif hour < 12:
- values.append(48) # Morning: around 48
- elif hour < 18:
- values.append(52) # Afternoon: around 52
- else:
- values.append(47) # Evening: around 47
-
- with patch.object(threshold_recommender.period_detector, "detect") as mock_detect:
- mock_detect.return_value = True # Daily periodicity detected
-
- # Mock sliding window to return similar thresholds
- with patch.object(threshold_recommender, "threshold_recommendation_with_sliding_window") as mock_sliding_window:
- # Return very similar thresholds for all time periods (within 10%)
- mock_sliding_window.side_effect = [(49.0, 5), (50.0, 5), (51.0, 5), (50.5, 5)] * 2
-
- result = threshold_recommender.recommend_threshold(
- timestamp_list=sample_timestamps[:1000], # Use subset
- value_list=values[:1000],
- default_window_size=5,
- time_split=True,
- auto_window_adjust=False,
- min_value=0.0,
- max_value=100.0,
- normal_threshold=None,
- min_ts_length=50,
- direction="up",
- )
-
- # Should consolidate to single group since thresholds are close (49-51, diff=2, 2/51=3.9% < 10%)
- assert len(result) == 1
- assert result[0]["start_hour"] == 0
- assert result[0]["end_hour"] == 24
- assert result[0]["upper_bound"] == 50.5 # Max value from the sorted thresholds
- assert result[0]["lower_bound"] is None
- assert result[0]["window_size"] == 5
-
-
-def test_recommend_threshold_with_consolidation_down(threshold_recommender, sample_timestamps):
- """Test full threshold recommendation with consolidation for down direction."""
- # Create values that result in similar thresholds across time periods
- values = []
- for i in range(len(sample_timestamps)):
- hour = (i // 60) % 24
- if hour < 6:
- values.append(25) # Night: around 25
- elif hour < 12:
- values.append(23) # Morning: around 23
- elif hour < 18:
- values.append(27) # Afternoon: around 27
- else:
- values.append(24) # Evening: around 24
-
- with patch.object(threshold_recommender.period_detector, "detect") as mock_detect:
- mock_detect.return_value = True # Daily periodicity detected
-
- with patch.object(threshold_recommender, "threshold_recommendation_with_sliding_window") as mock_sliding_window:
- # Return very similar thresholds for all time periods (within 10%)
- mock_sliding_window.side_effect = [(24.5, 3), (23.5, 3), (25.5, 3), (24.0, 3)] * 2
-
- result = threshold_recommender.recommend_threshold(
- timestamp_list=sample_timestamps[:1000],
- value_list=values[:1000],
- default_window_size=5,
- time_split=True,
- auto_window_adjust=False,
- min_value=0.0,
- max_value=100.0,
- normal_threshold=None,
- min_ts_length=50,
- direction="down",
- )
-
- # Should consolidate to single group since thresholds are close (23.5-25.5, diff=2, 2/25.5=7.8% < 10%)
- assert len(result) == 1
- assert result[0]["start_hour"] == 0
- assert result[0]["end_hour"] == 24
- assert result[0]["upper_bound"] is None
- assert result[0]["lower_bound"] == 23.5 # Min value for down direction
- assert result[0]["window_size"] == 3
-
-
def test_recommend_threshold_no_consolidation_different_thresholds(
threshold_recommender, sample_timestamps, sample_values_periodic
):
@@ -629,6 +547,7 @@ def test_recommend_threshold_no_consolidation_different_thresholds(
max_value=100.0,
normal_threshold=None,
min_ts_length=50,
+ sensitivity=0.5,
direction="up",
)
@@ -650,6 +569,7 @@ def test_recommend_general_threshold_basic(threshold_recommender):
ignore_count=1,
min_value=0.0,
max_value=200.0,
+ sensitivity=0.5,
direction="up",
)
@@ -673,6 +593,7 @@ def test_recommend_general_threshold_direction_down(threshold_recommender):
ignore_count=1,
min_value=0.0,
max_value=100.0,
+ sensitivity=0.5,
direction="down",
)
@@ -695,6 +616,7 @@ def test_recommend_general_threshold_zero_interval(threshold_recommender):
ignore_count=1,
min_value=0.0,
max_value=100.0,
+ sensitivity=0.5,
direction="up",
)
@@ -722,6 +644,7 @@ def test_recommend_general_threshold_no_clusters(threshold_recommender):
ignore_count=1,
min_value=0.0,
max_value=100.0,
+ sensitivity=0.5,
direction="up",
)
@@ -751,6 +674,7 @@ def test_recommend_general_threshold_with_abnormals(threshold_recommender):
ignore_count=1, # Ignore one abnormal period
min_value=0.0,
max_value=200.0,
+ sensitivity=0.5,
direction="up",
)
@@ -776,6 +700,7 @@ def test_recommend_general_threshold_empty_data(threshold_recommender):
ignore_count=1,
min_value=0.0,
max_value=100.0,
+ sensitivity=0.5,
direction="up",
)
# Should handle empty data gracefully with failure status
@@ -799,6 +724,7 @@ def test_recommend_general_threshold_single_value(threshold_recommender):
ignore_count=1,
min_value=0.0,
max_value=100.0,
+ sensitivity=0.5,
direction="up",
)
# Should handle single value gracefully with failure status
@@ -823,6 +749,7 @@ def test_recommend_general_threshold_no_valid_clusters_fallback(threshold_recomm
ignore_count=1,
min_value=0.0,
max_value=1000.0,
+ sensitivity=0.5,
direction="up",
)
@@ -843,6 +770,7 @@ def test_recommend_general_threshold_no_valid_clusters_fallback(threshold_recomm
ignore_count=1,
min_value=0.0,
max_value=1000.0,
+ sensitivity=0.5,
direction="down",
)
@@ -873,6 +801,7 @@ def test_recommend_general_threshold_constant_values_fallback(threshold_recommen
ignore_count=1,
min_value=0.0,
max_value=200.0,
+ sensitivity=0.5,
direction="up",
)
@@ -892,6 +821,7 @@ def test_recommend_general_threshold_constant_values_fallback(threshold_recommen
ignore_count=1,
min_value=0.0,
max_value=200.0,
+ sensitivity=0.5,
direction="down",
)
diff --git a/tests/algorithm/intelligent_threshold/test_threshold_recommender.py b/tests/algorithm/intelligent_threshold/test_threshold_recommender.py
index 719aa5af..99f6064c 100644
--- a/tests/algorithm/intelligent_threshold/test_threshold_recommender.py
+++ b/tests/algorithm/intelligent_threshold/test_threshold_recommender.py
@@ -1201,21 +1201,21 @@ def test_merge_threshold_results_consolidation_mismatch_down_consolidated_up_not
IntelligentThresholdConfig(
start_hour=6,
end_hour=12,
- upper_bound=75.0,
+ upper_bound=72.0,
lower_bound=None,
window_size=5,
),
IntelligentThresholdConfig(
start_hour=12,
end_hour=18,
- upper_bound=80.0,
+ upper_bound=35.0,
lower_bound=None,
window_size=5,
),
IntelligentThresholdConfig(
start_hour=18,
end_hour=24,
- upper_bound=85.0,
+ upper_bound=36.0,
lower_bound=None,
window_size=5,
),
@@ -1251,14 +1251,16 @@ def test_merge_threshold_results_consolidation_mismatch_down_consolidated_up_not
assert len(merged_results) == 1
result = merged_results[0]
assert result.status == "Success"
- assert len(result.thresholds) == 4 # Should match the non-consolidated direction (up)
-
- # Check that down threshold (25.0) is distributed across all up periods
- for i, threshold in enumerate(result.thresholds):
- assert threshold.upper_bound == up_results[0].thresholds[i].upper_bound # Keep original up bounds
- assert threshold.lower_bound == 25.0 # All should have the consolidated down threshold
- assert threshold.start_hour == up_results[0].thresholds[i].start_hour
- assert threshold.end_hour == up_results[0].thresholds[i].end_hour
+ assert len(result.thresholds) == 2 # Should match the non-consolidated direction (up)
+
+ assert result.thresholds[0].upper_bound == up_results[0].thresholds[1].upper_bound # Keep original up bounds
+ assert result.thresholds[0].lower_bound == 25.0 # All should have the consolidated down threshold
+ assert result.thresholds[0].start_hour == up_results[0].thresholds[0].start_hour
+ assert result.thresholds[0].end_hour == up_results[0].thresholds[1].end_hour
+ assert result.thresholds[1].upper_bound == up_results[0].thresholds[3].upper_bound # Keep original up bounds
+ assert result.thresholds[1].lower_bound == 25.0 # All should have the consolidated down threshold
+ assert result.thresholds[1].start_hour == up_results[0].thresholds[2].start_hour
+ assert result.thresholds[1].end_hour == up_results[0].thresholds[3].end_hour
def test_merge_threshold_results_both_consolidated(threshold_recommender):
diff --git a/veaiops/algorithm/intelligent_threshold/configs.py b/veaiops/algorithm/intelligent_threshold/configs.py
index 053fdee8..7497220b 100644
--- a/veaiops/algorithm/intelligent_threshold/configs.py
+++ b/veaiops/algorithm/intelligent_threshold/configs.py
@@ -18,6 +18,8 @@
the intelligent threshold algorithm components.
"""
+import os
+
import tzlocal
# =============================================================================
@@ -45,8 +47,11 @@
# Default timezone for time-based operations
DEFAULT_TIMEZONE = tzlocal.get_localzone_name()
-# Default time split ranges for daily analysis (hours)
-DEFAULT_TIME_SPLIT_RANGES = [[0, 6], [6, 12], [12, 18], [18, 24]]
+# Default number of time split (default = 4)
+DEFAULT_NUMBER_OF_TIME_SPLIT = int(os.getenv("DEFAULT_NUMBER_OF_TIME_SPLIT", 4))
+
+# Maximum number of threshold blocks after merging (default = 8)
+DEFAULT_MAXIMUM_THRESHOLD_BLOCKS = int(os.getenv("DEFAULT_MAXIMUM_THRESHOLD_BLOCKS", 8))
# =============================================================================
# Algorithm Parameters
@@ -82,7 +87,7 @@
# =============================================================================
# Default correlation threshold for period detection
-DEFAULT_CORRELATION_THRESHOLD = 0.75
+DEFAULT_CORRELATION_THRESHOLD = float(os.getenv("DEFAULT_CORRELATION_THRESHOLD", 0.3))
# Minimum data points per day for analysis
DEFAULT_MIN_DATA_POINTS_PER_DAY = 720
@@ -94,5 +99,6 @@
# Timeout Configuration
# =============================================================================
-# Data fetch timeout in seconds (3 minutes for production)
-FETCH_DATA_TIMEOUT = 180
+# Data fetch timeout in seconds (60 minutes for production)
+
+FETCH_DATA_TIMEOUT = int(os.getenv("FETCH_DATA_TIMEOUT", 3600))
diff --git a/veaiops/algorithm/intelligent_threshold/threshold_recommendation_algorithm.py b/veaiops/algorithm/intelligent_threshold/threshold_recommendation_algorithm.py
index 95b6efe5..664ee730 100644
--- a/veaiops/algorithm/intelligent_threshold/threshold_recommendation_algorithm.py
+++ b/veaiops/algorithm/intelligent_threshold/threshold_recommendation_algorithm.py
@@ -26,8 +26,7 @@
from dbscan1d.core import DBSCAN1D
from veaiops.algorithm.intelligent_threshold.configs import (
- DEFAULT_COEFFICIENT,
- DEFAULT_TIME_SPLIT_RANGES,
+ DEFAULT_NUMBER_OF_TIME_SPLIT,
DEFAULT_TIMEZONE,
MICROSECOND_THRESHOLD,
MILLISECOND_THRESHOLD,
@@ -52,19 +51,28 @@ class ThresholdRecommendAlgorithm:
Attributes:
timezone (str): Timezone for timestamp processing.
- time_split_ranges (List[List[int]]): Time ranges for splitting analysis.
+ time_split_ranges (List[List[float]]): Time ranges for splitting analysis.
period_detector (RobustDailyPeriodDetector): Daily period detection instance.
"""
- def __init__(self, timezone: str = DEFAULT_TIMEZONE, time_split_ranges: Optional[List[List[int]]] = None) -> None:
+ def __init__(self, timezone: str = DEFAULT_TIMEZONE, time_split_ranges: Optional[List[List[float]]] = None) -> None:
"""Initialize the ThresholdRecommendAlgorithm.
Args:
timezone (str): Timezone for timestamp processing.
- time_split_ranges (Optional[List[List[int]]]): Custom time split ranges.
+ time_split_ranges (Optional[List[List[float]]]): Custom time split ranges.
"""
self.timezone = timezone
- self.time_split_ranges = time_split_ranges or DEFAULT_TIME_SPLIT_RANGES
+
+ if time_split_ranges is None:
+ self.time_split_ranges = []
+ start_time = 0.0
+ for _ in range(DEFAULT_NUMBER_OF_TIME_SPLIT):
+ end_time = start_time + 24 / DEFAULT_NUMBER_OF_TIME_SPLIT
+ self.time_split_ranges.append([start_time, end_time])
+ start_time = end_time
+ else:
+ self.time_split_ranges = time_split_ranges
self.period_detector = RobustDailyPeriodDetector()
logger.debug(f"Initialized ThresholdRecommendAlgorithm with timezone={timezone}")
@@ -114,6 +122,7 @@ def recommend_threshold(
max_value: Optional[float],
normal_threshold: Optional[float],
min_ts_length: int,
+ sensitivity: float,
direction: Literal["up", "down"] = "up",
) -> List[Dict]:
"""Recommend thresholds for time series data.
@@ -131,6 +140,7 @@ def recommend_threshold(
max_value (Optional[float]): Maximum value constraint for the time series data.
normal_threshold (Optional[float]): Normal threshold baseline for the time series data.
min_ts_length (int): Minimum required length of the time series data.
+ sensitivity (float): Sensitivity of the threshold recommendation algorithm.
direction (Literal["up", "down"]): Direction of the threshold recommendation.
Returns:
@@ -161,6 +171,7 @@ def recommend_threshold(
max_value,
normal_threshold,
direction,
+ sensitivity,
)
else:
# Process with time splitting
@@ -174,6 +185,7 @@ def recommend_threshold(
normal_threshold,
min_ts_length,
direction,
+ sensitivity,
)
def _process_single_time_period(
@@ -186,6 +198,7 @@ def _process_single_time_period(
max_value: Optional[float],
normal_threshold: Optional[float],
direction: Literal["up", "down"],
+ sensitivity: float,
) -> List[Dict]:
"""Process time series as a single time period without splitting.
@@ -198,6 +211,7 @@ def _process_single_time_period(
max_value (Optional[float]): Maximum value constraint.
normal_threshold (Optional[float]): Normal threshold baseline.
direction (Literal["up", "down"]): Threshold direction.
+ sensitivity (float): Sensitivity of the threshold recommendation algorithm.
Returns:
List[Dict]: Single threshold group covering 24 hours.
@@ -212,6 +226,7 @@ def _process_single_time_period(
normal_threshold,
1,
direction,
+ sensitivity,
)
threshold_group = {
@@ -240,6 +255,7 @@ def _process_time_split_periods(
normal_threshold: Optional[float],
min_ts_length: int,
direction: Literal["up", "down"],
+ sensitivity: float,
) -> List[Dict]:
"""Process time series with time period splitting.
@@ -253,6 +269,7 @@ def _process_time_split_periods(
normal_threshold (Optional[float]): Normal threshold baseline.
min_ts_length (int): Minimum required data length.
direction (Literal["up", "down"]): Threshold direction.
+ sensitivity (float): Sensitivity of the threshold recommendation algorithm.
Returns:
List[Dict]: Multiple threshold groups for different time periods.
@@ -295,6 +312,7 @@ def _process_time_split_periods(
normal_threshold,
1,
direction,
+ sensitivity,
)
# Calculate threshold with ignore_count=0
@@ -308,6 +326,7 @@ def _process_time_split_periods(
normal_threshold,
0,
direction,
+ sensitivity,
)
# Calculate ratio for sorting
@@ -353,11 +372,6 @@ def _process_time_split_periods(
threshold_groups.append(threshold_group)
- # Check if consolidation is needed
- consolidated_group = self._check_and_consolidate_threshold_groups(threshold_groups, direction)
- if consolidated_group:
- return [consolidated_group]
-
return threshold_groups
def _check_and_consolidate_threshold_groups(
@@ -433,6 +447,7 @@ def threshold_recommendation_with_sliding_window(
normal_threshold: Optional[float],
ignore_count: int,
direction: Literal["up", "down"],
+ sensitivity: float,
) -> tuple[float, int]:
"""Threshold recommendation with sliding window.
@@ -446,6 +461,7 @@ def threshold_recommendation_with_sliding_window(
normal_threshold (Optional[float]): Normal threshold for the time series data.
ignore_count (int): Number of data points to ignore at the beginning of the time series.
direction (Literal["up", "down"]): Direction of the threshold recommendation.
+ sensitivity (float): Sensitivity of the threshold recommendation algorithm.
Returns:
tuple[float, int]: Tuple of threshold and window size.
@@ -466,6 +482,7 @@ def threshold_recommendation_with_sliding_window(
min_value,
max_value,
direction,
+ sensitivity,
)
# If threshold calculation failed, continue with next window size
@@ -503,6 +520,7 @@ def recommend_general_threshold(
min_value: Optional[float],
max_value: Optional[float],
direction: Literal["up", "down"],
+ sensitivity: float,
) -> Dict[str, Union[bool, float]]:
"""Recommend general threshold for time series data.
@@ -514,6 +532,7 @@ def recommend_general_threshold(
min_value (Optional[float]): Minimum value of the time series data.
max_value (Optional[float]): Maximum value of the time series data.
direction (Literal["up", "down"]): Direction of the threshold recommendation.
+ sensitivity (float): Sensitivity of the threshold recommendation algorithm.
Returns:
Dict[str, Union[bool, float]]: Dictionary with keys:
@@ -522,6 +541,7 @@ def recommend_general_threshold(
"""
timestamp_list = timestamp_list_original
value_list = value_list_original if direction == "up" else [-v for v in value_list_original]
+ coefficient = 1.05 + 0.3 * sensitivity
# Calculate time interval
intervals = []
@@ -616,26 +636,26 @@ def recommend_general_threshold(
if direction == "up":
# For up direction, use 95th percentile of original values
baseline = float(np.percentile(value_list, 95))
- return {"status": True, "threshold": baseline * DEFAULT_COEFFICIENT}
+ return {"status": True, "threshold": baseline * coefficient}
else:
# For down direction, value_list is already negated, so use 95th percentile of negated values
# which corresponds to the 5th percentile of original values
baseline = float(np.percentile(value_list, 95)) # This is the max of negated values
# Apply the same logic as the original down direction
final_threshold = 0 - baseline # Convert back to positive
- return {"status": True, "threshold": final_threshold / DEFAULT_COEFFICIENT}
+ return {"status": True, "threshold": final_threshold / coefficient}
else:
# Empty data case - should not happen as we check earlier, but safety fallback
return {"status": False, "threshold": -1.0}
if direction == "up":
- threshold = final_max_value * DEFAULT_COEFFICIENT
+ threshold = final_max_value * coefficient
if max_value is not None:
threshold = min(max_value, threshold)
return {"status": True, "threshold": threshold}
else:
final_max_value = 0 - final_max_value
- threshold = final_max_value / DEFAULT_COEFFICIENT
+ threshold = final_max_value / coefficient
if min_value is not None:
threshold = max(min_value, threshold)
- return {"status": True, "threshold": final_max_value / DEFAULT_COEFFICIENT}
+ return {"status": True, "threshold": final_max_value / coefficient}
diff --git a/veaiops/algorithm/intelligent_threshold/threshold_recommender.py b/veaiops/algorithm/intelligent_threshold/threshold_recommender.py
index a367f73f..8bd0b585 100644
--- a/veaiops/algorithm/intelligent_threshold/threshold_recommender.py
+++ b/veaiops/algorithm/intelligent_threshold/threshold_recommender.py
@@ -31,6 +31,7 @@
from tenacity import retry, stop_after_attempt, wait_exponential
from veaiops.algorithm.intelligent_threshold.configs import (
+ DEFAULT_MAXIMUM_THRESHOLD_BLOCKS,
EXTREME_VALUE_THRESHOLD,
FETCH_DATA_TIMEOUT,
HISTORICAL_DAYS,
@@ -83,27 +84,36 @@ class ThresholdRecommender:
Attributes:
threshold_algorithm (ThresholdRecommendAlgorithm): Threshold recommendation algorithm instance.
max_concurrent_tasks (int): Maximum number of concurrent tasks (default: 5).
+ maximum_threshold_blocks (int): Maximum number of threshold blocks after merging (default: 8).
task_queue (List[TaskRequest]): Priority queue for pending tasks.
running_tasks (Dict[str, asyncio.Task]): Currently running tasks.
queue_lock (asyncio.Lock): Lock for thread-safe queue operations.
"""
def __init__(
- self, threshold_algorithm: Optional[ThresholdRecommendAlgorithm] = None, max_concurrent_tasks: int = 5
+ self,
+ threshold_algorithm: Optional[ThresholdRecommendAlgorithm] = None,
+ max_concurrent_tasks: int = 5,
+ maximum_threshold_blocks: int = DEFAULT_MAXIMUM_THRESHOLD_BLOCKS,
) -> None:
"""Initialize the ThresholdRecommender.
Args:
threshold_algorithm (Optional[ThresholdRecommendAlgorithm]): Threshold recommender algorithm instance.
max_concurrent_tasks (int): Maximum number of concurrent tasks (default: 5).
+ maximum_threshold_blocks (int): Maximum number of threshold blocks after merging (default: 8).
"""
self.threshold_algorithm = threshold_algorithm or ThresholdRecommendAlgorithm()
self.max_concurrent_tasks = max_concurrent_tasks
+ self.maximum_threshold_blocks = maximum_threshold_blocks
self.task_queue: List[TaskRequest] = []
self.running_tasks: Dict[str, asyncio.Task] = {}
self.queue_lock = asyncio.Lock()
- logger.debug(f"Initialized ThresholdRecommender with max_concurrent_tasks={max_concurrent_tasks}")
+ logger.debug(
+ f"Initialized ThresholdRecommender with max_concurrent_tasks={max_concurrent_tasks}, "
+ f"maximum_threshold_blocks={maximum_threshold_blocks}"
+ )
async def _process_queue(self) -> None:
"""Process the task queue and start new tasks if capacity allows."""
@@ -322,6 +332,296 @@ def _get_normal_threshold(
return normal_range_end if direction == "up" else normal_range_start
+ @staticmethod
+ def can_merge_threshold_configs(configs: List[IntelligentThresholdConfig]) -> bool:
+ """Check if a list of threshold configurations can be merged.
+
+ Merge conditions:
+ 1. window_size must be the same
+ 2. upper_bound max-min difference <= 10%
+ 3. lower_bound max-min difference <= 10%
+
+ Args:
+ configs: List of threshold configurations
+
+ Returns:
+ bool: Whether the configs can be merged
+ """
+ if len(configs) <= 1:
+ return True
+
+ # Check if all window_sizes are the same
+ window_sizes = [c.window_size for c in configs]
+ if len(set(window_sizes)) != 1:
+ return False
+
+ # Extract upper_bound and lower_bound
+ upper_bounds = [c.upper_bound for c in configs if c.upper_bound is not None]
+ lower_bounds = [c.lower_bound for c in configs if c.lower_bound is not None]
+
+ # Check upper_bound merge condition
+ if upper_bounds:
+ max_upper = max(upper_bounds)
+ min_upper = min(upper_bounds)
+
+ if max_upper == 0:
+ if max_upper != min_upper:
+ return False
+ else:
+ upper_diff_ratio = (max_upper - min_upper) / max_upper
+ if upper_diff_ratio > 0.1: # 10% threshold
+ return False
+
+ # Check lower_bound merge condition
+ if lower_bounds:
+ max_lower = max(lower_bounds)
+ min_lower = min(lower_bounds)
+
+ if max_lower == 0:
+ if max_lower != min_lower:
+ return False
+ else:
+ lower_diff_ratio = (max_lower - min_lower) / max_lower
+ if lower_diff_ratio > 0.1: # 10% threshold
+ return False
+
+ return True
+
+ @staticmethod
+ def merge_threshold_configs(configs: List[IntelligentThresholdConfig]) -> IntelligentThresholdConfig:
+ """Merge multiple threshold configurations into a single configuration.
+
+ Args:
+ configs: List of threshold configurations to merge
+
+ Returns:
+ IntelligentThresholdConfig: Merged threshold configuration
+ """
+ if len(configs) == 1:
+ return configs[0]
+
+ # Extract upper_bound and lower_bound
+ upper_bounds = [c.upper_bound for c in configs if c.upper_bound is not None]
+ lower_bounds = [c.lower_bound for c in configs if c.lower_bound is not None]
+
+ # For upper bound, use max (conservative strategy)
+ merged_upper = max(upper_bounds) if upper_bounds else None
+
+ # For lower bound, use min (conservative strategy)
+ merged_lower = min(lower_bounds) if lower_bounds else None
+
+ return IntelligentThresholdConfig(
+ start_hour=configs[0].start_hour,
+ end_hour=configs[-1].end_hour,
+ upper_bound=merged_upper,
+ lower_bound=merged_lower,
+ window_size=configs[0].window_size,
+ )
+
+ @staticmethod
+ def _calculate_merge_difference(config1: IntelligentThresholdConfig, config2: IntelligentThresholdConfig) -> float:
+ """Calculate the difference between two adjacent threshold configurations for merging.
+
+ The difference is calculated based on the relative change in both upper_bound and lower_bound.
+ Returns a normalized difference value that can be used to determine merge priority.
+
+ Args:
+ config1: First threshold configuration
+ config2: Second threshold configuration
+
+ Returns:
+ float: Normalized difference value (lower is more similar, higher is more different)
+ """
+ differences = []
+
+ # Calculate upper_bound difference
+ if config1.upper_bound is not None and config2.upper_bound is not None:
+ upper1 = config1.upper_bound
+ upper2 = config2.upper_bound
+ max_upper = max(abs(upper1), abs(upper2))
+ if max_upper > 0:
+ upper_diff = abs(upper1 - upper2) / max_upper
+ differences.append(upper_diff)
+
+ # Calculate lower_bound difference
+ if config1.lower_bound is not None and config2.lower_bound is not None:
+ lower1 = config1.lower_bound
+ lower2 = config2.lower_bound
+ max_lower = max(abs(lower1), abs(lower2))
+ if max_lower > 0:
+ lower_diff = abs(lower1 - lower2) / max_lower
+ differences.append(lower_diff)
+
+ # Return average difference, or 0 if no valid differences
+ return sum(differences) / len(differences) if differences else 0.0
+
+ def _hierarchical_merge_thresholds(
+ self, thresholds: List[IntelligentThresholdConfig], max_blocks: int
+ ) -> List[IntelligentThresholdConfig]:
+ """Merge threshold configurations using hierarchical clustering approach.
+
+ This method uses a bottom-up hierarchical clustering approach:
+ 1. Start with all individual threshold blocks
+ 2. Iteratively merge the two adjacent blocks with smallest difference
+ 3. Continue until the number of blocks <= max_blocks
+
+ Args:
+ thresholds: Original list of threshold configurations (must be sorted by start_hour)
+ max_blocks: Maximum number of blocks after merging
+
+ Returns:
+ List[IntelligentThresholdConfig]: List of merged threshold configurations
+ """
+ if len(thresholds) <= max_blocks:
+ return thresholds
+
+ # Create a mutable list of threshold blocks (as lists to track merged ranges)
+ blocks = [[threshold] for threshold in thresholds]
+
+ logger.debug(f"Starting hierarchical merge: {len(blocks)} blocks -> target {max_blocks} blocks")
+
+ # Iteratively merge until we reach the target number of blocks
+ while len(blocks) > max_blocks:
+ # Calculate differences between all adjacent blocks
+ min_diff = float("inf")
+ min_diff_idx = -1
+
+ for i in range(len(blocks) - 1):
+ # Get the last config of current block and first config of next block
+ current_last = blocks[i][-1]
+ next_first = blocks[i + 1][0]
+
+ # Only merge if blocks are continuous
+ if current_last.end_hour == next_first.start_hour:
+ # Calculate difference between the two blocks
+ diff = self._calculate_merge_difference(current_last, next_first)
+
+ if diff < min_diff:
+ min_diff = diff
+ min_diff_idx = i
+
+ # If no mergeable adjacent blocks found, break
+ if min_diff_idx == -1:
+ logger.warning(f"Cannot merge further: no continuous blocks found. Current blocks: {len(blocks)}")
+ break
+
+ # Merge the two blocks with minimum difference
+ merged_block = blocks[min_diff_idx] + blocks[min_diff_idx + 1]
+ blocks[min_diff_idx : min_diff_idx + 2] = [merged_block]
+
+ logger.debug(
+ f"Merged blocks at index {min_diff_idx} (diff={min_diff:.4f}), remaining blocks: {len(blocks)}"
+ )
+
+ # Convert blocks back to merged threshold configurations
+ result = []
+ for block in blocks:
+ merged_config = self.merge_threshold_configs(block)
+ result.append(merged_config)
+
+ logger.info(
+ f"Hierarchical merge completed: {len(thresholds)} -> {len(result)} blocks "
+ f"(target: {max_blocks}, maximum_threshold_blocks={self.maximum_threshold_blocks})"
+ )
+
+ return result
+
+ def merge_continuous_thresholds(
+ self, thresholds: List[IntelligentThresholdConfig]
+ ) -> List[IntelligentThresholdConfig]:
+ """Merge continuous threshold configurations with adaptive strategy.
+
+ Strategy:
+ 1. First apply greedy algorithm to merge consecutive time periods with variance within 10%
+ 2. If result exceeds maximum_threshold_blocks, apply hierarchical clustering to further merge
+
+ Args:
+ thresholds: Original list of threshold configurations
+
+ Returns:
+ List[IntelligentThresholdConfig]: List of merged threshold configurations
+ """
+ # Filter out configs that have neither upper_bound nor lower_bound
+ valid_thresholds = [t for t in thresholds if t.upper_bound is not None or t.lower_bound is not None]
+
+ if len(valid_thresholds) <= 1:
+ return thresholds
+
+ # Sort by start_hour
+ sorted_thresholds = sorted(valid_thresholds, key=lambda x: x.start_hour)
+
+ # Step 1: Apply greedy merge (10% threshold)
+ merged_result = []
+ current_group = [sorted_thresholds[0]]
+
+ for i in range(1, len(sorted_thresholds)):
+ # Check if continuous
+ if current_group[-1].end_hour == sorted_thresholds[i].start_hour:
+ # Try to add current threshold to merge group
+ test_group = current_group + [sorted_thresholds[i]]
+
+ if self.can_merge_threshold_configs(test_group):
+ # Can merge, add to current group
+ current_group.append(sorted_thresholds[i])
+ else:
+ # Cannot merge, finalize current group and start new group
+ merged_result.append(self.merge_threshold_configs(current_group))
+ current_group = [sorted_thresholds[i]]
+ else:
+ # Not continuous, finalize current group and start new group
+ merged_result.append(self.merge_threshold_configs(current_group))
+ current_group = [sorted_thresholds[i]]
+
+ # Process the last group
+ merged_result.append(self.merge_threshold_configs(current_group))
+
+ logger.debug(
+ f"Greedy merge completed: {len(valid_thresholds)} -> {len(merged_result)} blocks "
+ f"(maximum_threshold_blocks={self.maximum_threshold_blocks})"
+ )
+
+ # Step 2: If still exceeds maximum_threshold_blocks, apply hierarchical clustering
+ if len(merged_result) > self.maximum_threshold_blocks:
+ logger.info(
+ f"Greedy merge result ({len(merged_result)} blocks) exceeds maximum_threshold_blocks "
+ f"({self.maximum_threshold_blocks}), applying hierarchical clustering"
+ )
+ merged_result = self._hierarchical_merge_thresholds(merged_result, self.maximum_threshold_blocks)
+
+ return merged_result
+
+ def merge_metric_threshold_results(self, results: List[MetricThresholdResult]) -> List[MetricThresholdResult]:
+ """Merge threshold configurations in MetricThresholdResult list.
+
+ Merges the thresholds for each metric (MetricThresholdResult),
+ keeping other fields unchanged.
+
+ Args:
+ results: List containing multiple metric threshold results
+
+ Returns:
+ List[MetricThresholdResult]: List of merged results
+ """
+ merged_results = []
+
+ for result in results:
+ # Merge thresholds for each metric
+ merged_thresholds = self.merge_continuous_thresholds(result.thresholds)
+
+ # Create new MetricThresholdResult, keeping other fields unchanged
+ merged_result = MetricThresholdResult(
+ name=result.name,
+ thresholds=merged_thresholds,
+ labels=result.labels,
+ unique_key=result.unique_key,
+ status=result.status,
+ error_message=result.error_message,
+ )
+
+ merged_results.append(merged_result)
+
+ return merged_results
+
def _merge_threshold_results(
self, up_results: list[MetricThresholdResult], down_results: list[MetricThresholdResult]
) -> list[MetricThresholdResult]:
@@ -346,7 +646,6 @@ def _merge_threshold_results(
for up_result in up_results:
down_result = down_results_map.get(up_result.unique_key)
-
if down_result:
# Check if either direction failed
if up_result.status != "Success" or down_result.status != "Success":
@@ -492,8 +791,7 @@ def _merge_threshold_results(
else:
# Only down bound available and it succeeded
merged_results.append(down_result)
-
- return merged_results
+ return self.merge_metric_threshold_results(merged_results)
async def _fetch_and_validate_data(
self, datasource_id: str
diff --git a/veaiops/handler/routers/apis/v1/datasource/zabbix.py b/veaiops/handler/routers/apis/v1/datasource/zabbix.py
index 0b4eb0e9..130404c8 100644
--- a/veaiops/handler/routers/apis/v1/datasource/zabbix.py
+++ b/veaiops/handler/routers/apis/v1/datasource/zabbix.py
@@ -402,17 +402,60 @@ async def get_metrics_timeseries(
# Get history type from datasource
history_type = zabbix_datasource.history_type
- # Fetch data using client.get_metric_data
- history_data = await zabbix_datasource.client.get_metric_data(
- item_ids=item_ids,
- time_from=start_time_ts,
- time_till=end_time_ts,
- history_type=history_type,
- page_size=DEFAULT_PAGE_SIZE,
- )
+ # Fetch data using pagination to ensure all data is retrieved
+ all_history_data = []
+ last_clock = start_time_ts
+ page_size = DEFAULT_PAGE_SIZE
+
+ # Add maximum iterations limit to prevent infinite loop
+ max_iterations = 100
+ iteration_count = 0
+
+ while iteration_count < max_iterations:
+ page_data = await zabbix_datasource.client.get_metric_data(
+ item_ids=item_ids,
+ time_from=last_clock,
+ time_till=end_time_ts,
+ history_type=history_type,
+ page_size=page_size,
+ )
+
+ if not page_data:
+ break
+
+ # Validate page_data structure before processing
+ if not isinstance(page_data, list) or len(page_data) == 0:
+ raise BadRequestError(message="Received invalid page_data: expected non-empty list")
+
+ # Ensure page_data items have required fields
+ valid_items = [item for item in page_data if isinstance(item, dict) and "clock" in item]
+ if not valid_items:
+ raise BadRequestError(message="Received page_data with no valid items containing 'clock' field")
+
+ all_history_data.extend(page_data)
+
+ # Check if we have a valid last item before accessing its clock
+ if page_data and isinstance(page_data[-1], dict) and "clock" in page_data[-1]:
+ last_clock = int(page_data[-1]["clock"])
+ else:
+ raise BadRequestError(message="Last item in page_data is invalid or missing 'clock' field")
+
+ # If we got less data than the page size, we've reached the end
+ if len(page_data) < page_size:
+ break
+
+ # Increment iteration counter
+ iteration_count += 1
+
+ # Raise exception if we reached maximum iterations
+ if iteration_count >= max_iterations:
+ raise BadRequestError(
+ message=f"Maximum iterations ({max_iterations}) reached while fetching Zabbix data. "
+ f"This might indicate an issue with data retrieval or pagination logic."
+ )
# Convert history data to time series format
- timeseries_data = zabbix_datasource._convert_history_to_timeseries(history_data)
+ timeseries_data = zabbix_datasource._convert_history_to_timeseries(all_history_data)
return APIResponse(
message="success",
@@ -420,8 +463,13 @@ async def get_metrics_timeseries(
)
-@zabbix_router.get("/datasource/{datasource_id}/mediatypes", response_model=APIResponse[List[ZabbixMediatype]])
-async def get_zabbix_mediatypes(datasource_id: str) -> APIResponse[List[ZabbixMediatype]]:
+@zabbix_router.get(
+ "/datasource/{datasource_id}/mediatypes",
+ response_model=APIResponse[List[ZabbixMediatype]],
+)
+async def get_zabbix_mediatypes(
+ datasource_id: str,
+) -> APIResponse[List[ZabbixMediatype]]:
"""Get Zabbix mediatypes by datasource ID.
Args:
@@ -451,8 +499,13 @@ async def get_zabbix_mediatypes(datasource_id: str) -> APIResponse[List[ZabbixMe
)
-@zabbix_router.get("/datasource/{datasource_id}/usergroups", response_model=APIResponse[List[ZabbixUserGroup]])
-async def get_zabbix_usergroups(datasource_id: str) -> APIResponse[List[ZabbixUserGroup]]:
+@zabbix_router.get(
+ "/datasource/{datasource_id}/usergroups",
+ response_model=APIResponse[List[ZabbixUserGroup]],
+)
+async def get_zabbix_usergroups(
+ datasource_id: str,
+) -> APIResponse[List[ZabbixUserGroup]]:
"""Get Zabbix usergroups by datasource ID.
Args:
diff --git a/veaiops/metrics/zabbix.py b/veaiops/metrics/zabbix.py
index fdeb32be..9457c65e 100644
--- a/veaiops/metrics/zabbix.py
+++ b/veaiops/metrics/zabbix.py
@@ -13,6 +13,7 @@
# limitations under the License.
import asyncio
+import os
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Dict, List, Optional
@@ -132,7 +133,7 @@ class ZabbixItem(BaseModel):
default_action = "ve_aiops_action"
default_tag_key = "managed-by"
default_tag_value = "ve_aiops"
-default_timeout = 3
+default_timeout = int(os.getenv("ZABBIX_TIMEOUT", 10))
webhook_script = """
try {
@@ -232,6 +233,7 @@ async def get_metric_data(
"""
def _get_history(*args):
+ logger.debug(f"Fetching history for default_timeout: {default_timeout},item_ids: {item_ids}")
return self.zapi.history.get(
itemids=item_ids,
time_from=time_from,