Skip to content

Commit 672fb3b

Browse files
committed
vtgate: restrict 2PC commit path to only modified shards
Signed-off-by: Samriddha9619 <sumitkumartripathi0@gmail.com>
1 parent 762c494 commit 672fb3b

File tree

2 files changed

+108
-7
lines changed

2 files changed

+108
-7
lines changed

go/vt/vtgate/tx_conn.go

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -262,22 +262,41 @@ func (txc *TxConn) commitNormal(ctx context.Context, session *econtext.SafeSessi
262262

263263
// commit2PC will not used the pinned tablets - to make sure we use the current source, we need to use the gateway's queryservice
264264
func (txc *TxConn) commit2PC(ctx context.Context, session *econtext.SafeSession) (txnType txType, err error) {
265+
var modifiedShards []*vtgatepb.Session_ShardSession
266+
var readOnlyShards []*vtgatepb.Session_ShardSession
267+
268+
for _, s := range session.ShardSessions {
269+
if s.RowsAffected {
270+
modifiedShards = append(modifiedShards, s)
271+
} else {
272+
readOnlyShards = append(readOnlyShards, s)
273+
}
274+
}
275+
276+
if err = txc.runSessions(ctx, readOnlyShards, session.GetLogger(), txc.commitShard); err != nil {
277+
return TXReadOnly, err
278+
}
279+
280+
if len(modifiedShards) == 0 {
281+
return TXReadOnly, nil
282+
}
283+
265284
// If the number of participants is one or less, then it's a normal commit.
266-
if len(session.ShardSessions) <= 1 {
267-
return txc.commitNormal(ctx, session)
285+
if len(modifiedShards) == 1 {
286+
if err = txc.commitShard(ctx, modifiedShards[0], session.GetLogger()); err != nil {
287+
return TXReadWrite, err
288+
}
289+
return TXReadWrite, nil
268290
}
269291

270-
mmShard := session.ShardSessions[0]
271-
rmShards := session.ShardSessions[1:]
292+
mmShard := modifiedShards[0]
293+
rmShards := modifiedShards[1:]
272294
dtid := dtids.New(mmShard)
273295
if mmShard.RowsAffected {
274296
txnType = TXReadWrite
275297
}
276298
participants := make([]*querypb.Target, len(rmShards))
277299
for i, s := range rmShards {
278-
if s.RowsAffected {
279-
txnType = TXReadWrite
280-
}
281300
participants[i] = s.Target
282301
}
283302

go/vt/vtgate/tx_conn_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -964,6 +964,8 @@ func TestTxConnCommit2PC(t *testing.T) {
964964
session := econtext.NewSafeSession(&vtgatepb.Session{InTransaction: true})
965965
sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}, false)
966966
sc.ExecuteMultiShard(ctx, nil, rss01, twoQueries, session, false, false, nullResultsObserver{}, false)
967+
session.ShardSessions[0].RowsAffected = true
968+
session.ShardSessions[1].RowsAffected = true
967969
session.TransactionMode = vtgatepb.TransactionMode_TWOPC
968970
require.NoError(t,
969971
sc.txConn.Commit(ctx, session))
@@ -994,6 +996,8 @@ func TestTxConnCommit2PCCreateTransactionFail(t *testing.T) {
994996
session := econtext.NewSafeSession(&vtgatepb.Session{InTransaction: true})
995997
sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}, false)
996998
sc.ExecuteMultiShard(ctx, nil, rss1, queries, session, false, false, nullResultsObserver{}, false)
999+
session.ShardSessions[0].RowsAffected = true
1000+
session.ShardSessions[1].RowsAffected = true
9971001

9981002
sbc0.MustFailCreateTransaction = 1
9991003
session.TransactionMode = vtgatepb.TransactionMode_TWOPC
@@ -1016,6 +1020,8 @@ func TestTxConnCommit2PCPrepareFail(t *testing.T) {
10161020
session := econtext.NewSafeSession(&vtgatepb.Session{InTransaction: true})
10171021
sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}, false)
10181022
sc.ExecuteMultiShard(ctx, nil, rss01, twoQueries, session, false, false, nullResultsObserver{}, false)
1023+
session.ShardSessions[0].RowsAffected = true
1024+
session.ShardSessions[1].RowsAffected = true
10191025

