Skip to content

FlowtraceX Core Platform — a Docker-first Network Detection & Response (NDR) engine for flow-level visibility, real-time detection, and correlation.

Notifications You must be signed in to change notification settings

flowtracex/flowtracex-core

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

20 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Kafka → Parquet Pipeline

A production-ready pipeline that consumes Zeek network logs from Kafka, normalizes and enriches them, and outputs to Parquet files and/or Kafka topics.

Features

  • Multi-log-type support: Processes 18+ Zeek log types (DNS, CONN, HTTP, SSL, SSH, FTP, SMTP, etc.)
  • Dual output: Writes to Parquet files and/or Kafka topics
  • Three-layer data model: Raw fields, normalized fields, and enriched fields
  • Configurable flushing: Size, time, and event count-based flush conditions
  • Asynchronous processing: Non-blocking writes with background flushing
  • Health monitoring: Periodic metrics (memory, buffer usage, throughput)

Quick Start

Build

go build -o zeek-parquet-pipeline main.go

Run

./zeek-parquet-pipeline --config config/config.json

Configuration

config.json

{
  "kafka": {
    "brokers": ["localhost:9092"],
    "topic": "zeek-raw",
    "group_id": "parquet-writer-group",
    "max_poll_records": 1000
  },
  "output": {
    "parquet": {
      "enabled": true,
      "base_path": "./output",
      "file_prefix": "events"
    },
    "kafka": {
      "enabled": true,
      "brokers": ["localhost:9092"],
      "topic": "zeek-normalized",
      "compression": "lz4"
    }
  },
  "log": {
    "path": "./log"
  },
  "write": {
    "flush_buffer_mb": 1,
    "flush_interval_seconds": 60,
    "flush_event_count": 1000
  }
}

Flush Conditions (OR logic - any one triggers flush):

  • flush_buffer_mb: Flush when buffer reaches this size (MB)
  • flush_interval_seconds: Flush every N seconds (0 to disable)
  • flush_event_count: Flush after N events (0 to disable)

normalization.json

Core concept: Defines the three-layer data model per log type.

  • source: Zeek log type identifier (e.g., zeek_dns, zeek_conn)
  • promote: Field promotion mapping (raw Zeek field → canonical normalized field)
    • Example: "id.orig_h": "src_ip" promotes id.orig_h to src_ip
    • Promoted fields replace raw fields (no duplication)
  • static: Static fields added to all events of this log type
    • Example: "event_type": "dns", "event_class": "dns"
  • enrich: Per-log-type enrichment flags
    • "time": true → Enables time breakdown (year, month, day, hour, weekday)
    • "network": true → Enables network analysis (private IP detection, direction, service)
{
  "dns": {
    "source": "zeek_dns",
    "promote": {
      "ts": "event_time",
      "uid": "flow_id",
      "id.orig_h": "src_ip",
      "id.resp_h": "dst_ip",
      "proto": "protocol"
    },
    "static": {
      "event_type": "dns",
      "event_class": "dns"
    },
    "enrich": {
      "time": true,
      "network": true
    }
  }
}

schema.json

Core concept: Defines raw field structure for each Zeek log type.

  • Used by generate_schema.go to auto-generate schema/events.go
  • Defines all possible raw fields with their types (string, int, float, bool, etc.)
  • Fields not in normalization.jsonpromote remain as raw fields in Parquet
  • Important: After modifying this file, regenerate the schema:
    go run generate_schema.go > schema/events.go

Architecture

Kafka (zeek-raw)
    ↓
Consumer
    ↓
Normalizer (field promotion)
    ↓
Enricher (time + network enrichment)
    ↓
Fan-out
    ├──→ Parquet Writers (per log_type, async flush)
    └──→ Kafka Producer (structured JSON output)

Project Structure

.
├── main.go                 # Pipeline entry point
├── zeek-parquet-pipeline  # Compiled binary
├── config/                # Configuration files
│   ├── config.json        # Main configuration
│   ├── normalization.json # Field promotion rules
│   └── schema.json        # Raw field definitions
├── core/                  # Core pipeline components
│   ├── kafka.go          # Kafka consumer
│   ├── normalize.go      # Field normalization
│   ├── enrich.go         # Runtime enrichment (time, network)
│   ├── pipeline_flow.go  # Event routing by log_type
│   ├── parquet.go        # Parquet writer
│   ├── kafka_producer.go # Kafka output producer
│   ├── fanout.go         # Output fan-out
│   └── logger.go         # Logging system
├── schema/                # Generated Parquet schemas
│   └── events.go         # Auto-generated from schema.json
├── test/                 # Test scripts
├── log/                  # Runtime logs
└── output/               # Parquet output files

