Skip to content

Commit 8ae2f14

Browse files
committed
create pool methods to reserve and release connections for transaction use
1 parent ca17df1 commit 8ae2f14

File tree

4 files changed

+233
-3
lines changed

4 files changed

+233
-3
lines changed

README.md

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,11 +470,47 @@ Additional pool-specific methods:
470470

471471
* `pool:pool_size()` — Returns the current number of connections in the pool
472472
* `pool:active_connections()` — Returns the number of connections currently executing queries
473+
* `pool:reserve()` — Returns a raw `Postgres` connection for exclusive use (see below)
474+
* `pool:release(pg)` — Returns a reserved connection back to the pool (see below)
473475

474476
> **Note:** `wait_for_notification` is not supported with `PostgresPool` because
475477
> notifications are tied to the specific socket connection that issued the
476478
> `LISTEN` command.
477479
480+
### Transaction Support with reserve/release
481+
482+
When you need to run a transaction across multiple queries, use `reserve()` to
483+
get exclusive access to a connection. This prevents other coroutines from using
484+
the same connection between your transaction queries.
485+
486+
```lua
487+
local pg = pool:reserve() -- Returns raw Postgres connection, marked as reserved
488+
pg:query("BEGIN")
489+
pg:query("INSERT INTO users (name) VALUES ('test')")
490+
pg:query("COMMIT")
491+
pool:release(pg) -- Returns connection to pool
492+
```
493+
494+
#### `pool:reserve()`
495+
496+
Returns a raw `Postgres` connection object from the pool, marked as reserved.
497+
While reserved, the connection will not be used by `pool:query()` or other pool
498+
methods.
499+
500+
* Returns: `Postgres` connection object on success
501+
* Returns: `nil, "not connected"` if pool has no connections
502+
* Returns: `nil, "pool exhausted, max_pool_size reached"` if at capacity
503+
504+
#### `pool:release(pg)`
505+
506+
Returns a reserved connection back to the pool for reuse.
507+
508+
* Validates the connection belongs to this pool
509+
* If the connection is still in a transaction (`transaction_status` is `"T"` or `"E"`), automatically issues `ROLLBACK` to reset it to idle state
510+
* Returns: `true` on success
511+
* Returns: `nil, "connection not from this pool"` if connection belongs to a different pool
512+
* Returns: `nil, "connection not reserved"` if the connection was not reserved
513+
478514
## Extended and simple query protocols
479515

480516
pgmoon will issue your query to the database server using either the simple or

pgmoon/pool.lua

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ do
1414
local _list_0 = self.pool
1515
for _index_0 = 1, #_list_0 do
1616
local pg = _list_0[_index_0]
17-
if not (pg.busy) then
17+
if not (pg.busy or pg.reserved) then
1818
return pg
1919
end
2020
end
@@ -125,6 +125,55 @@ do
125125
encode_bytea = Postgres.encode_bytea,
126126
decode_bytea = Postgres.decode_bytea,
127127
setup_hstore = Postgres.setup_hstore,
128+
reserve = function(self)
129+
if #self.pool == 0 then
130+
return nil, "not connected"
131+
end
132+
local _list_0 = self.pool
133+
for _index_0 = 1, #_list_0 do
134+
local pg = _list_0[_index_0]
135+
if not (pg.busy or pg.reserved) then
136+
pg.reserved = true
137+
return pg
138+
end
139+
end
140+
if self.config.max_pool_size and #self.pool >= self.config.max_pool_size then
141+
return nil, "pool exhausted, max_pool_size reached"
142+
end
143+
local pg = self:_create_instance()
144+
local ok, err = pg:connect()
145+
if not (ok) then
146+
return nil, err
147+
end
148+
table.insert(self.pool, pg)
149+
pg.reserved = true
150+
return pg
151+
end,
152+
release = function(self, pg)
153+
if not (pg.parent_pool == self) then
154+
return nil, "connection not from this pool"
155+
end
156+
if not (pg.reserved) then
157+
return nil, "connection not reserved"
158+
end
159+
if pg.transaction_status == "T" or pg.transaction_status == "E" then
160+
local ok, err = pcall(function()
161+
return pg:query("ROLLBACK")
162+
end)
163+
if not (ok and pg.transaction_status == "I") then
164+
pg:disconnect()
165+
for i, conn in ipairs(self.pool) do
166+
if conn == pg then
167+
table.remove(self.pool, i)
168+
break
169+
end
170+
end
171+
return nil, "rollback failed, connection removed from pool"
172+
end
173+
end
174+
pg.reserved = false
175+
return true
176+
end,
128177
pool_size = function(self)
129178
return #self.pool
130179
end,

pgmoon/pool.moon

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@ class PostgresPool
1414
-- Must call connect() first
1515
return nil, "not connected" if #@pool == 0
1616

17-
-- Find first non-busy instance
17+
-- Find first non-busy and non-reserved instance
1818
for pg in *@pool
19-
unless pg.busy
19+
unless pg.busy or pg.reserved
2020
return pg
2121

2222
-- All busy, check max_pool_size
@@ -108,6 +108,48 @@ class PostgresPool
108108
decode_bytea: Postgres.decode_bytea
109109
setup_hstore: Postgres.setup_hstore
110110

