Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions checkov/common/checks_infra/checks_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,14 @@ def parse_raw_check(self, raw_check: Dict[str, Dict[str, Any]], **kwargs: Any) -
check.severity = severity
check.guideline = raw_check.get("metadata", {}).get("guideline")
check.check_path = kwargs.get("check_path", "")

# Parse evaluation_mode from top-level definition (supports both simple and complex checks)
evaluation_mode = policy_definition.get("evaluation_mode", "all")
if evaluation_mode not in ("all", "any"):
logging.warning(f"Invalid evaluation_mode '{evaluation_mode}' in check {check.id}, using default 'all'")
evaluation_mode = "all"
check.evaluation_mode = evaluation_mode

solver = self.get_check_solver(check)
solver.providers = providers
check.set_solver(solver)
Expand Down Expand Up @@ -302,6 +310,13 @@ def _parse_raw_check(self, raw_check: Dict[str, Any], resources_types: Optional[
check.attribute = raw_check.get("attribute")
check.attribute_value = raw_check.get("value")

# Parse evaluation_mode for result aggregation (default: "all")
evaluation_mode = raw_check.get("evaluation_mode", "all")
if evaluation_mode not in ("all", "any"):
logging.warning(f"Invalid evaluation_mode '{evaluation_mode}', using default 'all'")
evaluation_mode = "all"
check.evaluation_mode = evaluation_mode

return check

@staticmethod
Expand Down
1 change: 1 addition & 0 deletions checkov/common/graph/checks_infra/base_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def __init__(self) -> None:
self.frameworks: List[str] = []
self.is_jsonpath_check: bool = False
self.check_path: str = ""
self.evaluation_mode: str = "all" # "all" | "any" - controls result aggregation

def set_solver(self, solver: BaseSolver) -> None:
self.solver = solver
Expand Down
35 changes: 35 additions & 0 deletions checkov/common/graph/checks_infra/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Any, TYPE_CHECKING

from checkov.common.graph.checks_infra import debug
from checkov.common.graph.graph_builder import CustomAttributes
from checkov.common.models.enums import CheckResult
from checkov.runner_filter import RunnerFilter

Expand Down Expand Up @@ -43,12 +44,46 @@ def run_check_parallel(
debug.graph_check(check_id=check.id, check_name=check.name)

passed, failed, unknown = check.run(graph_connector)

# Apply evaluation_mode aggregation
if getattr(check, 'evaluation_mode', 'all') == 'any':
passed, failed, unknown = self._apply_any_mode(passed, failed, unknown)

evaluated_keys = check.get_evaluated_keys()
check_result = self._process_check_result(passed, [], CheckResult.PASSED, evaluated_keys)
check_result = self._process_check_result(failed, check_result, CheckResult.FAILED, evaluated_keys)
check_result = self._process_check_result(unknown, check_result, CheckResult.UNKNOWN, evaluated_keys)
check_results[check] = check_result

@staticmethod
def _apply_any_mode(
passed: list[dict[str, Any]],
failed: list[dict[str, Any]],
unknown: list[dict[str, Any]],
) -> tuple[list[dict[str, Any]], list[dict[str, Any]], list[dict[str, Any]]]:
"""Apply 'any' evaluation mode: if at least one resource in a file passes,
all resources of that type in the same file pass.

This enables checks like "at least one RUN instruction must contain X".
"""
if not passed:
# No resource passed - report one representative failure per file
seen_files: set[str | None] = set()
representative: list[dict[str, Any]] = []
for f in failed:
fp = f.get(CustomAttributes.FILE_PATH)
if fp not in seen_files:
seen_files.add(fp)
representative.append(f)
return [], representative, unknown

# At least one resource passed - move failed items in passing files to passed
passed_files = {v.get(CustomAttributes.FILE_PATH) for v in passed}
new_failed = [f for f in failed if f.get(CustomAttributes.FILE_PATH) not in passed_files]
new_passed = passed + [f for f in failed if f.get(CustomAttributes.FILE_PATH) in passed_files]
new_unknown = [u for u in unknown if u.get(CustomAttributes.FILE_PATH) not in passed_files]
return new_passed, new_failed, new_unknown

@staticmethod
def _process_check_result(
results: list[dict[str, Any]],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
metadata:
id: "CKV2_DOCKER_EVAL_ALL"
name: "Ensure apt-get update exists in all RUN instructions"
category: "APPLICATION_SECURITY"
definition:
cond_type: attribute
resource_types:
- RUN
attribute: value
operator: contains
value: "apt-get update"
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
metadata:
id: "CKV2_DOCKER_EVAL_ANY"
name: "Ensure apt-get update exists in at least one RUN instruction"
category: "APPLICATION_SECURITY"
definition:
cond_type: attribute
resource_types:
- RUN
attribute: value
operator: contains
value: "apt-get update"
evaluation_mode: any
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
FROM ubuntu:20.04

RUN yum update

RUN yum install -y some-package
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
FROM ubuntu:20.04

RUN apt-get update

RUN yum install -y some-package
96 changes: 96 additions & 0 deletions tests/dockerfile/graph_builder/checks/test_evaluation_mode.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import os
import warnings
from pathlib import Path
from unittest import TestCase

from parameterized import parameterized_class

from tests.graph_utils.utils import set_db_connector_by_graph_framework, PARAMETERIZED_GRAPH_FRAMEWORKS
from checkov.common.checks_infra.checks_parser import GraphCheckParser
from checkov.common.checks_infra.registry import Registry
from checkov.common.models.enums import CheckResult
from checkov.dockerfile.graph_manager import DockerfileGraphManager
from checkov.runner_filter import RunnerFilter

TEST_DIRNAME = os.path.dirname(os.path.realpath(__file__))


@parameterized_class(PARAMETERIZED_GRAPH_FRAMEWORKS)
class TestEvaluationModeAny(TestCase):
def setUp(self) -> None:
warnings.filterwarnings("ignore", category=ResourceWarning)
warnings.filterwarnings("ignore", category=DeprecationWarning)

def _run_check(self, resource_dir: str) -> dict:
db_connector = set_db_connector_by_graph_framework(self.graph_framework)
graph_manager = DockerfileGraphManager(db_connector=db_connector)

checks_dir = os.path.join(TEST_DIRNAME, "evaluation_mode_checks")
local_graph, _ = graph_manager.build_graph_from_source_directory(
source_dir=resource_dir,
)
graph = graph_manager.save_graph(local_graph)
registry = Registry(parser=GraphCheckParser(), checks_dir=checks_dir)
registry.load_checks()
results = registry.run_checks(graph, RunnerFilter(checks=["CKV2_DOCKER_EVAL_ANY"]), None)

passed = []
failed = []
for check, check_results in results.items():
for result in check_results:
if result["result"] == CheckResult.PASSED:
passed.append(result)
elif result["result"] == CheckResult.FAILED:
failed.append(result)

return {"passed": passed, "failed": failed}

def test_any_mode_passes_when_at_least_one_matches(self):
"""When evaluation_mode is 'any', the check should pass if at least one
RUN instruction in the file matches the condition."""
resource_dir = os.path.join(TEST_DIRNAME, "resources/EvaluationModeAny/pass")
results = self._run_check(resource_dir)

# Both RUN instructions should pass because at least one matches
self.assertEqual(len(results["passed"]), 2)
self.assertEqual(len(results["failed"]), 0)

def test_any_mode_fails_when_none_match(self):
"""When evaluation_mode is 'any' and no RUN instruction matches,
it should report one representative failure per file."""
resource_dir = os.path.join(TEST_DIRNAME, "resources/EvaluationModeAny/fail")
results = self._run_check(resource_dir)

# One representative failure for the file
self.assertEqual(len(results["passed"]), 0)
self.assertEqual(len(results["failed"]), 1)

def test_default_mode_evaluates_each_independently(self):
"""Without evaluation_mode (default 'all'), each RUN instruction is
evaluated independently - one passes, one fails."""
db_connector = set_db_connector_by_graph_framework(self.graph_framework)
graph_manager = DockerfileGraphManager(db_connector=db_connector)

checks_dir = os.path.join(TEST_DIRNAME, "evaluation_mode_checks")
resource_dir = os.path.join(TEST_DIRNAME, "resources/EvaluationModeAny/pass")
local_graph, _ = graph_manager.build_graph_from_source_directory(
source_dir=resource_dir,
)
graph = graph_manager.save_graph(local_graph)
registry = Registry(parser=GraphCheckParser(), checks_dir=checks_dir)
registry.load_checks()
# Use the "all" mode check
results = registry.run_checks(graph, RunnerFilter(checks=["CKV2_DOCKER_EVAL_ALL"]), None)

passed = []
failed = []
for check, check_results in results.items():
for result in check_results:
if result["result"] == CheckResult.PASSED:
passed.append(result)
elif result["result"] == CheckResult.FAILED:
failed.append(result)

# Default behavior: one passes, one fails
self.assertEqual(len(passed), 1)
self.assertEqual(len(failed), 1)