Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
330 changes: 217 additions & 113 deletions metagpt/tools/libs/terminal.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,131 +52,157 @@ async def _start_process(self):
stdout=PIPE,
stderr=STDOUT,
executable=self.executable,
env=os.environ.copy(),
cwd=str(DEFAULT_WORKSPACE_ROOT) if sys.platform.startswith("win") else DEFAULT_WORKSPACE_ROOT, # Windows
)
await self._check_state()

async def _check_state(self):
# Start a background task to read output
asyncio.create_task(self._read_output())

async def _read_output(self):
"""
Check the state of the terminal, e.g. the current directory.
Continuously read output from the shell process and put it into the queue.
"""
output = await self.run_command(self.pwd_command)
logger.info("The terminal is at:", output)

async def run_command(self, cmd: str, daemon=False) -> str:
while self.process and self.process.stdout:
try:
line = await self.process.stdout.readline()
if not line:
break
decoded_line = line.decode()
await self.stdout_queue.put(decoded_line)
except Exception as e:
logger.error(f"Error reading output: {e}")
break

async def run_command(self, cmd) -> str:
"""
Executes a specified command in the terminal and streams the output back in real time.
This command maintains state across executions, such as the current directory,
allowing for sequential commands to be contextually aware.

Args:
cmd (str): The command to execute in the terminal.
daemon (bool): If True, executes the command in an asynchronous task, allowing
the main program to continue execution.
Returns:
str: The command's output or an empty string if `daemon` is True. Remember that
when `daemon` is True, use the `get_stdout_output` method to get the output.
Execute a shell command and return its output.
"""
if self.process is None:
if not self.process:
await self._start_process()

output = ""
# Remove forbidden commands
commands = re.split(r"\s*&&\s*", cmd)
skip_cmd = "echo Skipped" if sys.platform.startswith("win") else "true"
for cmd_name, reason in self.forbidden_commands.items():
# "true" is a pass command in linux terminal.
for index, command in enumerate(commands):
if cmd_name in command:
output += f"Failed to execute {command}. {reason}\n"
commands[index] = skip_cmd
cmd = " && ".join(commands)
# Send the command
self.process.stdin.write((cmd + self.command_terminator).encode())

marker_cmd = f"echo {END_MARKER_VALUE}"
self.process.stdin.write((marker_cmd + self.command_terminator).encode()) # Unique marker to signal command end
await self.process.stdin.drain()

if daemon:
asyncio.create_task(self._read_and_process_output(cmd))
else:
output += await self._read_and_process_output(cmd)
# Check if command is forbidden
for forbidden_cmd, advice in self.forbidden_commands.items():
if cmd.strip().startswith(forbidden_cmd):
return f"Command '{forbidden_cmd}' is forbidden. {advice}"

# Write command to shell
if self.process.stdin:
full_cmd = cmd + self.command_terminator
self.process.stdin.write(full_cmd.encode())
await self.process.stdin.drain()

# Add a unique marker to indicate the end of this command's output
marker_cmd = f'echo "{END_MARKER_VALUE}"{self.command_terminator}'
if self.process.stdin:
self.process.stdin.write(marker_cmd.encode())
await self.process.stdin.drain()

# Collect output until we see our marker
output_lines = []
while True:
try:
line = await asyncio.wait_for(self.stdout_queue.get(), timeout=30) # 30-second timeout
if END_MARKER_VALUE in line:
break
output_lines.append(line)
except asyncio.TimeoutError:
logger.warning(f"Command '{cmd}' timed out after 30 seconds")
break

output = "".join(output_lines).strip()
self.observer.report(cmd, output)
return output

async def execute_in_conda_env(self, cmd: str, env, daemon=False) -> str:
"""
Executes a given command within a specified Conda environment automatically without
the need for manual activation. Users just need to provide the name of the Conda
environment and the command to execute.
async def execute_in_conda_env(self, env_name: str, cmd: str) -> str:
"""Execute a command inside a specified Conda environment."""
activate_cmd = f"conda activate {env_name} && {cmd}"
return await self.run_command(activate_cmd)

Args:
cmd (str): The command to execute within the Conda environment.
env (str, optional): The name of the Conda environment to activate before executing the command.
If not specified, the command will run in the current active environment.
daemon (bool): If True, the command is run in an asynchronous task, similar to `run_command`,
affecting error logging and handling in the same manner.
async def list_conda_envs(self) -> str:
"""List all available Conda environments."""
return await self.run_command("conda env list")

Returns:
str: The command's output, or an empty string if `daemon` is True, with output processed
asynchronously in that case.
async def get_current_working_directory(self) -> str:
"""Get the current working directory."""
return await self.run_command(self.pwd_command)

Note:
This function wraps `run_command`, prepending the necessary Conda activation commands
to ensure the specified environment is active for the command's execution.
"""
# windows & linux conda run
cmd = f"conda activate {env} && {cmd}" if sys.platform.startswith("win") else f"conda run -n {env} {cmd}"
return await self.run_command(cmd, daemon=daemon)
async def change_directory(self, path: str) -> str:
"""Change the working directory."""
return await self.run_command(f"cd {path}")

