Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ require (
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.6 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/eapache/queue/v2 v2.0.0-20230407133247-75960ed334e4 // indirect
github.com/ebitengine/purego v0.8.2 // indirect
github.com/envoyproxy/go-control-plane/envoy v1.32.4 // indirect
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,6 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8Yc
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 h1:fAjc9m62+UWV/WAFKLNi6ZS0675eEUC9y3AlwSbQu1Y=
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
Expand Down
20 changes: 20 additions & 0 deletions go/test/endtoend/vreplication/initial_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"fmt"
"math/rand/v2"
"os"
"strings"
"testing"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/log"
)

Expand All @@ -43,6 +45,12 @@ func insertInitialData(t *testing.T) {
`[[VARCHAR("Monoprice") VARCHAR("eléctronics")] [VARCHAR("newegg") VARCHAR("elec†ronics")]]`)

insertJSONValues(t)

insertLargeTransactionForChunkTesting(t, vtgateConn, defaultSourceKs+":0", 50000)
log.Infof("Inserted large transaction for chunking tests")

execVtgateQuery(t, vtgateConn, defaultSourceKs, "delete from customer where cid >= 50000 and cid < 50100")
log.Infof("Cleaned up chunk testing rows from source keyspace")
})
}

Expand Down Expand Up @@ -140,3 +148,15 @@ func insertIntoBlobTable(t *testing.T) {
execVtgateQuery(t, vtgateConn, fmt.Sprintf("%s:0", defaultSourceKs), query)
}
}

