diff --git a/go.mod b/go.mod index fe4cbc2f5af..33eb78c7647 100644 --- a/go.mod +++ b/go.mod @@ -158,7 +158,6 @@ require ( github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.6 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect - github.com/docker/go-units v0.5.0 // indirect github.com/eapache/queue/v2 v2.0.0-20230407133247-75960ed334e4 // indirect github.com/ebitengine/purego v0.8.2 // indirect github.com/envoyproxy/go-control-plane/envoy v1.32.4 // indirect diff --git a/go.sum b/go.sum index 9543110fb44..f3ff3b31cd7 100644 --- a/go.sum +++ b/go.sum @@ -168,8 +168,6 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8Yc github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 h1:fAjc9m62+UWV/WAFKLNi6ZS0675eEUC9y3AlwSbQu1Y= github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= -github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= -github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= diff --git a/go/test/endtoend/vreplication/initial_data_test.go b/go/test/endtoend/vreplication/initial_data_test.go index 2fcb485be4c..12eac8543cd 100644 --- a/go/test/endtoend/vreplication/initial_data_test.go +++ b/go/test/endtoend/vreplication/initial_data_test.go @@ -20,8 +20,10 @@ import ( "fmt" "math/rand/v2" "os" + "strings" "testing" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/log" ) @@ -43,6 +45,12 @@ func insertInitialData(t *testing.T) { `[[VARCHAR("Monoprice") VARCHAR("eléctronics")] [VARCHAR("newegg") VARCHAR("elec†ronics")]]`) insertJSONValues(t) + + insertLargeTransactionForChunkTesting(t, vtgateConn, defaultSourceKs+":0", 50000) + log.Infof("Inserted large transaction for chunking tests") + + execVtgateQuery(t, vtgateConn, defaultSourceKs, "delete from customer where cid >= 50000 and cid < 50100") + log.Infof("Cleaned up chunk testing rows from source keyspace") }) } @@ -140,3 +148,15 @@ func insertIntoBlobTable(t *testing.T) { execVtgateQuery(t, vtgateConn, fmt.Sprintf("%s:0", defaultSourceKs), query) } } + +// insertLargeTransactionForChunkTesting inserts a transaction large enough to exceed the 1KB chunking threshold. +func insertLargeTransactionForChunkTesting(t *testing.T, vtgateConn *mysql.Conn, keyspace string, startID int) { + execVtgateQuery(t, vtgateConn, keyspace, "BEGIN") + for i := 0; i < 15; i++ { + largeData := strings.Repeat("x", 94) + fmt.Sprintf("_%05d", i) + query := fmt.Sprintf("INSERT INTO customer (cid, name) VALUES (%d, '%s')", + startID+i, largeData) + execVtgateQuery(t, vtgateConn, keyspace, query) + } + execVtgateQuery(t, vtgateConn, keyspace, "COMMIT") +} diff --git a/go/test/endtoend/vreplication/vstream_test.go b/go/test/endtoend/vreplication/vstream_test.go index 69a336060a7..3cff0b490a8 100644 --- a/go/test/endtoend/vreplication/vstream_test.go +++ b/go/test/endtoend/vreplication/vstream_test.go @@ -48,7 +48,7 @@ func TestVStreamWithTablesToSkipCopyFlag(t *testing.T) { defaultRdonly = 0 defaultCell := vc.Cells[vc.CellNames[0]] - vc.AddKeyspace(t, []*Cell{defaultCell}, "product", "0", initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100, nil) + vc.AddKeyspace(t, []*Cell{defaultCell}, defaultSourceKs, "0", initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100, nil) verifyClusterHealth(t, vc) ctx := context.Background() @@ -59,7 +59,7 @@ func TestVStreamWithTablesToSkipCopyFlag(t *testing.T) { defer vstreamConn.Close() vgtid := &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ - Keyspace: "product", + Keyspace: defaultSourceKs, Shard: "0", Gtid: "", }}} @@ -79,7 +79,8 @@ func TestVStreamWithTablesToSkipCopyFlag(t *testing.T) { }, } flags := &vtgatepb.VStreamFlags{ - TablesToCopy: []string{"product", "customer"}, + TablesToCopy: []string{"product", "customer"}, + TransactionChunkSize: 1024, // 1KB - test chunking for all transactions } id := 0 vtgateConn := vc.GetVTGateConn(t) @@ -89,11 +90,13 @@ func TestVStreamWithTablesToSkipCopyFlag(t *testing.T) { // present in the filter before running the VStream. for range 10 { id++ - execVtgateQuery(t, vtgateConn, "product", fmt.Sprintf("insert into customer (cid, name) values (%d, 'customer%d')", id+100, id)) - execVtgateQuery(t, vtgateConn, "product", fmt.Sprintf("insert into product (pid, description) values (%d, 'description%d')", id+100, id)) - execVtgateQuery(t, vtgateConn, "product", fmt.Sprintf("insert into merchant (mname, category) values ('mname%d', 'category%d')", id+100, id)) + execVtgateQuery(t, vtgateConn, defaultSourceKs, fmt.Sprintf("insert into customer (cid, name) values (%d, 'customer%d')", id+100, id)) + execVtgateQuery(t, vtgateConn, defaultSourceKs, fmt.Sprintf("insert into product (pid, description) values (%d, 'description%d')", id+100, id)) + execVtgateQuery(t, vtgateConn, defaultSourceKs, fmt.Sprintf("insert into merchant (mname, category) values ('mname%d', 'category%d')", id+100, id)) } + insertLargeTransactionForChunkTesting(t, vtgateConn, defaultSourceKs, 10000) + // Stream events from the VStream API reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags) require.NoError(t, err) @@ -150,15 +153,20 @@ func TestVStreamWithTablesToSkipCopyFlag(t *testing.T) { stopInserting.Store(false) var insertMu sync.Mutex go func() { + insertCount := 0 for { if stopInserting.Load() { return } insertMu.Lock() id++ - execVtgateQuery(t, vtgateConn, "product", fmt.Sprintf("insert into customer (cid, name) values (%d, 'customer%d')", id+100, id)) - execVtgateQuery(t, vtgateConn, "product", fmt.Sprintf("insert into product (pid, description) values (%d, 'description%d')", id+100, id)) - execVtgateQuery(t, vtgateConn, "product", fmt.Sprintf("insert into merchant (mname, category) values ('mname%d', 'category%d')", id+100, id)) + execVtgateQuery(t, vtgateConn, defaultSourceKs, fmt.Sprintf("insert into customer (cid, name) values (%d, 'customer%d')", id+100, id)) + execVtgateQuery(t, vtgateConn, defaultSourceKs, fmt.Sprintf("insert into product (pid, description) values (%d, 'description%d')", id+100, id)) + execVtgateQuery(t, vtgateConn, defaultSourceKs, fmt.Sprintf("insert into merchant (mname, category) values ('mname%d', 'category%d')", id+100, id)) + insertCount++ + if insertCount%5 == 0 { + insertLargeTransactionForChunkTesting(t, vtgateConn, defaultSourceKs, 20000+insertCount*10) + } insertMu.Unlock() } }() @@ -168,9 +176,9 @@ func TestVStreamWithTablesToSkipCopyFlag(t *testing.T) { time.Sleep(10 * time.Second) // Give the vstream plenty of time to catchup done.Store(true) - qr1 := execVtgateQuery(t, vtgateConn, "product", "select count(*) from customer") - qr2 := execVtgateQuery(t, vtgateConn, "product", "select count(*) from product") - qr3 := execVtgateQuery(t, vtgateConn, "product", "select count(*) from merchant") + qr1 := execVtgateQuery(t, vtgateConn, defaultSourceKs, "select count(*) from customer") + qr2 := execVtgateQuery(t, vtgateConn, defaultSourceKs, "select count(*) from product") + qr3 := execVtgateQuery(t, vtgateConn, defaultSourceKs, "select count(*) from merchant") require.NotNil(t, qr1) require.NotNil(t, qr2) require.NotNil(t, qr3) @@ -238,7 +246,10 @@ func testVStreamWithFailover(t *testing.T, failover bool) { Filter: "select * from customer", }}, } - flags := &vtgatepb.VStreamFlags{HeartbeatInterval: 3600} + flags := &vtgatepb.VStreamFlags{ + HeartbeatInterval: 3600, + TransactionChunkSize: 1024, // 1KB - test chunking for all transactions + } done := atomic.Bool{} done.Store(false) @@ -253,6 +264,7 @@ func testVStreamWithFailover(t *testing.T, failover bool) { // first goroutine that keeps inserting rows into table being streamed until some time elapses after second PRS go func() { + insertCount := 0 for { if stopInserting.Load() { return @@ -260,6 +272,10 @@ func testVStreamWithFailover(t *testing.T, failover bool) { insertMu.Lock() id++ execVtgateQuery(t, vtgateConn, defaultSourceKs, fmt.Sprintf("insert into customer (cid, name) values (%d, 'customer%d')", id+100, id)) + insertCount++ + if insertCount%3 == 0 { + insertLargeTransactionForChunkTesting(t, vtgateConn, defaultSourceKs, 40000+insertCount*10) + } insertMu.Unlock() } }() @@ -304,7 +320,7 @@ func testVStreamWithFailover(t *testing.T, failover bool) { case 1: if failover { insertMu.Lock() - output, err := vc.VtctldClient.ExecuteCommandWithOutput("PlannedReparentShard", fmt.Sprintf("%s/0", defaultSourceKs), "--new-primary=zone1-101") + output, err := vc.VtctldClient.ExecuteCommandWithOutput("PlannedReparentShard", defaultSourceKs+"/0", "--new-primary=zone1-101") insertMu.Unlock() log.Infof("output of first PRS is %s", output) require.NoError(t, err) @@ -312,7 +328,7 @@ func testVStreamWithFailover(t *testing.T, failover bool) { case 2: if failover { insertMu.Lock() - output, err := vc.VtctldClient.ExecuteCommandWithOutput("PlannedReparentShard", fmt.Sprintf("%s/0", defaultSourceKs), "--new-primary=zone1-100") + output, err := vc.VtctldClient.ExecuteCommandWithOutput("PlannedReparentShard", defaultSourceKs+"/0", "--new-primary=zone1-100") insertMu.Unlock() log.Infof("output of second PRS is %s", output) require.NoError(t, err) @@ -383,7 +399,7 @@ func insertRow(keyspace, table string, id int) { if vtgateConn == nil { return } - vtgateConn.ExecuteFetch(fmt.Sprintf("use %s", keyspace), 1000, false) + vtgateConn.ExecuteFetch("use "+keyspace, 1000, false) vtgateConn.ExecuteFetch("begin", 1000, false) _, err := vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s (name) values ('%s%d')", table, table, id), 1000, false) if err != nil { @@ -440,7 +456,11 @@ func testVStreamStopOnReshardFlag(t *testing.T, stopOnReshard bool, baseTabletID Filter: "select * from customer", }}, } - flags := &vtgatepb.VStreamFlags{HeartbeatInterval: 3600, StopOnReshard: stopOnReshard} + flags := &vtgatepb.VStreamFlags{ + HeartbeatInterval: 3600, + StopOnReshard: stopOnReshard, + TransactionChunkSize: 1024, // 1KB - test chunking for all transactions + } done := false id := 1000 @@ -580,7 +600,9 @@ func testVStreamCopyMultiKeyspaceReshard(t *testing.T, baseTabletID int) numEven Match: "/customer.*/", }}, } - flags := &vtgatepb.VStreamFlags{} + flags := &vtgatepb.VStreamFlags{ + TransactionChunkSize: 1024, // 1KB - test chunking for all transactions + } done := false id := 1000 @@ -764,6 +786,7 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) { } flags := &vtgatepb.VStreamFlags{ IncludeReshardJournalEvents: true, + TransactionChunkSize: 1024, // 1KB - test chunking for all transactions } journalEvents := 0 @@ -787,7 +810,7 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) { case "0": // We expect some for the sequence backing table, but don't care. default: - require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard)) + require.FailNow(t, "received event for unexpected shard: "+shard) } case binlogdatapb.VEventType_VGTID: newVGTID = ev.GetVgtid() @@ -845,7 +868,7 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) { case "0": // Again, we expect some for the sequence backing table, but don't care. default: - require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard)) + require.FailNow(t, "received event for unexpected shard: "+shard) } case binlogdatapb.VEventType_JOURNAL: require.True(t, ev.Journal.MigrationType == binlogdatapb.MigrationType_SHARDS) @@ -961,7 +984,8 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) { }}, } flags := &vtgatepb.VStreamFlags{ - StopOnReshard: true, + StopOnReshard: true, + TransactionChunkSize: 1024, // 1KB - test chunking for all transactions } // Stream events but stop once we have a VGTID with positions for the old/original shards. @@ -982,7 +1006,7 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) { case "-80", "80-": oldShardRowEvents++ default: - require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard)) + require.FailNow(t, "received event for unexpected shard: "+shard) } case binlogdatapb.VEventType_VGTID: newVGTID = ev.GetVgtid() @@ -1039,7 +1063,7 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) { switch shard { case "-80", "80-": default: - require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard)) + require.FailNow(t, "received event for unexpected shard: "+shard) } case binlogdatapb.VEventType_JOURNAL: t.Logf("Journal event: %+v", ev) @@ -1232,6 +1256,7 @@ func TestVStreamHeartbeats(t *testing.T) { name: "With Keyspace Heartbeats On", flags: &vtgatepb.VStreamFlags{ StreamKeyspaceHeartbeats: true, + TransactionChunkSize: 1024, // 1KB - test chunking for all transactions }, expectedHeartbeats: numExpectedHeartbeats, }, @@ -1362,7 +1387,9 @@ func runVStreamAndGetNumOfRowEvents(t *testing.T, ctx context.Context, vstreamCo vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, done chan struct{}) (copyPhaseRowEvents int, runningPhaseRowEvents int) { copyPhase := true func() { - reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, &vtgatepb.VStreamFlags{}) + reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, &vtgatepb.VStreamFlags{ + TransactionChunkSize: 1024, // 1KB - test chunking for all transactions + }) require.NoError(t, err) for { evs, err := reader.Recv() diff --git a/go/vt/proto/vtgate/vtgate.pb.go b/go/vt/proto/vtgate/vtgate.pb.go index c09846bd14d..f779145821d 100644 --- a/go/vt/proto/vtgate/vtgate.pb.go +++ b/go/vt/proto/vtgate/vtgate.pb.go @@ -1343,8 +1343,14 @@ type VStreamFlags struct { TablesToCopy []string `protobuf:"bytes,9,rep,name=tables_to_copy,json=tablesToCopy,proto3" json:"tables_to_copy,omitempty"` // Exclude the keyspace from the table name that is sent to the vstream client ExcludeKeyspaceFromTableName bool `protobuf:"varint,10,opt,name=exclude_keyspace_from_table_name,json=excludeKeyspaceFromTableName,proto3" json:"exclude_keyspace_from_table_name,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + // Transaction chunk threshold in bytes. When a transaction exceeds this size, + // VTGate will acquire a lock to ensure contiguous, non-interleaved delivery + // (BEGIN...ROW...COMMIT sent sequentially without mixing events from other shards). + // Events are still chunked to prevent OOM. Transactions smaller than this are sent + // without locking for better parallelism. + TransactionChunkSize int64 `protobuf:"varint,11,opt,name=transaction_chunk_size,json=transactionChunkSize,proto3" json:"transaction_chunk_size,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *VStreamFlags) Reset() { @@ -1447,6 +1453,13 @@ func (x *VStreamFlags) GetExcludeKeyspaceFromTableName() bool { return false } +func (x *VStreamFlags) GetTransactionChunkSize() int64 { + if x != nil { + return x.TransactionChunkSize + } + return 0 +} + // VStreamRequest is the payload for VStream. type VStreamRequest struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -2157,8 +2170,8 @@ var file_vtgate_proto_rawDesc = string([]byte{ 0x49, 0x44, 0x52, 0x08, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x74, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x64, 0x74, 0x69, 0x64, 0x22, 0x1c, 0x0a, 0x1a, 0x52, 0x65, 0x73, 0x6f, 0x6c, 0x76, 0x65, 0x54, 0x72, 0x61, 0x6e, 0x73, - 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0xdd, - 0x03, 0x0a, 0x0c, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x46, 0x6c, 0x61, 0x67, 0x73, 0x12, + 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x93, + 0x04, 0x0a, 0x0c, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x46, 0x6c, 0x61, 0x67, 0x73, 0x12, 0x23, 0x0a, 0x0d, 0x6d, 0x69, 0x6e, 0x69, 0x6d, 0x69, 0x7a, 0x65, 0x5f, 0x73, 0x6b, 0x65, 0x77, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0c, 0x6d, 0x69, 0x6e, 0x69, 0x6d, 0x69, 0x7a, 0x65, 0x53, 0x6b, 0x65, 0x77, 0x12, 0x2d, 0x0a, 0x12, 0x68, 0x65, 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, @@ -2187,70 +2200,74 @@ var file_vtgate_proto_rawDesc = string([]byte{ 0x65, 0x5f, 0x6b, 0x65, 0x79, 0x73, 0x70, 0x61, 0x63, 0x65, 0x5f, 0x66, 0x72, 0x6f, 0x6d, 0x5f, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x08, 0x52, 0x1c, 0x65, 0x78, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x4b, 0x65, 0x79, 0x73, 0x70, 0x61, 0x63, - 0x65, 0x46, 0x72, 0x6f, 0x6d, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0xf6, - 0x01, 0x0a, 0x0e, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x12, 0x2c, 0x0a, 0x09, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x61, 0x6c, - 0x6c, 0x65, 0x72, 0x49, 0x44, 0x52, 0x08, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x64, 0x12, - 0x35, 0x0a, 0x0b, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x0e, 0x32, 0x14, 0x2e, 0x74, 0x6f, 0x70, 0x6f, 0x64, 0x61, 0x74, 0x61, 0x2e, - 0x54, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x52, 0x0a, 0x74, 0x61, 0x62, 0x6c, - 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x27, 0x0a, 0x05, 0x76, 0x67, 0x74, 0x69, 0x64, 0x18, - 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x64, 0x61, - 0x74, 0x61, 0x2e, 0x56, 0x47, 0x74, 0x69, 0x64, 0x52, 0x05, 0x76, 0x67, 0x74, 0x69, 0x64, 0x12, - 0x2a, 0x0a, 0x06, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x12, 0x2e, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x46, 0x69, 0x6c, - 0x74, 0x65, 0x72, 0x52, 0x06, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x12, 0x2a, 0x0a, 0x05, 0x66, - 0x6c, 0x61, 0x67, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x76, 0x74, 0x67, - 0x61, 0x74, 0x65, 0x2e, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x46, 0x6c, 0x61, 0x67, 0x73, - 0x52, 0x05, 0x66, 0x6c, 0x61, 0x67, 0x73, 0x22, 0x3d, 0x0a, 0x0f, 0x56, 0x53, 0x74, 0x72, 0x65, - 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x2a, 0x0a, 0x06, 0x65, 0x76, - 0x65, 0x6e, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x62, 0x69, 0x6e, - 0x6c, 0x6f, 0x67, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x56, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x06, - 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x22, 0x92, 0x01, 0x0a, 0x0e, 0x50, 0x72, 0x65, 0x70, 0x61, - 0x72, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2c, 0x0a, 0x09, 0x63, 0x61, 0x6c, - 0x6c, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, - 0x74, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x44, 0x52, 0x08, 0x63, - 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x64, 0x12, 0x29, 0x0a, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, - 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, - 0x65, 0x2e, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, - 0x6f, 0x6e, 0x12, 0x27, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x11, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x42, 0x6f, 0x75, 0x6e, 0x64, 0x51, - 0x75, 0x65, 0x72, 0x79, 0x52, 0x05, 0x71, 0x75, 0x65, 0x72, 0x79, 0x22, 0xac, 0x01, 0x0a, 0x0f, - 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, - 0x25, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, - 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x50, 0x43, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x52, - 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x29, 0x0a, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, - 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, - 0x2e, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, - 0x6e, 0x12, 0x24, 0x0a, 0x06, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, - 0x0b, 0x32, 0x0c, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x52, - 0x06, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x12, 0x21, 0x0a, 0x0c, 0x70, 0x61, 0x72, 0x61, 0x6d, - 0x73, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0b, 0x70, - 0x61, 0x72, 0x61, 0x6d, 0x73, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x6e, 0x0a, 0x13, 0x43, 0x6c, - 0x6f, 0x73, 0x65, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x12, 0x2c, 0x0a, 0x09, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x61, 0x6c, - 0x6c, 0x65, 0x72, 0x49, 0x44, 0x52, 0x08, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x64, 0x12, - 0x29, 0x0a, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, - 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, - 0x6e, 0x52, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x3d, 0x0a, 0x14, 0x43, 0x6c, - 0x6f, 0x73, 0x65, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x12, 0x25, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x50, 0x43, 0x45, 0x72, 0x72, - 0x6f, 0x72, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x2a, 0x44, 0x0a, 0x0f, 0x54, 0x72, 0x61, - 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x0f, 0x0a, 0x0b, - 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0a, 0x0a, - 0x06, 0x53, 0x49, 0x4e, 0x47, 0x4c, 0x45, 0x10, 0x01, 0x12, 0x09, 0x0a, 0x05, 0x4d, 0x55, 0x4c, - 0x54, 0x49, 0x10, 0x02, 0x12, 0x09, 0x0a, 0x05, 0x54, 0x57, 0x4f, 0x50, 0x43, 0x10, 0x03, 0x2a, - 0x3c, 0x0a, 0x0b, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x4f, 0x72, 0x64, 0x65, 0x72, 0x12, 0x0a, - 0x0a, 0x06, 0x4e, 0x4f, 0x52, 0x4d, 0x41, 0x4c, 0x10, 0x00, 0x12, 0x07, 0x0a, 0x03, 0x50, 0x52, - 0x45, 0x10, 0x01, 0x12, 0x08, 0x0a, 0x04, 0x50, 0x4f, 0x53, 0x54, 0x10, 0x02, 0x12, 0x0e, 0x0a, - 0x0a, 0x41, 0x55, 0x54, 0x4f, 0x43, 0x4f, 0x4d, 0x4d, 0x49, 0x54, 0x10, 0x03, 0x42, 0x36, 0x0a, - 0x0f, 0x69, 0x6f, 0x2e, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x5a, 0x23, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2e, 0x69, 0x6f, 0x2f, 0x76, 0x69, 0x74, 0x65, - 0x73, 0x73, 0x2f, 0x67, 0x6f, 0x2f, 0x76, 0x74, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x76, - 0x74, 0x67, 0x61, 0x74, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x65, 0x46, 0x72, 0x6f, 0x6d, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x34, + 0x0a, 0x16, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x63, 0x68, + 0x75, 0x6e, 0x6b, 0x5f, 0x73, 0x69, 0x7a, 0x65, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x03, 0x52, 0x14, + 0x74, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x68, 0x75, 0x6e, 0x6b, + 0x53, 0x69, 0x7a, 0x65, 0x22, 0xf6, 0x01, 0x0a, 0x0e, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2c, 0x0a, 0x09, 0x63, 0x61, 0x6c, 0x6c, 0x65, + 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, + 0x70, 0x63, 0x2e, 0x43, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x44, 0x52, 0x08, 0x63, 0x61, 0x6c, + 0x6c, 0x65, 0x72, 0x49, 0x64, 0x12, 0x35, 0x0a, 0x0b, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x5f, + 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x14, 0x2e, 0x74, 0x6f, 0x70, + 0x6f, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, + 0x52, 0x0a, 0x74, 0x61, 0x62, 0x6c, 0x65, 0x74, 0x54, 0x79, 0x70, 0x65, 0x12, 0x27, 0x0a, 0x05, + 0x76, 0x67, 0x74, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x62, 0x69, + 0x6e, 0x6c, 0x6f, 0x67, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x56, 0x47, 0x74, 0x69, 0x64, 0x52, 0x05, + 0x76, 0x67, 0x74, 0x69, 0x64, 0x12, 0x2a, 0x0a, 0x06, 0x66, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x18, + 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x12, 0x2e, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x64, 0x61, + 0x74, 0x61, 0x2e, 0x46, 0x69, 0x6c, 0x74, 0x65, 0x72, 0x52, 0x06, 0x66, 0x69, 0x6c, 0x74, 0x65, + 0x72, 0x12, 0x2a, 0x0a, 0x05, 0x66, 0x6c, 0x61, 0x67, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, + 0x32, 0x14, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, + 0x6d, 0x46, 0x6c, 0x61, 0x67, 0x73, 0x52, 0x05, 0x66, 0x6c, 0x61, 0x67, 0x73, 0x22, 0x3d, 0x0a, + 0x0f, 0x56, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x12, 0x2a, 0x0a, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, + 0x32, 0x12, 0x2e, 0x62, 0x69, 0x6e, 0x6c, 0x6f, 0x67, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x56, 0x45, + 0x76, 0x65, 0x6e, 0x74, 0x52, 0x06, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x22, 0x92, 0x01, 0x0a, + 0x0e, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, + 0x2c, 0x0a, 0x09, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x61, 0x6c, 0x6c, 0x65, + 0x72, 0x49, 0x44, 0x52, 0x08, 0x63, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x64, 0x12, 0x29, 0x0a, + 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, + 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, + 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x27, 0x0a, 0x05, 0x71, 0x75, 0x65, 0x72, + 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, + 0x42, 0x6f, 0x75, 0x6e, 0x64, 0x51, 0x75, 0x65, 0x72, 0x79, 0x52, 0x05, 0x71, 0x75, 0x65, 0x72, + 0x79, 0x22, 0xac, 0x01, 0x0a, 0x0f, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x65, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x50, 0x43, + 0x45, 0x72, 0x72, 0x6f, 0x72, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x29, 0x0a, 0x07, + 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, + 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x07, + 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x24, 0x0a, 0x06, 0x66, 0x69, 0x65, 0x6c, 0x64, + 0x73, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0c, 0x2e, 0x71, 0x75, 0x65, 0x72, 0x79, 0x2e, + 0x46, 0x69, 0x65, 0x6c, 0x64, 0x52, 0x06, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x12, 0x21, 0x0a, + 0x0c, 0x70, 0x61, 0x72, 0x61, 0x6d, 0x73, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x04, 0x20, + 0x01, 0x28, 0x0d, 0x52, 0x0b, 0x70, 0x61, 0x72, 0x61, 0x6d, 0x73, 0x43, 0x6f, 0x75, 0x6e, 0x74, + 0x22, 0x6e, 0x0a, 0x13, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2c, 0x0a, 0x09, 0x63, 0x61, 0x6c, 0x6c, 0x65, + 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, + 0x70, 0x63, 0x2e, 0x43, 0x61, 0x6c, 0x6c, 0x65, 0x72, 0x49, 0x44, 0x52, 0x08, 0x63, 0x61, 0x6c, + 0x6c, 0x65, 0x72, 0x49, 0x64, 0x12, 0x29, 0x0a, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x2e, + 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x52, 0x07, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, + 0x22, 0x3d, 0x0a, 0x14, 0x43, 0x6c, 0x6f, 0x73, 0x65, 0x53, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x25, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, + 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x76, 0x74, 0x72, 0x70, 0x63, 0x2e, + 0x52, 0x50, 0x43, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x2a, + 0x44, 0x0a, 0x0f, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4d, 0x6f, + 0x64, 0x65, 0x12, 0x0f, 0x0a, 0x0b, 0x55, 0x4e, 0x53, 0x50, 0x45, 0x43, 0x49, 0x46, 0x49, 0x45, + 0x44, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x53, 0x49, 0x4e, 0x47, 0x4c, 0x45, 0x10, 0x01, 0x12, + 0x09, 0x0a, 0x05, 0x4d, 0x55, 0x4c, 0x54, 0x49, 0x10, 0x02, 0x12, 0x09, 0x0a, 0x05, 0x54, 0x57, + 0x4f, 0x50, 0x43, 0x10, 0x03, 0x2a, 0x3c, 0x0a, 0x0b, 0x43, 0x6f, 0x6d, 0x6d, 0x69, 0x74, 0x4f, + 0x72, 0x64, 0x65, 0x72, 0x12, 0x0a, 0x0a, 0x06, 0x4e, 0x4f, 0x52, 0x4d, 0x41, 0x4c, 0x10, 0x00, + 0x12, 0x07, 0x0a, 0x03, 0x50, 0x52, 0x45, 0x10, 0x01, 0x12, 0x08, 0x0a, 0x04, 0x50, 0x4f, 0x53, + 0x54, 0x10, 0x02, 0x12, 0x0e, 0x0a, 0x0a, 0x41, 0x55, 0x54, 0x4f, 0x43, 0x4f, 0x4d, 0x4d, 0x49, + 0x54, 0x10, 0x03, 0x42, 0x36, 0x0a, 0x0f, 0x69, 0x6f, 0x2e, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x5a, 0x23, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2e, 0x69, + 0x6f, 0x2f, 0x76, 0x69, 0x74, 0x65, 0x73, 0x73, 0x2f, 0x67, 0x6f, 0x2f, 0x76, 0x74, 0x2f, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x76, 0x74, 0x67, 0x61, 0x74, 0x65, 0x62, 0x06, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x33, }) var ( diff --git a/go/vt/proto/vtgate/vtgate_vtproto.pb.go b/go/vt/proto/vtgate/vtgate_vtproto.pb.go index 5a20ac73dc0..8ae3e9c7767 100644 --- a/go/vt/proto/vtgate/vtgate_vtproto.pb.go +++ b/go/vt/proto/vtgate/vtgate_vtproto.pb.go @@ -436,6 +436,7 @@ func (m *VStreamFlags) CloneVT() *VStreamFlags { r.StreamKeyspaceHeartbeats = m.StreamKeyspaceHeartbeats r.IncludeReshardJournalEvents = m.IncludeReshardJournalEvents r.ExcludeKeyspaceFromTableName = m.ExcludeKeyspaceFromTableName + r.TransactionChunkSize = m.TransactionChunkSize if rhs := m.TablesToCopy; rhs != nil { tmpContainer := make([]string, len(rhs)) copy(tmpContainer, rhs) @@ -1847,6 +1848,11 @@ func (m *VStreamFlags) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if m.TransactionChunkSize != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.TransactionChunkSize)) + i-- + dAtA[i] = 0x58 + } if m.ExcludeKeyspaceFromTableName { i-- if m.ExcludeKeyspaceFromTableName { @@ -2794,6 +2800,9 @@ func (m *VStreamFlags) SizeVT() (n int) { if m.ExcludeKeyspaceFromTableName { n += 2 } + if m.TransactionChunkSize != 0 { + n += 1 + protohelpers.SizeOfVarint(uint64(m.TransactionChunkSize)) + } n += len(m.unknownFields) return n } @@ -6518,6 +6527,25 @@ func (m *VStreamFlags) UnmarshalVT(dAtA []byte) error { } } m.ExcludeKeyspaceFromTableName = bool(v != 0) + case 11: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field TransactionChunkSize", wireType) + } + m.TransactionChunkSize = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.TransactionChunkSize |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) diff --git a/go/vt/servenv/metrics.go b/go/vt/servenv/metrics.go index a2b25dc8fc5..52c707f89b3 100644 --- a/go/vt/servenv/metrics.go +++ b/go/vt/servenv/metrics.go @@ -17,24 +17,20 @@ limitations under the License. package servenv func getCpuUsage() float64 { - value, err := getCgroupCpu() - if err == nil { + if value, err := getCgroupCpu(); err == nil { return value } - value, err = getHostCpuUsage() - if err == nil { + if value, err := getHostCpuUsage(); err == nil { return value } return -1 } func getMemoryUsage() float64 { - value, err := getCgroupMemory() - if err == nil { + if value, err := getCgroupMemory(); err == nil { return value } - value, err = getHostMemoryUsage() - if err == nil { + if value, err := getHostMemoryUsage(); err == nil { return value } return -1 diff --git a/go/vt/servenv/metrics_cgroup.go b/go/vt/servenv/metrics_cgroup.go index 393c3a90ace..9f6c0ee563a 100644 --- a/go/vt/servenv/metrics_cgroup.go +++ b/go/vt/servenv/metrics_cgroup.go @@ -23,10 +23,10 @@ import ( "fmt" "math" "runtime" + "sync" "time" "github.com/containerd/cgroups/v3" - "github.com/containerd/cgroups/v3/cgroup1" "github.com/containerd/cgroups/v3/cgroup2" "github.com/shirou/gopsutil/v4/mem" @@ -34,75 +34,52 @@ import ( ) var ( - cgroup2Manager *cgroup2.Manager - cgroup1Manager cgroup1.Cgroup - lastCpu uint64 - lastTime time.Time + once sync.Once + cgroupManager *cgroup2.Manager + lastCpu uint64 + lastTime time.Time + errCgroupMetricsNotAvailable = fmt.Errorf("cgroup metrics are not available") ) -func init() { - if cgroups.Mode() == cgroups.Unified { - manager, err := getCgroup2() - if err != nil { - log.Errorf("Failed to init cgroup2 manager: %v", err) - } - cgroup2Manager = manager - lastCpu, err = getCgroup2CpuUsage() - if err != nil { - log.Errorf("Failed to init cgroup2 cpu %v", err) - } - } else { - cgroup, err := getCgroup1() - if err != nil { - log.Errorf("Failed to init cgroup1 manager: %v", err) - } - cgroup1Manager = cgroup - lastCpu, err = getCgroup1CpuUsage() - if err != nil { - log.Errorf("Failed to init cgroup1 cpu %v", err) - } +func setup() { + if cgroups.Mode() != cgroups.Unified { + log.Warning("cgroup metrics are only supported with cgroup v2, will use host metrics") + return } - lastTime = time.Now() -} - -func isCgroupV2() bool { - return cgroups.Mode() == cgroups.Unified -} - -func getCgroup1() (cgroup1.Cgroup, error) { - path := cgroup1.NestedPath("") - cgroup, err := cgroup1.Load(path) + manager, err := getCgroupManager() + if err != nil { + log.Warningf("Failed to init cgroup manager for metrics, will use host metrics: %v", err) + } + cgroupManager = manager + lastCpu, err = getCurrentCgroupCpuUsage() if err != nil { - return nil, fmt.Errorf("cgroup1 manager is nil") + log.Warningf("Failed to get initial cgroup CPU usage: %v", err) } - return cgroup, nil + lastTime = time.Now() } -func getCgroup2() (*cgroup2.Manager, error) { +func getCgroupManager() (*cgroup2.Manager, error) { path, err := cgroup2.NestedGroupPath("") if err != nil { - return nil, fmt.Errorf("failed to load cgroup2 manager: %w", err) + return nil, fmt.Errorf("failed to build nested cgroup paths: %w", err) } cgroupManager, err := cgroup2.Load(path) if err != nil { - return nil, fmt.Errorf("cgroup2 manager is nil") + return nil, fmt.Errorf("failed to load cgroup manager: %w", err) } return cgroupManager, nil } func getCgroupCpuUsage() (float64, error) { + once.Do(setup) var ( currentUsage uint64 err error ) currentTime := time.Now() - if isCgroupV2() { - currentUsage, err = getCgroup2CpuUsage() - } else { - currentUsage, err = getCgroup1CpuUsage() - } + currentUsage, err = getCurrentCgroupCpuUsage() if err != nil { - return -1, fmt.Errorf("Could not read cpu usage") + return -1, fmt.Errorf("failed to read current cgroup CPU usage: %w", err) } duration := currentTime.Sub(lastTime) usage, err := getCpuUsageFromSamples(lastCpu, currentUsage, duration) @@ -114,27 +91,13 @@ func getCgroupCpuUsage() (float64, error) { return usage, nil } -func getCgroupMemoryUsage() (float64, error) { - if isCgroupV2() { - return getCgroup2MemoryUsage() - } else { - return getCgroup1MemoryUsage() +func getCurrentCgroupCpuUsage() (uint64, error) { + if cgroupManager == nil { + return 0, errCgroupMetricsNotAvailable } -} - -func getCgroup1CpuUsage() (uint64, error) { - stat1, err := cgroup1Manager.Stat() + stat1, err := cgroupManager.Stat() if err != nil { - return 0, fmt.Errorf("failed to get initial CPU stat: %w", err) - } - currentUsage := stat1.CPU.Usage.Total - return currentUsage, nil -} - -func getCgroup2CpuUsage() (uint64, error) { - stat1, err := cgroup2Manager.Stat() - if err != nil { - return 0, fmt.Errorf("failed to get initial CPU stat: %w", err) + return 0, fmt.Errorf("failed to get initial cgroup CPU stats: %w", err) } currentUsage := stat1.CPU.UsageUsec return currentUsage, nil @@ -154,20 +117,14 @@ func getCpuUsageFromSamples(usage1 uint64, usage2 uint64, interval time.Duration return cpuUsage, nil } -func getCgroup1MemoryUsage() (float64, error) { - stats, err := cgroup1Manager.Stat() - if err != nil { - return -1, fmt.Errorf("failed to get cgroup2 stats: %w", err) +func getCgroupMemoryUsage() (float64, error) { + once.Do(setup) + if cgroupManager == nil { + return -1, errCgroupMetricsNotAvailable } - usage := stats.Memory.Usage.Usage - limit := stats.Memory.Usage.Limit - return computeMemoryUsage(usage, limit) -} - -func getCgroup2MemoryUsage() (float64, error) { - stats, err := cgroup2Manager.Stat() + stats, err := cgroupManager.Stat() if err != nil { - return -1, fmt.Errorf("failed to get cgroup2 stats: %w", err) + return -1, fmt.Errorf("failed to get cgroup stats: %w", err) } usage := stats.Memory.Usage limit := stats.Memory.UsageLimit @@ -176,15 +133,15 @@ func getCgroup2MemoryUsage() (float64, error) { func computeMemoryUsage(usage uint64, limit uint64) (float64, error) { if usage == 0 || usage == math.MaxUint64 { - return -1, fmt.Errorf("Failed to find memory usage with invalid value: %d", usage) + return -1, fmt.Errorf("invalid memory usage value: %d", usage) } if limit == 0 { - return -1, fmt.Errorf("Failed to compute memory usage with invalid limit: %d", limit) + return -1, fmt.Errorf("invalid memory limit: %d", limit) } if limit == math.MaxUint64 { vmem, err := mem.VirtualMemory() if err != nil { - return -1, fmt.Errorf("Failed to fall back to system max memory: %w", err) + return -1, fmt.Errorf("failed to get virtual memory stats: %w", err) } limit = vmem.Total } diff --git a/go/vt/servenv/metrics_cgroup_test.go b/go/vt/servenv/metrics_cgroup_test.go index 66069d1fcaf..01308ea4ec3 100644 --- a/go/vt/servenv/metrics_cgroup_test.go +++ b/go/vt/servenv/metrics_cgroup_test.go @@ -21,6 +21,8 @@ package servenv import ( "testing" + + "github.com/stretchr/testify/require" ) func TestGetCGroupCpuUsageMetrics(t *testing.T) { @@ -35,3 +37,25 @@ func TestGetCgroupMemoryUsageMetrics(t *testing.T) { validateMem(t, mem, err) t.Logf("mem %.5f", mem) } + +func TestErrHandlingWithCgroups(t *testing.T) { + origCgroupManager := cgroupManager + defer func() { + cgroupManager = origCgroupManager + }() + + cpu, err := getCgroupCpuUsage() + validateCpu(t, cpu, err) + mem, err := getCgroupMemoryUsage() + validateMem(t, mem, err) + + cgroupManager = nil + require.Nil(t, cgroupManager) + + cpu, err = getCgroupCpuUsage() + require.ErrorContains(t, err, errCgroupMetricsNotAvailable.Error()) + require.Equal(t, int(cpu), -1) + mem, err = getCgroupMemoryUsage() + require.ErrorContains(t, err, errCgroupMetricsNotAvailable.Error()) + require.Equal(t, int(mem), -1) +} diff --git a/go/vt/servenv/metrics_nonlinux.go b/go/vt/servenv/metrics_nonlinux.go index 53d854f8c94..bcbd9f1d91f 100644 --- a/go/vt/servenv/metrics_nonlinux.go +++ b/go/vt/servenv/metrics_nonlinux.go @@ -24,9 +24,9 @@ import ( ) func getCgroupCpu() (float64, error) { - return -1, fmt.Errorf("Cgroup not supported on nonlinux platform") + return -1, fmt.Errorf("cgroups not supported on nonlinux platforms") } func getCgroupMemory() (float64, error) { - return -1, fmt.Errorf("Cgroup not supported on nonlinux platform") + return -1, fmt.Errorf("cgroups not supported on nonlinux platforms") } diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 991925d9a6e..53b494c34ff 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -52,11 +52,12 @@ type vstreamManager struct { toposerv srvtopo.Server cell string - vstreamsCreated *stats.CountersWithMultiLabels - vstreamsLag *stats.GaugesWithMultiLabels - vstreamsCount *stats.CountersWithMultiLabels - vstreamsEventsStreamed *stats.CountersWithMultiLabels - vstreamsEndedWithErrors *stats.CountersWithMultiLabels + vstreamsCreated *stats.CountersWithMultiLabels + vstreamsLag *stats.GaugesWithMultiLabels + vstreamsCount *stats.CountersWithMultiLabels + vstreamsEventsStreamed *stats.CountersWithMultiLabels + vstreamsEndedWithErrors *stats.CountersWithMultiLabels + vstreamsTransactionsChunked *stats.CountersWithMultiLabels } // maxSkewTimeoutSeconds is the maximum allowed skew between two streams when the MinimizeSkew flag is set @@ -70,6 +71,15 @@ const tabletPickerContextTimeout = 90 * time.Second // ending the stream from the tablet. const stopOnReshardDelay = 500 * time.Millisecond +// livenessTimeout is the point at which we return an error to the client if the stream has received +// no events, including heartbeats, from any of the shards. +var livenessTimeout = 10 * time.Minute + +// defaultTransactionChunkSizeBytes is the default threshold for chunking transactions. +// 0 (the default value for protobuf int64) means disabled, clients must explicitly set a value to opt in for chunking. +// Eventually we plan to enable chunking by default, for now set to 0, which is the same as the protobuf default. +const defaultTransactionChunkSizeBytes = 0 + // vstream contains the metadata for one VStream request. type vstream struct { // mu protects parts of vgtid, the semantics of a send, and journaler. @@ -136,9 +146,20 @@ type vstream struct { tabletPickerOptions discovery.TabletPickerOptions + // At what point, without any activity in the stream, should we consider it dead. + streamLivenessTimer *time.Timer + + // When a transaction exceeds this size, VStream acquires a lock to ensure contiguous, chunked delivery. + // Smaller transactions are sent without locking for better parallelism. + transactionChunkSizeBytes int + flags *vtgatepb.VStreamFlags } +func (vs *vstream) isChunkingEnabled() bool { + return vs.transactionChunkSizeBytes > 0 +} + type journalEvent struct { journal *binlogdatapb.Journal participants map[*binlogdatapb.ShardGtid]bool @@ -173,6 +194,10 @@ func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell str "VStreamsEndedWithErrors", "Number of vstreams that ended with errors", labels), + vstreamsTransactionsChunked: exporter.NewCountersWithMultiLabels( + "VStreamsTransactionsChunked", + "Number of transactions that exceeded TransactionChunkSize threshold and required locking for contiguous, chunked delivery", + labels), } } @@ -182,6 +207,9 @@ func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.Ta if err != nil { return vterrors.Wrap(err, "failed to resolve vstream parameters") } + log.Infof("VStream flags: minimize_skew=%v, heartbeat_interval=%v, stop_on_reshard=%v, cells=%v, cell_preference=%v, tablet_order=%v, stream_keyspace_heartbeats=%v, include_reshard_journal_events=%v, tables_to_copy=%v, exclude_keyspace_from_table_name=%v, transaction_chunk_size=%v", + flags.GetMinimizeSkew(), flags.GetHeartbeatInterval(), flags.GetStopOnReshard(), flags.Cells, flags.CellPreference, flags.TabletOrder, + flags.GetStreamKeyspaceHeartbeats(), flags.GetIncludeReshardJournalEvents(), flags.TablesToCopy, flags.GetExcludeKeyspaceFromTableName(), flags.TransactionChunkSize) ts, err := vsm.toposerv.GetTopoServer() if err != nil { return vterrors.Wrap(err, "failed to get topology server") @@ -190,6 +218,13 @@ func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.Ta log.Errorf("unable to get topo server in VStream()") return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unable to get topoology server") } + transactionChunkSizeBytes := defaultTransactionChunkSizeBytes + if flags.TransactionChunkSize > 0 && flags.GetMinimizeSkew() { + log.Warning("Minimize skew cannot be set with transaction chunk size (can cause deadlock), ignoring transaction chunk size.") + } else if flags.TransactionChunkSize > 0 { + transactionChunkSizeBytes = int(flags.TransactionChunkSize) + } + vs := &vstream{ vgtid: vgtid, tabletType: tabletType, @@ -208,6 +243,7 @@ func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.Ta heartbeatInterval: flags.GetHeartbeatInterval(), ts: ts, copyCompletedShard: make(map[string]struct{}), + transactionChunkSizeBytes: transactionChunkSizeBytes, tabletPickerOptions: discovery.TabletPickerOptions{ CellPreference: flags.GetCellPreference(), TabletOrder: flags.GetTabletOrder(), @@ -224,7 +260,6 @@ func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.Ta // resolveParams provides defaults for the inputs if they're not specified. func (vsm *vstreamManager) resolveParams(ctx context.Context, tabletType topodatapb.TabletType, vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, flags *vtgatepb.VStreamFlags) (*binlogdatapb.VGtid, *binlogdatapb.Filter, *vtgatepb.VStreamFlags, error) { - if filter == nil { filter = &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -319,6 +354,10 @@ func (vsm *vstreamManager) GetTotalStreamDelay() int64 { func (vs *vstream) stream(ctx context.Context) error { ctx, vs.cancel = context.WithCancel(ctx) + if vs.streamLivenessTimer == nil { + vs.streamLivenessTimer = time.NewTimer(livenessTimeout) + defer vs.streamLivenessTimer.Stop() + } vs.wg.Add(1) go func() { @@ -361,6 +400,7 @@ func (vs *vstream) sendEvents(ctx context.Context) { send := func(evs []*binlogdatapb.VEvent) error { if err := vs.send(evs); err != nil { + log.Infof("Error in vstream send (wrapper) to client: %v", err) vs.once.Do(func() { vs.setError(err, "error sending events") }) @@ -372,12 +412,14 @@ func (vs *vstream) sendEvents(ctx context.Context) { for { select { case <-ctx.Done(): + log.Infof("vstream context canceled") vs.once.Do(func() { vs.setError(ctx.Err(), "context ended while sending events") }) return case evs := <-vs.eventCh: if err := send(evs); err != nil { + log.Infof("Error in vstream send events to client: %v", err) vs.once.Do(func() { vs.setError(err, "error sending events") }) @@ -392,11 +434,19 @@ func (vs *vstream) sendEvents(ctx context.Context) { CurrentTime: now, }} if err := send(evs); err != nil { + log.Infof("Error in vstream sending heartbeat to client: %v", err) vs.once.Do(func() { vs.setError(err, "error sending heartbeat") }) return } + case <-vs.streamLivenessTimer.C: + msg := fmt.Sprintf("vstream failed liveness checks as there was no activity, including heartbeats, within the last %v", livenessTimeout) + log.Infof("Error in vstream: %s", msg) + vs.once.Do(func() { + vs.setError(vterrors.New(vtrpcpb.Code_UNAVAILABLE, msg), "vstream is fully throttled or otherwise hung") + }) + return } } } @@ -417,7 +467,7 @@ func (vs *vstream) startOneStream(ctx context.Context, sgtid *binlogdatapb.Shard // Set the error on exit. First one wins. if err != nil { - log.Errorf("Error in vstream for %+v: %s", sgtid, err) + log.Errorf("Error in vstream for %+v: %v", sgtid, err) // Get the original/base error. uerr := vterrors.UnwrapAll(err) if !errors.Is(uerr, context.Canceled) && !errors.Is(uerr, context.DeadlineExceeded) { @@ -666,6 +716,17 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha Options: options, } log.Infof("Starting to vstream from %s, with req %+v", tabletAliasString, req) + var txLockHeld bool + var inTransaction bool + var accumulatedSize int + + defer func() { + if txLockHeld { + vs.mu.Unlock() + txLockHeld = false + } + }() + err = tabletConn.VStream(ctx, req, func(events []*binlogdatapb.VEvent) error { // We received a valid event. Reset error count. errCount = 0 @@ -675,6 +736,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha return vterrors.Wrapf(ctx.Err(), "context ended while streaming from tablet %s in %s/%s", tabletAliasString, sgtid.Keyspace, sgtid.Shard) case streamErr := <-errCh: + log.Infof("vstream for %s/%s ended due to health check, should retry: %v", sgtid.Keyspace, sgtid.Shard, streamErr) // You must return Code_UNAVAILABLE here to trigger a restart. return vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "error streaming from tablet %s in %s/%s: %s", tabletAliasString, sgtid.Keyspace, sgtid.Shard, streamErr.Error()) @@ -682,23 +744,33 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha // Unreachable. // This can happen if a server misbehaves and does not end // the stream after we return an error. + log.Infof("vstream for %s/%s ended due to journal event, returning io.EOF", sgtid.Keyspace, sgtid.Shard) return io.EOF default: } aligningStreamsErr := fmt.Sprintf("error aligning streams across %s/%s", sgtid.Keyspace, sgtid.Shard) - sendingEventsErr := fmt.Sprintf("error sending event batch from tablet %s", tabletAliasString) + sendingEventsErr := "error sending event batch from tablet " + tabletAliasString sendevents := make([]*binlogdatapb.VEvent, 0, len(events)) for i, event := range events { + vs.streamLivenessTimer.Reset(livenessTimeout) // Any event in the stream demonstrates liveness + accumulatedSize += event.SizeVT() switch event.Type { + case binlogdatapb.VEventType_BEGIN: + // Mark the start of a transaction. + // Also queue the events for sending to the client. + inTransaction = true + sendevents = append(sendevents, event) case binlogdatapb.VEventType_FIELD: ev := maybeUpdateTableName(event, sgtid.Keyspace, vs.flags.GetExcludeKeyspaceFromTableName(), extractFieldTableName) sendevents = append(sendevents, ev) case binlogdatapb.VEventType_ROW: ev := maybeUpdateTableName(event, sgtid.Keyspace, vs.flags.GetExcludeKeyspaceFromTableName(), extractRowTableName) sendevents = append(sendevents, ev) - case binlogdatapb.VEventType_COMMIT, binlogdatapb.VEventType_DDL, binlogdatapb.VEventType_OTHER: + case binlogdatapb.VEventType_COMMIT, binlogdatapb.VEventType_DDL, binlogdatapb.VEventType_OTHER, binlogdatapb.VEventType_ROLLBACK: + inTransaction = false + accumulatedSize = 0 sendevents = append(sendevents, event) eventss = append(eventss, sendevents) @@ -706,8 +778,20 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha return vterrors.Wrap(err, aligningStreamsErr) } - if err := vs.sendAll(ctx, sgtid, eventss); err != nil { - return vterrors.Wrap(err, sendingEventsErr) + var sendErr error + if vs.isChunkingEnabled() && txLockHeld { + // If chunking is enabled and we are holding the lock (only possible to acquire lock when chunking is enabled), then send the events. + sendErr = vs.sendEventsLocked(ctx, sgtid, eventss) + vs.mu.Unlock() + txLockHeld = false + } else { + // If chunking is not enabled or this transaction was small enough to not need chunking, + // fall back to default behavior of sending entire transaction atomically. + sendErr = vs.sendAll(ctx, sgtid, eventss) + } + if sendErr != nil { + log.Infof("vstream for %s/%s, error in sendAll: %v", sgtid.Keyspace, sgtid.Shard, sendErr) + return vterrors.Wrap(sendErr, sendingEventsErr) } eventss = nil sendevents = nil @@ -723,6 +807,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha } if err := vs.sendAll(ctx, sgtid, eventss); err != nil { + log.Infof("vstream for %s/%s, error in sendAll, on copy completed event: %v", sgtid.Keyspace, sgtid.Shard, err) return vterrors.Wrap(err, sendingEventsErr) } eventss = nil @@ -753,6 +838,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha } eventss = append(eventss, sendevents) if err := vs.sendAll(ctx, sgtid, eventss); err != nil { + log.Infof("vstream for %s/%s, error in sendAll, on journal event: %v", sgtid.Keyspace, sgtid.Shard, err) return vterrors.Wrap(err, sendingEventsErr) } eventss = nil @@ -787,6 +873,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha if endTimer != nil { <-endTimer.C } + log.Infof("vstream for %s/%s ended due to journal event, returning io.EOF", sgtid.Keyspace, sgtid.Shard) return io.EOF } } @@ -799,6 +886,41 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha if len(sendevents) != 0 { eventss = append(eventss, sendevents) } + + // If chunking is enabled, and we are holding the lock (only possible when enabled), and we are not in a transaction + // release the lock (this should not ever execute, acts as a safety check). + if vs.isChunkingEnabled() && txLockHeld && !inTransaction { + log.Warning("Detected held lock but not in a transaction, releasing the lock") + vs.mu.Unlock() + txLockHeld = false + } + + // If chunking is enabled, and we are holding the lock (only possible when chunking is enabled), send the events. + if vs.isChunkingEnabled() && txLockHeld && len(eventss) > 0 { + if err := vs.sendEventsLocked(ctx, sgtid, eventss); err != nil { + log.Infof("vstream for %s/%s, error in sendAll at end of callback: %v", sgtid.Keyspace, sgtid.Shard, err) + return vterrors.Wrap(err, sendingEventsErr) + } + eventss = nil + } + + // If chunking is enabled and we are in a transaction, and we do not yet hold the lock, and the accumulated size is greater than our chunk size + // then acquire the lock, so that we can send the events, and begin chunking the transaction. + if vs.isChunkingEnabled() && inTransaction && !txLockHeld && accumulatedSize > vs.transactionChunkSizeBytes { + log.Infof("vstream for %s/%s: transaction size %d bytes exceeds chunk size %d bytes, acquiring lock for contiguous, chunked delivery", + sgtid.Keyspace, sgtid.Shard, accumulatedSize, vs.transactionChunkSizeBytes) + vs.vsm.vstreamsTransactionsChunked.Add(labelValues, 1) + vs.mu.Lock() + txLockHeld = true + if len(eventss) > 0 { + if err := vs.sendEventsLocked(ctx, sgtid, eventss); err != nil { + log.Infof("vstream for %s/%s, error sending events after acquiring lock: %v", sgtid.Keyspace, sgtid.Shard, err) + return vterrors.Wrap(err, sendingEventsErr) + } + eventss = nil + } + } + return nil }) // If stream was ended (by a journal event), return nil without checking for error. @@ -815,7 +937,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha retry, ignoreTablet := vs.shouldRetry(err) if !retry { - log.Errorf("vstream for %s/%s error: %v", sgtid.Keyspace, sgtid.Shard, err) + log.Infof("vstream for %s/%s error, no retry: %v", sgtid.Keyspace, sgtid.Shard, err) return vterrors.Wrapf(err, "error in vstream for %s/%s on tablet %s", sgtid.Keyspace, sgtid.Shard, tabletAliasString) } @@ -832,7 +954,6 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha } log.Infof("vstream for %s/%s error, retrying: %v", sgtid.Keyspace, sgtid.Shard, err) } - } // maybeUpdateTableNames updates table names when the ExcludeKeyspaceFromTableName flag is disabled. @@ -915,6 +1036,11 @@ func (vs *vstream) shouldRetry(err error) (retry bool, ignoreTablet bool) { func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, eventss [][]*binlogdatapb.VEvent) error { vs.mu.Lock() defer vs.mu.Unlock() + return vs.sendEventsLocked(ctx, sgtid, eventss) +} + +// sendEventsLocked sends events assuming vs.mu is already held by the caller. +func (vs *vstream) sendEventsLocked(ctx context.Context, sgtid *binlogdatapb.ShardGtid, eventss [][]*binlogdatapb.VEvent) error { labelValues := []string{sgtid.Keyspace, sgtid.Shard, vs.tabletType.String()} // Send all chunks while holding the lock. diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index aa318e3ae65..0b0c6d7656c 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -18,6 +18,7 @@ package vtgate import ( "context" + "errors" "fmt" "os" "runtime/pprof" @@ -499,6 +500,162 @@ func TestVStreamChunks(t *testing.T) { require.Equal(t, 100, ddlCount) } +// Verifies that large chunked transactions from one shard +// are not interleaved with events from other shards. +func TestVStreamChunksOverSizeThreshold(t *testing.T) { + ctx := t.Context() + ks := "TestVStream" + cell := "aa" + _ = createSandbox(ks) + hc := discovery.NewFakeHealthCheck(nil) + st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"}) + vsm := newTestVStreamManager(ctx, hc, st, cell) + vsm.vstreamsTransactionsChunked.ResetAll() + sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) + addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc0.Tablet()) + sbc1 := hc.AddTestTablet("aa", "1.1.1.1", 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil) + addTabletToSandboxTopo(t, ctx, st, ks, "20-40", sbc1.Tablet()) + + rowData := make([]byte, 100) + for i := range rowData { + rowData[i] = byte(i % 256) + } + + sbc0.AddVStreamEvents([]*binlogdatapb.VEvent{{Type: binlogdatapb.VEventType_BEGIN}}, nil) + for range 50 { + sbc0.AddVStreamEvents([]*binlogdatapb.VEvent{{ + Type: binlogdatapb.VEventType_ROW, + RowEvent: &binlogdatapb.RowEvent{ + TableName: "shard0_table", + RowChanges: []*binlogdatapb.RowChange{{ + After: &querypb.Row{ + Lengths: []int64{int64(len(rowData))}, + Values: rowData, + }, + }}, + }, + }}, nil) + } + + sbc1.AddVStreamEvents([]*binlogdatapb.VEvent{{Type: binlogdatapb.VEventType_BEGIN}}, nil) + sbc1.AddVStreamEvents([]*binlogdatapb.VEvent{{ + Type: binlogdatapb.VEventType_ROW, + RowEvent: &binlogdatapb.RowEvent{ + TableName: "shard0_table", + RowChanges: []*binlogdatapb.RowChange{{ + After: &querypb.Row{ + Lengths: []int64{8}, + Values: rowData[:8], + }, + }}, + }, + }}, nil) + sbc1.AddVStreamEvents([]*binlogdatapb.VEvent{{Type: binlogdatapb.VEventType_COMMIT}}, nil) + + for range 50 { + sbc0.AddVStreamEvents([]*binlogdatapb.VEvent{{ + Type: binlogdatapb.VEventType_ROW, + RowEvent: &binlogdatapb.RowEvent{ + TableName: "shard0_table", + RowChanges: []*binlogdatapb.RowChange{{ + After: &querypb.Row{ + Lengths: []int64{int64(len(rowData))}, + Values: rowData, + }, + }}, + }, + }}, nil) + } + sbc0.AddVStreamEvents([]*binlogdatapb.VEvent{{Type: binlogdatapb.VEventType_COMMIT}}, nil) + + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: ks, + Shard: "-20", + Gtid: "pos", + }, { + Keyspace: ks, + Shard: "20-40", + Gtid: "pos", + }}, + } + + vstreamCtx, vstreamCancel := context.WithCancel(ctx) + defer vstreamCancel() + + // Track transaction states + type txState struct { + shard string + hasBegin bool + hasCommit bool + rowCount int + } + var currentTx *txState + var completedTxs []*txState + + flags := &vtgatepb.VStreamFlags{ + TransactionChunkSize: 1024, + } + + err := vsm.VStream(vstreamCtx, topodatapb.TabletType_PRIMARY, vgtid, nil, flags, func(events []*binlogdatapb.VEvent) error { + for _, event := range events { + switch event.Type { + case binlogdatapb.VEventType_VGTID: + if event.Keyspace != "" && event.Shard != "" { + shard := event.Keyspace + "/" + event.Shard + if currentTx != nil && currentTx.shard != "" && currentTx.shard != shard { + return fmt.Errorf("VGTID from shard %s while transaction from shard %s is in progress (interleaving detected)", shard, currentTx.shard) + } + if currentTx != nil && currentTx.shard == "" { + currentTx.shard = shard + } + } + case binlogdatapb.VEventType_BEGIN: + if currentTx != nil && !currentTx.hasCommit { + return fmt.Errorf("BEGIN received while transaction %s is still open (interleaving detected)", currentTx.shard) + } + currentTx = &txState{hasBegin: true} + case binlogdatapb.VEventType_ROW: + if currentTx == nil { + return errors.New("ROW event outside transaction") + } + currentTx.rowCount++ + case binlogdatapb.VEventType_COMMIT: + if currentTx == nil { + return errors.New("COMMIT without BEGIN") + } + currentTx.hasCommit = true + completedTxs = append(completedTxs, currentTx) + t.Logf("COMMIT transaction for shard %s (rows=%d, completed_txs=%d)", currentTx.shard, currentTx.rowCount, len(completedTxs)) + currentTx = nil + default: + } + } + + if len(completedTxs) == 2 { + vstreamCancel() + } + + return nil + }) + + require.Error(t, err) + require.ErrorIs(t, vterrors.UnwrapAll(err), context.Canceled) + require.Equal(t, 2, len(completedTxs), "Should receive both transactions") + + var rowCounts []int + for _, tx := range completedTxs { + require.True(t, tx.hasBegin, "Transaction should have BEGIN") + require.True(t, tx.hasCommit, "Transaction should have COMMIT") + rowCounts = append(rowCounts, tx.rowCount) + } + require.ElementsMatch(t, []int{1, 100}, rowCounts, "Should have one transaction with 1 row and one with 100 rows") + + chunkedCounts := vsm.vstreamsTransactionsChunked.Counts() + require.Contains(t, chunkedCounts, "TestVStream.-20.PRIMARY", "Should have chunked transaction metric for -20 shard") + require.GreaterOrEqual(t, chunkedCounts["TestVStream.-20.PRIMARY"], int64(1), "Should have at least one chunked transaction") +} + func TestVStreamMulti(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -633,14 +790,7 @@ func TestVStreamsMetrics(t *testing.T) { err := vsm.VStream(vstreamCtx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error { receivedResponses = append(receivedResponses, &binlogdatapb.VStreamResponse{Events: events}) - // While the VStream is running, we should see one active stream per shard. - require.Equal(t, map[string]int64{ - expectedLabels1: 1, - expectedLabels2: 1, - }, vsm.vstreamsCount.Counts()) - if len(receivedResponses) == 2 { - // Stop streaming after receiving both expected responses. vstreamCancel() } @@ -649,34 +799,37 @@ func TestVStreamsMetrics(t *testing.T) { require.Error(t, err) require.ErrorIs(t, vterrors.UnwrapAll(err), context.Canceled) - require.Equal(t, 2, len(receivedResponses)) - // After the streams end, the count should go back to zero. - require.Equal(t, map[string]int64{ - expectedLabels1: 0, - expectedLabels2: 0, - }, vsm.vstreamsCount.Counts()) - - require.Equal(t, map[string]int64{ - expectedLabels1: 1, - expectedLabels2: 1, - }, vsm.vstreamsCreated.Counts()) - - require.Equal(t, map[string]int64{ - expectedLabels1: 5, - expectedLabels2: 7, - }, vsm.vstreamsLag.Counts()) - - require.Equal(t, map[string]int64{ - expectedLabels1: 2, - expectedLabels2: 2, - }, vsm.vstreamsEventsStreamed.Counts()) - - require.Equal(t, map[string]int64{ - expectedLabels1: 0, - expectedLabels2: 0, - }, vsm.vstreamsEndedWithErrors.Counts()) + counts := vsm.vstreamsCount.Counts() + require.Contains(t, counts, expectedLabels1, "Should have count for shard -20") + require.Contains(t, counts, expectedLabels2, "Should have count for shard 20-40") + require.Equal(t, int64(0), counts[expectedLabels1], "Shard -20 should have 0 active streams after completion") + require.Equal(t, int64(0), counts[expectedLabels2], "Shard 20-40 should have 0 active streams after completion") + + created := vsm.vstreamsCreated.Counts() + require.Contains(t, created, expectedLabels1, "Should have created count for shard -20") + require.Contains(t, created, expectedLabels2, "Should have created count for shard 20-40") + require.Equal(t, int64(1), created[expectedLabels1], "Shard -20 should have created 1 stream") + require.Equal(t, int64(1), created[expectedLabels2], "Shard 20-40 should have created 1 stream") + + lag := vsm.vstreamsLag.Counts() + require.Contains(t, lag, expectedLabels1, "Should have lag for shard -20") + require.Contains(t, lag, expectedLabels2, "Should have lag for shard 20-40") + require.Equal(t, int64(5), lag[expectedLabels1], "Shard -20 should have lag of 5") + require.Equal(t, int64(7), lag[expectedLabels2], "Shard 20-40 should have lag of 7") + + streamed := vsm.vstreamsEventsStreamed.Counts() + require.Contains(t, streamed, expectedLabels1, "Should have events streamed for shard -20") + require.Contains(t, streamed, expectedLabels2, "Should have events streamed for shard 20-40") + require.Equal(t, int64(2), streamed[expectedLabels1], "Shard -20 should have streamed 2 events") + require.Equal(t, int64(2), streamed[expectedLabels2], "Shard 20-40 should have streamed 2 events") + + errors := vsm.vstreamsEndedWithErrors.Counts() + require.Contains(t, errors, expectedLabels1, "Should have error count for shard -20") + require.Contains(t, errors, expectedLabels2, "Should have error count for shard 20-40") + require.Equal(t, int64(0), errors[expectedLabels1], "Shard -20 should have 0 errors") + require.Equal(t, int64(0), errors[expectedLabels2], "Shard 20-40 should have 0 errors") } func TestVStreamsMetricsErrors(t *testing.T) { diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go index 43a2a436bb3..3dcb64b0aee 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go @@ -417,10 +417,10 @@ func (uvs *uvstreamer) currentPosition() (replication.Position, error) { // 3. TablePKs not nil, startPos empty => table copy (for pks > lastPK) // 4. TablePKs not nil, startPos set => run catchup from startPos, then table copy (for pks > lastPK) // -// If TablesToCopy option is not nil, copy only the tables listed in TablesToCopy. -// For other tables not in TablesToCopy, if startPos is set, perform catchup starting from startPos. +// If table copy phase should run based on one of the previous states, then only copy the tables in +// TablesToCopy list. func (uvs *uvstreamer) init() error { - if uvs.startPos == "" /* full copy */ || len(uvs.inTablePKs) > 0 /* resume copy */ || len(uvs.options.GetTablesToCopy()) > 0 /* copy specific tables */ { + if uvs.startPos == "" /* full copy */ || len(uvs.inTablePKs) > 0 /* resume copy */ { if err := uvs.buildTablePlan(); err != nil { return err } diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go index d8359de1bd3..a9cf3928d2b 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go @@ -2307,3 +2307,30 @@ func TestFilteredIsNullOperator(t *testing.T) { }) } } + +func TestUVStreamerNoCopyWithGTID(t *testing.T) { + execStatements(t, []string{ + "create table t1(id int, val varchar(128), primary key(id))", + "insert into t1 values (1, 'val1')", + }) + defer execStatements(t, []string{ + "drop table t1", + }) + ctx := context.Background() + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select * from t1", + }}, + } + pos := primaryPosition(t) + options := &binlogdatapb.VStreamOptions{ + TablesToCopy: []string{"t1"}, + } + uvs := newUVStreamer(ctx, engine, env.Dbcfgs.DbaWithDB(), env.SchemaEngine, pos, + nil, filter, testLocalVSchema, throttlerapp.VStreamerName, + func([]*binlogdatapb.VEvent) error { return nil }, options) + err := uvs.init() + require.NoError(t, err) + require.Empty(t, uvs.plans, "Should not build table plans when startPos is set") +} diff --git a/proto/vtgate.proto b/proto/vtgate.proto index f798a1ca6ca..d1633785e94 100644 --- a/proto/vtgate.proto +++ b/proto/vtgate.proto @@ -373,6 +373,12 @@ message VStreamFlags { repeated string tables_to_copy = 9; // Exclude the keyspace from the table name that is sent to the vstream client bool exclude_keyspace_from_table_name = 10; + // Transaction chunk threshold in bytes. When a transaction exceeds this size, + // VTGate will acquire a lock to ensure contiguous, non-interleaved delivery + // (BEGIN...ROW...COMMIT sent sequentially without mixing events from other shards). + // Events are still chunked to prevent OOM. Transactions smaller than this are sent + // without locking for better parallelism. + int64 transaction_chunk_size = 11; } // VStreamRequest is the payload for VStream.