Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/dave/jennifer v1.7.0
github.com/evanphx/json-patch v5.9.0+incompatible
github.com/fsnotify/fsnotify v1.7.0
github.com/go-sql-driver/mysql v1.7.1
github.com/go-sql-driver/mysql v1.8.1
github.com/golang/glog v1.2.0
github.com/golang/protobuf v1.5.4
github.com/golang/snappy v0.0.4
Expand Down Expand Up @@ -114,7 +114,10 @@ require (
modernc.org/sqlite v1.29.2
)

require github.com/go-chi/chi/v5 v5.0.10 // indirect
require (
filippo.io/edwards25519 v1.1.0 // indirect
github.com/go-chi/chi/v5 v5.0.10 // indirect
)

require (
cloud.google.com/go v0.112.1 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ cloud.google.com/go/iam v1.1.6 h1:bEa06k05IO4f4uJonbB5iAgKTPpABy1ayxaIZV/GHVc=
cloud.google.com/go/iam v1.1.6/go.mod h1:O0zxdPeGBoFdWW3HWmBxJsk0pfvNM/p/qa82rWOGTwI=
cloud.google.com/go/storage v1.39.0 h1:brbjUa4hbDHhpQf48tjqMaXEV+f1OGoaTmQau9tmCsA=
cloud.google.com/go/storage v1.39.0/go.mod h1:OAEj/WZwUYjA3YHQ10/YcN9ttGuEpLwvaoyBXIPikEk=
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9vkmnHYOMsOr4WLk+Vo07yKIzd94sVoIqshQ4bU=
github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8=
github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U=
Expand Down Expand Up @@ -153,6 +155,8 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-sql-driver/mysql v1.7.1 h1:lUIinVbN1DY0xBg0eMOzmmtGoHwWBbvnWubQUrtU8EI=
github.com/go-sql-driver/mysql v1.7.1/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI=
github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y=
github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
Expand Down
Binary file removed go/cmd/vtgateproxy/vtgateproxy
Binary file not shown.
2 changes: 1 addition & 1 deletion go/vt/vtgateproxy/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ func (b *JSONGateResolverBuilder) update(r *JSONGateResolver) error {
if r.currentAddrs != nil && warmupTime.Seconds() > 0 {
combined := append(r.currentAddrs, addrs...)
log.V(100).Infof("updating targets for %s to warmup %v", r.target.URL.String(), targets)
r.clientConn.UpdateState(resolver.State{Addresses: combined})
_ = r.clientConn.UpdateState(resolver.State{Addresses: combined})
time.Sleep(*warmupTime)
}

Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgateproxy/firstready_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (f *frPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
log.V(100).Infof("first_ready: Build called with info: %v", info)

if len(info.ReadySCs) == 0 {
return base.NewErrPicker(errors.New("no available connections"))
return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
}

f.mu.Lock()
Expand Down
103 changes: 78 additions & 25 deletions go/vt/vtgateproxy/mysql_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@ import (
"time"

"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vtenv"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/vtgateconn"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/callerid"
Expand All @@ -56,6 +59,8 @@ var (
mysqlAllowClearTextWithoutTLS = flag.Bool("mysql_allow_clear_text_without_tls", false, "If set, the server will allow the use of a clear text password over non-SSL connections.")
mysqlProxyProtocol = flag.Bool("proxy_protocol", false, "Enable HAProxy PROXY protocol on MySQL listener socket")
mysqlConnBufferPooling = flag.Bool("mysql_conn_buffer_pooling", false, "Enable mysql conn buffer pooling.")
mysqlKeepAlivePeriod = flag.Duration("mysql-server-keepalive-period", 0*time.Second, "TCP period between keep-alives")
mysqlServerFlushDelay = flag.Duration("mysql_server_flush_delay", 100*time.Millisecond, "Delay after which buffered response will be flushed to the client.")

mysqlServerRequireSecureTransport = flag.Bool("mysql_server_require_secure_transport", false, "Reject insecure connections but only if mysql_server_ssl_cert and mysql_server_ssl_key are provided")

Expand All @@ -68,7 +73,7 @@ var (

mysqlSslServerCA = flag.String("mysql_server_ssl_server_ca", "", "path to server CA in PEM format, which will be combine with server cert, return full certificate chain to clients")

mysqlSlowConnectWarnThreshold = flag.Duration("mysql_slow_connect_warn_threshold", 0, "Warn if it takes more than the given threshold for a mysql connection to establish")
mysqlSlowConnectWarnThreshold = flag.Int64("mysql_slow_connect_warn_threshold", 0, "Warn if it takes more than the given threshold for a mysql connection to establish")

mysqlConnReadTimeout = flag.Duration("mysql_server_read_timeout", 0, "connection read timeout")
mysqlConnWriteTimeout = flag.Duration("mysql_server_write_timeout", 0, "connection write timeout")
Expand All @@ -80,29 +85,43 @@ var (
busyConnections int32
)

// proxyHandler implements the Listener interface.
// proxyHandler implements the mysql.Handler interface.
// It stores the Session in the ClientData of a Connection.
type proxyHandler struct {
mysql.UnimplementedHandler
mu sync.Mutex

env *vtenv.Environment
mu sync.Mutex
proxy *VTGateProxy
}

func newProxyHandler(proxy *VTGateProxy) *proxyHandler {
func newProxyHandler(proxy *VTGateProxy) (*proxyHandler, error) {
env, err := vtenv.New(vtenv.Options{
MySQLServerVersion: servenv.MySQLServerVersion(),
TruncateUILen: servenv.TruncateUILen,
TruncateErrLen: servenv.TruncateErrLen,
})
if err != nil {
return nil, fmt.Errorf("unable to initialize env: %v", err)
}

return &proxyHandler{
env: env,
proxy: proxy,
}
}, nil
}

// NewConnection is called when a connection is created.
// It is not established yet. The handler can decide to
// set StatusFlags that will be returned by the handshake methods.
// In particular, ServerStatusAutocommit might be set.
func (ph *proxyHandler) NewConnection(c *mysql.Conn) {
}

func (ph *proxyHandler) ComResetConnection(c *mysql.Conn) {
ctx := context.Background()
ph.closeSession(ctx, c)
// ConnectionReady is called after the connection handshake, but
// before we begin to process commands.
func (ph *proxyHandler) ConnectionReady(c *mysql.Conn) {
}

// ConnectionClosed is called when a connection is closed.
func (ph *proxyHandler) ConnectionClosed(c *mysql.Conn) {
// Rollback if there is an ongoing transaction. Ignore error.
defer func() {
Expand Down Expand Up @@ -155,6 +174,10 @@ func startSpan(ctx context.Context, query, label string) (trace.Span, context.Co
return startSpanTestable(ctx, query, label, trace.NewSpan, trace.NewFromString)
}

// ComQuery is called when a connection receives a query.
// Note the contents of the query slice may change after
// the first call to callback. So the Handler should not
// hang on to the byte slice.
func (ph *proxyHandler) ComQuery(c *mysql.Conn, query string, callback func(*sqltypes.Result) error) error {
ctx := context.Background()
var cancel context.CancelFunc
Expand Down Expand Up @@ -198,12 +221,12 @@ func (ph *proxyHandler) ComQuery(c *mysql.Conn, query string, callback func(*sql

if session.SessionPb().Options.Workload == querypb.ExecuteOptions_OLAP {
err := ph.proxy.StreamExecute(ctx, session, query, make(map[string]*querypb.BindVariable), callback)
return mysql.NewSQLErrorFromError(err)
return sqlerror.NewSQLErrorFromError(err)
}

result, err := ph.proxy.Execute(ctx, session, query, make(map[string]*querypb.BindVariable))

if err := mysql.NewSQLErrorFromError(err); err != nil {
if err := sqlerror.NewSQLErrorFromError(err); err != nil {
return err
}
fillInTxStatusFlags(c, session)
Expand All @@ -223,7 +246,8 @@ func fillInTxStatusFlags(c *mysql.Conn, session *vtgateconn.VTGateSession) {
}
}

// ComPrepare is the handler for command prepare.
// ComPrepare is called when a connection receives a prepared
// statement query.
func (ph *proxyHandler) ComPrepare(c *mysql.Conn, query string, bindVars map[string]*querypb.BindVariable) ([]*querypb.Field, error) {
var ctx context.Context
var cancel context.CancelFunc
Expand Down Expand Up @@ -262,13 +286,15 @@ func (ph *proxyHandler) ComPrepare(c *mysql.Conn, query string, bindVars map[str
}(session)

_, fld, err := ph.proxy.Prepare(ctx, session, query, bindVars)
err = mysql.NewSQLErrorFromError(err)
err = sqlerror.NewSQLErrorFromError(err)
if err != nil {
return nil, err
}
return fld, nil
}

// ComStmtExecute is called when a connection receives a statement
// execute query.
func (ph *proxyHandler) ComStmtExecute(c *mysql.Conn, prepare *mysql.PrepareData, callback func(*sqltypes.Result) error) error {
var ctx context.Context
var cancel context.CancelFunc
Expand Down Expand Up @@ -308,18 +334,38 @@ func (ph *proxyHandler) ComStmtExecute(c *mysql.Conn, prepare *mysql.PrepareData

if session.SessionPb().Options.Workload == querypb.ExecuteOptions_OLAP {
err := ph.proxy.StreamExecute(ctx, session, prepare.PrepareStmt, prepare.BindVars, callback)
return mysql.NewSQLErrorFromError(err)
return sqlerror.NewSQLErrorFromError(err)
}

qr, err := ph.proxy.Execute(ctx, session, prepare.PrepareStmt, prepare.BindVars)
if err != nil {
return mysql.NewSQLErrorFromError(err)
return sqlerror.NewSQLErrorFromError(err)
}
fillInTxStatusFlags(c, session)

return callback(qr)
}

// ComRegisterReplica is called when a connection receives a ComRegisterReplica request
func (ph *proxyHandler) ComRegisterReplica(c *mysql.Conn, replicaHost string, replicaPort uint16, replicaUser string, replicaPassword string) error {
return vterrors.New(vtrpcpb.Code_UNIMPLEMENTED, "ComRegisterReplica")
}

// ComBinlogDump is called when a connection receives a ComBinlogDump request
func (ph *proxyHandler) ComBinlogDump(c *mysql.Conn, logFile string, binlogPos uint32) error {
return vterrors.New(vtrpcpb.Code_UNIMPLEMENTED, "ComBinlogDump")
}

// ComBinlogDumpGTID is part of the mysql.Handler interface.
func (ph *proxyHandler) ComBinlogDumpGTID(c *mysql.Conn, logFile string, logPos uint64, gtidSet replication.GTIDSet) error {
return vterrors.New(vtrpcpb.Code_UNIMPLEMENTED, "ComBinlogDumpGTID")
}

// WarningCount is called at the end of each query to obtain
// the value to be returned to the client in the EOF packet.
// Note that this will be called either in the context of the
// ComQuery callback if the result does not contain any fields,
// or after the last ComQuery call completes.
func (ph *proxyHandler) WarningCount(c *mysql.Conn) uint16 {
session, _ := c.ClientData.(*vtgateconn.VTGateSession)
if session == nil {
Expand All @@ -329,9 +375,13 @@ func (ph *proxyHandler) WarningCount(c *mysql.Conn) uint16 {
return uint16(len(session.SessionPb().GetWarnings()))
}

// ComBinlogDumpGTID is part of the mysql.Handler interface.
func (ph *proxyHandler) ComBinlogDumpGTID(c *mysql.Conn, gtidSet mysql.GTIDSet) error {
return vterrors.New(vtrpcpb.Code_UNIMPLEMENTED, "ComBinlogDumpGTID")
func (ph *proxyHandler) ComResetConnection(c *mysql.Conn) {
ctx := context.Background()
ph.closeSession(ctx, c)
}

func (ph *proxyHandler) Env() *vtenv.Environment {
return ph.env
}

func (ph *proxyHandler) getSession(ctx context.Context, c *mysql.Conn) (*vtgateconn.VTGateSession, error) {
Expand Down Expand Up @@ -437,10 +487,13 @@ func initMySQLProtocol() {

// Create a Listener.
var err error
proxyHandle = newProxyHandler(vtGateProxy)
proxyHandle, err = newProxyHandler(vtGateProxy)
if err != nil {
log.Exitf("newProxyHandler failed: %v", err)
}
if *mysqlServerPort >= 0 {
log.Infof("Mysql Server listening on Port %d", *mysqlServerPort)
mysqlListener, err = mysql.NewListener(*mysqlTCPVersion, net.JoinHostPort(*mysqlServerBindAddress, fmt.Sprintf("%v", *mysqlServerPort)), authServer, proxyHandle, *mysqlConnReadTimeout, *mysqlConnWriteTimeout, *mysqlProxyProtocol, *mysqlConnBufferPooling)
mysqlListener, err = mysql.NewListener(*mysqlTCPVersion, net.JoinHostPort(*mysqlServerBindAddress, fmt.Sprintf("%v", *mysqlServerPort)), authServer, proxyHandle, *mysqlConnReadTimeout, *mysqlConnWriteTimeout, *mysqlProxyProtocol, *mysqlConnBufferPooling, *mysqlKeepAlivePeriod, *mysqlServerFlushDelay)
if err != nil {
log.Exitf("mysql.NewListener failed: %v", err)
}
Expand All @@ -455,11 +508,11 @@ func initMySQLProtocol() {

_ = initTLSConfig(mysqlListener, *mysqlSslCert, *mysqlSslKey, *mysqlSslCa, *mysqlSslCrl, *mysqlSslServerCA, *mysqlServerRequireSecureTransport, tlsVersion)
}
mysqlListener.AllowClearTextWithoutTLS.Set(*mysqlAllowClearTextWithoutTLS)
mysqlListener.AllowClearTextWithoutTLS.Store(*mysqlAllowClearTextWithoutTLS)
// Check for the connection threshold
if *mysqlSlowConnectWarnThreshold != 0 {
log.Infof("setting mysql slow connection threshold to %v", mysqlSlowConnectWarnThreshold)
mysqlListener.SlowConnectWarnThreshold.Set(*mysqlSlowConnectWarnThreshold)
mysqlListener.SlowConnectWarnThreshold.Store(*mysqlSlowConnectWarnThreshold)
}
// Start listening for tcp
go mysqlListener.Accept()
Expand All @@ -483,7 +536,7 @@ func initMySQLProtocol() {
// newMysqlUnixSocket creates a new unix socket mysql listener. If a socket file already exists, attempts
// to clean it up.
func newMysqlUnixSocket(address string, authServer mysql.AuthServer, handler mysql.Handler) (*mysql.Listener, error) {
listener, err := mysql.NewListener("unix", address, authServer, handler, *mysqlConnReadTimeout, *mysqlConnWriteTimeout, false, false)
listener, err := mysql.NewListener("unix", address, authServer, handler, *mysqlConnReadTimeout, *mysqlConnWriteTimeout, false, false, *mysqlKeepAlivePeriod, *mysqlServerFlushDelay)
switch err := err.(type) {
case nil:
return listener, nil
Expand All @@ -504,7 +557,7 @@ func newMysqlUnixSocket(address string, authServer mysql.AuthServer, handler mys
log.Errorf("Couldn't remove existent socket file: %s", address)
return nil, err
}
listener, listenerErr := mysql.NewListener("unix", address, authServer, handler, *mysqlConnReadTimeout, *mysqlConnWriteTimeout, false, false)
listener, listenerErr := mysql.NewListener("unix", address, authServer, handler, *mysqlConnReadTimeout, *mysqlConnWriteTimeout, false, false, *mysqlKeepAlivePeriod, *mysqlServerFlushDelay)
return listener, listenerErr
default:
return nil, err
Expand Down
5 changes: 4 additions & 1 deletion go/vt/vtgateproxy/vtgateproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,10 @@ func (proxy *VTGateProxy) Execute(ctx context.Context, session *vtgateconn.VTGat

// Intercept "use" statements since they just have to update the local session
if strings.HasPrefix(sql, "use ") {
targetString := sqlescape.UnescapeID(sql[4:])
targetString, err := sqlescape.UnescapeID(sql[4:])
if err != nil {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Somewhat minor question but actually a bit thought provoking as well...

Suppose this parsing fails for whatever reason. Is it better to fail (as you did here) or just fall through and call session.Execute below? Since the whole point of this use statement hijacking is to avoid a round trip, if for some rare reason vtgate is able to process it but the proxy isn't then could we be unnecesarily failing?

Then again maybe the case where we can't unescape an ID is rare enough that it's just plain irrelevant?

@henryr curious about your thoughts as well.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's ok to fail. I think it's just as likely that we create a dependency elsewhere in the proxy where we assume that we've always successfully intercepted use statements, as a situation arises where we can't unescape the argument.

return &sqltypes.Result{}, fmt.Errorf("failed to unescape use statement target string: %w", err)
}
session.SessionPb().TargetString = targetString
return &sqltypes.Result{}, nil
}
Expand Down
Loading