Skip to content

Commit 2eb8a5d

Browse files
authored
Merge pull request #3 from wavezync/feat/parallel-execution
feat!: parallel execution improvements and foreach removal
2 parents 8dc4c6e + af7c386 commit 2eb8a5d

File tree

15 files changed

+1464
-2665
lines changed

15 files changed

+1464
-2665
lines changed

README.md

Lines changed: 168 additions & 103 deletions
Large diffs are not rendered by default.

guides/ai_workflows.md

Lines changed: 37 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -60,28 +60,28 @@ defmodule MyApp.DocumentProcessor do
6060
use Durable.Helpers
6161

6262
workflow "process_document" do
63-
step :fetch, fn data ->
64-
doc = DocumentStore.get(data["doc_id"])
63+
step :fetch, fn ctx ->
64+
doc = DocumentStore.get(ctx["doc_id"])
6565
{:ok, %{doc: doc}}
6666
end
6767

6868
# AI classification with automatic retry
69-
step :classify, [retry: [max_attempts: 3, backoff: :exponential]], fn data ->
70-
content = data.doc.content
69+
step :classify, [retry: [max_attempts: 3, backoff: :exponential]], fn ctx ->
70+
content = ctx.doc.content
7171

7272
doc_type = ReqLLM.generate_text!(
7373
"anthropic:claude-sonnet-4-20250514",
7474
"Classify this document as :invoice, :contract, or :other. Reply with only the atom.\n\n#{content}"
7575
) |> String.trim() |> String.to_atom()
7676

77-
{:ok, assign(data, :doc_type, doc_type)}
77+
{:ok, assign(ctx, :doc_type, doc_type)}
7878
end
7979

8080
# Conditional branching - only ONE path executes
81-
branch on: fn data -> data.doc_type end do
81+
branch on: fn ctx -> ctx.doc_type end do
8282
:invoice ->
83-
step :extract_invoice, [retry: [max_attempts: 3]], fn data ->
84-
content = data.doc.content
83+
step :extract_invoice, [retry: [max_attempts: 3]], fn ctx ->
84+
content = ctx.doc.content
8585

8686
{:ok, extracted} = ReqLLM.generate_object(
8787
"anthropic:claude-sonnet-4-20250514",
@@ -94,18 +94,18 @@ defmodule MyApp.DocumentProcessor do
9494
}
9595
)
9696

97-
{:ok, assign(data, :extracted, extracted)}
97+
{:ok, assign(ctx, :extracted, extracted)}
9898
end
9999

100-
step :validate_invoice, fn data ->
101-
extracted = data.extracted
100+
step :validate_invoice, fn ctx ->
101+
extracted = ctx.extracted
102102
calculated = Enum.sum(Enum.map(extracted.line_items, & &1.amount))
103-
{:ok, assign(data, :valid, abs(calculated - extracted.total) < 0.01)}
103+
{:ok, assign(ctx, :valid, abs(calculated - extracted.total) < 0.01)}
104104
end
105105

106106
:contract ->
107-
step :extract_contract, [retry: [max_attempts: 3]], fn data ->
108-
content = data.doc.content
107+
step :extract_contract, [retry: [max_attempts: 3]], fn ctx ->
108+
content = ctx.doc.content
109109

110110
{:ok, extracted} = ReqLLM.generate_object(
111111
"anthropic:claude-sonnet-4-20250514",
@@ -117,26 +117,26 @@ defmodule MyApp.DocumentProcessor do
117117
}
118118
)
119119

120-
{:ok, assign(data, :extracted, extracted)}
120+
{:ok, assign(ctx, :extracted, extracted)}
121121
end
122122

123123
_ ->
124-
step :flag_for_review, fn data ->
125-
{:ok, assign(data, :needs_review, true)}
124+
step :flag_for_review, fn ctx ->
125+
{:ok, assign(ctx, :needs_review, true)}
126126
end
127127
end
128128

129129
# Runs after any branch completes
130-
step :store, fn data ->
131-
doc = data.doc
130+
step :store, fn ctx ->
131+
doc = ctx.doc
132132

133133
DocumentStore.update(doc.id, %{
134-
doc_type: data.doc_type,
135-
extracted_data: Map.get(data, :extracted, %{}),
136-
needs_review: Map.get(data, :needs_review, false)
134+
doc_type: ctx.doc_type,
135+
extracted_data: Map.get(ctx, :extracted, %{}),
136+
needs_review: Map.get(ctx, :needs_review, false)
137137
})
138138

139-
{:ok, data}
139+
{:ok, ctx}
140140
end
141141
end
142142
end
@@ -150,18 +150,18 @@ end
150150
### Retries for API Calls
151151

152152
```elixir
153-
step :ai_call, [retry: [max_attempts: 3, backoff: :exponential]], fn data ->
154-
result = ReqLLM.generate_text!("anthropic:claude-sonnet-4-20250514", data.prompt)
155-
{:ok, assign(data, :result, result)}
153+
step :ai_call, [retry: [max_attempts: 3, backoff: :exponential]], fn ctx ->
154+
result = ReqLLM.generate_text!("anthropic:claude-sonnet-4-20250514", ctx.prompt)
155+
{:ok, assign(ctx, :result, result)}
156156
end
157157
```
158158

159159
### Validate AI Outputs
160160

161161
```elixir
162-
step :extract, fn data ->
163-
case ReqLLM.generate_object(model, data.prompt, schema: schema) do
164-
{:ok, extracted} -> {:ok, assign(data, :data, extracted)}
162+
step :extract, fn ctx ->
163+
case ReqLLM.generate_object(model, ctx.prompt, schema: schema) do
164+
{:ok, extracted} -> {:ok, assign(ctx, :data, extracted)}
165165
{:error, _} -> raise "Invalid response" # Triggers retry
166166
end
167167
end
@@ -172,26 +172,26 @@ end
172172
```elixir
173173
use Durable.Wait
174174

175-
step :review, fn data ->
176-
if data.confidence < 0.8 do
175+
step :review, fn ctx ->
176+
if ctx.confidence < 0.8 do
177177
result = wait_for_input("human_review", timeout: hours(24))
178-
{:ok, assign(data, :human_verified, result)}
178+
{:ok, assign(ctx, :human_verified, result)}
179179
else
180-
{:ok, data}
180+
{:ok, ctx}
181181
end
182182
end
183183
```
184184

185185
### Branch on AI Classification
186186

187187
```elixir
188-
branch on: fn data -> data.category end do
188+
branch on: fn ctx -> ctx.category end do
189189
:billing ->
190-
step :handle_billing, fn data -> {:ok, data} end
190+
step :handle_billing, fn ctx -> {:ok, ctx} end
191191
:technical ->
192-
step :handle_technical, fn data -> {:ok, data} end
192+
step :handle_technical, fn ctx -> {:ok, ctx} end
193193
_ ->
194-
step :handle_default, fn data -> {:ok, data} end
194+
step :handle_default, fn ctx -> {:ok, ctx} end
195195
end
196196
```
197197

0 commit comments

Comments
 (0)