Skip to content

Commit 320f960

Browse files
committed
add optional flag to enable multi shard autocommit by default
Adds a new vtgate flag --default-multi-shard-autocommit which, as the name implies, opts the query engine into using multi-shard autocommit semantics by default, even if the plan does not contain the query directive MULTI_SHARD_AUTOCOMMIT. Signed-off-by: Michael Demmer <mdemmer@slack-corp.com>
1 parent ef248b3 commit 320f960

File tree

13 files changed

+45
-13
lines changed

13 files changed

+45
-13
lines changed

go/vt/vtexplain/vtexplain_vtgate.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func (vte *VTExplain) initVtgateExecutor(ctx context.Context, vSchemaStr, ksShar
7575
var schemaTracker vtgate.SchemaInfo // no schema tracker for these tests
7676
queryLogBufferSize := 10
7777
plans := theine.NewStore[vtgate.PlanCacheKey, *engine.Plan](4*1024*1024, false)
78-
vte.vtgateExecutor = vtgate.NewExecutor(ctx, vte.explainTopo, vtexplainCell, resolver, opts.Normalize, false, streamSize, plans, schemaTracker, false, opts.PlannerVersion, 0)
78+
vte.vtgateExecutor = vtgate.NewExecutor(ctx, vte.explainTopo, vtexplainCell, resolver, opts.Normalize, false, streamSize, plans, schemaTracker /* noScatter */, false /* defaultMultiShardAutocommit */, false, opts.PlannerVersion, 0)
7979
vte.vtgateExecutor.SetQueryLogger(streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize))
8080

8181
return nil

go/vt/vtgate/engine/dml.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ func allowOnlyPrimary(rss ...*srvtopo.ResolvedShard) error {
130130
}
131131

132132
func execMultiShard(ctx context.Context, primitive Primitive, vcursor VCursor, rss []*srvtopo.ResolvedShard, queries []*querypb.BoundQuery, multiShardAutoCommit bool) (*sqltypes.Result, error) {
133-
autocommit := (len(rss) == 1 || multiShardAutoCommit) && vcursor.AutocommitApproval()
133+
autocommit := (len(rss) == 1 || vcursor.DefaultMultiShardAutocommit() || multiShardAutoCommit) && vcursor.AutocommitApproval()
134134
result, errs := vcursor.ExecuteMultiShard(ctx, primitive, rss, queries, true /* rollbackOnError */, autocommit)
135135
return result, vterrors.Aggregate(errs)
136136
}

go/vt/vtgate/engine/fake_vcursor_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,10 @@ func (t *noopVCursor) GetDBDDLPluginName() string {
373373
panic("unimplemented")
374374
}
375375

376+
func (t *noopVCursor) DefaultMultiShardAutocommit() bool {
377+
panic("unimplemented")
378+
}
379+
376380
var _ VCursor = (*loggingVCursor)(nil)
377381
var _ SessionActions = (*loggingVCursor)(nil)
378382

@@ -775,6 +779,10 @@ func (f *loggingVCursor) SetPriority(string) {
775779
panic("implement me")
776780
}
777781

