Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ release_notes.md
go.work*
*.env
*.prof
*.wasm
bento
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
.PHONY: all serverless deps docker docker-cgo clean docs protos test test-race test-integration fmt lint install deploy-docs playground
.PHONY: all serverless deps docker docker-cgo clean docs generate protos test test-race test-integration fmt lint install deploy-docs playground
TAGS ?=

GOMAXPROCS ?= 1
Expand Down Expand Up @@ -39,6 +39,9 @@ install: $(APPS) ## Install binaries to $(INSTALL_DIR)
deps: ## Go mod tidy
@go mod tidy

generate: ## Run all go generate directives
@go generate ./...

SOURCE_FILES = $(shell find internal public cmd -type f)
TEMPLATE_FILES = $(shell find internal/impl -type f -name "template_*.yaml")

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ require (
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.51.0 // indirect
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect
github.com/andybalholm/brotli v1.2.0 // indirect
github.com/andybalholm/brotli v1.2.0
github.com/apache/arrow/go/v15 v15.0.2 // indirect
github.com/ardielle/ardielle-go v1.5.2 // indirect
github.com/armon/go-metrics v0.3.4 // indirect
Expand Down
126 changes: 126 additions & 0 deletions internal/impl/python/instance.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package python

import (
"bytes"
"context"
"encoding/binary"
"fmt"
"io"
"path/filepath"
"strings"
"testing/fstest"
"time"

"github.com/tetratelabs/wazero"
"github.com/tetratelabs/wazero/api"
)

func (p *pythonProcessor) newInstance(ctx context.Context) (*pythonInstance, error) {
fs := fstest.MapFS{
pythonEntrypointFile: &fstest.MapFile{Data: pythonEntrypoint},
pythonExecScriptFile: &fstest.MapFile{Data: []byte(p.script)},
}

inR, inW := io.Pipe()
outR, outW := io.Pipe()
errBuf := &bytes.Buffer{}

cfg := wazero.NewModuleConfig().
WithFS(fs).
WithStdin(inR).WithStdout(outW).WithStderr(errBuf).
WithArgs("python", filepath.Join("/", pythonEntrypointFile), strings.Join(p.imports, ","))

var mod api.Module

instantiateErr := make(chan error, 1)
go func() {
var err error
// this is a blocking call, so we have to deploy in a goroutine
mod, err = p.runtime.InstantiateModule(ctx, p.compiled, cfg)
if err != nil {
instantiateErr <- fmt.Errorf("instantiation failed: %w", err)
outW.Close()
}
}()

handshakeErr := make(chan error, 1)
go func() {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do a quick-and-dirty handshake to ensure the python env is up-and-running. The python module should write a "READY" signal to the output socket which we need to check is received before proceeding.

buf := make([]byte, len(pythonReadySignal))
if _, err := io.ReadFull(outR, buf); err != nil {
handshakeErr <- fmt.Errorf("failed to read handshake: %w", err)
return
}
if string(buf) != pythonReadySignal {
handshakeErr <- fmt.Errorf("wrong signal: %s", string(buf))
return
}
handshakeErr <- nil
}()

ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

select {
case <-ctx.Done():
return nil, ctx.Err()
case err := <-instantiateErr:
return nil, err
case err := <-handshakeErr:
if err != nil {
p.logger.Errorf("python handshake failed (stderr): %s)")
return nil, fmt.Errorf("handshake failed: %w", err)
}
}

return &pythonInstance{
Module: mod,
stdinW: inW,
stdoutR: outR,
stderrBuf: errBuf,
}, nil
}

//------------------------------------------------------------------------------

type pythonInstance struct {
api.Module
stdinW io.WriteCloser
stdoutR io.Reader
stderrBuf *bytes.Buffer
}

func (pi *pythonInstance) runRequest(input []byte) (stdout []byte, stderr []byte, err error) {
if err := binary.Write(pi.stdinW, binary.BigEndian, uint32(len(input))); err != nil {
return nil, pi.stderrBuf.Bytes(), err
}
if _, err := pi.stdinW.Write(input); err != nil {
return nil, pi.stderrBuf.Bytes(), err
}

header := make([]byte, 5)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First byte signals that the response was a success or exception. Last four bytes tell us how long the response is.

if _, err := io.ReadFull(pi.stdoutR, header); err != nil {
return nil, pi.stderrBuf.Bytes(), fmt.Errorf("pipe read error: %w", err)
}

status, respLen := header[0], binary.BigEndian.Uint32(header[1:])

payload := make([]byte, respLen)
if _, err := io.ReadFull(pi.stdoutR, payload); err != nil {
return nil, pi.stderrBuf.Bytes(), fmt.Errorf("pipe read error: %w", err)
}

stderr = pi.stderrBuf.Bytes()
if status == pythonFailureStatus {
return nil, stderr, fmt.Errorf("python error: %s", string(payload))
}

return payload, stderr, nil
}

func (pi *pythonInstance) Close(ctx context.Context) error {
pi.stdinW.Close()
if pi.Module == nil {
return nil
}
return pi.Module.Close(ctx)
}
38 changes: 38 additions & 0 deletions internal/impl/python/package.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package python

import (
"bytes"
_ "embed"
"fmt"
"io"
"sync"

"github.com/andybalholm/brotli"
)

// Calling 'go generate internal/impl/python/package.go' or 'make generate' will trigger this directive,
// downloading the python wasm runtime to the ./runtime directory.
//
// The artifacts required by this package (entrypoint.py and the compressed WASM runtime)
// must be present at build time for the //go:embed directives to succeed.

//go:generate go run runtime/install.go -out ./runtime

var (
//go:embed runtime/entrypoint.py
pythonEntrypoint []byte

//go:embed runtime/python-3.12.0.wasm.br
pythonBrotliWASM []byte
)

// getPythonWasm is an idempotent function that decompresses and returns the
// brotili encoded runtime/python-3.12.0.wasm.br into it's underlying .wasm form.
var getPythonWasm = sync.OnceValues(func() ([]byte, error) {
br := brotli.NewReader(bytes.NewReader(pythonBrotliWASM))
contents, err := io.ReadAll(br)
if err != nil {
return nil, fmt.Errorf("failed to decompress embedded python wasm: %w", err)
}
return contents, nil
})
201 changes: 201 additions & 0 deletions internal/impl/python/processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
package python

import (
"context"
"errors"
"fmt"
"sync"

"github.com/tetratelabs/wazero"
"github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1"
"github.com/warpstreamlabs/bento/internal/impl/wasm/wasmpool"
"github.com/warpstreamlabs/bento/public/service"
)

var (
errNoWasmRuntime = errors.New("no WASM runtime found")

// getCompliationCache allows us to compile an in-memory compilation cache once, and have it re-used between
// processor instantiations.
// TODO(gregfurman): Look into a wazero.NewCompilationCacheWithDir for FS persistence (likely within the ./runtime dir).
// TODO(gregfurman): We never close the CompilationCache since it's shared between processors.
getCompilationCache = sync.OnceValue(wazero.NewCompilationCache)
)

const (
pythonReadySignal = "READY"

pythonSuccessStatus = 0
pythonFailureStatus = 1

pythonEntrypointFile = "entrypoint.py"
pythonExecScriptFile = "exec_script.py"
)

func init() {
err := service.RegisterProcessor("python", pythonProcessorSpec(),
func(conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error) {
return newPythonProcessor(conf, mgr)
})
if err != nil {
panic(err)
}
}

func pythonProcessorSpec() *service.ConfigSpec {
return service.NewConfigSpec().
Categories("Mapping").
Summary(`
Executes a Python script against each message in a stream.
`).Description(`
This processor uses a WebAssembly (WASM) hosted Python 3.12 environment [provided by VMWare](https://github.com/vmware-labs/webassembly-language-runtimes).

Each message is passed to the script as a global variable `+"`this`"+`. The script should assign the transformed result to the global variable `+"`root`"+`.

### Libraries
A curated set of standard libraries is available. Additional modules can be specified via the `+"`imports`"+` field, provided they are available within the internal WASM runtime environment.
`).Fields(
service.NewStringField("script").
Description("The Python script to execute for each message."),
service.NewStringListField("imports").
Description("An optional list of Python modules to pre-import for the script.").
Default([]string{}).
Optional(),
).Example(
"Structured Mapping", `
If we have a stream of JSON documents containing user data, we can use Python to calculate a new field.`, `
pipeline:
processors:
- python:
script: |
root["full_name"] = f"{this['first_name']} {this['last_name']}"
root["age_next_year"] = this["age"] + 1

# In: {"first_name": "Richie", "last_name": "Ryan", "age": 35}
# Out: {"full_name": "Richie Ryan", "age_next_year": 36}
`,
).Example(
"Data Filtering", `
By assigning `+"`None`"+` to `+"`root`"+`, you can effectively filter out messages based on complex logic.`, `
pipeline:
processors:
- python:
script: |
if this.pop("status", None) == "active":
root = this

# In: {"status": "active", "region": "us-east-1"}
# Out: {"region": "us-east-1"}

# In: {"status": "inactive", "region": "af-south-1"}
# Out: null # empty
`,
).Example(
"External Imports", `
Using standard libraries to perform calculations.`, `
pipeline:
processors:
- python:
imports: [ math ]
script: |
root = this
root["rounded_value"] = math.ceil(this["value"])

# In: {"value": 3.14158}
# Out: {"value": 3.14158, "rounded_value": 3}
`,
)
}

type pythonProcessor struct {
script string
imports []string

runtime wazero.Runtime
compiled wazero.CompiledModule

pool *wasmpool.WasmModulePool[*pythonInstance]

logger *service.Logger
}

func newPythonProcessor(conf *service.ParsedConfig, mgr *service.Resources) (service.Processor, error) {
pythonWASM, err := getPythonWasm()
if err != nil {
return nil, err
}

if len(pythonWASM) == 0 || len(pythonEntrypoint) == 0 {
mgr.Logger().Error("cannot load in python processor without WASM runtime and entrypoint.py being set.")
return nil, errNoWasmRuntime
}

ctx := context.Background()
cache := getCompilationCache()

r := wazero.NewRuntimeWithConfig(ctx, wazero.NewRuntimeConfig().WithCompilationCache(cache))
_, err = wasi_snapshot_preview1.Instantiate(ctx, r)
if err != nil {
return nil, err
}

script, err := conf.FieldString("script")
if err != nil {
return nil, err
}
imports, err := conf.FieldStringList("imports")
if err != nil {
return nil, err
}

compiled, err := r.CompileModule(ctx, pythonWASM)
if err != nil {
return nil, err
}

proc := &pythonProcessor{
runtime: r,
compiled: compiled,
imports: imports,
script: script,
logger: mgr.Logger(),
}

proc.pool, err = wasmpool.NewWasmModulePool[*pythonInstance](ctx, proc.newInstance)
if err != nil {
return nil, err
}

return proc, nil
}

func (p *pythonProcessor) Process(ctx context.Context, msg *service.Message) (service.MessageBatch, error) {
inputBytes, err := msg.AsBytes()
if err != nil {
return nil, err
}

instance, err := p.pool.Get(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get python instance: %w", err)
}
defer p.pool.Put(instance)

outputData, stderr, err := instance.runRequest(inputBytes)
if len(stderr) > 0 {
p.logger.Debugf("Python stderr: %s", string(stderr))
}

outMsg := msg.Copy()
if err != nil {
outMsg.SetError(err)
} else {
outMsg.SetBytes(outputData)
}

return service.MessageBatch{outMsg}, nil
}

func (p *pythonProcessor) Close(ctx context.Context) error {
return p.runtime.Close(ctx)
}
Loading