Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ jobs:

# Running a test with vtctl at version n and vttablet at version n-1
- name: Run reparent tests (vtctl=N, vttablet=N-1)
if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true'
if: false
run: |
rm -rf /tmp/vtdataroot
mkdir -p /tmp/vtdataroot
Expand Down
28 changes: 10 additions & 18 deletions go/mysql/replication/replication_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,41 +178,33 @@ func ProtoToReplicationStatus(s *replicationdatapb.Status) ReplicationStatus {
}

// FindErrantGTIDs can be used to find errant GTIDs in the receiver's relay log, by comparing it against all known replicas,
// provided as a list of ReplicationStatus's. This method only works if the flavor for all retrieved ReplicationStatus's is MySQL.
// provided as a list of Positions. This method only works if the flavor for all retrieved Positions is MySQL.
// The result is returned as a Mysql56GTIDSet, each of whose elements is a found errant GTID.
// This function is best effort in nature. If it marks something as errant, then it is for sure errant. But there may be cases of errant GTIDs, which aren't caught by this function.
func (s *ReplicationStatus) FindErrantGTIDs(otherReplicaStatuses []*ReplicationStatus) (Mysql56GTIDSet, error) {
if len(otherReplicaStatuses) == 0 {
func FindErrantGTIDs(position Position, sourceUUID SID, otherPositions []Position) (Mysql56GTIDSet, error) {
if len(otherPositions) == 0 {
// If there is nothing to compare this replica against, then we must assume that its GTID set is the correct one.
return nil, nil
}

relayLogSet, ok := s.RelayLogPosition.GTIDSet.(Mysql56GTIDSet)
gtidSet, ok := position.GTIDSet.(Mysql56GTIDSet)
if !ok {
return nil, fmt.Errorf("errant GTIDs can only be computed on the MySQL flavor")
}

otherSets := make([]Mysql56GTIDSet, 0, len(otherReplicaStatuses))
for _, status := range otherReplicaStatuses {
otherSet, ok := status.RelayLogPosition.GTIDSet.(Mysql56GTIDSet)
otherSets := make([]Mysql56GTIDSet, 0, len(otherPositions))
for _, pos := range otherPositions {
otherSet, ok := pos.GTIDSet.(Mysql56GTIDSet)
if !ok {
panic("The receiver ReplicationStatus contained a Mysql56GTIDSet in its relay log, but a replica's ReplicationStatus is of another flavor. This should never happen.")
}
otherSets = append(otherSets, otherSet)
}

if len(otherSets) == 1 {
// If there is only one replica to compare against, and one is a subset of the other, then we consider them not to be errant.
// It simply means that one replica might be behind on replication.
if relayLogSet.Contains(otherSets[0]) || otherSets[0].Contains(relayLogSet) {
return nil, nil
}
}

// Copy set for final diffSet so we don't mutate receiver.
diffSet := make(Mysql56GTIDSet, len(relayLogSet))
for sid, intervals := range relayLogSet {
if sid == s.SourceUUID {
diffSet := make(Mysql56GTIDSet, len(gtidSet))
for sid, intervals := range gtidSet {
if sid == sourceUUID {
continue
}
diffSet[sid] = intervals
Expand Down
30 changes: 16 additions & 14 deletions go/mysql/replication/replication_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,40 +86,42 @@ func TestFindErrantGTIDs(t *testing.T) {
}

testcases := []struct {
mainRepStatus *ReplicationStatus
otherRepStatuses []*ReplicationStatus
want Mysql56GTIDSet
mainRepStatus *ReplicationStatus
otherPositions []Position
want Mysql56GTIDSet
}{{
mainRepStatus: &ReplicationStatus{SourceUUID: sourceSID, RelayLogPosition: Position{GTIDSet: set1}},
otherRepStatuses: []*ReplicationStatus{
{SourceUUID: sourceSID, RelayLogPosition: Position{GTIDSet: set2}},
{SourceUUID: sourceSID, RelayLogPosition: Position{GTIDSet: set3}},
otherPositions: []Position{
{GTIDSet: set2},
{GTIDSet: set3},
},
want: Mysql56GTIDSet{
sid1: []interval{{39, 39}, {40, 49}, {71, 75}},
sid2: []interval{{1, 2}, {6, 7}, {20, 21}, {26, 31}, {38, 50}, {60, 66}},
sid4: []interval{{1, 30}},
},
}, {
mainRepStatus: &ReplicationStatus{SourceUUID: sourceSID, RelayLogPosition: Position{GTIDSet: set1}},
otherRepStatuses: []*ReplicationStatus{{SourceUUID: sid1, RelayLogPosition: Position{GTIDSet: set1}}},
mainRepStatus: &ReplicationStatus{SourceUUID: sourceSID, RelayLogPosition: Position{GTIDSet: set1}},
otherPositions: []Position{{GTIDSet: set1}},
// servers with the same GTID sets should not be diagnosed with errant GTIDs
want: nil,
}, {
mainRepStatus: &ReplicationStatus{SourceUUID: sourceSID, RelayLogPosition: Position{GTIDSet: set2}},
otherRepStatuses: []*ReplicationStatus{{SourceUUID: sid1, RelayLogPosition: Position{GTIDSet: set3}}},
mainRepStatus: &ReplicationStatus{SourceUUID: sourceSID, RelayLogPosition: Position{GTIDSet: set2}},
otherPositions: []Position{{GTIDSet: set3}},
// set2 is a strict subset of set3
want: nil,
}, {
mainRepStatus: &ReplicationStatus{SourceUUID: sourceSID, RelayLogPosition: Position{GTIDSet: set3}},
otherRepStatuses: []*ReplicationStatus{{SourceUUID: sid1, RelayLogPosition: Position{GTIDSet: set2}}},
mainRepStatus: &ReplicationStatus{SourceUUID: sourceSID, RelayLogPosition: Position{GTIDSet: set3}},
otherPositions: []Position{{GTIDSet: set2}},
// set3 is a strict superset of set2
want: nil,
want: Mysql56GTIDSet{
sid1: []interval{{38, 38}, {61, 70}},
},
}}

for _, testcase := range testcases {
t.Run("", func(t *testing.T) {
got, err := testcase.mainRepStatus.FindErrantGTIDs(testcase.otherRepStatuses)
got, err := FindErrantGTIDs(testcase.mainRepStatus.RelayLogPosition, testcase.mainRepStatus.SourceUUID, testcase.otherPositions)
require.NoError(t, err)
require.Equal(t, testcase.want, got)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,3 +205,13 @@ func TestReplicationRepairAfterPrimaryTabletChange(t *testing.T) {
// sidecardb should find the desired _vt schema and not apply any new creates or upgrades when the tablet comes up again
require.Equal(t, sidecarDDLCount, int64(0))
}

func TestReparentJournalInfo(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
for _, vttablet := range clusterInstance.Keyspaces[0].Shards[0].Vttablets {
length, err := tmClient.ReadReparentJournalInfo(ctx, getTablet(vttablet.GrpcPort))
require.NoError(t, err)
require.EqualValues(t, 1, length)
}
}
1 change: 1 addition & 0 deletions go/test/endtoend/vtgate/queries/misc/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ func TestTransactionModeVar(t *testing.T) {

// TestAliasesInOuterJoinQueries tests that aliases work in queries that have outer join clauses.
func TestAliasesInOuterJoinQueries(t *testing.T) {
utils.SkipIfBinaryIsBelowVersion(t, 21, "vtgate")
mcmp, closer := start(t)
defer closer()

Expand Down
6 changes: 6 additions & 0 deletions go/vt/mysqlctl/reparent.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ This file contains the reparenting methods for mysqlctl.

import (
"context"
"fmt"
"time"

"vitess.io/vitess/go/constants/sidecar"
Expand Down Expand Up @@ -53,6 +54,11 @@ func PopulateReparentJournal(timeCreatedNS int64, actionName, primaryAlias strin
timeCreatedNS, actionName, primaryAlias, posStr).Query
}

// ReadReparentJournalInfoQuery returns the query we use to read information required from Reparent Journal.
func ReadReparentJournalInfoQuery() string {
return fmt.Sprintf("SELECT COUNT(*) FROM %s.reparent_journal", sidecar.GetIdentifier())
}

// queryReparentJournal returns the SQL query to use to query the database
// for a reparent_journal row.
func queryReparentJournal(timeCreatedNS int64) string {
Expand Down
Loading
Loading