@@ -7,7 +7,7 @@ A durable, resumable workflow engine for Elixir. Similar to Temporal/Inngest.
77
88## Features
99
10- - ** Declarative DSL ** - Clean macro-based workflow definitions
10+ - ** Pipeline Model ** - Data flows from step to step, simple and explicit
1111- ** Resumability** - Sleep, wait for events, wait for human input
1212- ** Branching** - Pattern-matched conditional flow control
1313- ** Parallel** - Run steps concurrently with merge strategies
@@ -51,37 +51,38 @@ children = [
5151``` elixir
5252defmodule MyApp .OrderWorkflow do
5353 use Durable
54- use Durable .Context
54+ use Durable .Helpers
5555
5656 workflow " process_order" , timeout: hours (2 ) do
57- step :validate do
58- order = input ().order
59- put_context (:order_id , order.id)
60- put_context (:items , order.items)
57+ # First step receives workflow input
58+ step :validate , fn order ->
59+ {:ok , %{
60+ order_id: order[" id" ],
61+ items: order[" items" ],
62+ customer_id: order[" customer_id" ]
63+ }}
6164 end
6265
63- step :calculate_total do
64- total =
65- get_context (:items )
66- |> Enum .map (& &1 .price)
67- |> Enum .sum ()
68-
69- put_context (:total , total)
66+ # Each step receives previous step's output
67+ step :calculate_total , fn data ->
68+ total = data.items |> Enum .map (& &1 [" price" ]) |> Enum .sum ()
69+ {:ok , assign (data, :total , total)}
7070 end
7171
72- step :charge_payment , retry: [max_attempts: 3 , backoff: :exponential ] do
73- {:ok , charge} = PaymentService .charge (get_context ( : order_id), get_context ( : total) )
74- put_context ( :charge_id , charge.id)
72+ step :charge_payment , [ retry: [max_attempts: 3 , backoff: :exponential ]], fn data ->
73+ {:ok , charge} = PaymentService .charge (data. order_id, data. total)
74+ { :ok , assign (data, :charge_id , charge.id)}
7575 end
7676
77- step :send_confirmation do
78- EmailService .send_confirmation (get_context (:order_id ))
77+ step :send_confirmation , fn data ->
78+ EmailService .send_confirmation (data.order_id)
79+ {:ok , data}
7980 end
8081 end
8182end
8283
8384# Start it
84- {:ok , id} = Durable .start (MyApp .OrderWorkflow , %{order: order })
85+ {:ok , id} = Durable .start (MyApp .OrderWorkflow , %{" id " => " order_123 " , " items " => items })
8586```
8687
8788## Examples
@@ -93,22 +94,31 @@ Wait for human approval with timeout fallback.
9394``` elixir
9495defmodule MyApp .ExpenseApproval do
9596 use Durable
96- use Durable .Context
97+ use Durable .Helpers
9798 use Durable .Wait
9899
99100 workflow " expense_approval" do
100- step :request_approval do
101+ step :request_approval , fn data ->
101102 result = wait_for_approval (" manager" ,
102- prompt: " Approve $#{ input (). amount} expense?" ,
103+ prompt: " Approve $#{ data[ " amount" ] } expense?" ,
103104 timeout: days (3 ),
104105 timeout_value: :auto_rejected
105106 )
106- put_context ( :decision , result)
107+ { :ok , assign (data, :decision , result)}
107108 end
108109
109- branch on: get_context (:decision ) do
110- :approved -> step :process , do: Expenses .reimburse (input ().employee_id, input ().amount)
111- _ -> step :notify_rejection , do: Mailer .send_rejection (input ().employee_id)
110+ branch on: fn data -> data.decision end do
111+ :approved ->
112+ step :process , fn data ->
113+ Expenses .reimburse (data[" employee_id" ], data[" amount" ])
114+ {:ok , assign (data, :status , :reimbursed )}
115+ end
116+
117+ _ ->
118+ step :notify_rejection , fn data ->
119+ Mailer .send_rejection (data[" employee_id" ])
120+ {:ok , assign (data, :status , :rejected )}
121+ end
112122 end
113123 end
114124end
@@ -124,17 +134,30 @@ Fetch data concurrently, then combine.
124134``` elixir
125135defmodule MyApp .DashboardBuilder do
126136 use Durable
127- use Durable .Context
137+ use Durable .Helpers
128138
129139 workflow " build_dashboard" do
140+ step :init , fn input ->
141+ {:ok , %{user_id: input[" user_id" ]}}
142+ end
143+
130144 parallel do
131- step :user , do: put_context (:user , Users .get (input ().user_id))
132- step :orders , do: put_context (:orders , Orders .recent (input ().user_id))
133- step :notifications , do: put_context (:notifs , Notifications .unread (input ().user_id))
145+ step :user , fn data ->
146+ {:ok , assign (data, :user , Users .get (data.user_id))}
147+ end
148+
149+ step :orders , fn data ->
150+ {:ok , assign (data, :orders , Orders .recent (data.user_id))}
151+ end
152+
153+ step :notifications , fn data ->
154+ {:ok , assign (data, :notifs , Notifications .unread (data.user_id))}
155+ end
134156 end
135157
136- step :render do
137- Dashboard .build (get_context (:user ), get_context (:orders ), get_context (:notifs ))
158+ step :render , fn data ->
159+ dashboard = Dashboard .build (data.user, data.orders, data.notifs)
160+ {:ok , assign (data, :dashboard , dashboard)}
138161 end
139162 end
140163end
@@ -147,16 +170,23 @@ Process items with controlled concurrency.
147170``` elixir
148171defmodule MyApp .BulkEmailer do
149172 use Durable
150- use Durable .Context
173+ use Durable .Helpers
151174
152175 workflow " send_campaign" do
153- step :load do
154- put_context (:recipients , Subscribers .active (input ().campaign_id))
176+ step :load , fn input ->
177+ recipients = Subscribers .active (input[" campaign_id" ])
178+ {:ok , %{campaign_id: input[" campaign_id" ], recipients: recipients}}
155179 end
156180
157- foreach :send_emails , items: :recipients , concurrency: 10 , on_error: :continue do
158- step :send do
159- Mailer .send_campaign (current_item (), input ().campaign_id)
181+ foreach :send_emails ,
182+ items: fn data -> data.recipients end ,
183+ concurrency: 10 ,
184+ on_error: :continue do
185+
186+ # Foreach steps receive (data, item, index)
187+ step :send , fn data, recipient, _idx ->
188+ Mailer .send_campaign (recipient, data.campaign_id)
189+ {:ok , increment (data, :sent_count )}
160190 end
161191 end
162192 end
@@ -170,24 +200,34 @@ Book multiple services with automatic rollback on failure.
170200``` elixir
171201defmodule MyApp .TripBooking do
172202 use Durable
173- use Durable .Context
203+ use Durable .Helpers
174204
175205 workflow " book_trip" do
176- step :book_flight , compensate: :cancel_flight do
177- put_context (:flight , Flights .book (input ().flight))
206+ step :book_flight , [compensate: :cancel_flight ], fn data ->
207+ booking = Flights .book (data[" flight" ])
208+ {:ok , assign (data, :flight , booking)}
178209 end
179210
180- step :book_hotel , compensate: :cancel_hotel do
181- put_context (:hotel , Hotels .book (input ().hotel))
211+ step :book_hotel , [compensate: :cancel_hotel ], fn data ->
212+ booking = Hotels .book (data[" hotel" ])
213+ {:ok , assign (data, :hotel , booking)}
182214 end
183215
184- step :charge do
185- total = get_context (:flight ).price + get_context (:hotel ).price
186- Payments .charge (input ().card, total)
216+ step :charge , fn data ->
217+ total = data.flight.price + data.hotel.price
218+ Payments .charge (data[" card" ], total)
219+ {:ok , assign (data, :charged , true )}
187220 end
188221
189- compensate :cancel_flight , do: Flights .cancel (get_context (:flight ).id)
190- compensate :cancel_hotel , do: Hotels .cancel (get_context (:hotel ).id)
222+ compensate :cancel_flight , fn data ->
223+ Flights .cancel (data.flight.id)
224+ {:ok , data}
225+ end
226+
227+ compensate :cancel_hotel , fn data ->
228+ Hotels .cancel (data.hotel.id)
229+ {:ok , data}
230+ end
191231 end
192232end
193233```
@@ -199,19 +239,20 @@ Run daily at 9am.
199239``` elixir
200240defmodule MyApp .DailyReport do
201241 use Durable
242+ use Durable .Helpers
202243 use Durable .Scheduler .DSL
203- use Durable .Context
204244
205245 @schedule cron: " 0 9 * * *" , timezone: " America/New_York"
206246 workflow " daily_sales_report" do
207- step :generate do
247+ step :generate , fn _input ->
208248 report = Reports .sales_summary (Date .utc_today ())
209- put_context ( :report , report)
249+ { :ok , %{ report: report}}
210250 end
211251
212- step :distribute do
213- Mailer .send_report (get_context (:report ), to: " team@company.com" )
214- Slack .post_summary (get_context (:report ), channel: " #sales" )
252+ step :distribute , fn data ->
253+ Mailer .send_report (data.report, to: " team@company.com" )
254+ Slack .post_summary (data.report, channel: " #sales" )
255+ {:ok , data}
215256 end
216257 end
217258end
@@ -227,32 +268,37 @@ Sleep, schedule for specific times, and wait for events.
227268``` elixir
228269defmodule MyApp .TrialReminder do
229270 use Durable
230- use Durable .Context
271+ use Durable .Helpers
231272 use Durable .Wait
232273
233274 workflow " trial_reminder" do
234- step :welcome do
235- Mailer .send_welcome (input ().user_id)
275+ step :welcome , fn data ->
276+ Mailer .send_welcome (data[" user_id" ])
277+ {:ok , %{user_id: data[" user_id" ], trial_started_at: data[" trial_started_at" ]}}
236278 end
237279
238- step :wait_3_days do
280+ step :wait_3_days , fn data ->
239281 sleep (days (3 ))
282+ {:ok , data}
240283 end
241284
242- step :check_in do
243- Mailer .send_tips (input ().user_id)
285+ step :check_in , fn data ->
286+ Mailer .send_tips (data.user_id)
287+ {:ok , data}
244288 end
245289
246- step :wait_until_trial_ends do
247- trial_end = DateTime .add (input () .trial_started_at, 14 , :day )
290+ step :wait_until_trial_ends , fn data ->
291+ trial_end = DateTime .add (data .trial_started_at, 14 , :day )
248292 schedule_at (trial_end)
293+ {:ok , data}
249294 end
250295
251- step :convert_or_remind do
252- if Subscriptions .active? (input () .user_id) do
253- put_context ( :converted , true )
296+ step :convert_or_remind , fn data ->
297+ if Subscriptions .active? (data .user_id) do
298+ { :ok , assign (data, :converted , true )}
254299 else
255- Mailer .send_upgrade_reminder (input ().user_id)
300+ Mailer .send_upgrade_reminder (data.user_id)
301+ {:ok , assign (data, :converted , false )}
256302 end
257303 end
258304 end
@@ -266,26 +312,35 @@ Wait for external webhook events.
266312``` elixir
267313defmodule MyApp .PaymentFlow do
268314 use Durable
269- use Durable .Context
315+ use Durable .Helpers
270316 use Durable .Wait
271317
272318 workflow " payment_flow" do
273- step :create_invoice do
274- invoice = Invoices .create (input (). order_id, input (). amount)
275- put_context ( :invoice_id , invoice.id)
319+ step :create_invoice , fn data ->
320+ invoice = Invoices .create (data[ " order_id" ], data[ " amount" ] )
321+ { :ok , %{ order_id: data[ " order_id " ], invoice_id: invoice.id}}
276322 end
277323
278- step :await_payment do
324+ step :await_payment , fn data ->
279325 {event, _payload } = wait_for_any ([" payment.success" , " payment.failed" ],
280326 timeout: days (7 ),
281327 timeout_value: {" payment.expired" , nil }
282328 )
283- put_context ( :result , event)
329+ { :ok , assign (data, :result , event)}
284330 end
285331
286- branch on: get_context (:result ) do
287- " payment.success" -> step :fulfill , do: Orders .fulfill (input ().order_id)
288- _ -> step :cancel , do: Orders .cancel (input ().order_id)
332+ branch on: fn data -> data.result end do
333+ " payment.success" ->
334+ step :fulfill , fn data ->
335+ Orders .fulfill (data.order_id)
336+ {:ok , assign (data, :status , :fulfilled )}
337+ end
338+
339+ _ ->
340+ step :cancel , fn data ->
341+ Orders .cancel (data.order_id)
342+ {:ok , assign (data, :status , :cancelled )}
343+ end
289344 end
290345 end
291346end
@@ -296,14 +351,17 @@ Durable.send_event(workflow_id, "payment.success", %{transaction_id: "txn_123"})
296351
297352## Reference
298353
299- ### Context
354+ ### Helper Functions
300355
301356``` elixir
302- input () # Initial workflow input
303- get_context (:key ) # Get value
304- get_context (:key , default) # With default
305- put_context (:key , value) # Set value
306- append_context (:list , item) # Append to list
357+ use Durable .Helpers
358+
359+ assign (data, :key , value) # Set a value
360+ assign (data, %{a: 1 , b: 2 }) # Merge multiple values
361+ update (data, :key , default, fn old -> new end )
362+ append (data, :list , item) # Append to list
363+ increment (data, :count ) # Increment by 1
364+ increment (data, :count , 5 ) # Increment by 5
307365```
308366
309367### Time Helpers
0 commit comments