782+
func (f *loggingVCursor) DefaultMultiShardAutocommit() bool {
783+
return false
784+
}
785+
778786
func (f *loggingVCursor) FindRoutedTable(tbl sqlparser.TableName) (*vindexes.Table, error) {
779787
f.log = append(f.log, fmt.Sprintf("FindTable(%s)", sqlparser.String(tbl)))
780788
return f.tableRoutes.tbl, nil

go/vt/vtgate/engine/insert.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ func (ins *Insert) executeInsertQueries(
165165
queries []*querypb.BoundQuery,
166166
insertID uint64,
167167
) (*sqltypes.Result, error) {
168-
autocommit := (len(rss) == 1 || ins.MultiShardAutocommit) && vcursor.AutocommitApproval()
168+
autocommit := (len(rss) == 1 || vcursor.DefaultMultiShardAutocommit() || ins.MultiShardAutocommit) && vcursor.AutocommitApproval()
169169
err := allowOnlyPrimary(rss...)
170170
if err != nil {
171171
return nil, err

go/vt/vtgate/engine/insert_select.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ func (ins *InsertSelect) executeInsertQueries(
201201
queries []*querypb.BoundQuery,
202202
insertID uint64,
203203
) (*sqltypes.Result, error) {
204-
autocommit := (len(rss) == 1 || ins.MultiShardAutocommit) && vcursor.AutocommitApproval()
204+
autocommit := (len(rss) == 1 || vcursor.DefaultMultiShardAutocommit() || ins.MultiShardAutocommit) && vcursor.AutocommitApproval()
205205
err := allowOnlyPrimary(rss...)
206206
if err != nil {
207207
return nil, err

go/vt/vtgate/engine/primitive.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,9 @@ type (
129129

130130
// CloneForReplicaWarming clones the VCursor for re-use in warming queries to replicas
131131
CloneForReplicaWarming(ctx context.Context) VCursor
132+
133+
// DefaultMultiShardAutocommit returns true if multi shard autocommit semantics are enabled by default
134+
DefaultMultiShardAutocommit() bool
132135
}
133136

134137
// SessionActions gives primitives ability to interact with the session state

go/vt/vtgate/engine/send.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ func (s *Send) checkAndReturnShards(ctx context.Context, vcursor VCursor) ([]*sr
140140

141141
func (s *Send) canAutoCommit(vcursor VCursor, rss []*srvtopo.ResolvedShard) bool {
142142
if s.IsDML {
143-
return (len(rss) == 1 || s.MultishardAutocommit) && vcursor.AutocommitApproval()
143+
return (len(rss) == 1 || vcursor.DefaultMultiShardAutocommit() || s.MultishardAutocommit) && vcursor.AutocommitApproval()
144144
}
145145
return false
146146
}

go/vt/vtgate/executor.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,9 @@ type Executor struct {
117117
// allowScatter will fail planning if set to false and a plan contains any scatter queries
118118
allowScatter bool
119119

120+
// defaultMultiShardAutocommit will by default execute multi shard DML statements with autocommit
121+
defaultMultiShardAutocommit bool
122+
120123
// queryLogger is passed in for logging from this vtgate executor.
121124
queryLogger *streamlog.StreamLogger[*logstats.LogStats]
122125

@@ -149,7 +152,7 @@ func NewExecutor(
149152
streamSize int,
150153
plans *PlanCache,
151154
schemaTracker SchemaInfo,
152-
noScatter bool,
155+
noScatter, defaultMultiShardAutocommit bool,
153156
pv plancontext.PlannerVersion,
154157
warmingReadsPercent int,
155158
) *Executor {
@@ -168,6 +171,8 @@ func NewExecutor(
168171
plans: plans,
169172
warmingReadsPercent: warmingReadsPercent,
170173
warmingReadsChannel: make(chan bool, warmingReadsConcurrency),
174+
175+
defaultMultiShardAutocommit: defaultMultiShardAutocommit,
171176
}
172177

173178
vschemaacl.Init()

go/vt/vtgate/executor_framework_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ func createExecutorEnvCallback(t testing.TB, eachShard func(shard, ks string, ta
184184
// one-off queries from thrashing the cache. Disable the doorkeeper in the tests to prevent flakiness.
185185
plans := theine.NewStore[PlanCacheKey, *engine.Plan](queryPlanCacheMemory, false)
186186

187-
executor = NewExecutor(ctx, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0)
187+
executor = NewExecutor(ctx, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, false, querypb.ExecuteOptions_Gen4, 0)
188188
executor.SetQueryLogger(queryLogger)
189189

190190
key.AnyShardPicker = DestinationAnyShardPickerFirstShard{}
@@ -231,7 +231,7 @@ func createCustomExecutor(t testing.TB, vschema string) (executor *Executor, sbc
231231

232232
queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize)
233233
plans := DefaultPlanCache()
234-
executor = NewExecutor(ctx, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0)
234+
executor = NewExecutor(ctx, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, false, querypb.ExecuteOptions_Gen4, 0)
235235
executor.SetQueryLogger(queryLogger)
236236

237237
t.Cleanup(func() {
@@ -268,7 +268,7 @@ func createCustomExecutorSetValues(t testing.TB, vschema string, values []*sqlty
268268
sbclookup = hc.AddTestTablet(cell, "0", 1, KsTestUnsharded, "0", topodatapb.TabletType_PRIMARY, true, 1, nil)
269269
queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize)
270270
plans := DefaultPlanCache()
271-
executor = NewExecutor(ctx, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0)
271+
executor = NewExecutor(ctx, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, false, querypb.ExecuteOptions_Gen4, 0)
272272
executor.SetQueryLogger(queryLogger)
273273

274274
t.Cleanup(func() {
@@ -293,7 +293,7 @@ func createExecutorEnvWithPrimaryReplicaConn(t testing.TB, ctx context.Context,
293293
replica = hc.AddTestTablet(cell, "0-replica", 1, KsTestUnsharded, "0", topodatapb.TabletType_REPLICA, true, 1, nil)
294294

295295
queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize)
296-
executor = NewExecutor(ctx, serv, cell, resolver, false, false, testBufferSize, DefaultPlanCache(), nil, false, querypb.ExecuteOptions_Gen4, warmingReadsPercent)
296+
executor = NewExecutor(ctx, serv, cell, resolver, false, false, testBufferSize, DefaultPlanCache(), nil, false, false, querypb.ExecuteOptions_Gen4, warmingReadsPercent)
297297
executor.SetQueryLogger(queryLogger)
298298

299299
t.Cleanup(func() {

go/vt/vtgate/executor_select_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1563,7 +1563,7 @@ func TestStreamSelectIN(t *testing.T) {
15631563
func createExecutor(ctx context.Context, serv *sandboxTopo, cell string, resolver *Resolver) *Executor {
15641564
queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize)
15651565
plans := DefaultPlanCache()
1566-
ex := NewExecutor(ctx, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0)
1566+
ex := NewExecutor(ctx, serv, cell, resolver, false, false, testBufferSize, plans, nil, false, false, querypb.ExecuteOptions_Gen4, 0)
15671567
ex.SetQueryLogger(queryLogger)
15681568
return ex
15691569
}
@@ -3189,7 +3189,7 @@ func TestStreamOrderByLimitWithMultipleResults(t *testing.T) {
31893189
}
31903190
queryLogger := streamlog.New[*logstats.LogStats]("VTGate", queryLogBufferSize)
31913191
plans := DefaultPlanCache()
3192-
executor := NewExecutor(ctx, serv, cell, resolver, true, false, testBufferSize, plans, nil, false, querypb.ExecuteOptions_Gen4, 0)
3192+
executor := NewExecutor(ctx, serv, cell, resolver, true, false, testBufferSize, plans, nil, false, false, querypb.ExecuteOptions_Gen4, 0)
31933193
executor.SetQueryLogger(queryLogger)
31943194
defer executor.Close()
31953195
// some sleep for all goroutines to start

0 commit comments

Comments
 (0)