Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions backend/agent/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,3 @@ AGENT_NETWORKS='["perlmutter"]'
AGENT_NAME="SecretAgentMan"
NATS_STREAM_STORAGE_TYPE=file
OPERATOR_EXTRA_ENV='{"CUDA_VISIBLE_DEVICES":"0","MY_FLAG":"enabled"}'

# Vector log aggregation
VECTOR_AGGREGATOR_ADDR=host.containers.internal:6000
86 changes: 81 additions & 5 deletions backend/agent/interactem/agent/_vector_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,89 @@
del(.message)
{% endraw %}

sinks:
vector_aggregator:
type: vector
clean_logs:
type: remap
inputs:
- parse_operator_logs
- parse_vector_logs
- parse_agent_logs
address: {{ vector_addr }}
version: '2'
source: |
{% raw %}
# Remove fields we don't want to forward to NATS
if .log_type == "agent" {
del(.file)
del(.source_type)
}
if .log_type == "operator" {
del(.file)
del(.source_type)
del(.stream)
}
.
{% endraw %}

route_by_type:
type: route
inputs:
- clean_logs
route:
vector_logs: '.log_type == "vector"'
agent_logs: '.log_type == "agent"'
operator_logs: '.log_type == "operator"'

sinks:
console:
type: console
inputs:
- route_by_type.vector_logs
- route_by_type.agent_logs
- route_by_type.operator_logs
target: stdout
encoding:
codec: json

nats_vector:
type: nats
inputs:
- route_by_type.vector_logs
url: "{{ nats_url }}"
{% raw %}
subject: "log.vector.{{ agent_id }}"
encoding:
codec: json
{% endraw %}
auth:
strategy: credentials_file
credentials_file:
path: "{{ nats_creds_path }}"

nats_agents:
type: nats
inputs:
- route_by_type.agent_logs
url: "{{ nats_url }}"
{% raw %}
subject: "log.agent.{{ agent_id }}"
encoding:
codec: json
{% endraw %}
auth:
strategy: credentials_file
credentials_file:
path: "{{ nats_creds_path }}"