10201026
sbc1.MustFailPrepare = 1
10211027
session.TransactionMode = vtgatepb.TransactionMode_TWOPC
@@ -1042,6 +1048,8 @@ func TestTxConnCommit2PCStartCommitFail(t *testing.T) {
10421048
session := econtext.NewSafeSession(&vtgatepb.Session{InTransaction: true})
10431049
sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}, false)
10441050
sc.ExecuteMultiShard(ctx, nil, rss01, twoQueries, session, false, false, nullResultsObserver{}, false)
1051+
session.ShardSessions[0].RowsAffected = true
1052+
session.ShardSessions[1].RowsAffected = true
10451053

10461054
sbc0.MustFailStartCommit = 1
10471055
session.TransactionMode = vtgatepb.TransactionMode_TWOPC
@@ -1061,6 +1069,8 @@ func TestTxConnCommit2PCStartCommitFail(t *testing.T) {
10611069
session = econtext.NewSafeSession(&vtgatepb.Session{InTransaction: true})
10621070
sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}, false)
10631071
sc.ExecuteMultiShard(ctx, nil, rss01, twoQueries, session, false, false, nullResultsObserver{}, false)
1072+
session.ShardSessions[0].RowsAffected = true
1073+
session.ShardSessions[1].RowsAffected = true
10641074

10651075
// Here the StartCommit failure is in uncertain state so rollback is not called and neither conclude.
10661076
sbc0.MustFailStartCommitUncertain = 1
@@ -1084,6 +1094,8 @@ func TestTxConnCommit2PCCommitPreparedFail(t *testing.T) {
10841094
session := econtext.NewSafeSession(&vtgatepb.Session{InTransaction: true})
10851095
sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}, false)
10861096
sc.ExecuteMultiShard(ctx, nil, rss01, twoQueries, session, false, false, nullResultsObserver{}, false)
1097+
session.ShardSessions[0].RowsAffected = true
1098+
session.ShardSessions[1].RowsAffected = true
10871099

10881100
sbc1.MustFailCommitPrepared = 1
10891101
session.TransactionMode = vtgatepb.TransactionMode_TWOPC
@@ -1104,6 +1116,8 @@ func TestTxConnCommit2PCConcludeTransactionFail(t *testing.T) {
11041116
session := econtext.NewSafeSession(&vtgatepb.Session{InTransaction: true})
11051117
sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}, false)
11061118
sc.ExecuteMultiShard(ctx, nil, rss01, twoQueries, session, false, false, nullResultsObserver{}, false)
1119+
session.ShardSessions[0].RowsAffected = true
1120+
session.ShardSessions[1].RowsAffected = true
11071121

11081122
sbc0.MustFailConcludeTransaction = 1
11091123
session.TransactionMode = vtgatepb.TransactionMode_TWOPC
@@ -1116,6 +1130,74 @@ func TestTxConnCommit2PCConcludeTransactionFail(t *testing.T) {
11161130
assert.EqualValues(t, 1, sbc0.ConcludeTransactionCount.Load(), "sbc0.ConcludeTransactionCount")
11171131
}
11181132