111+
-- Reserve/release for transaction support
112+
reserve: =>
113+
return nil, "not connected" if #@pool == 0
114+
115+
-- Find first available (non-busy and non-reserved) instance
116+
for pg in *@pool
117+
unless pg.busy or pg.reserved
118+
pg.reserved = true
119+
return pg
120+
121+
-- All busy/reserved, check max_pool_size
122+
if @config.max_pool_size and #@pool >= @config.max_pool_size
123+
return nil, "pool exhausted, max_pool_size reached"
124+
125+
-- Create and connect new instance
126+
pg = @_create_instance!
127+
ok, err = pg\connect!
128+
return nil, err unless ok
129+
130+
table.insert @pool, pg
131+
pg.reserved = true
132+
pg
133+
134+
release: (pg) =>
135+
return nil, "connection not from this pool" unless pg.parent_pool == @
136+
return nil, "connection not reserved" unless pg.reserved
137+
138+
-- Ensure connection is idle before returning to pool
139+
if pg.transaction_status == "T" or pg.transaction_status == "E"
140+
ok, err = pcall -> pg\query "ROLLBACK"
141+
-- If rollback failed, connection is likely broken - remove from pool
142+
unless ok and pg.transaction_status == "I"
143+
pg\disconnect!
144+
for i, conn in ipairs @pool
145+
if conn == pg
146+
table.remove @pool, i
147+
break
148+
return nil, "rollback failed, connection removed from pool"
149+
150+
pg.reserved = false
151+
true
152+
111153
-- Pool info helpers
112154
pool_size: => #@pool
113155
active_connections: =>

spec/pgmoon_pool_spec.moon

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,3 +194,106 @@ describe "pgmoon.pool", ->
194194
it "has NULL constant", ->
195195
pool = PostgresPool {}
196196
assert.truthy pool.NULL
197+
198+
describe "reserve and release", ->
199+
before_each ->
200+
pool = PostgresPool { database: DB, user: USER, password: PASSWORD, host: HOST, port: PORT }
201+
assert pool\connect!
202+
203+
after_each ->
204+
pool\disconnect!
205+
206+
it "reserves and releases a connection for transaction", ->
207+
pg = assert pool\reserve!
208+
assert.truthy pg.reserved
209+
assert.same 1, pool\pool_size!
210+
211+
assert pg\query "BEGIN"
212+
assert.same "T", pg.transaction_status
213+
214+
assert pg\query "SELECT 1"
215+
assert pg\query "COMMIT"
216+
assert.same "I", pg.transaction_status
217+
218+
assert pool\release pg
219+
assert.falsy pg.reserved
220+
221+
it "reserved connection is not reused by pool:query", ->
222+
pg = assert pool\reserve!
223+
224+
-- pool:query should create a new connection since first is reserved
225+
assert pool\query "SELECT 1"
226+
assert.same 2, pool\pool_size!
227+
228+
assert pool\release pg
229+
230+
it "auto-rollback on release when in transaction (T state)", ->
231+
pg = assert pool\reserve!
232+
233+
assert pg\query "BEGIN"
234+
assert.same "T", pg.transaction_status
235+
236+
-- Release without committing
237+
assert pool\release pg
238+
assert.falsy pg.reserved
239+
assert.same "I", pg.transaction_status
240+
241+
it "auto-rollback on release when in error state (E state)", ->
242+
pg = assert pool\reserve!
243+
244+
assert pg\query "BEGIN"
245+
-- Cause an error
246+
res, err = pg\query "SELECT * FROM nonexistent_table_xyz"
247+
assert.is_nil res
248+
assert.same "E", pg.transaction_status
249+
250+
-- Release should auto-rollback and recover
251+
assert pool\release pg
252+
assert.falsy pg.reserved
253+
assert.same "I", pg.transaction_status
254+
255+
it "respects max_pool_size when reserving", ->
256+
pool\disconnect!
257+
pool = PostgresPool {
258+
database: DB, user: USER, password: PASSWORD, host: HOST, port: PORT
259+
max_pool_size: 2
260+
}
261+
assert pool\connect!
262+
263+
pg1 = assert pool\reserve!
264+
pg2 = assert pool\reserve!
265+
assert.same 2, pool\pool_size!
266+
267+
res, err = pool\reserve!
268+
assert.is_nil res
269+
assert.same "pool exhausted, max_pool_size reached", err
270+
271+
pool\release pg1
272+
pool\release pg2
273+
274+
it "errors when releasing connection not from this pool", ->
275+
other_pool = PostgresPool { database: DB, user: USER, password: PASSWORD, host: HOST, port: PORT }
276+
assert other_pool\connect!
277+
278+
pg = assert other_pool\reserve!
279+
280+
res, err = pool\release pg
281+
assert.is_nil res
282+
assert.same "connection not from this pool", err
283+
284+
other_pool\release pg
285+
other_pool\disconnect!
286+
287+
it "errors when releasing connection that is not reserved", ->
288+
pg = pool.pool[1]
289+
assert.falsy pg.reserved
290+
291+
res, err = pool\release pg
292+
assert.is_nil res
293+
assert.same "connection not reserved", err
294+
295+
it "reserve returns error when not connected", ->
296+
pool\disconnect!
297+
res, err = pool\reserve!
298+
assert.is_nil res
299+
assert.same "not connected", err

0 commit comments

Comments
 (0)