Output Formats

Parquet Output

Partitioned by log type and time:

output/
├── dns/
│   └── year=2026/month=02/day=02/hour=10/
│       └── events_dns_1.parquet
├── conn/
│   └── year=2026/month=02/day=02/hour=10/
│       └── events_conn_1.parquet
└── ...

Kafka Output

Structured JSON format with three layers:

{
  "source": "zeek",
  "log_type": "dns",
  "raw": {
    "ts": 1769868799.213927,
    "uid": "CfqDt31quyW8AAJct6",
    "id.orig_h": "10.128.0.4",
    ...
  },
  "normalized": {
    "event_time": 1769868799213,
    "ingest_time": 1770089817352,
    "flow_id": "CfqDt31quyW8AAJct6",
    "src_ip": "10.128.0.4",
    "dst_ip": "216.239.34.174",
    ...
  },
  "enriched": {
    "src_ip_is_private": true,
    "dst_ip_is_private": false,
    "direction": "outbound",
    "event_year": 2026,
    ...
  }
}

Data Model

Three-Layer Architecture

  1. Raw: All fields exactly as they come from Zeek
  2. Normalized: Field promotion to canonical names (e.g., id.orig_hsrc_ip)
  3. Enriched: Derived fields (time components, network analysis, direction)

Development

Regenerate Schema

After updating schema.json or normalization.json:

go run generate_schema.go > schema/events.go
go build -o zeek-parquet-pipeline main.go

Testing

go run test/test_parquet_validation.go output

Requirements

  • Go 1.21+
  • Kafka broker (tested with Kafka 2.8+)
  • Zeek logs in Kafka topic (nested JSON format: {"dns": {...}} or {"conn": {...}})

Test Environment

Note: This pipeline has been tested on a single machine. Performance characteristics may vary in distributed deployments.

Test Machine Specifications

  • OS: Linux (Ubuntu 22.04.1 LTS)
  • Kernel: 6.8.0-1045-gcp
  • Architecture: x86_64
  • CPU: Intel Xeon @ 2.20GHz
    • 4 CPUs (2 cores, 2 threads per core)
  • Memory: 11GB RAM
  • Storage: 97GB disk space

Software Versions

  • Go: 1.22.4
  • Zeek: 8.0.5
  • Kafka: 2.8+ (broker version)

Tested Log Types

The pipeline has been validated with the following Zeek log types:

  • DNS, CONN, HTTP, SSL, SSH, FTP, SMTP, DHCP, RDP, SMB, DCE-RPC, Kerberos, NTLM, SIP, SNMP, RADIUS, Tunnel, and others (18+ total)

Performance Observations

  • Throughput: Processes thousands of events per second (observed: 18-31 EPS in test environment)
  • Memory Usage: Low memory footprint (~33-37MB heap observed)
  • Parquet Files: Successfully generated with proper partitioning and schema
  • Buffer Flushing: All three flush conditions (size, time, event count) validated

Dependencies

  • github.com/segmentio/kafka-go - Kafka client
  • github.com/parquet-go/parquet-go - Parquet library

Monitoring

Health metrics are logged every 30 seconds to log/pipeline.log:

  • Memory usage (RSS and heap)
  • Buffer usage (bytes and percentage)
  • Throughput (events per second)
  • Flush counts and error counts

Logging

Structured logs with format:

TIMESTAMP | LEVEL | COMPONENT | MESSAGE | CONTEXT

Example:

2026-02-03T10:30:00Z | INFO | startup | configuration loaded | kafka_topic=zeek-raw
2026-02-03T10:30:30Z | INFO | health | memory=612MB buffer=38% eps=8200

About

FlowtraceX Core Platform — a Docker-first Network Detection & Response (NDR) engine for flow-level visibility, real-time detection, and correlation.

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Contributors 3

  •  
  •  
  •