-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy path001-event.sql
More file actions
118 lines (111 loc) · 2.55 KB
/
001-event.sql
File metadata and controls
118 lines (111 loc) · 2.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
--001-event.sql
-----------------------------------------------------------------------
-- agent.insert_event
create or replace function agent.insert_event(_event jsonb) returns void
as $func$
insert into agent.event
( event_ts
, event
)
select
agent.to_timestamptz((_event->>'event_ts')::numeric)
, _event
;
$func$ language sql volatile security invoker
;
-----------------------------------------------------------------------
-- agent.claim_event
create or replace function agent.claim_event
( _max_attempts int4 default 3
, _invisible_for interval default interval '10m'
) returns setof agent.event
as $func$
with x as
(
select e.id
from agent.event e
where e.vt <= now() -- must be visible
and e.attempts < _max_attempts -- must not have exceeded attempts
order by random() -- shuffle the deck
limit 1
for update
skip locked
)
, u as
(
update agent.event u set
vt = clock_timestamp() + _invisible_for -- invisible for a bit while we work it
, attempts = u.attempts + 1
, claimed = claimed || now()
from x
where u.id = x.id
returning u.*
)
select *
from u
$func$ language sql volatile security invoker
;
-----------------------------------------------------------------------
-- agent.delete_event
create or replace function agent.delete_event(_id int8, _processed boolean default true) returns void
as $func$
with d as
(
delete from agent.event
where id = _id
returning *
)
insert into agent.event_hist
( id
, event_ts
, attempts
, vt
, claimed
, event
, processed
)
select
d.id
, d.event_ts
, d.attempts
, d.vt
, d.claimed
, d.event
, _processed
from d
;
$func$ language sql volatile security invoker
;
-----------------------------------------------------------------------
-- agent.delete_expired_events
create or replace function agent.delete_expired_events
( _max_attempts int default 3
, _max_vt_age interval default interval '1h'
) returns void
as $func$
with d as
(
delete from agent.event e
where e.attempts >= _max_attempts
or e.vt <= (now() - _max_vt_age)
returning *
)
insert into agent.event_hist
( id
, event_ts
, attempts
, vt
, claimed
, event
)
select
d.id
, d.event_ts
, d.attempts
, d.vt
, d.claimed
, d.event
from d
;
$func$ language sql volatile security invoker
;