Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions pkg/bigquery/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1244,7 +1244,8 @@ func (d *Client) GetDatabaseSummary(ctx context.Context) (*ansisql.DBDatabase, e
mu := sync.Mutex{}
var errs []error

workers := max(runtime.NumCPU(), 8)
// Use a smaller pool size to reduce API call frequency
workers := min(runtime.NumCPU(), 4)

p := pool.New().WithMaxGoroutines(workers)

Expand Down Expand Up @@ -1325,7 +1326,8 @@ func (d *Client) GetDatabaseSummaryForSchemas(ctx context.Context, schemas []str
mu := sync.Mutex{}
var errs []error

workers := max(runtime.NumCPU(), 8)
// Use a smaller pool size to reduce API call frequency
workers := min(runtime.NumCPU(), 4)
p := pool.New().WithMaxGoroutines(workers)

// Only iterate over requested schemas (datasets)
Expand Down
28 changes: 18 additions & 10 deletions pkg/claudecode/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (
"github.com/bruin-data/bruin/pkg/pipeline"
"github.com/bruin-data/bruin/pkg/scheduler"
"github.com/pkg/errors"
"github.com/sourcegraph/conc/pool"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)

// Parameter constants.
Expand Down Expand Up @@ -226,23 +226,31 @@ func (o *ClaudeCodeOperator) Run(ctx context.Context, ti scheduler.TaskInstance)
return errors.Wrap(err, "failed to get stderr pipe")
}

wg := new(errgroup.Group)
wg.Go(func() error { return o.consumePipe(stdout, output) })
wg.Go(func() error { return o.consumePipe(stderr, output) })
p := pool.New().WithMaxGoroutines(2).WithErrors()
p.Go(func() error { return o.consumePipe(stdout, output) })
p.Go(func() error { return o.consumePipe(stderr, output) })

err = cmd.Start()
if err != nil {
return errors.Wrap(err, "failed to start Claude command")
}

res := cmd.Wait()
if res != nil {
return res
// Wait for pipe consumption to complete FIRST
// This is critical: we must finish reading from pipes before calling cmd.Wait()
// because cmd.Wait() will close the pipes after the command exits
pipeErr := p.Wait()

// Now wait for the command to finish
cmdErr := cmd.Wait()

// Return command error first if both exist
if cmdErr != nil {
return cmdErr
}

err = wg.Wait()
if err != nil {
return errors.Wrap(err, "failed to consume pipe")
// Return pipe error if it exists
if pipeErr != nil {
return errors.Wrap(pipeErr, "failed to consume pipe")
}
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/python/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/bruin-data/bruin/pkg/git"
"github.com/bruin-data/bruin/pkg/logger"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"github.com/sourcegraph/conc/pool"
)

const WINDOWS = "windows"
Expand Down Expand Up @@ -164,9 +164,9 @@ func (l *CommandRunner) RunAnyCommand(ctx context.Context, cmd *exec.Cmd) error

// Start reading from pipes in goroutines before starting the command
// This prevents deadlock if the command generates a lot of output
wg := new(errgroup.Group)
wg.Go(func() error { return consumePipe(stdout, output) })
wg.Go(func() error { return consumePipe(stderr, output) })
p := pool.New().WithMaxGoroutines(2).WithErrors()
p.Go(func() error { return consumePipe(stdout, output) })
p.Go(func() error { return consumePipe(stderr, output) })

err = cmd.Start()
if err != nil {
Expand All @@ -176,7 +176,7 @@ func (l *CommandRunner) RunAnyCommand(ctx context.Context, cmd *exec.Cmd) error
// Wait for pipe consumption to complete FIRST
// This is critical: we must finish reading from pipes before calling cmd.Wait()
// because cmd.Wait() will close the pipes after the command exits
pipeErr := wg.Wait()
pipeErr := p.Wait()

// Now wait for the command to finish
cmdErr := cmd.Wait()
Expand Down
28 changes: 18 additions & 10 deletions pkg/r/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/bruin-data/bruin/pkg/git"
"github.com/bruin-data/bruin/pkg/logger"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"github.com/sourcegraph/conc/pool"
)

type cmd interface {
Expand Down Expand Up @@ -117,23 +117,31 @@ func (l *CommandRunner) RunAnyCommand(ctx context.Context, cmd *exec.Cmd) error
return errors.Wrap(err, "failed to get stderr")
}

wg := new(errgroup.Group)
wg.Go(func() error { return consumePipe(stdout, output) })
wg.Go(func() error { return consumePipe(stderr, output) })
p := pool.New().WithMaxGoroutines(2).WithErrors()
p.Go(func() error { return consumePipe(stdout, output) })
p.Go(func() error { return consumePipe(stderr, output) })

err = cmd.Start()
if err != nil {
return errors.Wrap(err, "failed to start CommandInstance")
}

res := cmd.Wait()
if res != nil {
return res
// Wait for pipe consumption to complete FIRST
// This is critical: we must finish reading from pipes before calling cmd.Wait()
// because cmd.Wait() will close the pipes after the command exits
pipeErr := p.Wait()

// Now wait for the command to finish
cmdErr := cmd.Wait()

// Return command error first if both exist
if cmdErr != nil {
return cmdErr
}

err = wg.Wait()
if err != nil {
return errors.Wrap(err, "failed to consume pipe")
// Return pipe error if it exists
if pipeErr != nil {
return errors.Wrap(pipeErr, "failed to consume pipe")
}

return nil
Expand Down