async def get_stdout_output(self) -> str:
"""
Retrieves all collected output from background running commands and returns it as a string.
async def list_files(self, path: str = ".") -> str:
"""List files in a directory."""
if sys.platform.startswith("win"):
return await self.run_command(f"dir {path}")
else:
return await self.run_command(f"ls -la {path}")

Returns:
str: The collected output from background running commands, returned as a string.
"""
output_lines = []
while not self.stdout_queue.empty():
line = await self.stdout_queue.get()
output_lines.append(line)
return "\n".join(output_lines)

async def _read_and_process_output(self, cmd, daemon=False) -> str:
async with self.observer as observer:
cmd_output = []
await observer.async_report(cmd + self.command_terminator, "cmd")
# report the command
# Read the output until the unique marker is found.
# We read bytes directly from stdout instead of text because when reading text,
# '\r' is changed to '\n', resulting in excessive output.
tmp = b""
while True:
output = tmp + await self.process.stdout.read(1)
if not output:
continue
*lines, tmp = output.splitlines(True)
for line in lines:
line = line.decode(errors="ignore")
ix = line.rfind(END_MARKER_VALUE)
if ix >= 0:
line = line[:ix]
if line:
await observer.async_report(line, "output")
# report stdout in real-time
cmd_output.append(line)
return "".join(cmd_output)
# log stdout in real-time
await observer.async_report(line, "output")
cmd_output.append(line)
if daemon:
await self.stdout_queue.put(line)

async def close(self):
"""Close the persistent shell process."""
if self.process:
async def read_file(self, file_path: str) -> str:
"""Read the content of a file."""
if sys.platform.startswith("win"):
return await self.run_command(f"type {file_path}")
else:
return await self.run_command(f"cat {file_path}")

async def write_file(self, file_path: str, content: str) -> str:
"""Write content to a file."""
# Escape the content properly for shell
escaped_content = content.replace('"', '\\"').replace("$", "\\$")
if sys.platform.startswith("win"):
return await self.run_command(f'echo "{escaped_content}" > {file_path}')
else:
return await self.run_command(f'echo "{escaped_content}" > {file_path}')

async def create_directory(self, dir_path: str) -> str:
"""Create a directory."""
if sys.platform.startswith("win"):
return await self.run_command(f"mkdir {dir_path}")
else:
return await self.run_command(f"mkdir -p {dir_path}")

async def remove_file(self, file_path: str) -> str:
"""Remove a file."""
if sys.platform.startswith("win"):
return await self.run_command(f"del {file_path}")
else:
return await self.run_command(f"rm {file_path}")

async def remove_directory(self, dir_path: str) -> str:
"""Remove a directory."""
if sys.platform.startswith("win"):
return await self.run_command(f"rmdir /s {dir_path}")
else:
return await self.run_command(f"rm -rf {dir_path}")

async def copy_file(self, src: str, dest: str) -> str:
"""Copy a file."""
if sys.platform.startswith("win"):
return await self.run_command(f"copy {src} {dest}")
else:
return await self.run_command(f"cp {src} {dest}")

async def move_file(self, src: str, dest: str) -> str:
"""Move a file."""
if sys.platform.startswith("win"):
return await self.run_command(f"move {src} {dest}")
else:
return await self.run_command(f"mv {src} {dest}")

async def find_files(self, pattern: str, path: str = ".") -> str:
"""Find files matching a pattern."""
if sys.platform.startswith("win"):
return await self.run_command(f'dir {path}\\{pattern} /s')
else:
return await self.run_command(f'find {path} -name "{pattern}"')

async def grep(self, pattern: str, file_path: str) -> str:
"""Search for a pattern in a file."""
if sys.platform.startswith("win"):
return await self.run_command(f'findstr "{pattern}" {file_path}')
else:
return await self.run_command(f'grep "{pattern}" {file_path}')

async def stop(self):
if self.process and self.process.stdin:
self.process.stdin.close()
await self.process.wait()

Expand All @@ -186,30 +212,99 @@ class Bash(Terminal):
"""
A class to run bash commands directly and provides custom shell functions.
All custom functions in this class can ONLY be called via the `Bash.run` method.

Security Note: This class implements strict command validation to prevent
command injection attacks when exposed to LLM agents.
"""