nats_operators:
type: nats
inputs:
- route_by_type.operator_logs
url: "{{ nats_url }}"
{% raw %}
subject: "log.depl.{{ deployment_id }}.op.{{ operator_id }}"
encoding:
codec: json
{% endraw %}
auth:
strategy: credentials_file
credentials_file:
path: "{{ nats_creds_path }}"
"""
6 changes: 5 additions & 1 deletion backend/agent/interactem/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,11 @@ async def run(self):
await self.agent_kv.start()
# Start the container monitor task at the agent level
self._monitor_task = asyncio.create_task(self.monitor_containers())
self._vector_container = await self._start_vector_container()
self._vector_container = None
if cfg.vector_enabled:
self._vector_container = await self._start_vector_container()
else:
logger.info("Vector logging disabled; skipping vector container startup.")

async def _start_vector_container(self) -> Container:
logger.info("Starting Vector container for log aggregation...")
Expand Down
40 changes: 34 additions & 6 deletions backend/agent/interactem/agent/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
PodmanMount,
PodmanMountType,
)
from interactem.core.nats.config import NatsMode, get_nats_config

from ._vector_template import VECTOR_CONFIG_TEMPLATE

logger = get_logger()
VECTOR_NATS_CREDS_TARGET = "/nats.creds"


class Settings(BaseSettings):
Expand Down Expand Up @@ -43,7 +45,7 @@ class Settings(BaseSettings):
ALWAYS_PULL_IMAGES: bool = False

# Vector configuration
VECTOR_AGGREGATOR_ADDR: str | None = None
VECTOR_ENABLED: bool = True
LOG_DIR: Path = Path("~/.interactem/logs").expanduser().resolve()
VECTOR_CONFIG_PATH: Path | None = None
OPERATOR_EXTRA_ENV: dict[str, str] = Field(default_factory=dict, exclude=True)
Expand Down Expand Up @@ -125,15 +127,40 @@ def vector_mounts(self) -> list[PodmanMount]:
target="/etc/vector/vector.yaml",
)
log_mount = self.log_mount
mounts = [config_mount]
if log_mount:
return [config_mount, log_mount]
return [config_mount]
mounts.append(log_mount)
creds_mount = self.vector_creds_mount
if creds_mount:
mounts.append(creds_mount)
return mounts

@property
def vector_creds_mount(self) -> PodmanMount | None:
creds_path = self.nats_creds_file
if not creds_path:
return None
return PodmanMount(
type=PodmanMountType.bind,
source=str(creds_path),
target=VECTOR_NATS_CREDS_TARGET,
)

@property
def nats_creds_file(self) -> Path | None:
try:
nats_cfg = get_nats_config()
except ValueError:
return None
if nats_cfg.NATS_SECURITY_MODE != NatsMode.CREDS:
return None
return nats_cfg.NATS_CREDS_FILE

def generate_vector_config(self) -> Path | None:
"""Generates a vector config file and returns path to it"""

if not self.VECTOR_AGGREGATOR_ADDR:
logger.warning("VECTOR_AGGREGATOR_ADDR not set, skipping log aggregation.")
if not self.VECTOR_ENABLED:
logger.warning("VECTOR_ENABLED is false, skipping log aggregation.")
return None

if not self.LOG_DIR.exists():
Expand All @@ -144,7 +171,8 @@ def generate_vector_config(self) -> Path | None:
vector_yaml = templ.render(
logs_dir=LOGS_DIR_IN_CONTAINER,
agent_id=self.ID,
vector_addr=self.VECTOR_AGGREGATOR_ADDR,
nats_url=str(self.NATS_SERVER_URL_IN_CONTAINER),
nats_creds_path=VECTOR_NATS_CREDS_TARGET,
)
output_path = self.LOG_DIR / "vector.yaml"
with open(output_path, "w") as f:
Expand Down
116 changes: 0 additions & 116 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -282,126 +282,10 @@ services:
- ./backend/metrics/monitoring-conf/grafana/provisioning:/etc/grafana/provisioning:ro
- ./backend/metrics/monitoring-conf/grafana/dashboards:/var/lib/grafana/dashboards:ro

vector:
image: timberio/vector:0.50.0-alpine
container_name: vector-aggregator
ports:
- "6000:6000"
- "8686:8686"
depends_on:
nats-healthcheck:
condition: service_healthy
volumes:
- vector_data:/vector-data-dir
- "./conf/nats-conf/out_jwt/backend.creds:/backend.creds"
entrypoint: ["/bin/sh", "-c"]
command:
- |
cat > /tmp/vector.yaml <<EOF
data_dir: /vector-data-dir

api:
enabled: true
address: 0.0.0.0:8686
playground: false

sources:
vector:
type: vector
address: 0.0.0.0:6000
version: "2"

transforms:
# Clean up all logs by removing unnecessary fields based on log type
clean_logs:
type: remap
inputs:
- vector
source: |
# Remove fields from agent logs
if .log_type == "agent" {
del(.file)
del(.source_type)
}
# Remove fields from operator logs
if .log_type == "operator" {
del(.file)
del(.source_type)
del(.stream)
}
.

# Route logs based on their type
route_by_type:
type: route
inputs:
- clean_logs
route:
vector_logs: '.log_type == "vector"'
agent_logs: '.log_type == "agent"'
operator_logs: '.log_type == "operator"'

sinks:
console:
type: console
inputs:
- route_by_type.vector_logs
- route_by_type.agent_logs
- route_by_type.operator_logs
target: stdout
encoding:
codec: json

# NATS sink for vector logs
nats_vector:
type: nats
inputs:
- route_by_type.vector_logs
url: nats://nats1:4222
subject: "log.vector.{{ agent_id }}"
encoding:
codec: json
auth:
strategy: credentials_file
credentials_file:
path: /backend.creds

# NATS sink for agent logs
nats_agents:
type: nats
inputs:
- route_by_type.agent_logs
url: nats://nats1:4222
subject: "log.agent.{{ agent_id }}"
encoding:
codec: json
auth:
strategy: credentials_file
credentials_file:
path: /backend.creds

# NATS sink for operator logs
nats_operators:
type: nats
inputs:
- route_by_type.operator_logs
url: nats://nats1:4222
subject: "log.depl.{{ deployment_id }}.op.{{ operator_id }}"
encoding:
codec: json
auth:
strategy: credentials_file
credentials_file:
path: /backend.creds
EOF
vector --config /tmp/vector.yaml


volumes:
app-db-data:
prometheus_data:
grafana_data:
vector_data:
nats_data1:
nats_data2:
nats_data3:
Loading