Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 33 additions & 12 deletions services/constrained_transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,24 @@ class TransformTimeoutError(Exception):

class TransformExecutor:
"""
Executes registered transforms with strict runtime constraints.
Legacy in-process transform executor (DEPRECATED — use TransformProcessRunner).

WARNING: This executor runs transforms in a daemon thread within the host
process. Python threads cannot be forcefully terminated, so a malicious or
runaway transform can escape the timeout constraint and continue executing
indefinitely. It also shares the host process address space, making
network/filesystem isolation best-effort only.

New code should always use ``TransformProcessRunner`` (S35) which executes
transforms in an isolated subprocess with enforced timeouts via
``subprocess.run(timeout=…)`` and network access denied via socket
monkeypatching.

``get_transform_executor()`` will refuse to fall back to this class and
instead fail closed if ``TransformProcessRunner`` is unavailable.

Enforces:
- Timeout per transform
- Timeout per transform (best-effort — thread cannot be killed)
- Output size cap
- Integrity verification before execution
- No network/filesystem access (best-effort: module is pre-vetted)
Expand Down Expand Up @@ -288,26 +302,33 @@ def get_transform_executor() -> TransformExecutor:
"""
Get or create the global transform executor.

If S35 isolation is enabled (default: True in this hardening wave),
returns a TransformProcessRunner instance.
Always returns a ``TransformProcessRunner`` instance (S35 process
isolation). If the subprocess runner cannot be imported, this function
raises ``RuntimeError`` rather than falling back to the in-process
``TransformExecutor``, which cannot enforce timeouts or isolation.
"""
global _executor
if _executor is None:
# Check for process isolation flag (defaulting to on for S35)
# We can use the same enable flag, or a specific isolation one.
# Let's assume strict isolation is part of the enabling.

# Local import to avoid circular dependency
try:
from .transform_runner import TransformProcessRunner

registry = get_transform_registry()
# We treat TransformProcessRunner as compatible with TransformExecutor interface
_executor = TransformProcessRunner(registry) # type: ignore
except ImportError as e:
logger.warning(
f"S35: Could not import transform_runner ({e}), falling back to thread executor."
# Fail closed: do NOT fall back to the thread-based executor.
# The in-process TransformExecutor uses threading.Thread which
# cannot be forcefully terminated, allowing a malicious transform
# to escape timeout constraints and continue executing in the
# host process indefinitely.
logger.error(
f"S35: Could not import transform_runner ({e}). "
"Refusing to fall back to in-process executor for security reasons. "
"Transforms will be unavailable until the subprocess runner is restored."
)
_executor = TransformExecutor(get_transform_registry())
raise RuntimeError(
"TransformProcessRunner unavailable — transforms disabled for security. "
f"Import error: {e}"
) from e

return _executor
50 changes: 50 additions & 0 deletions tests/test_s35_transform_isolation.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,5 +131,55 @@ def transform(data):
os.remove(module_path)


class TestGetTransformExecutorFailClosed(unittest.TestCase):
"""Test that get_transform_executor refuses to fall back to in-process executor."""

def setUp(self):
# Reset the global singleton before each test
import services.constrained_transforms as ct

ct._executor = None

def tearDown(self):
import services.constrained_transforms as ct

ct._executor = None

def test_raises_when_process_runner_unavailable(self):
"""get_transform_executor must raise RuntimeError, not fall back."""
import importlib

import services.constrained_transforms as ct

# Force the module to re-evaluate the local import by removing the
# transform_runner module from sys.modules temporarily.
saved = sys.modules.pop("services.transform_runner", None)
# Also remove any cached reference
sys.modules["services.transform_runner"] = None # type: ignore[assignment]
try:
with self.assertRaises((RuntimeError, ImportError)) as ctx:
ct.get_transform_executor()
error_msg = str(ctx.exception).lower()
self.assertTrue(
"unavailable" in error_msg or "import" in error_msg,
f"Expected 'unavailable' or 'import' in error: {ctx.exception}",
)
finally:
# Restore
if saved is not None:
sys.modules["services.transform_runner"] = saved
else:
sys.modules.pop("services.transform_runner", None)

def test_returns_process_runner_when_available(self):
"""get_transform_executor returns TransformProcessRunner normally."""
from services.constrained_transforms import get_transform_executor

executor = get_transform_executor()
from services.transform_runner import TransformProcessRunner

self.assertIsInstance(executor, TransformProcessRunner)


if __name__ == "__main__":
unittest.main()