Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions dialect/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ func DebugWithContext(d Driver, logger func(context.Context, ...any)) Driver {
return drv
}

// Log returns the log function used by this DebugDriver.
func (d *DebugDriver) Log() func(context.Context, ...any) {
return d.log
}

// Exec logs its params and calls the underlying driver Exec method.
func (d *DebugDriver) Exec(ctx context.Context, query string, args, v any) error {
d.log(ctx, fmt.Sprintf("driver.Exec: query=%v args=%v", query, args))
Expand Down
64 changes: 56 additions & 8 deletions dialect/sql/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,75 @@ import (
"strings"

"entgo.io/ent/dialect"
"github.com/ydb-platform/ydb-go-sdk/v3"
)

// Driver is a dialect.Driver implementation for SQL based databases.
type Driver struct {
Conn
dialect string
dialect string
retryExecutor RetryExecutor
}

// NewDriver creates a new Driver with the given Conn and dialect.
func NewDriver(dialect string, c Conn) *Driver {
return &Driver{dialect: dialect, Conn: c}
func NewDriver(
dialect string,
c Conn,
retryExecutor RetryExecutor,
) *Driver {
return &Driver{
Conn: c,
dialect: dialect,
retryExecutor: retryExecutor,
}
}

// Open wraps the database/sql.Open method and returns a dialect.Driver that implements the an ent/dialect.Driver interface.
func Open(dialect, source string) (*Driver, error) {
db, err := sql.Open(dialect, source)
func Open(sqlDialect, dsn string) (*Driver, error) {
var (
db *sql.DB
err error
)

if sqlDialect == dialect.YDB {
nativeDriver, err := ydb.Open(context.Background(), dsn)
if err != nil {
return nil, err
}

conn, err := ydb.Connector(
nativeDriver,
ydb.WithAutoDeclare(),
ydb.WithTablePathPrefix(nativeDriver.Name()),
ydb.WithQueryService(true),
)
if err != nil {
return nil, err
}

db = sql.OpenDB(conn)
} else {
db, err = sql.Open(sqlDialect, dsn)
}

if err != nil {
return nil, err
}
return NewDriver(dialect, Conn{db, dialect}), nil

return NewDriver(
sqlDialect,
Conn{db, sqlDialect},
NewRetryExecutor(sqlDialect, db),
), nil
}

// OpenDB wraps the given database/sql.DB method with a Driver.
func OpenDB(dialect string, db *sql.DB) *Driver {
return NewDriver(dialect, Conn{db, dialect})
func OpenDB(sqlDialect string, db *sql.DB) *Driver {
return NewDriver(
sqlDialect,
Conn{db, sqlDialect},
NewRetryExecutor(sqlDialect, db),
)
}

// DB returns the underlying *sql.DB instance.
Expand Down Expand Up @@ -74,6 +118,10 @@ func (d *Driver) BeginTx(ctx context.Context, opts *TxOptions) (dialect.Tx, erro
}, nil
}

func (d *Driver) RetryExecutor() RetryExecutor {
return d.retryExecutor
}

// Close closes the underlying connection.
func (d *Driver) Close() error { return d.DB().Close() }

Expand Down
50 changes: 44 additions & 6 deletions dialect/sql/sqlgraph/retry.go → dialect/sql/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
// This source code is licensed under the Apache 2.0 license found
// in the LICENSE file in the root directory of this source tree.

package sqlgraph
package sql

import (
"context"
"database/sql"

"entgo.io/ent/dialect"
)
Expand All @@ -31,6 +32,18 @@ type RetryExecutor interface {
) error
}

// NewRetryExecutor creates a new RetryExecutor with the given database connection
func NewRetryExecutor(
sqlDialect string,
db *sql.DB,
) RetryExecutor {
if sqlDialect == dialect.YDB && db != nil {
return &YDBRetryExecutor{db: db}
} else {
return nil
}
}

// RetryExecutorGetter is an optional interface that drivers can implement to provide
// a RetryExecutor for automatic retry handling.
// If a driver implements this interface,
Expand All @@ -41,12 +54,37 @@ type RetryExecutorGetter interface {
RetryExecutor() RetryExecutor
}

// getRetryExecutor returns the RetryExecutor for the given driver if available.
func getRetryExecutor(drv dialect.Driver) RetryExecutor {
if reg, ok := drv.(RetryExecutorGetter); ok {
return reg.RetryExecutor()
// GetRetryExecutor returns the RetryExecutor for the given driver if available.
// If the driver is wrapped with a DebugDriver, the returned executor will preserve
// debug logging by wrapping the driver passed to callback functions.
func GetRetryExecutor(drv dialect.Driver) RetryExecutor {
drv, logFn := unwrapDebugDriver(drv)

getter, ok := drv.(RetryExecutorGetter)
if !ok {
return nil
}

executor := getter.RetryExecutor()
if executor == nil {
return nil
}

if logFn != nil {
return &debugRetryExecutor{
RetryExecutor: executor,
log: logFn,
}
}
return executor
}

// unwrapDebugDriver extracts the underlying driver and log function from a DebugDriver.
func unwrapDebugDriver(drv dialect.Driver) (dialect.Driver, func(context.Context, ...any)) {
if debugDriver, ok := drv.(*dialect.DebugDriver); ok {
return debugDriver.Driver, debugDriver.Log()
}
return nil
return drv, nil
}

// RetryConfig holds retry configuration for sqlgraph operations.
Expand Down
47 changes: 47 additions & 0 deletions dialect/sql/retry_debug.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2019-present Facebook Inc. All rights reserved.
// This source code is licensed under the Apache 2.0 license found
// in the LICENSE file in the root directory of this source tree.

package sql

import (
"context"

"entgo.io/ent/dialect"
)

// debugRetryExecutor wraps a RetryExecutor to preserve debug logging.
type debugRetryExecutor struct {
RetryExecutor
log func(context.Context, ...any)
}

// Do executes the operation with debug logging preserved.
func (d *debugRetryExecutor) Do(
ctx context.Context,
fn func(ctx context.Context, drv dialect.Driver) error,
opts ...any,
) error {
return d.RetryExecutor.Do(
ctx,
func(ctx context.Context, drv dialect.Driver) error {
return fn(ctx, dialect.DebugWithContext(drv, d.log))
},
opts...,
)
}

// DoTx executes the operation within a transaction with debug logging preserved.
func (d *debugRetryExecutor) DoTx(
ctx context.Context,
fn func(ctx context.Context, drv dialect.Driver) error,
opts ...any,
) error {
return d.RetryExecutor.DoTx(
ctx,
func(ctx context.Context, drv dialect.Driver) error {
return fn(ctx, dialect.DebugWithContext(drv, d.log))
},
opts...,
)
}
50 changes: 22 additions & 28 deletions dialect/ydb/retry.go → dialect/sql/retry_ydb.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,25 @@
// This source code is licensed under the Apache 2.0 license found
// in the LICENSE file in the root directory of this source tree.

package ydb
package sql

import (
"context"
"database/sql"

"entgo.io/ent/dialect"
entSql "entgo.io/ent/dialect/sql"
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
)

// RetryExecutor implements sqlgraph.RetryExecutor for YDB
type RetryExecutor struct {
// YDBRetryExecutor implements sqlgraph.YDBRetryExecutor for YDB
type YDBRetryExecutor struct {
db *sql.DB
}

// NewRetryExecutor creates a new RetryExecutor with the given database connection
func NewRetryExecutor(db *sql.DB) *RetryExecutor {
return &RetryExecutor{db: db}
}

// Do executes a read-only operation with retry support.
// It uses ydb-go-sdk's retry.Do which handles YDB-specific retryable errors.
// Options should be created using retry.WithIdempotent(), retry.WithLabel(), etc.
func (r *RetryExecutor) Do(
func (r *YDBRetryExecutor) Do(
ctx context.Context,
fn func(ctx context.Context, drv dialect.Driver) error,
opts ...any,
Expand All @@ -35,7 +29,7 @@ func (r *RetryExecutor) Do(
ctx,
r.db,
func(ctx context.Context, conn *sql.Conn) error {
return fn(ctx, NewRetryDriver(conn))
return fn(ctx, newConnRetryDriver(conn))
},
retry.WithDoRetryOptions(toRetryOptions(opts)...),
)
Expand All @@ -44,7 +38,7 @@ func (r *RetryExecutor) Do(
// DoTx executes the operation within a transaction with retry support.
// It uses ydb-go-sdk's retry.DoTx which handles YDB-specific retryable errors.
// Options should be created using retry.WithIdempotent(), retry.WithLabel(), etc.
func (r *RetryExecutor) DoTx(
func (r *YDBRetryExecutor) DoTx(
ctx context.Context,
fn func(ctx context.Context, drv dialect.Driver) error,
opts ...any,
Expand All @@ -53,7 +47,7 @@ func (r *RetryExecutor) DoTx(
ctx,
r.db,
func(ctx context.Context, tx *sql.Tx) error {
return fn(ctx, NewTxRetryDriver(tx))
return fn(ctx, newTxRetryDriver(tx))
},
retry.WithDoTxRetryOptions(toRetryOptions(opts)...),
)
Expand All @@ -70,41 +64,41 @@ func toRetryOptions(opts []any) []retry.Option {
return retryOpts
}

// RetryDriver is designed for use only in sqlgraph,
// ydbRetryDriver is designed for use only in sqlgraph,
// specifically - in retry.DoTx callbacks
type RetryDriver struct {
entSql.Conn
type ydbRetryDriver struct {
Conn
}

var _ dialect.Driver = (*RetryDriver)(nil)
var _ dialect.Driver = (*ydbRetryDriver)(nil)

// NewTxRetryDriver creates a new RetryDriver from a transaction.
func NewTxRetryDriver(tx *sql.Tx) *RetryDriver {
return &RetryDriver{
Conn: entSql.Conn{ExecQuerier: tx},
// newConnRetryDriver creates a new RetryDriver from a database connection.
func newConnRetryDriver(conn *sql.Conn) *ydbRetryDriver {
return &ydbRetryDriver{
Conn: Conn{ExecQuerier: conn},
}
}

// NewRetryDriver creates a new RetryDriver from a database connection.
func NewRetryDriver(conn *sql.Conn) *RetryDriver {
return &RetryDriver{
Conn: entSql.Conn{ExecQuerier: conn},
// newTxRetryDriver creates a new RetryDriver from a transaction.
func newTxRetryDriver(tx *sql.Tx) *ydbRetryDriver {
return &ydbRetryDriver{
Conn: Conn{ExecQuerier: tx},
}
}

// sqlgraph creates nested transactions in several methods.
// But YDB doesnt support nested transactions.
// Therefore, this methods returns no-op tx
func (d *RetryDriver) Tx(ctx context.Context) (dialect.Tx, error) {
func (d *ydbRetryDriver) Tx(ctx context.Context) (dialect.Tx, error) {
return dialect.NopTx(d), nil
}

// Close is a no-op for RetryDriver since retry.DoTx manages the transaction lifecycle.
func (d *RetryDriver) Close() error {
func (d *ydbRetryDriver) Close() error {
return nil
}

// Dialect returns the YDB dialect name.
func (d *RetryDriver) Dialect() string {
func (d *ydbRetryDriver) Dialect() string {
return dialect.YDB
}
Loading
Loading