Skip to content

Commit 3de231c

Browse files
committed
Cleanup code
1 parent a752e33 commit 3de231c

File tree

6 files changed

+69
-55
lines changed

6 files changed

+69
-55
lines changed

worker2/dag.go renamed to worker2/dag/dag.go

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package worker2
1+
package dag
22

33
import (
44
"fmt"
@@ -129,24 +129,24 @@ func newNodesTransitive[T any](transitiveGetter func(d *Node[T]) *nodesTransitiv
129129
}
130130
}
131131

132-
type DAGEvent interface {
132+
type Event interface {
133133
dagEvent()
134134
}
135135

136-
type DAGEventNewDep[T any] struct {
136+
type EventNewDep[T any] struct {
137137
Node *Node[T]
138138
}
139139

140-
func (d DAGEventNewDep[T]) dagEvent() {}
140+
func (d EventNewDep[T]) dagEvent() {}
141141

142-
type DAGHook func(DAGEvent)
142+
type Hook func(Event)
143143

144144
type Node[T any] struct {
145145
V T
146146
ID string
147147
frozen atomic.Bool
148148
m sync.Mutex
149-
hooks []DAGHook
149+
hooks []Hook
150150

151151
Dependencies *nodesTransitive[T]
152152
Dependees *nodesTransitive[T]
@@ -169,7 +169,7 @@ func (d *Node[T]) GetID() string {
169169
return d.ID
170170
}
171171

172-
func (d *Node[T]) AddHook(hook DAGHook) {
172+
func (d *Node[T]) AddHook(hook Hook) {
173173
d.m.Lock()
174174
defer d.m.Unlock()
175175

@@ -207,7 +207,7 @@ func (d *Node[T]) addDependency(dep *Node[T]) {
207207
}
208208

209209
for _, hook := range d.hooks {
210-
hook(DAGEventNewDep[T]{Node: dep})
210+
hook(EventNewDep[T]{Node: dep})
211211
}
212212
}
213213
}
@@ -242,19 +242,27 @@ func (d *Node[T]) IsFrozen() bool {
242242
return d.frozen.Load()
243243
}
244244

245-
// Freeze assumes the lock is already held
246-
func (d *Node[T]) Freeze() {
245+
// Freeze will lock, and run valid across all dependencies, return false to prevent locking
246+
func (d *Node[T]) Freeze(valid func(*Node[T]) bool) bool {
247+
d.m.Lock() // prevent any deps modification
248+
defer d.m.Unlock()
249+
247250
if d.frozen.Load() {
248-
return
251+
return true
249252
}
250253

251-
for _, dep := range d.Dependencies.nodes.Slice() {
254+
for _, dep := range d.Dependencies.Set().Slice() {
252255
if !dep.IsFrozen() {
253256
panic(fmt.Sprintf("attempting to freeze '%v' while all deps aren't frozen, '%v' isnt", d.ID, dep.ID))
254257
}
258+
259+
if !valid(dep) {
260+
return false
261+
}
255262
}
256263

257264
d.frozen.Store(true)
265+
return true
258266
}
259267

260268
func (d *Node[T]) DebugString() string {

worker2/dep.go

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@ package worker2
33
import (
44
"context"
55
"github.com/hephbuild/heph/utils/xtypes"
6+
"github.com/hephbuild/heph/worker2/dag"
67
"sync"
78
"time"
89
)
910

1011
type Dep interface {
1112
GetName() string
1213
Exec(ctx context.Context, ins InStore, outs OutStore) error
13-
GetNode() *Node[Dep]
14+
GetNode() *dag.Node[Dep]
1415
AddDep(...Dep)
1516
GetHooks() []Hook
1617
AddHook(h Hook)
@@ -42,7 +43,7 @@ func newBase() baseDep {
4243
type baseDep struct {
4344
execution *Execution
4445
m sync.RWMutex
45-
node *Node[Dep]
46+
node *dag.Node[Dep]
4647
named map[string]Dep
4748
hooks []Hook
4849

@@ -56,7 +57,7 @@ func (a *baseDep) init() {
5657
}
5758
}
5859

59-
func (a *baseDep) GetNode() *Node[Dep] {
60+
func (a *baseDep) GetNode() *dag.Node[Dep] {
6061
return a.node
6162
}
6263

@@ -213,16 +214,6 @@ func (a *baseDep) GetHooks() []Hook {
213214
return a.hooks[:]
214215
}
215216

216-
type ActionConfig struct {
217-
Ctx context.Context
218-
Name string
219-
Deps []Dep
220-
Hooks []Hook
221-
Scheduler Scheduler
222-
Requests map[string]float64
223-
Do func(ctx context.Context, ins InStore, outs OutStore) error
224-
}
225-
226217
type Action struct {
227218
baseDep
228219
ctx context.Context
@@ -266,11 +257,6 @@ func (a *Action) DeepDo(f func(Dep)) {
266257
deepDo(a, f)
267258
}
268259

269-
type GroupConfig struct {
270-
Name string
271-
Deps []Dep
272-
}
273-
274260
type Group struct {
275261
baseDep
276262
name string

worker2/dep_action.go

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,23 @@
11
package worker2
22

3-
type EventDeclared struct {
4-
Dep Dep
5-
}
3+
import (
4+
"context"
5+
"github.com/hephbuild/heph/worker2/dag"
6+
)
67

7-
func (EventDeclared) Replayable() bool {
8-
return true
8+
type ActionConfig struct {
9+
Ctx context.Context
10+
Name string
11+
Deps []Dep
12+
Hooks []Hook
13+
Scheduler Scheduler
14+
Requests map[string]float64
15+
Do func(ctx context.Context, ins InStore, outs OutStore) error
916
}
1017

1118
func NewAction(cfg ActionConfig) *Action {
1219
a := &Action{baseDep: newBase()}
13-
a.node = NewNode[Dep](cfg.Name, a)
20+
a.node = dag.NewNode[Dep](cfg.Name, a)
1421

1522
a.name = cfg.Name
1623
a.ctx = cfg.Ctx
@@ -30,9 +37,9 @@ func NewAction(cfg ActionConfig) *Action {
3037
hook(EventDeclared{Dep: a})
3138
}
3239

33-
a.node.AddHook(func(event DAGEvent) {
40+
a.node.AddHook(func(event dag.Event) {
3441
switch event := event.(type) {
35-
case DAGEventNewDep[Dep]:
42+
case dag.EventNewDep[Dep]:
3643
for _, hook := range a.hooks {
3744
hook(EventNewDep{Target: event.Node.V})
3845
}

worker2/dep_group.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package worker2
22

3+
import "github.com/hephbuild/heph/worker2/dag"
4+
35
func NewGroup(deps ...Dep) *Group {
46
return NewNamedGroup("", deps...)
57
}
@@ -8,16 +10,21 @@ func NewNamedGroup(name string, deps ...Dep) *Group {
810
return NewGroupWith(GroupConfig{Name: name, Deps: deps})
911
}
1012

13+
type GroupConfig struct {
14+
Name string
15+
Deps []Dep
16+
}
17+
1118
func NewGroupWith(cfg GroupConfig) *Group {
1219
g := &Group{baseDep: newBase()}
13-
g.node = NewNode[Dep](cfg.Name, g)
20+
g.node = dag.NewNode[Dep](cfg.Name, g)
1421

1522
g.name = cfg.Name
1623
g.AddDep(cfg.Deps...)
1724

18-
g.node.AddHook(func(event DAGEvent) {
25+
g.node.AddHook(func(event dag.Event) {
1926
switch event := event.(type) {
20-
case DAGEventNewDep[Dep]:
27+
case dag.EventNewDep[Dep]:
2128
for _, hook := range g.hooks {
2229
hook(EventNewDep{Target: g, AddedDep: event.Node.V})
2330
}

worker2/engine.go

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/hephbuild/heph/utils/sets"
88
"github.com/hephbuild/heph/utils/xcontext"
99
"github.com/hephbuild/heph/utils/xerrors"
10+
"github.com/hephbuild/heph/worker2/dag"
1011
"go.uber.org/multierr"
1112
"runtime"
1213
"sync"
@@ -146,22 +147,19 @@ func (e *Engine) waitForDeps(exec *Execution) error {
146147
}
147148
}
148149

149-
func (e *Engine) tryFreeze(depObj *Node[Dep]) (bool, error) {
150-
depObj.m.Lock() // prevent any deps modification
151-
defer depObj.m.Unlock()
152-
150+
func (e *Engine) tryFreeze(depObj *dag.Node[Dep]) (bool, error) {
153151
errs := sets.NewIdentitySet[error](0)
154152

155-
for _, dep := range depObj.Dependencies.Values() {
156-
depExec := dep.getExecution()
153+
frozen := depObj.Freeze(func(n *dag.Node[Dep]) bool {
154+
depExec := n.V.getExecution()
157155
if depExec == nil {
158-
return false, nil
156+
return false
159157
}
160158

161159
state := depExec.State
162160

163161
if !state.IsFinal() {
164-
return false, nil
162+
return false
165163
}
166164

167165
switch state {
@@ -183,15 +181,15 @@ func (e *Engine) tryFreeze(depObj *Node[Dep]) (bool, error) {
183181
}
184182
}
185183
}
186-
}
187184

188-
depObj.Freeze()
185+
return true
186+
})
189187

190-
if errs.Len() > 0 {
191-
return true, multierr.Combine(errs.Slice()...)
188+
if !frozen {
189+
return false, nil
192190
}
193191

194-
return true, nil
192+
return true, multierr.Combine(errs.Slice()...)
195193
}
196194

197195
func (e *Engine) waitForDepsAndSchedule(exec *Execution) {
@@ -368,7 +366,7 @@ func (e *Engine) registerOne(dep Dep, lock bool) *Execution {
368366
eventsCh: e.eventsCh,
369367
completedCh: make(chan struct{}),
370368
m: m,
371-
events: make([]Event, 5),
369+
events: make([]Event, 0, 5),
372370

373371
// see field comments
374372
errCh: nil,

worker2/events.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,14 @@ type EventWithExecution interface {
1111
getExecution() *Execution
1212
}
1313

14+
type EventDeclared struct {
15+
Dep Dep
16+
}
17+
18+
func (EventDeclared) Replayable() bool {
19+
return true
20+
}
21+
1422
type EventCompleted struct {
1523
At time.Time
1624
Execution *Execution

0 commit comments

Comments
 (0)