diff --git a/charts/veaiops/Chart.yaml b/charts/veaiops/Chart.yaml index 84785075..32abf949 100644 --- a/charts/veaiops/Chart.yaml +++ b/charts/veaiops/Chart.yaml @@ -29,13 +29,13 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.1.1 +version: 0.1.2 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to # follow Semantic Versioning. They should reflect the version the application is using. # It is recommended to use it with quotes. -appVersion: "0.1.1" +appVersion: "0.1.2" dependencies: - name: common diff --git a/charts/veaiops/values.yaml b/charts/veaiops/values.yaml index b320ab0b..2f6a6698 100644 --- a/charts/veaiops/values.yaml +++ b/charts/veaiops/values.yaml @@ -245,7 +245,7 @@ backend: image: registry: veaiops-registry-cn-beijing.cr.volces.com repository: veaiops/backend - tag: v0.1.1 + tag: v0.1.2 pullPolicy: Always ## Optionally specify an array of imagePullSecrets. ## Secrets must be manually created in the namespace. @@ -312,7 +312,7 @@ chatops: image: registry: veaiops-registry-cn-beijing.cr.volces.com repository: veaiops/chatops - tag: v0.1.1 + tag: v0.1.2 pullPolicy: Always ## Optionally specify an array of imagePullSecrets. ## Secrets must be manually created in the namespace. @@ -378,7 +378,7 @@ frontend: image: registry: veaiops-registry-cn-beijing.cr.volces.com repository: veaiops/frontend - tag: v0.1.1 + tag: v0.1.2 pullPolicy: Always ## Optionally specify an array of imagePullSecrets. ## Secrets must be manually created in the namespace. @@ -445,7 +445,7 @@ intelligentThreshold: image: registry: veaiops-registry-cn-beijing.cr.volces.com repository: veaiops/intelligent-threshold - tag: v0.1.1 + tag: v0.1.2 pullPolicy: Always ## Optionally specify an array of imagePullSecrets. ## Secrets must be manually created in the namespace. @@ -538,7 +538,7 @@ initial: image: registry: veaiops-registry-cn-beijing.cr.volces.com repository: veaiops/initial - tag: v0.1.1 + tag: v0.1.2 pullPolicy: Always ## Optionally specify an array of imagePullSecrets. ## Secrets must be manually created in the namespace. diff --git a/frontend/apps/veaiops/src/components/wizard/steps/host-selection/components/shared/instance-selection-configs.tsx b/frontend/apps/veaiops/src/components/wizard/steps/host-selection/components/shared/instance-selection-configs.tsx index a2826267..46bb1934 100644 --- a/frontend/apps/veaiops/src/components/wizard/steps/host-selection/components/shared/instance-selection-configs.tsx +++ b/frontend/apps/veaiops/src/components/wizard/steps/host-selection/components/shared/instance-selection-configs.tsx @@ -13,17 +13,18 @@ // limitations under the License. /** - * 实例选择配置工厂 - * @description 为不同数据源提供预定义的配置 + * Instance selection config factory. + * @description Provides predefined instance selection configs for different data sources. */ import { IconCloud, IconDesktop } from '@arco-design/web-react/icon'; +import type { AliyunInstance, VolcengineInstance } from '@wizard/types'; +import { checkMatch } from '@wizard/utils/filter'; import type { ZabbixHost } from 'api-generate'; -import type { AliyunInstance, VolcengineInstance } from '../../../../types'; import type { InstanceSelectionConfig } from './instance-selection-config'; /** - * 阿里云实例选择配置 + * Aliyun instance selection config. */ export const createAliyunConfig = ( selectionAction: (instances: AliyunInstance[]) => void, @@ -36,13 +37,13 @@ export const createAliyunConfig = ( itemType: '实例', icon: , dataTransformer: (instance) => { - // 当只有 userId 而没有 instanceId 时,使用 userId 作为 id - // 这样可以确保标题和显示正确 + // When only userId exists (no instanceId), use userId as the id + // This ensures the title and display are still meaningful const id = instance.instanceId || instance.dimensions?.instanceId || instance.dimensions?.userId || - instance.userId || + (instance as AliyunInstance & { userId?: string }).userId || ''; const name = instance.instanceName || @@ -62,28 +63,28 @@ export const createAliyunConfig = ( }, selectionAction, searchFilter: (instance, searchValue) => { - const searchLower = searchValue.toLowerCase(); return ( - (instance.instanceId?.toLowerCase() || '').includes(searchLower) || - (instance.instanceName?.toLowerCase() || '').includes(searchLower) || - (instance.region?.toLowerCase() || '').includes(searchLower) || - // 当只有 userId 时,也支持搜索 userId - (instance.dimensions?.userId?.toLowerCase() || '').includes( - searchLower, - ) || - (instance.userId?.toLowerCase() || '').includes(searchLower) + checkMatch(instance.instanceId, searchValue) || + checkMatch(instance.instanceName, searchValue) || + checkMatch(instance.region, searchValue) || + // When only userId exists, also allow searching by userId + checkMatch(instance.dimensions?.userId, searchValue) || + checkMatch( + (instance as AliyunInstance & { userId?: string }).userId, + searchValue, + ) ); }, getId: (instance) => instance.instanceId || instance.dimensions?.instanceId || instance.dimensions?.userId || - instance.userId || + (instance as AliyunInstance & { userId?: string }).userId || '', }); /** - * 火山引擎实例选择配置 + * Volcengine instance selection config. */ export const createVolcengineConfig = ( selectionAction: (instances: VolcengineInstance[]) => void, @@ -104,15 +105,15 @@ export const createVolcengineConfig = ( }), selectionAction, searchFilter: (instance, searchValue) => - instance.instanceId.toLowerCase().includes(searchValue) || - (instance.instanceName?.toLowerCase() || '').includes(searchValue) || - (instance.region?.toLowerCase() || '').includes(searchValue) || - (instance.namespace?.toLowerCase() || '').includes(searchValue), + checkMatch(instance.instanceId, searchValue) || + checkMatch(instance.instanceName, searchValue) || + checkMatch(instance.region, searchValue) || + checkMatch(instance.namespace, searchValue), getId: (instance) => instance.instanceId, }); /** - * Zabbix主机选择配置 + * Zabbix host selection config. */ export const createZabbixConfig = ( selectionAction: (hosts: ZabbixHost[]) => void, @@ -120,19 +121,18 @@ export const createZabbixConfig = ( title: '选择主机', description: '选择要监控的主机,可以选择多个主机', emptyDescription: '暂无可用的主机', - searchPlaceholder: '搜索主机名称...', + searchPlaceholder: '搜索主机名称 (支持正则)...', itemType: '主机', icon: , dataTransformer: (host) => ({ - id: host.host, // 使用 host 作为唯一标识 + id: host.host, // Use host as the unique identifier name: host.name, - region: undefined, // Zabbix没有region概念 - dimensions: undefined, // Zabbix没有dimensions概念 + region: undefined, // Zabbix has no region concept + dimensions: undefined, // Zabbix has no dimensions concept }), selectionAction, searchFilter: (host, searchValue) => - host.host.toLowerCase().includes(searchValue) || - host.name.toLowerCase().includes(searchValue), - getId: (host) => host.host, // 使用 host 作为唯一标识 - useHostList: true, // 使用特殊的主机列表组件 + checkMatch(host.host, searchValue) || checkMatch(host.name, searchValue), + getId: (host) => host.host, // Use host as the unique identifier + useHostList: true, // Use the specialized host list component }); diff --git a/frontend/apps/veaiops/src/components/wizard/utils/filter.ts b/frontend/apps/veaiops/src/components/wizard/utils/filter.ts new file mode 100644 index 00000000..626524ab --- /dev/null +++ b/frontend/apps/veaiops/src/components/wizard/utils/filter.ts @@ -0,0 +1,135 @@ +// Copyright 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * Check whether the given text matches the search value. + * Supports both fuzzy substring matching and regular expressions. + * @param text Text to check + * @param searchValue Search value (plain string or regex pattern string) + * @returns Whether the text matches the search condition + */ +export const isMatch = ( + text: string | undefined | null, + searchValue: string, +): boolean => { + if (!text) { + return false; + } + + const safeText = text.toLowerCase(); + const safeSearch = searchValue.toLowerCase().trim(); + + if (!safeSearch) { + return true; + } + + try { + // Try regex matching first + // Typical user inputs: + // - AA-\d+-BB style regex + // - AA-* style wildcard + // - Simple substring like AA + + // Strategy: + // 1. Try to interpret the query as regex + // 2. If it fails and contains *, treat it as a wildcard + // 3. Otherwise fallback to plain substring match + + // We directly build RegExp here to fully support regex syntax + + const regex = new RegExp(safeSearch, 'i'); + if (regex.test(text)) { + return true; + } + } catch (e) { + // If regex parsing fails (e.g. syntax error), try wildcard handling instead + } + + // Wildcard handling (simple * to .*) + // Only used when query contains * and regex parsing failed + if (safeSearch.includes('*')) { + try { + // Escape all regex metacharacters except * + const pattern = safeSearch + .replace(/[.+?^${}()|[\]\\]/g, '\\$&') + .replace(/\*/g, '.*'); + const wildcardRegex = new RegExp(`^${pattern}$`, 'i'); // Wildcard usually implies full match + if (wildcardRegex.test(text)) { + return true; + } + } catch (e) { + // Ignore and continue to substring fallback + } + } + + // Final fallback: plain substring match + return safeText.includes(safeSearch); +}; + +/** + * Matching helper optimized for AA-number-BB style patterns + * while still supporting generic regex and wildcard queries. + */ +export const checkMatch = ( + text: string | undefined | null, + searchValue: string, +): boolean => { + if (!text) { + return false; + } + + const safeText = text.trim(); + const query = searchValue.trim(); + + if (!query) { + return true; + } + + // 1. Try regex match first + // Typical usage: + // - AA-\d+-BB style patterns with \d for digits + // - ^server.* for prefix matching + // Using RegExp directly gives maximum flexibility for power users. + + try { + // Use 'i' flag to make the match case-insensitive + const regex = new RegExp(query, 'i'); + if (regex.test(safeText)) { + return true; + } + } catch (e) { + // If regex parsing fails (e.g. unclosed parenthesis), fall through to wildcard or substring + } + + // 2. Try wildcard * match + // Only used when query contains * and regex parsing did not succeed + if (query.includes('*')) { + try { + // Escape all regex metacharacters except * + const pattern = query + .replace(/[.+?^${}()|[\]\\]/g, '\\$&') + .replace(/\*/g, '.*'); + const wildcardRegex = new RegExp(`^${pattern}$`, 'i'); // Wildcard usually implies full match + if (wildcardRegex.test(safeText)) { + return true; + } + } catch (e) { + // Ignore and fallback to plain substring match + } + } + + // 3. Plain substring match (case-insensitive) + // Fallback for queries like "server(" which are invalid regex but valid text + return safeText.toLowerCase().includes(query.toLowerCase()); +}; diff --git a/frontend/apps/veaiops/src/modules/threshold/features/task-config/ui/components/table-columns/labels-column.tsx b/frontend/apps/veaiops/src/modules/threshold/features/task-config/ui/components/table-columns/labels-column.tsx index 0f1b57d4..987ee2ea 100644 --- a/frontend/apps/veaiops/src/modules/threshold/features/task-config/ui/components/table-columns/labels-column.tsx +++ b/frontend/apps/veaiops/src/modules/threshold/features/task-config/ui/components/table-columns/labels-column.tsx @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +import { Tooltip } from '@arco-design/web-react'; import { CellRender } from '@veaiops/components'; import type React from 'react'; @@ -29,9 +30,13 @@ export const LabelsColumn: React.FC<{
{Object.entries(labels).map(([key, value]) => (
- - {`${key}: ${value}`} - + + + + {`${key}: ${value}`} + + +
))}
diff --git a/frontend/apps/veaiops/src/modules/threshold/features/task-config/ui/version/hooks/use-rerun-drawer.tsx b/frontend/apps/veaiops/src/modules/threshold/features/task-config/ui/version/hooks/use-rerun-drawer.tsx index 4acae8a2..1bb6ac87 100644 --- a/frontend/apps/veaiops/src/modules/threshold/features/task-config/ui/version/hooks/use-rerun-drawer.tsx +++ b/frontend/apps/veaiops/src/modules/threshold/features/task-config/ui/version/hooks/use-rerun-drawer.tsx @@ -12,9 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -import apiClient from '@/utils/api-client'; import { Button, Drawer, Form, Message } from '@arco-design/web-react'; -import { API_RESPONSE_CODE } from '@veaiops/constants'; import { logger } from '@veaiops/utils'; import type { IntelligentThresholdTaskVersion, @@ -22,6 +20,7 @@ import type { } from 'api-generate'; import type React from 'react'; import { useCallback, useEffect, useState } from 'react'; +import { rerunTask } from '../../../lib/data-source'; import { renderRerunForm } from '../../task'; /** @@ -66,51 +65,67 @@ export const useRerunDrawer = ( try { setLoading(true); + + // Build metric_template_value, setting normal_range_start and normal_range_end to null + // if user didn't fill them, so backend won't use default values (10/90) + // Note: Backend model default is now None, so sending null or omitting field will result in None + const metricTemplateValue = values.metric_template_value || {}; + const cleanedMetricTemplateValue: Record = { + ...metricTemplateValue, + }; + + // Set to null if user didn't fill normal_range_start or normal_range_end + // Backend will use None (not 10/90) when these fields are null or omitted + if ( + cleanedMetricTemplateValue.normal_range_start === undefined || + cleanedMetricTemplateValue.normal_range_start === null || + cleanedMetricTemplateValue.normal_range_start === '' + ) { + cleanedMetricTemplateValue.normal_range_start = null; + } + + if ( + cleanedMetricTemplateValue.normal_range_end === undefined || + cleanedMetricTemplateValue.normal_range_end === null || + cleanedMetricTemplateValue.normal_range_end === '' + ) { + cleanedMetricTemplateValue.normal_range_end = null; + } + const requestBody: RerunIntelligentThresholdTaskRequest = { task_id, direction: values.direction, n_count: values.n_count, - metric_template_value: values.metric_template_value, + metric_template_value: + cleanedMetricTemplateValue as RerunIntelligentThresholdTaskRequest['metric_template_value'], + sensitivity: + (values.sensitivity as number) ?? + (values.metric_template_value?.sensitivity as number) ?? + 0.5, }; - const response = - await apiClient.intelligentThresholdTask.postApisV1IntelligentThresholdTaskRerun( - { - requestBody, - }, - ); + // ✅ Reuse rerunTask from api.ts to avoid duplicate API calls + const success = await rerunTask(requestBody); - if (response.code === API_RESPONSE_CODE.SUCCESS && response.data) { - Message.success('任务重新执行成功'); + if (success) { close(); - // 刷新表格数据以显示新的版本结果 + // Refresh table data to show new version results if (tableRef.current?.refresh) { + logger.info({ + message: + '[useRerunDrawer] Rerun successful, triggering single table refresh', + source: 'useRerunDrawer', + }); const refreshResult = await tableRef.current.refresh(); return refreshResult ?? true; } return true; - } else { - throw new Error(response.message || '任务重新执行失败'); } + return false; } catch (error: unknown) { - // ✅ 正确:使用 logger 记录错误,并透出实际错误信息 - const errorObj = - error instanceof Error ? error : new Error(String(error)); - logger.error({ - message: '任务重新执行失败', - data: { - error: errorObj.message, - stack: errorObj.stack, - errorObj, - request: values, - timestamp: Date.now(), - }, - source: 'useRerunDrawer', - component: 'handleSubmit', - }); - const errorMessage = errorObj.message || '任务重新执行失败'; - Message.error(errorMessage); + // Error handling is already done in rerunTask (Message.error and logger.error) + // Just return false to indicate failure return false; } finally { setLoading(false); @@ -124,11 +139,12 @@ export const useRerunDrawer = ( const formData = { direction: rerunData.direction || 'both', n_count: rerunData.n_count || 3, + sensitivity: rerunData.sensitivity ?? 0.5, metric_template_value: { normal_range_start: - rerunData.metric_template_value?.normal_range_start || 0, + rerunData.metric_template_value?.normal_range_start ?? undefined, normal_range_end: - rerunData.metric_template_value?.normal_range_end || 55, + rerunData.metric_template_value?.normal_range_end ?? undefined, }, task_id: rerunData.task_id, }; diff --git a/frontend/apps/veaiops/src/modules/threshold/features/task-config/ui/version/table.tsx b/frontend/apps/veaiops/src/modules/threshold/features/task-config/ui/version/table.tsx index 01e3fcc9..65277c7f 100644 --- a/frontend/apps/veaiops/src/modules/threshold/features/task-config/ui/version/table.tsx +++ b/frontend/apps/veaiops/src/modules/threshold/features/task-config/ui/version/table.tsx @@ -14,6 +14,8 @@ import type { CustomTableActionType } from '@veaiops/components'; import type { BaseQuery, BaseRecord } from '@veaiops/types'; +import { logger } from '@veaiops/utils'; +import { useInterval } from 'ahooks'; import { useRef } from 'react'; import { useAlarmDrawer, @@ -49,6 +51,18 @@ const TaskVersionTable: React.FC = ({ tableRef, }); + // Auto refresh table data every 3 seconds to update task status + useInterval(() => { + // Only refresh when tableRef is available + if (tableRef.current?.refresh) { + logger.debug({ + message: '[TaskVersionTable] Auto refreshing task versions', + source: 'TaskVersionTable', + }); + tableRef.current.refresh(); + } + }, 3000); + return ( <> {tableElement} diff --git a/tests/algorithm/intelligent_threshold/test_robust_daily_period_detector.py b/tests/algorithm/intelligent_threshold/test_robust_daily_period_detector.py index e2bee696..67124db6 100644 --- a/tests/algorithm/intelligent_threshold/test_robust_daily_period_detector.py +++ b/tests/algorithm/intelligent_threshold/test_robust_daily_period_detector.py @@ -113,7 +113,7 @@ def short_data(): def test_init_default_parameters(): """Test initialization with default parameters.""" detector = RobustDailyPeriodDetector() - assert detector.correlation_threshold == 0.75 + assert detector.correlation_threshold == 0.3 assert detector.min_data_points_per_day == 720 assert detector.min_common_points == 720 diff --git a/tests/algorithm/intelligent_threshold/test_threshold_recommendation_algorithm.py b/tests/algorithm/intelligent_threshold/test_threshold_recommendation_algorithm.py index 5f8b250f..36ff3b55 100644 --- a/tests/algorithm/intelligent_threshold/test_threshold_recommendation_algorithm.py +++ b/tests/algorithm/intelligent_threshold/test_threshold_recommendation_algorithm.py @@ -195,6 +195,7 @@ def test_recommend_threshold_no_time_split(threshold_recommender, sample_timesta max_value=100.0, normal_threshold=50.0, min_ts_length=50, + sensitivity=0.5, direction="up", ) @@ -225,6 +226,7 @@ def test_recommend_threshold_with_time_split(threshold_recommender, sample_times max_value=100.0, normal_threshold=50.0, min_ts_length=30, # Reduced from 50 to ensure all periods have enough data + sensitivity=0.5, direction="up", ) @@ -260,6 +262,7 @@ def test_recommend_threshold_direction_down(threshold_recommender, sample_timest max_value=100.0, normal_threshold=30.0, min_ts_length=50, + sensitivity=0.5, direction="down", ) @@ -290,6 +293,7 @@ def test_recommend_threshold_insufficient_data(threshold_recommender, sample_tim max_value=100.0, normal_threshold=50.0, min_ts_length=1000, # High minimum length + sensitivity=0.5, direction="up", ) @@ -320,6 +324,7 @@ def test_threshold_recommendation_with_sliding_window_basic(threshold_recommende max_value=100.0, normal_threshold=50.0, ignore_count=1, + sensitivity=0.5, direction="up", ) @@ -351,6 +356,7 @@ def test_threshold_recommendation_with_sliding_window_auto_adjust(threshold_reco max_value=100.0, normal_threshold=50.0, ignore_count=1, + sensitivity=0.5, direction="up", ) @@ -376,6 +382,7 @@ def test_threshold_recommendation_with_sliding_window_normal_threshold_up(thresh max_value=100.0, normal_threshold=60.0, ignore_count=1, + sensitivity=0.5, direction="up", ) @@ -401,6 +408,7 @@ def test_threshold_recommendation_with_sliding_window_normal_threshold_down(thre max_value=100.0, normal_threshold=30.0, ignore_count=1, + sensitivity=0.5, direction="down", ) @@ -518,96 +526,6 @@ def test_check_and_consolidate_threshold_groups_single_group(threshold_recommend assert result is None -def test_recommend_threshold_with_consolidation_up(threshold_recommender, sample_timestamps): - """Test full threshold recommendation with consolidation for up direction.""" - # Create values that result in similar thresholds across time periods - values = [] - for i in range(len(sample_timestamps)): - # Create values that vary slightly but result in similar thresholds - hour = (i // 60) % 24 - if hour < 6: - values.append(45) # Night: around 45 - elif hour < 12: - values.append(48) # Morning: around 48 - elif hour < 18: - values.append(52) # Afternoon: around 52 - else: - values.append(47) # Evening: around 47 - - with patch.object(threshold_recommender.period_detector, "detect") as mock_detect: - mock_detect.return_value = True # Daily periodicity detected - - # Mock sliding window to return similar thresholds - with patch.object(threshold_recommender, "threshold_recommendation_with_sliding_window") as mock_sliding_window: - # Return very similar thresholds for all time periods (within 10%) - mock_sliding_window.side_effect = [(49.0, 5), (50.0, 5), (51.0, 5), (50.5, 5)] * 2 - - result = threshold_recommender.recommend_threshold( - timestamp_list=sample_timestamps[:1000], # Use subset - value_list=values[:1000], - default_window_size=5, - time_split=True, - auto_window_adjust=False, - min_value=0.0, - max_value=100.0, - normal_threshold=None, - min_ts_length=50, - direction="up", - ) - - # Should consolidate to single group since thresholds are close (49-51, diff=2, 2/51=3.9% < 10%) - assert len(result) == 1 - assert result[0]["start_hour"] == 0 - assert result[0]["end_hour"] == 24 - assert result[0]["upper_bound"] == 50.5 # Max value from the sorted thresholds - assert result[0]["lower_bound"] is None - assert result[0]["window_size"] == 5 - - -def test_recommend_threshold_with_consolidation_down(threshold_recommender, sample_timestamps): - """Test full threshold recommendation with consolidation for down direction.""" - # Create values that result in similar thresholds across time periods - values = [] - for i in range(len(sample_timestamps)): - hour = (i // 60) % 24 - if hour < 6: - values.append(25) # Night: around 25 - elif hour < 12: - values.append(23) # Morning: around 23 - elif hour < 18: - values.append(27) # Afternoon: around 27 - else: - values.append(24) # Evening: around 24 - - with patch.object(threshold_recommender.period_detector, "detect") as mock_detect: - mock_detect.return_value = True # Daily periodicity detected - - with patch.object(threshold_recommender, "threshold_recommendation_with_sliding_window") as mock_sliding_window: - # Return very similar thresholds for all time periods (within 10%) - mock_sliding_window.side_effect = [(24.5, 3), (23.5, 3), (25.5, 3), (24.0, 3)] * 2 - - result = threshold_recommender.recommend_threshold( - timestamp_list=sample_timestamps[:1000], - value_list=values[:1000], - default_window_size=5, - time_split=True, - auto_window_adjust=False, - min_value=0.0, - max_value=100.0, - normal_threshold=None, - min_ts_length=50, - direction="down", - ) - - # Should consolidate to single group since thresholds are close (23.5-25.5, diff=2, 2/25.5=7.8% < 10%) - assert len(result) == 1 - assert result[0]["start_hour"] == 0 - assert result[0]["end_hour"] == 24 - assert result[0]["upper_bound"] is None - assert result[0]["lower_bound"] == 23.5 # Min value for down direction - assert result[0]["window_size"] == 3 - - def test_recommend_threshold_no_consolidation_different_thresholds( threshold_recommender, sample_timestamps, sample_values_periodic ): @@ -629,6 +547,7 @@ def test_recommend_threshold_no_consolidation_different_thresholds( max_value=100.0, normal_threshold=None, min_ts_length=50, + sensitivity=0.5, direction="up", ) @@ -650,6 +569,7 @@ def test_recommend_general_threshold_basic(threshold_recommender): ignore_count=1, min_value=0.0, max_value=200.0, + sensitivity=0.5, direction="up", ) @@ -673,6 +593,7 @@ def test_recommend_general_threshold_direction_down(threshold_recommender): ignore_count=1, min_value=0.0, max_value=100.0, + sensitivity=0.5, direction="down", ) @@ -695,6 +616,7 @@ def test_recommend_general_threshold_zero_interval(threshold_recommender): ignore_count=1, min_value=0.0, max_value=100.0, + sensitivity=0.5, direction="up", ) @@ -722,6 +644,7 @@ def test_recommend_general_threshold_no_clusters(threshold_recommender): ignore_count=1, min_value=0.0, max_value=100.0, + sensitivity=0.5, direction="up", ) @@ -751,6 +674,7 @@ def test_recommend_general_threshold_with_abnormals(threshold_recommender): ignore_count=1, # Ignore one abnormal period min_value=0.0, max_value=200.0, + sensitivity=0.5, direction="up", ) @@ -776,6 +700,7 @@ def test_recommend_general_threshold_empty_data(threshold_recommender): ignore_count=1, min_value=0.0, max_value=100.0, + sensitivity=0.5, direction="up", ) # Should handle empty data gracefully with failure status @@ -799,6 +724,7 @@ def test_recommend_general_threshold_single_value(threshold_recommender): ignore_count=1, min_value=0.0, max_value=100.0, + sensitivity=0.5, direction="up", ) # Should handle single value gracefully with failure status @@ -823,6 +749,7 @@ def test_recommend_general_threshold_no_valid_clusters_fallback(threshold_recomm ignore_count=1, min_value=0.0, max_value=1000.0, + sensitivity=0.5, direction="up", ) @@ -843,6 +770,7 @@ def test_recommend_general_threshold_no_valid_clusters_fallback(threshold_recomm ignore_count=1, min_value=0.0, max_value=1000.0, + sensitivity=0.5, direction="down", ) @@ -873,6 +801,7 @@ def test_recommend_general_threshold_constant_values_fallback(threshold_recommen ignore_count=1, min_value=0.0, max_value=200.0, + sensitivity=0.5, direction="up", ) @@ -892,6 +821,7 @@ def test_recommend_general_threshold_constant_values_fallback(threshold_recommen ignore_count=1, min_value=0.0, max_value=200.0, + sensitivity=0.5, direction="down", ) diff --git a/tests/algorithm/intelligent_threshold/test_threshold_recommender.py b/tests/algorithm/intelligent_threshold/test_threshold_recommender.py index 719aa5af..99f6064c 100644 --- a/tests/algorithm/intelligent_threshold/test_threshold_recommender.py +++ b/tests/algorithm/intelligent_threshold/test_threshold_recommender.py @@ -1201,21 +1201,21 @@ def test_merge_threshold_results_consolidation_mismatch_down_consolidated_up_not IntelligentThresholdConfig( start_hour=6, end_hour=12, - upper_bound=75.0, + upper_bound=72.0, lower_bound=None, window_size=5, ), IntelligentThresholdConfig( start_hour=12, end_hour=18, - upper_bound=80.0, + upper_bound=35.0, lower_bound=None, window_size=5, ), IntelligentThresholdConfig( start_hour=18, end_hour=24, - upper_bound=85.0, + upper_bound=36.0, lower_bound=None, window_size=5, ), @@ -1251,14 +1251,16 @@ def test_merge_threshold_results_consolidation_mismatch_down_consolidated_up_not assert len(merged_results) == 1 result = merged_results[0] assert result.status == "Success" - assert len(result.thresholds) == 4 # Should match the non-consolidated direction (up) - - # Check that down threshold (25.0) is distributed across all up periods - for i, threshold in enumerate(result.thresholds): - assert threshold.upper_bound == up_results[0].thresholds[i].upper_bound # Keep original up bounds - assert threshold.lower_bound == 25.0 # All should have the consolidated down threshold - assert threshold.start_hour == up_results[0].thresholds[i].start_hour - assert threshold.end_hour == up_results[0].thresholds[i].end_hour + assert len(result.thresholds) == 2 # Should match the non-consolidated direction (up) + + assert result.thresholds[0].upper_bound == up_results[0].thresholds[1].upper_bound # Keep original up bounds + assert result.thresholds[0].lower_bound == 25.0 # All should have the consolidated down threshold + assert result.thresholds[0].start_hour == up_results[0].thresholds[0].start_hour + assert result.thresholds[0].end_hour == up_results[0].thresholds[1].end_hour + assert result.thresholds[1].upper_bound == up_results[0].thresholds[3].upper_bound # Keep original up bounds + assert result.thresholds[1].lower_bound == 25.0 # All should have the consolidated down threshold + assert result.thresholds[1].start_hour == up_results[0].thresholds[2].start_hour + assert result.thresholds[1].end_hour == up_results[0].thresholds[3].end_hour def test_merge_threshold_results_both_consolidated(threshold_recommender): diff --git a/veaiops/algorithm/intelligent_threshold/configs.py b/veaiops/algorithm/intelligent_threshold/configs.py index 053fdee8..7497220b 100644 --- a/veaiops/algorithm/intelligent_threshold/configs.py +++ b/veaiops/algorithm/intelligent_threshold/configs.py @@ -18,6 +18,8 @@ the intelligent threshold algorithm components. """ +import os + import tzlocal # ============================================================================= @@ -45,8 +47,11 @@ # Default timezone for time-based operations DEFAULT_TIMEZONE = tzlocal.get_localzone_name() -# Default time split ranges for daily analysis (hours) -DEFAULT_TIME_SPLIT_RANGES = [[0, 6], [6, 12], [12, 18], [18, 24]] +# Default number of time split (default = 4) +DEFAULT_NUMBER_OF_TIME_SPLIT = int(os.getenv("DEFAULT_NUMBER_OF_TIME_SPLIT", 4)) + +# Maximum number of threshold blocks after merging (default = 8) +DEFAULT_MAXIMUM_THRESHOLD_BLOCKS = int(os.getenv("DEFAULT_MAXIMUM_THRESHOLD_BLOCKS", 8)) # ============================================================================= # Algorithm Parameters @@ -82,7 +87,7 @@ # ============================================================================= # Default correlation threshold for period detection -DEFAULT_CORRELATION_THRESHOLD = 0.75 +DEFAULT_CORRELATION_THRESHOLD = float(os.getenv("DEFAULT_CORRELATION_THRESHOLD", 0.3)) # Minimum data points per day for analysis DEFAULT_MIN_DATA_POINTS_PER_DAY = 720 @@ -94,5 +99,6 @@ # Timeout Configuration # ============================================================================= -# Data fetch timeout in seconds (3 minutes for production) -FETCH_DATA_TIMEOUT = 180 +# Data fetch timeout in seconds (60 minutes for production) + +FETCH_DATA_TIMEOUT = int(os.getenv("FETCH_DATA_TIMEOUT", 3600)) diff --git a/veaiops/algorithm/intelligent_threshold/threshold_recommendation_algorithm.py b/veaiops/algorithm/intelligent_threshold/threshold_recommendation_algorithm.py index 95b6efe5..664ee730 100644 --- a/veaiops/algorithm/intelligent_threshold/threshold_recommendation_algorithm.py +++ b/veaiops/algorithm/intelligent_threshold/threshold_recommendation_algorithm.py @@ -26,8 +26,7 @@ from dbscan1d.core import DBSCAN1D from veaiops.algorithm.intelligent_threshold.configs import ( - DEFAULT_COEFFICIENT, - DEFAULT_TIME_SPLIT_RANGES, + DEFAULT_NUMBER_OF_TIME_SPLIT, DEFAULT_TIMEZONE, MICROSECOND_THRESHOLD, MILLISECOND_THRESHOLD, @@ -52,19 +51,28 @@ class ThresholdRecommendAlgorithm: Attributes: timezone (str): Timezone for timestamp processing. - time_split_ranges (List[List[int]]): Time ranges for splitting analysis. + time_split_ranges (List[List[float]]): Time ranges for splitting analysis. period_detector (RobustDailyPeriodDetector): Daily period detection instance. """ - def __init__(self, timezone: str = DEFAULT_TIMEZONE, time_split_ranges: Optional[List[List[int]]] = None) -> None: + def __init__(self, timezone: str = DEFAULT_TIMEZONE, time_split_ranges: Optional[List[List[float]]] = None) -> None: """Initialize the ThresholdRecommendAlgorithm. Args: timezone (str): Timezone for timestamp processing. - time_split_ranges (Optional[List[List[int]]]): Custom time split ranges. + time_split_ranges (Optional[List[List[float]]]): Custom time split ranges. """ self.timezone = timezone - self.time_split_ranges = time_split_ranges or DEFAULT_TIME_SPLIT_RANGES + + if time_split_ranges is None: + self.time_split_ranges = [] + start_time = 0.0 + for _ in range(DEFAULT_NUMBER_OF_TIME_SPLIT): + end_time = start_time + 24 / DEFAULT_NUMBER_OF_TIME_SPLIT + self.time_split_ranges.append([start_time, end_time]) + start_time = end_time + else: + self.time_split_ranges = time_split_ranges self.period_detector = RobustDailyPeriodDetector() logger.debug(f"Initialized ThresholdRecommendAlgorithm with timezone={timezone}") @@ -114,6 +122,7 @@ def recommend_threshold( max_value: Optional[float], normal_threshold: Optional[float], min_ts_length: int, + sensitivity: float, direction: Literal["up", "down"] = "up", ) -> List[Dict]: """Recommend thresholds for time series data. @@ -131,6 +140,7 @@ def recommend_threshold( max_value (Optional[float]): Maximum value constraint for the time series data. normal_threshold (Optional[float]): Normal threshold baseline for the time series data. min_ts_length (int): Minimum required length of the time series data. + sensitivity (float): Sensitivity of the threshold recommendation algorithm. direction (Literal["up", "down"]): Direction of the threshold recommendation. Returns: @@ -161,6 +171,7 @@ def recommend_threshold( max_value, normal_threshold, direction, + sensitivity, ) else: # Process with time splitting @@ -174,6 +185,7 @@ def recommend_threshold( normal_threshold, min_ts_length, direction, + sensitivity, ) def _process_single_time_period( @@ -186,6 +198,7 @@ def _process_single_time_period( max_value: Optional[float], normal_threshold: Optional[float], direction: Literal["up", "down"], + sensitivity: float, ) -> List[Dict]: """Process time series as a single time period without splitting. @@ -198,6 +211,7 @@ def _process_single_time_period( max_value (Optional[float]): Maximum value constraint. normal_threshold (Optional[float]): Normal threshold baseline. direction (Literal["up", "down"]): Threshold direction. + sensitivity (float): Sensitivity of the threshold recommendation algorithm. Returns: List[Dict]: Single threshold group covering 24 hours. @@ -212,6 +226,7 @@ def _process_single_time_period( normal_threshold, 1, direction, + sensitivity, ) threshold_group = { @@ -240,6 +255,7 @@ def _process_time_split_periods( normal_threshold: Optional[float], min_ts_length: int, direction: Literal["up", "down"], + sensitivity: float, ) -> List[Dict]: """Process time series with time period splitting. @@ -253,6 +269,7 @@ def _process_time_split_periods( normal_threshold (Optional[float]): Normal threshold baseline. min_ts_length (int): Minimum required data length. direction (Literal["up", "down"]): Threshold direction. + sensitivity (float): Sensitivity of the threshold recommendation algorithm. Returns: List[Dict]: Multiple threshold groups for different time periods. @@ -295,6 +312,7 @@ def _process_time_split_periods( normal_threshold, 1, direction, + sensitivity, ) # Calculate threshold with ignore_count=0 @@ -308,6 +326,7 @@ def _process_time_split_periods( normal_threshold, 0, direction, + sensitivity, ) # Calculate ratio for sorting @@ -353,11 +372,6 @@ def _process_time_split_periods( threshold_groups.append(threshold_group) - # Check if consolidation is needed - consolidated_group = self._check_and_consolidate_threshold_groups(threshold_groups, direction) - if consolidated_group: - return [consolidated_group] - return threshold_groups def _check_and_consolidate_threshold_groups( @@ -433,6 +447,7 @@ def threshold_recommendation_with_sliding_window( normal_threshold: Optional[float], ignore_count: int, direction: Literal["up", "down"], + sensitivity: float, ) -> tuple[float, int]: """Threshold recommendation with sliding window. @@ -446,6 +461,7 @@ def threshold_recommendation_with_sliding_window( normal_threshold (Optional[float]): Normal threshold for the time series data. ignore_count (int): Number of data points to ignore at the beginning of the time series. direction (Literal["up", "down"]): Direction of the threshold recommendation. + sensitivity (float): Sensitivity of the threshold recommendation algorithm. Returns: tuple[float, int]: Tuple of threshold and window size. @@ -466,6 +482,7 @@ def threshold_recommendation_with_sliding_window( min_value, max_value, direction, + sensitivity, ) # If threshold calculation failed, continue with next window size @@ -503,6 +520,7 @@ def recommend_general_threshold( min_value: Optional[float], max_value: Optional[float], direction: Literal["up", "down"], + sensitivity: float, ) -> Dict[str, Union[bool, float]]: """Recommend general threshold for time series data. @@ -514,6 +532,7 @@ def recommend_general_threshold( min_value (Optional[float]): Minimum value of the time series data. max_value (Optional[float]): Maximum value of the time series data. direction (Literal["up", "down"]): Direction of the threshold recommendation. + sensitivity (float): Sensitivity of the threshold recommendation algorithm. Returns: Dict[str, Union[bool, float]]: Dictionary with keys: @@ -522,6 +541,7 @@ def recommend_general_threshold( """ timestamp_list = timestamp_list_original value_list = value_list_original if direction == "up" else [-v for v in value_list_original] + coefficient = 1.05 + 0.3 * sensitivity # Calculate time interval intervals = [] @@ -616,26 +636,26 @@ def recommend_general_threshold( if direction == "up": # For up direction, use 95th percentile of original values baseline = float(np.percentile(value_list, 95)) - return {"status": True, "threshold": baseline * DEFAULT_COEFFICIENT} + return {"status": True, "threshold": baseline * coefficient} else: # For down direction, value_list is already negated, so use 95th percentile of negated values # which corresponds to the 5th percentile of original values baseline = float(np.percentile(value_list, 95)) # This is the max of negated values # Apply the same logic as the original down direction final_threshold = 0 - baseline # Convert back to positive - return {"status": True, "threshold": final_threshold / DEFAULT_COEFFICIENT} + return {"status": True, "threshold": final_threshold / coefficient} else: # Empty data case - should not happen as we check earlier, but safety fallback return {"status": False, "threshold": -1.0} if direction == "up": - threshold = final_max_value * DEFAULT_COEFFICIENT + threshold = final_max_value * coefficient if max_value is not None: threshold = min(max_value, threshold) return {"status": True, "threshold": threshold} else: final_max_value = 0 - final_max_value - threshold = final_max_value / DEFAULT_COEFFICIENT + threshold = final_max_value / coefficient if min_value is not None: threshold = max(min_value, threshold) - return {"status": True, "threshold": final_max_value / DEFAULT_COEFFICIENT} + return {"status": True, "threshold": final_max_value / coefficient} diff --git a/veaiops/algorithm/intelligent_threshold/threshold_recommender.py b/veaiops/algorithm/intelligent_threshold/threshold_recommender.py index a367f73f..8bd0b585 100644 --- a/veaiops/algorithm/intelligent_threshold/threshold_recommender.py +++ b/veaiops/algorithm/intelligent_threshold/threshold_recommender.py @@ -31,6 +31,7 @@ from tenacity import retry, stop_after_attempt, wait_exponential from veaiops.algorithm.intelligent_threshold.configs import ( + DEFAULT_MAXIMUM_THRESHOLD_BLOCKS, EXTREME_VALUE_THRESHOLD, FETCH_DATA_TIMEOUT, HISTORICAL_DAYS, @@ -83,27 +84,36 @@ class ThresholdRecommender: Attributes: threshold_algorithm (ThresholdRecommendAlgorithm): Threshold recommendation algorithm instance. max_concurrent_tasks (int): Maximum number of concurrent tasks (default: 5). + maximum_threshold_blocks (int): Maximum number of threshold blocks after merging (default: 8). task_queue (List[TaskRequest]): Priority queue for pending tasks. running_tasks (Dict[str, asyncio.Task]): Currently running tasks. queue_lock (asyncio.Lock): Lock for thread-safe queue operations. """ def __init__( - self, threshold_algorithm: Optional[ThresholdRecommendAlgorithm] = None, max_concurrent_tasks: int = 5 + self, + threshold_algorithm: Optional[ThresholdRecommendAlgorithm] = None, + max_concurrent_tasks: int = 5, + maximum_threshold_blocks: int = DEFAULT_MAXIMUM_THRESHOLD_BLOCKS, ) -> None: """Initialize the ThresholdRecommender. Args: threshold_algorithm (Optional[ThresholdRecommendAlgorithm]): Threshold recommender algorithm instance. max_concurrent_tasks (int): Maximum number of concurrent tasks (default: 5). + maximum_threshold_blocks (int): Maximum number of threshold blocks after merging (default: 8). """ self.threshold_algorithm = threshold_algorithm or ThresholdRecommendAlgorithm() self.max_concurrent_tasks = max_concurrent_tasks + self.maximum_threshold_blocks = maximum_threshold_blocks self.task_queue: List[TaskRequest] = [] self.running_tasks: Dict[str, asyncio.Task] = {} self.queue_lock = asyncio.Lock() - logger.debug(f"Initialized ThresholdRecommender with max_concurrent_tasks={max_concurrent_tasks}") + logger.debug( + f"Initialized ThresholdRecommender with max_concurrent_tasks={max_concurrent_tasks}, " + f"maximum_threshold_blocks={maximum_threshold_blocks}" + ) async def _process_queue(self) -> None: """Process the task queue and start new tasks if capacity allows.""" @@ -322,6 +332,296 @@ def _get_normal_threshold( return normal_range_end if direction == "up" else normal_range_start + @staticmethod + def can_merge_threshold_configs(configs: List[IntelligentThresholdConfig]) -> bool: + """Check if a list of threshold configurations can be merged. + + Merge conditions: + 1. window_size must be the same + 2. upper_bound max-min difference <= 10% + 3. lower_bound max-min difference <= 10% + + Args: + configs: List of threshold configurations + + Returns: + bool: Whether the configs can be merged + """ + if len(configs) <= 1: + return True + + # Check if all window_sizes are the same + window_sizes = [c.window_size for c in configs] + if len(set(window_sizes)) != 1: + return False + + # Extract upper_bound and lower_bound + upper_bounds = [c.upper_bound for c in configs if c.upper_bound is not None] + lower_bounds = [c.lower_bound for c in configs if c.lower_bound is not None] + + # Check upper_bound merge condition + if upper_bounds: + max_upper = max(upper_bounds) + min_upper = min(upper_bounds) + + if max_upper == 0: + if max_upper != min_upper: + return False + else: + upper_diff_ratio = (max_upper - min_upper) / max_upper + if upper_diff_ratio > 0.1: # 10% threshold + return False + + # Check lower_bound merge condition + if lower_bounds: + max_lower = max(lower_bounds) + min_lower = min(lower_bounds) + + if max_lower == 0: + if max_lower != min_lower: + return False + else: + lower_diff_ratio = (max_lower - min_lower) / max_lower + if lower_diff_ratio > 0.1: # 10% threshold + return False + + return True + + @staticmethod + def merge_threshold_configs(configs: List[IntelligentThresholdConfig]) -> IntelligentThresholdConfig: + """Merge multiple threshold configurations into a single configuration. + + Args: + configs: List of threshold configurations to merge + + Returns: + IntelligentThresholdConfig: Merged threshold configuration + """ + if len(configs) == 1: + return configs[0] + + # Extract upper_bound and lower_bound + upper_bounds = [c.upper_bound for c in configs if c.upper_bound is not None] + lower_bounds = [c.lower_bound for c in configs if c.lower_bound is not None] + + # For upper bound, use max (conservative strategy) + merged_upper = max(upper_bounds) if upper_bounds else None + + # For lower bound, use min (conservative strategy) + merged_lower = min(lower_bounds) if lower_bounds else None + + return IntelligentThresholdConfig( + start_hour=configs[0].start_hour, + end_hour=configs[-1].end_hour, + upper_bound=merged_upper, + lower_bound=merged_lower, + window_size=configs[0].window_size, + ) + + @staticmethod + def _calculate_merge_difference(config1: IntelligentThresholdConfig, config2: IntelligentThresholdConfig) -> float: + """Calculate the difference between two adjacent threshold configurations for merging. + + The difference is calculated based on the relative change in both upper_bound and lower_bound. + Returns a normalized difference value that can be used to determine merge priority. + + Args: + config1: First threshold configuration + config2: Second threshold configuration + + Returns: + float: Normalized difference value (lower is more similar, higher is more different) + """ + differences = [] + + # Calculate upper_bound difference + if config1.upper_bound is not None and config2.upper_bound is not None: + upper1 = config1.upper_bound + upper2 = config2.upper_bound + max_upper = max(abs(upper1), abs(upper2)) + if max_upper > 0: + upper_diff = abs(upper1 - upper2) / max_upper + differences.append(upper_diff) + + # Calculate lower_bound difference + if config1.lower_bound is not None and config2.lower_bound is not None: + lower1 = config1.lower_bound + lower2 = config2.lower_bound + max_lower = max(abs(lower1), abs(lower2)) + if max_lower > 0: + lower_diff = abs(lower1 - lower2) / max_lower + differences.append(lower_diff) + + # Return average difference, or 0 if no valid differences + return sum(differences) / len(differences) if differences else 0.0 + + def _hierarchical_merge_thresholds( + self, thresholds: List[IntelligentThresholdConfig], max_blocks: int + ) -> List[IntelligentThresholdConfig]: + """Merge threshold configurations using hierarchical clustering approach. + + This method uses a bottom-up hierarchical clustering approach: + 1. Start with all individual threshold blocks + 2. Iteratively merge the two adjacent blocks with smallest difference + 3. Continue until the number of blocks <= max_blocks + + Args: + thresholds: Original list of threshold configurations (must be sorted by start_hour) + max_blocks: Maximum number of blocks after merging + + Returns: + List[IntelligentThresholdConfig]: List of merged threshold configurations + """ + if len(thresholds) <= max_blocks: + return thresholds + + # Create a mutable list of threshold blocks (as lists to track merged ranges) + blocks = [[threshold] for threshold in thresholds] + + logger.debug(f"Starting hierarchical merge: {len(blocks)} blocks -> target {max_blocks} blocks") + + # Iteratively merge until we reach the target number of blocks + while len(blocks) > max_blocks: + # Calculate differences between all adjacent blocks + min_diff = float("inf") + min_diff_idx = -1 + + for i in range(len(blocks) - 1): + # Get the last config of current block and first config of next block + current_last = blocks[i][-1] + next_first = blocks[i + 1][0] + + # Only merge if blocks are continuous + if current_last.end_hour == next_first.start_hour: + # Calculate difference between the two blocks + diff = self._calculate_merge_difference(current_last, next_first) + + if diff < min_diff: + min_diff = diff + min_diff_idx = i + + # If no mergeable adjacent blocks found, break + if min_diff_idx == -1: + logger.warning(f"Cannot merge further: no continuous blocks found. Current blocks: {len(blocks)}") + break + + # Merge the two blocks with minimum difference + merged_block = blocks[min_diff_idx] + blocks[min_diff_idx + 1] + blocks[min_diff_idx : min_diff_idx + 2] = [merged_block] + + logger.debug( + f"Merged blocks at index {min_diff_idx} (diff={min_diff:.4f}), remaining blocks: {len(blocks)}" + ) + + # Convert blocks back to merged threshold configurations + result = [] + for block in blocks: + merged_config = self.merge_threshold_configs(block) + result.append(merged_config) + + logger.info( + f"Hierarchical merge completed: {len(thresholds)} -> {len(result)} blocks " + f"(target: {max_blocks}, maximum_threshold_blocks={self.maximum_threshold_blocks})" + ) + + return result + + def merge_continuous_thresholds( + self, thresholds: List[IntelligentThresholdConfig] + ) -> List[IntelligentThresholdConfig]: + """Merge continuous threshold configurations with adaptive strategy. + + Strategy: + 1. First apply greedy algorithm to merge consecutive time periods with variance within 10% + 2. If result exceeds maximum_threshold_blocks, apply hierarchical clustering to further merge + + Args: + thresholds: Original list of threshold configurations + + Returns: + List[IntelligentThresholdConfig]: List of merged threshold configurations + """ + # Filter out configs that have neither upper_bound nor lower_bound + valid_thresholds = [t for t in thresholds if t.upper_bound is not None or t.lower_bound is not None] + + if len(valid_thresholds) <= 1: + return thresholds + + # Sort by start_hour + sorted_thresholds = sorted(valid_thresholds, key=lambda x: x.start_hour) + + # Step 1: Apply greedy merge (10% threshold) + merged_result = [] + current_group = [sorted_thresholds[0]] + + for i in range(1, len(sorted_thresholds)): + # Check if continuous + if current_group[-1].end_hour == sorted_thresholds[i].start_hour: + # Try to add current threshold to merge group + test_group = current_group + [sorted_thresholds[i]] + + if self.can_merge_threshold_configs(test_group): + # Can merge, add to current group + current_group.append(sorted_thresholds[i]) + else: + # Cannot merge, finalize current group and start new group + merged_result.append(self.merge_threshold_configs(current_group)) + current_group = [sorted_thresholds[i]] + else: + # Not continuous, finalize current group and start new group + merged_result.append(self.merge_threshold_configs(current_group)) + current_group = [sorted_thresholds[i]] + + # Process the last group + merged_result.append(self.merge_threshold_configs(current_group)) + + logger.debug( + f"Greedy merge completed: {len(valid_thresholds)} -> {len(merged_result)} blocks " + f"(maximum_threshold_blocks={self.maximum_threshold_blocks})" + ) + + # Step 2: If still exceeds maximum_threshold_blocks, apply hierarchical clustering + if len(merged_result) > self.maximum_threshold_blocks: + logger.info( + f"Greedy merge result ({len(merged_result)} blocks) exceeds maximum_threshold_blocks " + f"({self.maximum_threshold_blocks}), applying hierarchical clustering" + ) + merged_result = self._hierarchical_merge_thresholds(merged_result, self.maximum_threshold_blocks) + + return merged_result + + def merge_metric_threshold_results(self, results: List[MetricThresholdResult]) -> List[MetricThresholdResult]: + """Merge threshold configurations in MetricThresholdResult list. + + Merges the thresholds for each metric (MetricThresholdResult), + keeping other fields unchanged. + + Args: + results: List containing multiple metric threshold results + + Returns: + List[MetricThresholdResult]: List of merged results + """ + merged_results = [] + + for result in results: + # Merge thresholds for each metric + merged_thresholds = self.merge_continuous_thresholds(result.thresholds) + + # Create new MetricThresholdResult, keeping other fields unchanged + merged_result = MetricThresholdResult( + name=result.name, + thresholds=merged_thresholds, + labels=result.labels, + unique_key=result.unique_key, + status=result.status, + error_message=result.error_message, + ) + + merged_results.append(merged_result) + + return merged_results + def _merge_threshold_results( self, up_results: list[MetricThresholdResult], down_results: list[MetricThresholdResult] ) -> list[MetricThresholdResult]: @@ -346,7 +646,6 @@ def _merge_threshold_results( for up_result in up_results: down_result = down_results_map.get(up_result.unique_key) - if down_result: # Check if either direction failed if up_result.status != "Success" or down_result.status != "Success": @@ -492,8 +791,7 @@ def _merge_threshold_results( else: # Only down bound available and it succeeded merged_results.append(down_result) - - return merged_results + return self.merge_metric_threshold_results(merged_results) async def _fetch_and_validate_data( self, datasource_id: str diff --git a/veaiops/handler/routers/apis/v1/datasource/zabbix.py b/veaiops/handler/routers/apis/v1/datasource/zabbix.py index 0b4eb0e9..130404c8 100644 --- a/veaiops/handler/routers/apis/v1/datasource/zabbix.py +++ b/veaiops/handler/routers/apis/v1/datasource/zabbix.py @@ -402,17 +402,60 @@ async def get_metrics_timeseries( # Get history type from datasource history_type = zabbix_datasource.history_type - # Fetch data using client.get_metric_data - history_data = await zabbix_datasource.client.get_metric_data( - item_ids=item_ids, - time_from=start_time_ts, - time_till=end_time_ts, - history_type=history_type, - page_size=DEFAULT_PAGE_SIZE, - ) + # Fetch data using pagination to ensure all data is retrieved + all_history_data = [] + last_clock = start_time_ts + page_size = DEFAULT_PAGE_SIZE + + # Add maximum iterations limit to prevent infinite loop + max_iterations = 100 + iteration_count = 0 + + while iteration_count < max_iterations: + page_data = await zabbix_datasource.client.get_metric_data( + item_ids=item_ids, + time_from=last_clock, + time_till=end_time_ts, + history_type=history_type, + page_size=page_size, + ) + + if not page_data: + break + + # Validate page_data structure before processing + if not isinstance(page_data, list) or len(page_data) == 0: + raise BadRequestError(message="Received invalid page_data: expected non-empty list") + + # Ensure page_data items have required fields + valid_items = [item for item in page_data if isinstance(item, dict) and "clock" in item] + if not valid_items: + raise BadRequestError(message="Received page_data with no valid items containing 'clock' field") + + all_history_data.extend(page_data) + + # Check if we have a valid last item before accessing its clock + if page_data and isinstance(page_data[-1], dict) and "clock" in page_data[-1]: + last_clock = int(page_data[-1]["clock"]) + else: + raise BadRequestError(message="Last item in page_data is invalid or missing 'clock' field") + + # If we got less data than the page size, we've reached the end + if len(page_data) < page_size: + break + + # Increment iteration counter + iteration_count += 1 + + # Raise exception if we reached maximum iterations + if iteration_count >= max_iterations: + raise BadRequestError( + message=f"Maximum iterations ({max_iterations}) reached while fetching Zabbix data. " + f"This might indicate an issue with data retrieval or pagination logic." + ) # Convert history data to time series format - timeseries_data = zabbix_datasource._convert_history_to_timeseries(history_data) + timeseries_data = zabbix_datasource._convert_history_to_timeseries(all_history_data) return APIResponse( message="success", @@ -420,8 +463,13 @@ async def get_metrics_timeseries( ) -@zabbix_router.get("/datasource/{datasource_id}/mediatypes", response_model=APIResponse[List[ZabbixMediatype]]) -async def get_zabbix_mediatypes(datasource_id: str) -> APIResponse[List[ZabbixMediatype]]: +@zabbix_router.get( + "/datasource/{datasource_id}/mediatypes", + response_model=APIResponse[List[ZabbixMediatype]], +) +async def get_zabbix_mediatypes( + datasource_id: str, +) -> APIResponse[List[ZabbixMediatype]]: """Get Zabbix mediatypes by datasource ID. Args: @@ -451,8 +499,13 @@ async def get_zabbix_mediatypes(datasource_id: str) -> APIResponse[List[ZabbixMe ) -@zabbix_router.get("/datasource/{datasource_id}/usergroups", response_model=APIResponse[List[ZabbixUserGroup]]) -async def get_zabbix_usergroups(datasource_id: str) -> APIResponse[List[ZabbixUserGroup]]: +@zabbix_router.get( + "/datasource/{datasource_id}/usergroups", + response_model=APIResponse[List[ZabbixUserGroup]], +) +async def get_zabbix_usergroups( + datasource_id: str, +) -> APIResponse[List[ZabbixUserGroup]]: """Get Zabbix usergroups by datasource ID. Args: diff --git a/veaiops/metrics/zabbix.py b/veaiops/metrics/zabbix.py index fdeb32be..9457c65e 100644 --- a/veaiops/metrics/zabbix.py +++ b/veaiops/metrics/zabbix.py @@ -13,6 +13,7 @@ # limitations under the License. import asyncio +import os from dataclasses import dataclass from datetime import datetime from typing import Any, Dict, List, Optional @@ -132,7 +133,7 @@ class ZabbixItem(BaseModel): default_action = "ve_aiops_action" default_tag_key = "managed-by" default_tag_value = "ve_aiops" -default_timeout = 3 +default_timeout = int(os.getenv("ZABBIX_TIMEOUT", 10)) webhook_script = """ try { @@ -232,6 +233,7 @@ async def get_metric_data( """ def _get_history(*args): + logger.debug(f"Fetching history for default_timeout: {default_timeout},item_ids: {item_ids}") return self.zapi.history.get( itemids=item_ids, time_from=time_from,