Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 110 additions & 10 deletions dialect/sql/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -586,14 +586,28 @@ type UpdateBuilder struct {
order []any
limit *int
prefix Queries
onSelect *Selector // YDB-specific: UPDATE ON subquery

// YDB-specific:
onSelect *Selector // UPDATE ON subquery
isBatch bool // use BATCH UPDATE instead of UPDATE
}

// Update creates a builder for the `UPDATE` statement.
//
// Update("users").Set("name", "foo").Set("age", 10)
func Update(table string) *UpdateBuilder { return &UpdateBuilder{table: table} }

// BatchUpdate creates a builder for the `BATCH UPDATE` statement (YDB-specific).
// BATCH UPDATE processes large tables in batches, minimizing lock invalidation risk.
//
// Note: BATCH UPDATE is only supported in YDB dialect.
// Note: YDB uses path notation for tables, e.g. "/database/path/to/table"
//
// BatchUpdate("/local/my_table").Set("status", "active").Where(GT("id", 100))
func BatchUpdate(table string) *UpdateBuilder {
return &UpdateBuilder{table: table, isBatch: true}
}

// Schema sets the database name for the updated table.
func (u *UpdateBuilder) Schema(name string) *UpdateBuilder {
u.schema = name
Expand Down Expand Up @@ -717,12 +731,36 @@ func (u *UpdateBuilder) On(s *Selector) *UpdateBuilder {

// Query returns query representation of an `UPDATE` statement.
func (u *UpdateBuilder) Query() (string, []any) {
query, args, _ := u.QueryErr()
return query, args
}

func (u *UpdateBuilder) QueryErr() (string, []any, error) {
b := u.Builder.clone()
if len(u.prefix) > 0 {
b.join(u.prefix, " ")
b.Pad()
}
b.WriteString("UPDATE ")

// BATCH UPDATE (YDB-specific)
if u.isBatch {
if !b.ydb() {
b.AddError(fmt.Errorf("BATCH UPDATE: unsupported dialect: %q", b.dialect))
return "", nil, b.Err()
}
if len(u.returning) > 0 {
b.AddError(fmt.Errorf("BATCH UPDATE: RETURNING clause is not supported"))
return "", nil, b.Err()
}
if u.onSelect != nil {
b.AddError(fmt.Errorf("BATCH UPDATE: UPDATE ON pattern is not supported"))
return "", nil, b.Err()
}
b.WriteString("BATCH UPDATE ")
} else {
b.WriteString("UPDATE ")
}

b.writeSchema(u.schema)
b.Ident(u.table)

Expand All @@ -731,7 +769,7 @@ func (u *UpdateBuilder) Query() (string, []any) {
b.WriteString(" ON ")
b.Join(u.onSelect)
joinReturning(u.returning, &b)
return b.String(), b.args
return b.String(), b.args, nil
}

// Standard UPDATE SET pattern
Expand All @@ -747,7 +785,7 @@ func (u *UpdateBuilder) Query() (string, []any) {
b.WriteString(" LIMIT ")
b.WriteString(strconv.Itoa(*u.limit))
}
return b.String(), b.args
return b.String(), b.args, nil
}

// writeSetter writes the "SET" clause for the UPDATE statement.
Expand Down Expand Up @@ -782,9 +820,10 @@ type DeleteBuilder struct {
schema string
where *Predicate

// For YDB's DELETE FROM ... ON SELECT pattern
onSelect *Selector
// YDB-specific:
onSelect *Selector // DELETE FROM ... ON SELECT pattern
returning []string
isBatch bool // use BATCH DELETE instead of DELETE
}

// Delete creates a builder for the `DELETE` statement.
Expand All @@ -802,6 +841,17 @@ type DeleteBuilder struct {
// )
func Delete(table string) *DeleteBuilder { return &DeleteBuilder{table: table} }

// BatchDelete creates a builder for the `BATCH DELETE FROM` statement (YDB-specific).
// BATCH DELETE processes large tables in batches, minimizing lock invalidation risk.
//
// Note: BATCH DELETE is only supported in YDB dialect.
//
// BatchDelete("/local/my_table")
// .Where(GT("Key1", 1))
func BatchDelete(table string) *DeleteBuilder {
return &DeleteBuilder{table: table, isBatch: true}
}

// Schema sets the database name for the table whose row will be deleted.
func (d *DeleteBuilder) Schema(name string) *DeleteBuilder {
d.schema = name
Expand Down Expand Up @@ -854,8 +904,32 @@ func (d *DeleteBuilder) Returning(columns ...string) *DeleteBuilder {

// Query returns query representation of a `DELETE` statement.
func (d *DeleteBuilder) Query() (string, []any) {
query, args, _ := d.QueryErr()
return query, args
}

func (d *DeleteBuilder) QueryErr() (string, []any, error) {
b := d.Builder.clone()
b.WriteString("DELETE FROM ")

// BATCH DELETE (YDB-specific)
if d.isBatch {
if !b.ydb() {
b.AddError(fmt.Errorf("BATCH DELETE: unsupported dialect: %q", b.dialect))
return "", nil, b.Err()
}
if len(d.returning) > 0 {
b.AddError(fmt.Errorf("BATCH DELETE: RETURNING clause is not supported"))
return "", nil, b.Err()
}
if d.onSelect != nil {
b.AddError(fmt.Errorf("BATCH DELETE: DELETE ON pattern is not supported"))
return "", nil, b.Err()
}
b.WriteString("BATCH DELETE FROM ")
} else {
b.WriteString("DELETE FROM ")
}

b.writeSchema(d.schema)
b.Ident(d.table)

Expand All @@ -870,7 +944,7 @@ func (d *DeleteBuilder) Query() (string, []any) {
}

joinReturning(d.returning, &b)
return b.String(), b.args
return b.String(), b.args, nil
}

// Predicate is a where predicate.
Expand Down Expand Up @@ -3286,9 +3360,10 @@ func (b *Builder) AddError(err error) *Builder {
}

func (b *Builder) writeSchema(schema string) {
if schema != "" && b.dialect != dialect.SQLite {
b.Ident(schema).WriteByte('.')
if schema == "" || b.sqlite() || b.ydb() {
return
}
b.Ident(schema).WriteByte('.')
}

// Err returns a concatenated error of all errors encountered during
Expand Down Expand Up @@ -3711,6 +3786,19 @@ func (d *DialectBuilder) Update(table string) *UpdateBuilder {
return b
}

// BatchUpdate creates an UpdateBuilder for the BATCH UPDATE statement with the configured dialect.
// BATCH UPDATE is only supported in YDB dialect.
//
// Dialect(dialect.YDB).
// BatchUpdate("users").
// Set("status", "active").
// Where(GT("created_at", time.Now()))
func (d *DialectBuilder) BatchUpdate(table string) *UpdateBuilder {
b := BatchUpdate(table)
b.SetDialect(d.dialect)
return b
}

// Delete creates a DeleteBuilder for the configured dialect.
//
// Dialect(dialect.Postgres).
Expand All @@ -3721,6 +3809,18 @@ func (d *DialectBuilder) Delete(table string) *DeleteBuilder {
return b
}

// BatchDelete creates a DeleteBuilder for the BATCH DELETE statement with the configured dialect.
// BATCH DELETE is only supported in YDB dialect.
//
// Dialect(dialect.YDB).
// BatchDelete("users").
// Where(GT("Key1", 1))
func (d *DialectBuilder) BatchDelete(table string) *DeleteBuilder {
b := BatchDelete(table)
b.SetDialect(d.dialect)
return b
}

// Select creates a Selector for the configured dialect.
//
// Dialect(dialect.Postgres).
Expand Down
115 changes: 115 additions & 0 deletions dialect/sql/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2336,6 +2336,121 @@ func TestSelector_VIEW_SecondaryIndex_YDB(t *testing.T) {
})
}

func TestBatchUpdate_YDB(t *testing.T) {
t.Run("Basic BATCH UPDATE with SET and WHERE", func(t *testing.T) {
d := Dialect(dialect.YDB)
query, args := d.BatchUpdate("my_table").
Set("Value1", "foo").
Set("Value2", 0).
Where(GT("Key1", 1)).
Query()

require.Equal(t, "BATCH UPDATE `my_table` SET `Value1` = $p0, `Value2` = $p1 WHERE `Key1` > $p2", query)
require.Equal(t, []any{
driver.NamedValue{Name: "p0", Value: "foo"},
driver.NamedValue{Name: "p1", Value: 0},
driver.NamedValue{Name: "p2", Value: 1},
}, args)
})

t.Run("BATCH UPDATE on non-YDB dialect should error", func(t *testing.T) {
builder := Dialect(dialect.MySQL).
BatchUpdate("users").
Set("status", "active").
Where(GT("created_at", "2024-01-01"))

query, args, err := builder.QueryErr()

require.Empty(t, query)
require.Empty(t, args)
require.Error(t, err)
})

t.Run("BATCH UPDATE with RETURNING should error", func(t *testing.T) {
builder := Dialect(dialect.YDB).
BatchUpdate("users").
Set("status", "active").
Returning("id", "status")

query, args, err := builder.QueryErr()

require.Empty(t, query)
require.Empty(t, args)
require.Error(t, err)
})

t.Run("BATCH UPDATE with UPDATE ON pattern should error", func(t *testing.T) {
d := Dialect(dialect.YDB)
subquery := d.Select("id").From(Table("orders")).Where(EQ("status", "pending"))

builder := d.BatchUpdate("users").
Set("status", "active").
On(subquery)

query, args, err := builder.QueryErr()

require.Empty(t, query)
require.Empty(t, args)
require.Error(t, err)
require.Contains(t, err.Error(), "BATCH UPDATE: UPDATE ON pattern is not supported")
})
}

func TestBatchDelete_YDB(t *testing.T) {
t.Run("Basic BATCH DELETE", func(t *testing.T) {
d := Dialect(dialect.YDB)
query, args := d.BatchDelete("my_table").
Where(And(GT("Key1", 1), GTE("Key2", "One"))).
Query()

require.Equal(t, "BATCH DELETE FROM `my_table` WHERE `Key1` > $p0 AND `Key2` >= $p1", query)
require.Equal(t, []any{
driver.NamedValue{Name: "p0", Value: 1},
driver.NamedValue{Name: "p1", Value: "One"},
}, args)
})

t.Run("BATCH DELETE on non-YDB dialect should error", func(t *testing.T) {
builder := Dialect(dialect.MySQL).
BatchDelete("users").
Where(GT("id", 100))

query, args, err := builder.QueryErr()

require.Empty(t, query)
require.Empty(t, args)
require.Error(t, err)
})

t.Run("BATCH DELETE with RETURNING should error", func(t *testing.T) {
builder := Dialect(dialect.YDB).
BatchDelete("users").
Where(GT("id", 100)).
Returning("id")

query, args, err := builder.QueryErr()

require.Empty(t, query)
require.Empty(t, args)
require.Error(t, err)
})

t.Run("BATCH DELETE with DELETE ON pattern should error", func(t *testing.T) {
d := Dialect(dialect.YDB)
subquery := d.Select("id").From(Table("users")).Where(EQ("status", "deleted"))

builder := d.BatchDelete("users").
On(subquery)

query, args, err := builder.QueryErr()

require.Empty(t, query)
require.Empty(t, args)
require.Error(t, err)
require.Contains(t, err.Error(), "BATCH DELETE: DELETE ON pattern is not supported")
})
}

func TestCreateView_YDB(t *testing.T) {
t.Run("Basic view with security_invoker", func(t *testing.T) {
d := Dialect(dialect.YDB)
Expand Down
Loading