def __init__(self):
"""init"""
os.environ["SWE_CMD_WORK_DIR"] = str(Config.default().workspace.path)
super().__init__()
self.start_flag = False

# Define allowed commands and dangerous patterns for security
self.safe_commands = {
# File operations
'ls', 'dir', 'pwd', 'cd', 'cat', 'head', 'tail', 'wc', 'find', 'grep', 'awk', 'sed',
# Custom shell functions defined in the environment
'open', 'goto', 'scroll_down', 'scroll_up', 'create', 'search_dir_and_preview',
'search_file', 'find_file', 'edit', 'submit',
# Safe development tools
'git', 'python3', 'python', 'pip', 'node', 'npm', 'yarn', 'make', 'cmake',
# Text processing
'echo', 'printf', 'sort', 'uniq', 'cut', 'tr', 'paste', 'join'
}

self.dangerous_patterns = [
# Network operations that could exfiltrate data or download malicious content
r'\b(curl|wget|nc|netcat|telnet|ssh|scp|rsync|ftp|sftp)\b',
# Destructive file operations
r'\b(rm\s+-rf|rmdir|shred|dd)\b',
# System modification
r'\b(chmod\s+777|chown|su|sudo|passwd|useradd|usermod|userdel)\b',
# Process control
r'\b(kill|killall|pkill|nohup|crontab|at|batch)\b',
# Shell metacharacters that could be used for command injection
r'[;&|`$()]',
# Redirection that could overwrite important files
r'>.*/(etc|usr|bin|sbin|boot|sys|proc)',
# Execution of arbitrary files
r'\b(exec|eval|source|\.)\s',
]

def _validate_command(self, cmd: str) -> tuple[bool, str]:
"""
Validates if a command is safe to execute.

Args:
cmd (str): The command to validate

Returns:
tuple[bool, str]: (is_safe, reason_if_unsafe)
"""
if not cmd or not cmd.strip():
return False, "Empty command not allowed"

cmd_clean = cmd.strip().lower()

# Check for dangerous patterns
for pattern in self.dangerous_patterns:
if re.search(pattern, cmd, re.IGNORECASE):
return False, f"Command contains dangerous pattern: {pattern}"

# Extract the base command (first word)
base_cmd = cmd_clean.split()[0]

# Allow custom shell functions and safe commands
if base_cmd in self.safe_commands:
return True, ""

# Special case for common safe variations
if base_cmd.startswith('python') and re.match(r'^python\d*(\.\d+)?$', base_cmd):
return True, ""

return False, f"Command '{base_cmd}' is not in the allowlist of safe commands"

async def start(self):
await self.run_command(f"cd {Config.default().workspace.path}")
await self.run_command(f"source {SWE_SETUP_PATH}")

async def run(self, cmd) -> str:
"""
Executes a bash command.
Executes a bash command with security validation.

Args:
cmd (str): The bash command to execute.

Returns:
str: The output of the command.
str: The output of the command, or an error message if the command is unsafe.

This method allows for executing standard bash commands as well as
utilizing several custom shell functions defined in the environment.

Security: Commands are validated against an allowlist and checked for
dangerous patterns before execution to prevent command injection attacks.

Custom Shell Functions:

Expand Down Expand Up @@ -264,7 +359,7 @@ async def run(self, cmd) -> str:
sure your indentation is formatted properly. Python files will be checked for syntax errors after the edit. If the system
detects a syntax error, the edit will not be executed. Simply try to edit the file again, but make sure to read the error
message and modify the edit command you issue accordingly. Issuing the same command a second time will just lead to the same
error message again. All code modifications made via the 'edit' command must strictly follow the PEP8 standard.
error message again. All code modifications made via the 'edit' command must strictly follow the PEP8 standards.
Arguments:
start_line (int): The line number to start the edit at, starting from 1.
end_line (int): The line number to end the edit at (inclusive), starting from 1.
Expand All @@ -275,8 +370,17 @@ async def run(self, cmd) -> str:

Note: Make sure to use these functions as per their defined arguments and behaviors.
"""
# Validate command for security before execution
is_safe, reason = self._validate_command(cmd)
if not is_safe:
error_msg = f"SECURITY ERROR: Command blocked - {reason}\n"
error_msg += f"Attempted command: {cmd}\n"
error_msg += "Only safe, allowlisted commands are permitted for security."
logger.warning(f"Blocked unsafe command from LLM: {cmd} - {reason}")
return error_msg

if not self.start_flag:
await self.start()
self.start_flag = True

return await self.run_command(cmd)
return await self.run_command(cmd)
Loading