// insertLargeTransactionForChunkTesting inserts a transaction large enough to exceed the 1KB chunking threshold.
func insertLargeTransactionForChunkTesting(t *testing.T, vtgateConn *mysql.Conn, keyspace string, startID int) {
execVtgateQuery(t, vtgateConn, keyspace, "BEGIN")
for i := 0; i < 15; i++ {
largeData := strings.Repeat("x", 94) + fmt.Sprintf("_%05d", i)
query := fmt.Sprintf("INSERT INTO customer (cid, name) VALUES (%d, '%s')",
startID+i, largeData)
execVtgateQuery(t, vtgateConn, keyspace, query)
}
execVtgateQuery(t, vtgateConn, keyspace, "COMMIT")
}
75 changes: 51 additions & 24 deletions go/test/endtoend/vreplication/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestVStreamWithTablesToSkipCopyFlag(t *testing.T) {
defaultRdonly = 0

defaultCell := vc.Cells[vc.CellNames[0]]
vc.AddKeyspace(t, []*Cell{defaultCell}, "product", "0", initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100, nil)
vc.AddKeyspace(t, []*Cell{defaultCell}, defaultSourceKs, "0", initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100, nil)
verifyClusterHealth(t, vc)

ctx := context.Background()
Expand All @@ -59,7 +59,7 @@ func TestVStreamWithTablesToSkipCopyFlag(t *testing.T) {
defer vstreamConn.Close()
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: "product",
Keyspace: defaultSourceKs,
Shard: "0",
Gtid: "",
}}}
Expand All @@ -79,7 +79,8 @@ func TestVStreamWithTablesToSkipCopyFlag(t *testing.T) {
},
}
flags := &vtgatepb.VStreamFlags{
TablesToCopy: []string{"product", "customer"},
TablesToCopy: []string{"product", "customer"},
TransactionChunkSize: 1024, // 1KB - test chunking for all transactions
}
id := 0
vtgateConn := vc.GetVTGateConn(t)
Expand All @@ -89,11 +90,13 @@ func TestVStreamWithTablesToSkipCopyFlag(t *testing.T) {
// present in the filter before running the VStream.
for range 10 {
id++
execVtgateQuery(t, vtgateConn, "product", fmt.Sprintf("insert into customer (cid, name) values (%d, 'customer%d')", id+100, id))
execVtgateQuery(t, vtgateConn, "product", fmt.Sprintf("insert into product (pid, description) values (%d, 'description%d')", id+100, id))
execVtgateQuery(t, vtgateConn, "product", fmt.Sprintf("insert into merchant (mname, category) values ('mname%d', 'category%d')", id+100, id))
execVtgateQuery(t, vtgateConn, defaultSourceKs, fmt.Sprintf("insert into customer (cid, name) values (%d, 'customer%d')", id+100, id))
execVtgateQuery(t, vtgateConn, defaultSourceKs, fmt.Sprintf("insert into product (pid, description) values (%d, 'description%d')", id+100, id))
execVtgateQuery(t, vtgateConn, defaultSourceKs, fmt.Sprintf("insert into merchant (mname, category) values ('mname%d', 'category%d')", id+100, id))
}

insertLargeTransactionForChunkTesting(t, vtgateConn, defaultSourceKs, 10000)

// Stream events from the VStream API
reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
require.NoError(t, err)
Expand Down Expand Up @@ -150,15 +153,20 @@ func TestVStreamWithTablesToSkipCopyFlag(t *testing.T) {
stopInserting.Store(false)
var insertMu sync.Mutex
go func() {
insertCount := 0
for {
if stopInserting.Load() {
return
}
insertMu.Lock()
id++
execVtgateQuery(t, vtgateConn, "product", fmt.Sprintf("insert into customer (cid, name) values (%d, 'customer%d')", id+100, id))
execVtgateQuery(t, vtgateConn, "product", fmt.Sprintf("insert into product (pid, description) values (%d, 'description%d')", id+100, id))
execVtgateQuery(t, vtgateConn, "product", fmt.Sprintf("insert into merchant (mname, category) values ('mname%d', 'category%d')", id+100, id))
execVtgateQuery(t, vtgateConn, defaultSourceKs, fmt.Sprintf("insert into customer (cid, name) values (%d, 'customer%d')", id+100, id))
execVtgateQuery(t, vtgateConn, defaultSourceKs, fmt.Sprintf("insert into product (pid, description) values (%d, 'description%d')", id+100, id))
execVtgateQuery(t, vtgateConn, defaultSourceKs, fmt.Sprintf("insert into merchant (mname, category) values ('mname%d', 'category%d')", id+100, id))
insertCount++
if insertCount%5 == 0 {
insertLargeTransactionForChunkTesting(t, vtgateConn, defaultSourceKs, 20000+insertCount*10)
}
insertMu.Unlock()
}
}()
Expand All @@ -168,9 +176,9 @@ func TestVStreamWithTablesToSkipCopyFlag(t *testing.T) {
time.Sleep(10 * time.Second) // Give the vstream plenty of time to catchup
done.Store(true)

qr1 := execVtgateQuery(t, vtgateConn, "product", "select count(*) from customer")
qr2 := execVtgateQuery(t, vtgateConn, "product", "select count(*) from product")
qr3 := execVtgateQuery(t, vtgateConn, "product", "select count(*) from merchant")
qr1 := execVtgateQuery(t, vtgateConn, defaultSourceKs, "select count(*) from customer")
qr2 := execVtgateQuery(t, vtgateConn, defaultSourceKs, "select count(*) from product")
qr3 := execVtgateQuery(t, vtgateConn, defaultSourceKs, "select count(*) from merchant")
require.NotNil(t, qr1)
require.NotNil(t, qr2)
require.NotNil(t, qr3)
Expand Down Expand Up @@ -238,7 +246,10 @@ func testVStreamWithFailover(t *testing.T, failover bool) {
Filter: "select * from customer",
}},
}
flags := &vtgatepb.VStreamFlags{HeartbeatInterval: 3600}
flags := &vtgatepb.VStreamFlags{
HeartbeatInterval: 3600,
TransactionChunkSize: 1024, // 1KB - test chunking for all transactions
}
done := atomic.Bool{}
done.Store(false)

Expand All @@ -253,13 +264,18 @@ func testVStreamWithFailover(t *testing.T, failover bool) {

// first goroutine that keeps inserting rows into table being streamed until some time elapses after second PRS
go func() {
insertCount := 0
for {
if stopInserting.Load() {
return
}
insertMu.Lock()
id++
execVtgateQuery(t, vtgateConn, defaultSourceKs, fmt.Sprintf("insert into customer (cid, name) values (%d, 'customer%d')", id+100, id))
insertCount++
if insertCount%3 == 0 {
insertLargeTransactionForChunkTesting(t, vtgateConn, defaultSourceKs, 40000+insertCount*10)
}
insertMu.Unlock()
}
}()
Expand Down Expand Up @@ -304,15 +320,15 @@ func testVStreamWithFailover(t *testing.T, failover bool) {
case 1:
if failover {
insertMu.Lock()
output, err := vc.VtctldClient.ExecuteCommandWithOutput("PlannedReparentShard", fmt.Sprintf("%s/0", defaultSourceKs), "--new-primary=zone1-101")
output, err := vc.VtctldClient.ExecuteCommandWithOutput("PlannedReparentShard", defaultSourceKs+"/0", "--new-primary=zone1-101")
insertMu.Unlock()
log.Infof("output of first PRS is %s", output)
require.NoError(t, err)
}
case 2:
if failover {
insertMu.Lock()
output, err := vc.VtctldClient.ExecuteCommandWithOutput("PlannedReparentShard", fmt.Sprintf("%s/0", defaultSourceKs), "--new-primary=zone1-100")
output, err := vc.VtctldClient.ExecuteCommandWithOutput("PlannedReparentShard", defaultSourceKs+"/0", "--new-primary=zone1-100")
insertMu.Unlock()
log.Infof("output of second PRS is %s", output)
require.NoError(t, err)
Expand Down Expand Up @@ -383,7 +399,7 @@ func insertRow(keyspace, table string, id int) {
if vtgateConn == nil {
return
}
vtgateConn.ExecuteFetch(fmt.Sprintf("use %s", keyspace), 1000, false)
vtgateConn.ExecuteFetch("use "+keyspace, 1000, false)
vtgateConn.ExecuteFetch("begin", 1000, false)
_, err := vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s (name) values ('%s%d')", table, table, id), 1000, false)
if err != nil {
Expand Down Expand Up @@ -440,7 +456,11 @@ func testVStreamStopOnReshardFlag(t *testing.T, stopOnReshard bool, baseTabletID
Filter: "select * from customer",
}},
}
flags := &vtgatepb.VStreamFlags{HeartbeatInterval: 3600, StopOnReshard: stopOnReshard}
flags := &vtgatepb.VStreamFlags{
HeartbeatInterval: 3600,
StopOnReshard: stopOnReshard,
TransactionChunkSize: 1024, // 1KB - test chunking for all transactions
}
done := false

id := 1000
Expand Down Expand Up @@ -580,7 +600,9 @@ func testVStreamCopyMultiKeyspaceReshard(t *testing.T, baseTabletID int) numEven
Match: "/customer.*/",
}},
}
flags := &vtgatepb.VStreamFlags{}
flags := &vtgatepb.VStreamFlags{
TransactionChunkSize: 1024, // 1KB - test chunking for all transactions
}
done := false

id := 1000
Expand Down Expand Up @@ -764,6 +786,7 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) {
}
flags := &vtgatepb.VStreamFlags{
IncludeReshardJournalEvents: true,
TransactionChunkSize: 1024, // 1KB - test chunking for all transactions
}
journalEvents := 0

