Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
dbfee5c
VTGate VStream: Ensure reasonable delivery time for reshard journal e…
makinje16 Aug 29, 2024
e4f341c
Backport sqlparser patch for v15->v19 upgrade: 14763 Fix accepting bi…
tanjinx Jan 23, 2025
2f836a5
Upgrade vitess addons to 0.19.8 (#591)
ejortegau Jan 24, 2025
3e1481c
Use prefix in all vtorc check and recover logs (#17526) (#592)
ejortegau Jan 24, 2025
5ff39d2
`slack-19.0`: various backports for `vtorc`, part 2 (#596)
timvaillancourt Feb 5, 2025
0edc5e6
`slack-19.0`: `vtorc`: improve handling of partial cell topo results …
timvaillancourt Feb 10, 2025
00069e8
`slack-19.0`: skip tests that will fail on v15 downgrade testing (#605)
timvaillancourt Feb 14, 2025
a6971a5
`slack-19.0`: Add stats for shards watched by VTOrc (#606)
timvaillancourt Feb 17, 2025
5eb0b1c
Add `GetServerStatus` RPC to use in PRS (#16022) (#607)
timvaillancourt Feb 18, 2025
ea4740d
backport/patch connection pool bug/perf fixes (#604)
tanjinx Feb 19, 2025
c5f46ac
pool: reopen connection closed by idle timeout (#17818) (#609)
tanjinx Feb 19, 2025
10e6e8b
VReplication: Support excluding lagging tablets and use this in vstre…
twthorn Feb 24, 2025
e084d42
`slack-19.0`: backport v22 VTOrc optimizations, part 2 (#613)
timvaillancourt Feb 26, 2025
3a68b52
Add stats for shards watched by VTOrc, purge stale shards (#17815) (#…
timvaillancourt Mar 5, 2025
e1b2c48
--consolidator-query-waiter-cap to set the max number of waiter for c…
timvaillancourt Mar 6, 2025
3146f83
`slack-19.0` backport v22 `vtorc` optimizations + stats, part 3 (#618)
timvaillancourt Mar 10, 2025
005917f
Bp pr 17558 pr 17858.slack19.0 (#615)
twthorn Mar 12, 2025
ac11a19
`slack-19.0`: re-backport tweaks from vitessio/vitess#17911 (#621)
timvaillancourt Mar 12, 2025
e1e335e
fix releasing the global read lock when mysqlshell backup fails (#170…
rvrangel Mar 13, 2025
a4b2b49
VStream API: allow keyspace-level heartbeats to be streamed (#16593) …
makinje16 Mar 13, 2025
6715b78
Increase health check channel buffer (#17821) (#625)
makinje16 Mar 17, 2025
413f6cc
VStream: Allow for automatic resume after Reshard across VStreams (#1…
tanjinx Mar 18, 2025
0e59f8d
Merge branch 'slack-19.0' into slack-19.0-vtgate-journal-events
makinje16 Mar 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions examples/local/vstream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ import (
"log"
"time"

vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
_ "vitess.io/vitess/go/vt/vtctl/grpcvtctlclient"
_ "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn"
"vitess.io/vitess/go/vt/vtgate/vtgateconn"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
)

/*
Expand Down Expand Up @@ -73,15 +73,18 @@ func main() {
}
defer conn.Close()
flags := &vtgatepb.VStreamFlags{
//MinimizeSkew: false,
//HeartbeatInterval: 60, //seconds
// MinimizeSkew: false,
// HeartbeatInterval: 60, //seconds
// StopOnReshard: true,
}
reader, err := conn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
if err != nil {
log.Fatal(err)
}
for {
e, err := reader.Recv()
switch err {
case nil:
_ = e
fmt.Printf("%v\n", e)
case io.EOF:
fmt.Printf("stream ended\n")
Expand Down
220 changes: 210 additions & 10 deletions go/test/endtoend/vreplication/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ import (
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
_ "vitess.io/vitess/go/vt/vtctl/grpcvtctlclient"
_ "vitess.io/vitess/go/vt/vtgate/grpcvtgateconn"

"vitess.io/vitess/go/vt/vtgate/vtgateconn"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
)

// Validates that we have a working VStream API
Expand Down Expand Up @@ -606,8 +606,8 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) {
// Stream events but stop once we have a VGTID with positions for the old/original shards.
var newVGTID *binlogdatapb.VGtid
func() {
var reader vtgateconn.VStreamReader
reader, err = vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)

require.NoError(t, err)
for {
evs, err := reader.Recv()
Expand Down Expand Up @@ -661,8 +661,8 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) {

// Now start a new VStream from our previous VGTID which only has the old/original shards.
func() {
var reader vtgateconn.VStreamReader
reader, err = vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, newVGTID, filter, flags)
reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, newVGTID, filter, flags)

require.NoError(t, err)
for {
evs, err := reader.Recv()
Expand Down Expand Up @@ -697,8 +697,8 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) {
}()

// We should have a mix of events across the old and new shards.
require.NotZero(t, oldShardRowEvents)
require.NotZero(t, newShardRowEvents)
require.Greater(t, oldShardRowEvents, 0)
require.Greater(t, newShardRowEvents, 0)

// The number of row events streamed by the VStream API should match the number of rows inserted.
customerResult := execVtgateQuery(t, vtgateConn, ks, "select count(*) from customer")
Expand All @@ -707,6 +707,206 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) {
require.Equal(t, customerCount, int64(oldShardRowEvents+newShardRowEvents))
}

// TestMultiVStreamsKeyspaceStopOnReshard confirms that journal events are received
// when resuming a VStream after a reshard.
func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) {
ctx := context.Background()
ks := "testks"
wf := "multiVStreamsKeyspaceReshard"
baseTabletID := 100
tabletType := topodatapb.TabletType_PRIMARY.String()
oldShards := "-80,80-"
newShards := "-40,40-80,80-c0,c0-"
oldShardRowEvents, journalEvents := 0, 0
vc = NewVitessCluster(t, nil)
defer vc.TearDown()
defaultCell := vc.Cells[vc.CellNames[0]]
ogdr := defaultReplicas
defaultReplicas = 0 // Because of CI resource constraints we can only run this test with primary tablets
defer func(dr int) { defaultReplicas = dr }(ogdr)

// For our sequences etc.
_, err := vc.AddKeyspace(t, []*Cell{defaultCell}, "global", "0", vschemaUnsharded, schemaUnsharded, defaultReplicas, defaultRdonly, baseTabletID, nil)
require.NoError(t, err)

// Setup the keyspace with our old/original shards.
keyspace, err := vc.AddKeyspace(t, []*Cell{defaultCell}, ks, oldShards, vschemaSharded, schemaSharded, defaultReplicas, defaultRdonly, baseTabletID+1000, nil)
require.NoError(t, err)

// Add the new shards.
err = vc.AddShards(t, []*Cell{defaultCell}, keyspace, newShards, defaultReplicas, defaultRdonly, baseTabletID+2000, targetKsOpts)
require.NoError(t, err)

vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
verifyClusterHealth(t, vc)

vstreamConn, err := vtgateconn.Dial(ctx, fmt.Sprintf("%s:%d", vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateGrpcPort))
require.NoError(t, err)
defer vstreamConn.Close()

// Ensure that we're starting with a clean slate.
_, err = vtgateConn.ExecuteFetch(fmt.Sprintf("delete from %s.customer", ks), 1000, false)
require.NoError(t, err)

// Coordinate go-routines.
streamCtx, streamCancel := context.WithTimeout(ctx, 1*time.Minute)
defer streamCancel()
done := make(chan struct{})

// First goroutine that keeps inserting rows into the table being streamed until the
// stream context is cancelled.
go func() {
id := 1
for {
select {
case <-streamCtx.Done():
// Give the VStream a little catch-up time before telling it to stop
// via the done channel.
time.Sleep(10 * time.Second)
close(done)
return
default:
insertRow(ks, "customer", id)
time.Sleep(250 * time.Millisecond)
id++
}
}
}()

// Create the Reshard workflow and wait for it to finish the copy phase.
reshardAction(t, "Create", wf, ks, oldShards, newShards, defaultCellName, tabletType)
waitForWorkflowState(t, vc, fmt.Sprintf("%s.%s", ks, wf), binlogdatapb.VReplicationWorkflowState_Running.String())

vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
// Only stream the keyspace that we're resharding. Otherwise the client stream
// will continue to run with only the tablet stream from the global keyspace.
Keyspace: ks,
}}}

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
// Stream all tables.
Match: "/.*",
}},
}
flags := &vtgatepb.VStreamFlags{
StopOnReshard: true,
}

// Stream events but stop once we have a VGTID with positions for the old/original shards.
var newVGTID *binlogdatapb.VGtid
func() {
reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
require.NoError(t, err)
for {
evs, err := reader.Recv()

switch err {
case nil:
for _, ev := range evs {
switch ev.Type {
case binlogdatapb.VEventType_ROW:
shard := ev.GetRowEvent().GetShard()
switch shard {
case "-80", "80-":
oldShardRowEvents++
default:
require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard))
}
case binlogdatapb.VEventType_VGTID:
newVGTID = ev.GetVgtid()
// We want a VGTID with a ShardGtid for both of the old shards.
if len(newVGTID.GetShardGtids()) == 2 {
canStop := true
for _, sg := range newVGTID.GetShardGtids() {
if sg.GetGtid() == "" {
canStop = false
}
}
if canStop {
return
}
}
}
}
default:
require.FailNow(t, fmt.Sprintf("VStream returned unexpected error: %v", err))
}
select {
case <-streamCtx.Done():
return
default:
}
}
}()

// Confirm that we have shard GTIDs for the old/original shards.
require.Len(t, newVGTID.GetShardGtids(), 2)
t.Logf("Position at end of first stream: %+v", newVGTID.GetShardGtids())

// Switch the traffic to the new shards.
reshardAction(t, "SwitchTraffic", wf, ks, oldShards, newShards, defaultCellName, tabletType)

// Now start a new VStream from our previous VGTID which only has the old/original shards.
expectedJournalEvents := 2 // One for each old shard: -80,80-
var streamStopped bool // We expect the stream to end with io.EOF from the reshard
runResumeStream := func() {
journalEvents = 0
streamStopped = false
t.Logf("Streaming from position: %+v", newVGTID.GetShardGtids())
reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, newVGTID, filter, flags)
require.NoError(t, err)
for {
evs, err := reader.Recv()

switch err {
case nil:
for i, ev := range evs {
switch ev.Type {
case binlogdatapb.VEventType_ROW:
shard := ev.RowEvent.Shard
switch shard {
case "-80", "80-":
default:
require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard))
}
case binlogdatapb.VEventType_JOURNAL:
t.Logf("Journal event: %+v", ev)
journalEvents++
require.Equal(t, binlogdatapb.VEventType_BEGIN, evs[i-1].Type, "JOURNAL event not preceded by BEGIN event")
require.Equal(t, binlogdatapb.VEventType_VGTID, evs[i+1].Type, "JOURNAL event not followed by VGTID event")
require.Equal(t, binlogdatapb.VEventType_COMMIT, evs[i+2].Type, "JOURNAL event not followed by COMMIT event")
}
}
case io.EOF:
streamStopped = true
return
default:
require.FailNow(t, fmt.Sprintf("VStream returned unexpected error: %v", err))
}
select {
case <-done:
return
default:
}
}
}

// Multiple VStream clients should be able to resume from where they left off and
// get the reshard journal event.
for i := 1; i <= expectedJournalEvents; i++ {
runResumeStream()
// We should have seen the journal event for each shard in the stream due to
// using StopOnReshard.
require.Equal(t, expectedJournalEvents, journalEvents,
"did not get expected journal events on resume vstream #%d", i)
// Confirm that the stream stopped on the reshard.
require.True(t, streamStopped, "the vstream did not stop with io.EOF as expected")
}
}

func TestVStreamFailover(t *testing.T) {
testVStreamWithFailover(t, true)
}
Expand Down
1 change: 0 additions & 1 deletion go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,6 @@ func (ts *trafficSwitcher) createJournals(ctx context.Context, sourceWorkflows [
})

}
log.Infof("Creating journal %v", journal)
ts.Logger().Infof("Creating journal: %v", journal)
statement := fmt.Sprintf("insert into _vt.resharding_journal "+
"(id, db_name, val) "+
Expand Down
35 changes: 31 additions & 4 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ const maxSkewTimeoutSeconds = 10 * 60
// for a vstream
const tabletPickerContextTimeout = 90 * time.Second

// stopOnReshardDelay is how long we wait, at a minimum, after sending a reshard journal event before
// ending the stream from the tablet.
const stopOnReshardDelay = 500 * time.Millisecond

// vstream contains the metadata for one VStream request.
type vstream struct {
// mu protects parts of vgtid, the semantics of a send, and journaler.
Expand Down Expand Up @@ -664,7 +668,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
sendingEventsErr := fmt.Sprintf("error sending event batch from tablet %s", tabletAliasString)

sendevents := make([]*binlogdatapb.VEvent, 0, len(events))
for _, event := range events {
for i, event := range events {
switch event.Type {
case binlogdatapb.VEventType_FIELD:
// Update table names and send.
Expand Down Expand Up @@ -714,12 +718,22 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
if err := vs.alignStreams(ctx, event, sgtid.Keyspace, sgtid.Shard); err != nil {
return vterrors.Wrap(err, aligningStreamsErr)
}

case binlogdatapb.VEventType_JOURNAL:
journal := event.Journal
// Journal events are not sent to clients by default, but only when StopOnReshard is set
// Journal events are not sent to clients by default, but only when
// StopOnReshard is set.
if vs.stopOnReshard && journal.MigrationType == binlogdatapb.MigrationType_SHARDS {
sendevents = append(sendevents, event)
// Read any subsequent events until we get the VGTID->COMMIT events that
// always follow the JOURNAL event which is generated as a result of
// an autocommit insert into the _vt.resharding_journal table on the
// tablet.
for j := i + 1; j < len(events); j++ {
sendevents = append(sendevents, events[j])
if events[j].Type == binlogdatapb.VEventType_COMMIT {
break
}
}
eventss = append(eventss, sendevents)
if err := vs.sendAll(ctx, sgtid, eventss); err != nil {
return vterrors.Wrap(err, sendingEventsErr)
Expand All @@ -733,13 +747,23 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
sgtid, tabletAliasString)
}
if je != nil {
// Wait till all other participants converge and return EOF.
// We're going to be ending the tablet stream, so we ensure a reasonable
// minimum amount of time is alloted for clients to Recv the journal event
// before the stream's context is cancelled (which would cause the grpc
// SendMsg or RecvMsg to fail). If the client doesn't Recv the journal
// event before the stream ends then they'll have to resume from the last
// ShardGtid they received before the journal event.
endTimer := time.NewTimer(stopOnReshardDelay)
defer endTimer.Stop()
// Wait until all other participants converge and then return EOF after
// the minimum delay has passed.
journalDone = je.done
select {
case <-ctx.Done():
return vterrors.Wrapf(ctx.Err(), "context ended while waiting for journal event for shard GTID %+v on tablet %s",
sgtid, tabletAliasString)
case <-journalDone:
<-endTimer.C
return io.EOF
}
}
Expand Down Expand Up @@ -1019,6 +1043,9 @@ func (vs *vstream) keyspaceHasBeenResharded(ctx context.Context, keyspace string
return false, err
}

vs.mu.Lock()
defer vs.mu.Unlock()

// First check the typical case, where the VGTID shards match the serving shards.
// In that case it's NOT possible that an applicable reshard has happened because
// the VGTID contains shards that are all serving.
Expand Down
Loading