1133+
func TestTxConnCommit2PCReadOnlyShardsExcluded(t *testing.T) {
1134+
ctx := utils.LeakCheckContext(t)
1135+
1136+
sc, sbc0, sbc1, rss0, rss1, _ := newTestTxConnEnv(t, ctx, "TestTxConnCommit2PCReadOnlyShardsExcluded")
1137+
1138+
session := econtext.NewSafeSession(&vtgatepb.Session{InTransaction: true})
1139+
sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}, false)
1140+
sc.ExecuteMultiShard(ctx, nil, rss1, queries, session, false, false, nullResultsObserver{}, false)
1141+
1142+
session.ShardSessions[0].RowsAffected = true
1143+
session.ShardSessions[1].RowsAffected = false
1144+
1145+
session.TransactionMode = vtgatepb.TransactionMode_TWOPC
1146+
require.NoError(t, sc.txConn.Commit(ctx, session))
1147+
1148+
assert.EqualValues(t, 1, sbc0.CommitCount.Load(), "sbc0.CommitCount")
1149+
assert.EqualValues(t, 1, sbc1.CommitCount.Load(), "sbc1.CommitCount")
1150+
assert.EqualValues(t, 0, sbc0.CreateTransactionCount.Load(), "sbc0.CreateTransactionCount")
1151+
assert.EqualValues(t, 0, sbc1.PrepareCount.Load(), "sbc1.PrepareCount")
1152+
}
1153+
1154+
func TestTxConnCommit2PCAllShardsReadOnly(t *testing.T) {
1155+
ctx := utils.LeakCheckContext(t)
1156+
1157+
sc, sbc0, sbc1, rss0, rss1, _ := newTestTxConnEnv(t, ctx, "TestTxConnCommit2PCAllShardsReadOnly")
1158+
1159+
session := econtext.NewSafeSession(&vtgatepb.Session{InTransaction: true})
1160+
sc.ExecuteMultiShard(ctx, nil, rss0, queries, session, false, false, nullResultsObserver{}, false)
1161+
sc.ExecuteMultiShard(ctx, nil, rss1, queries, session, false, false, nullResultsObserver{}, false)
1162+
1163+
session.ShardSessions[0].RowsAffected = false
1164+
session.ShardSessions[1].RowsAffected = false
1165+
1166+
session.TransactionMode = vtgatepb.TransactionMode_TWOPC
1167+
require.NoError(t, sc.txConn.Commit(ctx, session))
1168+
1169+
assert.EqualValues(t, 1, sbc0.CommitCount.Load(), "sbc0.CommitCount")
1170+
assert.EqualValues(t, 1, sbc1.CommitCount.Load(), "sbc1.CommitCount")
1171+
assert.EqualValues(t, 0, sbc0.CreateTransactionCount.Load(), "sbc0.CreateTransactionCount")
1172+
assert.EqualValues(t, 0, sbc1.PrepareCount.Load(), "sbc1.PrepareCount")
1173+
}
1174+
1175+
func TestTxConnCommit2PCWithReadOnlyShardAndTwoModifiedShards(t *testing.T) {
1176+
ctx := utils.LeakCheckContext(t)
1177+
1178+
sc, sbcs, rssm, _ := newTestTxConnEnvNShards(t, ctx, "TestTxConnCommit2PCWithReadOnlyShardAndTwoModifiedShards", 3)
1179+
1180+
session := econtext.NewSafeSession(&vtgatepb.Session{InTransaction: true})
1181+
sc.ExecuteMultiShard(ctx, nil, rssm[0], queries, session, false, false, nullResultsObserver{}, false)
1182+
sc.ExecuteMultiShard(ctx, nil, rssm[1], queries, session, false, false, nullResultsObserver{}, false)
1183+
sc.ExecuteMultiShard(ctx, nil, rssm[2], queries, session, false, false, nullResultsObserver{}, false)
1184+
1185+
session.ShardSessions[0].RowsAffected = true
1186+
session.ShardSessions[1].RowsAffected = false
1187+
session.ShardSessions[2].RowsAffected = true
1188+
1189+
session.TransactionMode = vtgatepb.TransactionMode_TWOPC
1190+
require.NoError(t, sc.txConn.Commit(ctx, session))
1191+
1192+
assert.EqualValues(t, 1, sbcs[1].CommitCount.Load(), "sbcs[1].CommitCount")
1193+
1194+
assert.EqualValues(t, 1, sbcs[0].CreateTransactionCount.Load(), "sbcs[0].CreateTransactionCount")
1195+
assert.EqualValues(t, 1, sbcs[2].PrepareCount.Load(), "sbcs[2].PrepareCount")
1196+
assert.EqualValues(t, 1, sbcs[0].StartCommitCount.Load(), "sbcs[0].StartCommitCount")
1197+
assert.EqualValues(t, 1, sbcs[2].CommitPreparedCount.Load(), "sbcs[2].CommitPreparedCount")
1198+
assert.EqualValues(t, 1, sbcs[0].ConcludeTransactionCount.Load(), "sbcs[0].ConcludeTransactionCount")
1199+
}
1200+
11191201
func TestTxConnRollback(t *testing.T) {
11201202
ctx := utils.LeakCheckContext(t)
11211203

0 commit comments

Comments
 (0)