Expand All @@ -787,7 +810,7 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) {
case "0":
// We expect some for the sequence backing table, but don't care.
default:
require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard))
require.FailNow(t, "received event for unexpected shard: "+shard)
}
case binlogdatapb.VEventType_VGTID:
newVGTID = ev.GetVgtid()
Expand Down Expand Up @@ -845,7 +868,7 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) {
case "0":
// Again, we expect some for the sequence backing table, but don't care.
default:
require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard))
require.FailNow(t, "received event for unexpected shard: "+shard)
}
case binlogdatapb.VEventType_JOURNAL:
require.True(t, ev.Journal.MigrationType == binlogdatapb.MigrationType_SHARDS)
Expand Down Expand Up @@ -961,7 +984,8 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) {
}},
}
flags := &vtgatepb.VStreamFlags{
StopOnReshard: true,
StopOnReshard: true,
TransactionChunkSize: 1024, // 1KB - test chunking for all transactions
}

// Stream events but stop once we have a VGTID with positions for the old/original shards.
Expand All @@ -982,7 +1006,7 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) {
case "-80", "80-":
oldShardRowEvents++
default:
require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard))
require.FailNow(t, "received event for unexpected shard: "+shard)
}
case binlogdatapb.VEventType_VGTID:
newVGTID = ev.GetVgtid()
Expand Down Expand Up @@ -1039,7 +1063,7 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) {
switch shard {
case "-80", "80-":
default:
require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard))
require.FailNow(t, "received event for unexpected shard: "+shard)
}
case binlogdatapb.VEventType_JOURNAL:
t.Logf("Journal event: %+v", ev)
Expand Down Expand Up @@ -1232,6 +1256,7 @@ func TestVStreamHeartbeats(t *testing.T) {
name: "With Keyspace Heartbeats On",
flags: &vtgatepb.VStreamFlags{
StreamKeyspaceHeartbeats: true,
TransactionChunkSize: 1024, // 1KB - test chunking for all transactions
},
expectedHeartbeats: numExpectedHeartbeats,
},
Expand Down Expand Up @@ -1362,7 +1387,9 @@ func runVStreamAndGetNumOfRowEvents(t *testing.T, ctx context.Context, vstreamCo
vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, done chan struct{}) (copyPhaseRowEvents int, runningPhaseRowEvents int) {
copyPhase := true
func() {
reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, &vtgatepb.VStreamFlags{})
reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, &vtgatepb.VStreamFlags{
TransactionChunkSize: 1024, // 1KB - test chunking for all transactions
})
require.NoError(t, err)
for {
evs, err := reader.Recv()
Expand Down
Loading
Loading