Skip to content

Commit 5b47103

Browse files
refactored retries and open template
1 parent 7bd3cb7 commit 5b47103

File tree

152 files changed

+568
-1170
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

152 files changed

+568
-1170
lines changed

dialect/dialect.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,11 @@ func DebugWithContext(d Driver, logger func(context.Context, ...any)) Driver {
8888
return drv
8989
}
9090

91+
// Log returns the log function used by this DebugDriver.
92+
func (d *DebugDriver) Log() func(context.Context, ...any) {
93+
return d.log
94+
}
95+
9196
// Exec logs its params and calls the underlying driver Exec method.
9297
func (d *DebugDriver) Exec(ctx context.Context, query string, args, v any) error {
9398
d.log(ctx, fmt.Sprintf("driver.Exec: query=%v args=%v", query, args))

dialect/sql/driver.go

Lines changed: 56 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,31 +14,75 @@ import (
1414
"strings"
1515

1616
"entgo.io/ent/dialect"
17+
"github.com/ydb-platform/ydb-go-sdk/v3"
1718
)
1819

1920
// Driver is a dialect.Driver implementation for SQL based databases.
2021
type Driver struct {
2122
Conn
22-
dialect string
23+
dialect string
24+
retryExecutor RetryExecutor
2325
}
2426

2527
// NewDriver creates a new Driver with the given Conn and dialect.
26-
func NewDriver(dialect string, c Conn) *Driver {
27-
return &Driver{dialect: dialect, Conn: c}
28+
func NewDriver(
29+
dialect string,
30+
c Conn,
31+
retryExecutor RetryExecutor,
32+
) *Driver {
33+
return &Driver{
34+
Conn: c,
35+
dialect: dialect,
36+
retryExecutor: retryExecutor,
37+
}
2838
}
2939

3040
// Open wraps the database/sql.Open method and returns a dialect.Driver that implements the an ent/dialect.Driver interface.
31-
func Open(dialect, source string) (*Driver, error) {
32-
db, err := sql.Open(dialect, source)
41+
func Open(sqlDialect, dsn string) (*Driver, error) {
42+
var (
43+
db *sql.DB
44+
err error
45+
)
46+
47+
if sqlDialect == dialect.YDB {
48+
nativeDriver, err := ydb.Open(context.Background(), dsn)
49+
if err != nil {
50+
return nil, err
51+
}
52+
53+
conn, err := ydb.Connector(
54+
nativeDriver,
55+
ydb.WithAutoDeclare(),
56+
ydb.WithTablePathPrefix(nativeDriver.Name()),
57+
ydb.WithQueryService(true),
58+
)
59+
if err != nil {
60+
return nil, err
61+
}
62+
63+
db = sql.OpenDB(conn)
64+
} else {
65+
db, err = sql.Open(sqlDialect, dsn)
66+
}
67+
3368
if err != nil {
3469
return nil, err
3570
}
36-
return NewDriver(dialect, Conn{db, dialect}), nil
71+
72+
return NewDriver(
73+
sqlDialect,
74+
Conn{db, sqlDialect},
75+
NewRetryExecutor(sqlDialect, db),
76+
), nil
3777
}
3878

3979
// OpenDB wraps the given database/sql.DB method with a Driver.
40-
func OpenDB(dialect string, db *sql.DB) *Driver {
41-
return NewDriver(dialect, Conn{db, dialect})
80+
func OpenDB(sqlDialect string, db *sql.DB) *Driver {
81+
return NewDriver(
82+
sqlDialect,
83+
Conn{db, sqlDialect},
84+
NewRetryExecutor(sqlDialect, db),
85+
)
4286
}
4387

4488
// DB returns the underlying *sql.DB instance.
@@ -74,6 +118,10 @@ func (d *Driver) BeginTx(ctx context.Context, opts *TxOptions) (dialect.Tx, erro
74118
}, nil
75119
}
76120

121+
func (d *Driver) RetryExecutor() RetryExecutor {
122+
return d.retryExecutor
123+
}
124+
77125
// Close closes the underlying connection.
78126
func (d *Driver) Close() error { return d.DB().Close() }
79127

dialect/sql/retry.go

Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
// Copyright 2019-present Facebook Inc. All rights reserved.
2+
// This source code is licensed under the Apache 2.0 license found
3+
// in the LICENSE file in the root directory of this source tree.
4+
5+
package sql
6+
7+
import (
8+
"context"
9+
"database/sql"
10+
"errors"
11+
12+
"entgo.io/ent/dialect"
13+
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
14+
)
15+
16+
// RetryExecutor is an interface for database operations with automatic retries.
17+
type RetryExecutor interface {
18+
// Do executes the given function within a retry loop without a transaction.
19+
// The function receives a dialect.Driver that wraps the connection.
20+
// opts are driver-specific retry options (e.g., ydb retry.Option).
21+
Do(
22+
ctx context.Context,
23+
fn func(ctx context.Context, drv dialect.Driver) error,
24+
opts ...any,
25+
) error
26+
27+
// DoTx executes the given function within a retry loop with a transaction.
28+
// The function receives a dialect.Driver that wraps the database/sql.Tx transaction.
29+
// opts are driver-specific retry options (e.g., ydb retry.Option).
30+
DoTx(
31+
ctx context.Context,
32+
fn func(ctx context.Context, drv dialect.Driver) error,
33+
opts ...any,
34+
) error
35+
}
36+
37+
// NewRetryExecutor creates a new RetryExecutor with the given database connection
38+
func NewRetryExecutor(
39+
sqlDialect string,
40+
db *sql.DB,
41+
) RetryExecutor {
42+
if sqlDialect == dialect.YDB && db != nil {
43+
return &YDBRetryExecutor{db: db}
44+
} else {
45+
return nil
46+
}
47+
}
48+
49+
// RetryExecutorGetter is an optional interface that drivers can implement to provide
50+
// a RetryExecutor for automatic retry handling.
51+
// If a driver implements this interface,
52+
// sqlgraph operations will use the RetryExecutor for database operations.
53+
type RetryExecutorGetter interface {
54+
// RetryExecutor returns the RetryExecutor for this driver.
55+
// If nil is returned, no retry handling will be applied.
56+
RetryExecutor() RetryExecutor
57+
}
58+
59+
// GetRetryExecutor returns the RetryExecutor for the given driver if available.
60+
// If the driver is wrapped with a DebugDriver, the returned executor will preserve
61+
// debug logging by wrapping the driver passed to callback functions.
62+
func GetRetryExecutor(drv dialect.Driver) RetryExecutor {
63+
var logFn func(context.Context, ...any)
64+
if dd, ok := drv.(*dialect.DebugDriver); ok {
65+
logFn = dd.Log()
66+
drv = dd.Driver
67+
}
68+
if getter, ok := drv.(RetryExecutorGetter); ok {
69+
executor := getter.RetryExecutor()
70+
if executor == nil {
71+
return nil
72+
}
73+
if logFn != nil {
74+
return &debugRetryExecutor{
75+
RetryExecutor: executor,
76+
log: logFn,
77+
}
78+
}
79+
return executor
80+
}
81+
return nil
82+
}
83+
84+
// debugRetryExecutor wraps a RetryExecutor to preserve debug logging.
85+
type debugRetryExecutor struct {
86+
RetryExecutor
87+
log func(context.Context, ...any)
88+
}
89+
90+
// Do executes the operation with debug logging preserved.
91+
func (d *debugRetryExecutor) Do(
92+
ctx context.Context,
93+
fn func(ctx context.Context, drv dialect.Driver) error,
94+
opts ...any,
95+
) error {
96+
return d.RetryExecutor.Do(
97+
ctx,
98+
func(ctx context.Context, drv dialect.Driver) error {
99+
return fn(ctx, dialect.DebugWithContext(drv, d.log))
100+
},
101+
opts...,
102+
)
103+
}
104+
105+
// DoTx executes the operation within a transaction with debug logging preserved.
106+
func (d *debugRetryExecutor) DoTx(
107+
ctx context.Context,
108+
fn func(ctx context.Context, drv dialect.Driver) error,
109+
opts ...any,
110+
) error {
111+
return d.RetryExecutor.DoTx(
112+
ctx,
113+
func(ctx context.Context, drv dialect.Driver) error {
114+
return fn(ctx, dialect.DebugWithContext(drv, d.log))
115+
},
116+
opts...,
117+
)
118+
}
119+
120+
// RetryConfig holds retry configuration for sqlgraph operations.
121+
// This is used to pass retry options to the RetryExecutor.
122+
type RetryConfig struct {
123+
// Options are driver-specific retry options.
124+
Options []any
125+
}
126+
127+
// YDBRetryExecutor implements sqlgraph.YDBRetryExecutor for YDB
128+
type YDBRetryExecutor struct {
129+
db *sql.DB
130+
}
131+
132+
// Do executes a read-only operation with retry support.
133+
// It uses ydb-go-sdk's retry.Do which handles YDB-specific retryable errors.
134+
// Options should be created using retry.WithIdempotent(), retry.WithLabel(), etc.
135+
func (r *YDBRetryExecutor) Do(
136+
ctx context.Context,
137+
fn func(ctx context.Context, drv dialect.Driver) error,
138+
opts ...any,
139+
) error {
140+
err := retry.Do(
141+
ctx,
142+
r.db,
143+
func(ctx context.Context, conn *sql.Conn) error {
144+
return fn(ctx, newConnRetryDriver(conn))
145+
},
146+
retry.WithDoRetryOptions(toRetryOptions(opts)...),
147+
)
148+
return unwrapYDBError(err)
149+
}
150+
151+
// DoTx executes the operation within a transaction with retry support.
152+
// It uses ydb-go-sdk's retry.DoTx which handles YDB-specific retryable errors.
153+
// Options should be created using retry.WithIdempotent(), retry.WithLabel(), etc.
154+
func (r *YDBRetryExecutor) DoTx(
155+
ctx context.Context,
156+
fn func(ctx context.Context, drv dialect.Driver) error,
157+
opts ...any,
158+
) error {
159+
err := retry.DoTx(
160+
ctx,
161+
r.db,
162+
func(ctx context.Context, tx *sql.Tx) error {
163+
return fn(ctx, newTxRetryDriver(tx))
164+
},
165+
retry.WithDoTxRetryOptions(toRetryOptions(opts)...),
166+
)
167+
return unwrapYDBError(err)
168+
}
169+
170+
// unwrapYDBError extracts the original error from YDB's error wrapping.
171+
// YDB SDK wraps errors with stack traces and retry context, which changes
172+
// the error message.
173+
func unwrapYDBError(err error) error {
174+
if err == nil {
175+
return nil
176+
}
177+
original := err
178+
for {
179+
unwrapped := errors.Unwrap(original)
180+
if unwrapped == nil {
181+
break
182+
}
183+
original = unwrapped
184+
}
185+
return original
186+
}
187+
188+
// toRetryOptions converts a slice of any options to retry.Option slice
189+
func toRetryOptions(opts []any) []retry.Option {
190+
retryOpts := make([]retry.Option, 0, len(opts))
191+
for _, opt := range opts {
192+
if ro, ok := opt.(retry.Option); ok {
193+
retryOpts = append(retryOpts, ro)
194+
}
195+
}
196+
return retryOpts
197+
}
198+
199+
// retryDriver is designed for use only in sqlgraph,
200+
// specifically - in retry.DoTx callbacks
201+
type retryDriver struct {
202+
Conn
203+
}
204+
205+
var _ dialect.Driver = (*retryDriver)(nil)
206+
207+
// newConnRetryDriver creates a new RetryDriver from a database connection.
208+
func newConnRetryDriver(conn *sql.Conn) *retryDriver {
209+
return &retryDriver{
210+
Conn: Conn{ExecQuerier: conn},
211+
}
212+
}
213+
214+
// newTxRetryDriver creates a new RetryDriver from a transaction.
215+
func newTxRetryDriver(tx *sql.Tx) *retryDriver {
216+
return &retryDriver{
217+
Conn: Conn{ExecQuerier: tx},
218+
}
219+
}
220+
221+
// sqlgraph creates nested transactions in several methods.
222+
// But YDB doesnt support nested transactions.
223+
// Therefore, this methods returns no-op tx
224+
func (d *retryDriver) Tx(ctx context.Context) (dialect.Tx, error) {
225+
return dialect.NopTx(d), nil
226+
}
227+
228+
// Close is a no-op for RetryDriver since retry.DoTx manages the transaction lifecycle.
229+
func (d *retryDriver) Close() error {
230+
return nil
231+
}
232+
233+
// Dialect returns the YDB dialect name.
234+
func (d *retryDriver) Dialect() string {
235+
return dialect.YDB
236+
}

0 commit comments

Comments
 (0)