Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/flags/endtoend/vtgate.txt
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ Flags:
--redact-debug-ui-queries redact full queries and bind variables from debug UI
--remote-operation-timeout duration time to wait for a remote operation (default 15s)
--retry-count int retry count (default 2)
--reuse-port Enable SO_REUSEPORT when binding sockets; available on Linux 3.9+ (default false)
--schema-change-signal Enable the schema tracker; requires queryserver-config-schema-change-signal to be enabled on the underlying vttablets for this to work (default true)
--security-policy string the name of a registered security policy to use for controlling access to URLs - empty means allow all for anyone (built-in policies: deny-all, read-only)
--service-map strings comma separated list of services to enable (or disable if prefixed with '-') Example: grpc-queryservice
Expand Down
12 changes: 7 additions & 5 deletions go/mysql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ func NewFromListener(
handler Handler,
connReadTimeout time.Duration,
connWriteTimeout time.Duration,
proxyProtocol bool,
connBufferPooling bool,
keepAlivePeriod time.Duration,
flushDelay time.Duration,
Expand All @@ -252,6 +253,11 @@ func NewFromListener(
FlushDelay: flushDelay,
MultiQuery: multiQuery,
}

if proxyProtocol {
cfg.Listener = &proxyproto.Listener{Listener: l}
}

return NewListenerWithConfig(cfg)
}

Expand All @@ -272,12 +278,8 @@ func NewListener(
if err != nil {
return nil, err
}
if proxyProtocol {
proxyListener := &proxyproto.Listener{Listener: listener}
return NewFromListener(proxyListener, authServer, handler, connReadTimeout, connWriteTimeout, connBufferPooling, keepAlivePeriod, flushDelay, multiQuery)
}

return NewFromListener(listener, authServer, handler, connReadTimeout, connWriteTimeout, connBufferPooling, keepAlivePeriod, flushDelay, multiQuery)
return NewFromListener(listener, authServer, handler, connReadTimeout, connWriteTimeout, proxyProtocol, connBufferPooling, keepAlivePeriod, flushDelay, multiQuery)
}

// ListenerConfig should be used with NewListenerWithConfig to specify listener parameters.
Expand Down
2 changes: 1 addition & 1 deletion go/mysql/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func TestConnectionFromListener(t *testing.T) {
listener, err := net.Listen("tcp", "127.0.0.1:")
require.NoError(t, err, "net.Listener failed")

l, err := NewFromListener(listener, authServer, th, 0, 0, false, 0, 0, false)
l, err := NewFromListener(listener, authServer, th, 0, 0, false, false, 0, 0, false)
require.NoError(t, err, "NewListener failed")
host, port := getHostPort(t, l.Addr())
fmt.Printf("host: %s, port: %d\n", host, port)
Expand Down
59 changes: 59 additions & 0 deletions go/netutil/reuseport_unix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
//go:build !windows

/*
Copyright 2026 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package netutil

import (
"context"
"fmt"
"net"
"syscall"

"golang.org/x/sys/unix"
)

// ListenReusePort binds a host:port and sets SO_REUSEPORT on the listener.
// SO_REUSEPORT allows multiple processes to bind to the same port, enabling
// kernel-level load balancing of incoming connections.
//
// Requires Linux 3.9+ or equivalent kernel support.
func ListenReusePort(network, address string) (net.Listener, error) {
switch network {
case "tcp", "tcp4", "tcp6":
default:
return nil, fmt.Errorf("SO_REUSEPORT: protocol not supported: %s", network)
}

lc := net.ListenConfig{
Control: func(network, address string, c syscall.RawConn) error {
var opErr error
err := c.Control(func(fd uintptr) {
opErr = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEPORT, 1)
})
if err != nil {
return err
}
if opErr != nil {
return fmt.Errorf("failed to set SO_REUSEPORT on %s %s: %w", network, address, opErr)
}
return nil
},
}

return lc.Listen(context.Background(), network, address)
}
57 changes: 57 additions & 0 deletions go/netutil/reuseport_unix_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
//go:build !windows

/*
Copyright 2026 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package netutil

import (
"net"
"testing"

"golang.org/x/sys/unix"
)

func TestListenReusePort(t *testing.T) {
l1, err := ListenReusePort("tcp", "127.0.0.1:0")
if err != nil {
t.Fatal(err)
}
defer l1.Close()

// Bind to the same address. This should be possible with SO_REUSEPORT.
addr := l1.Addr().String()
l2, err := ListenReusePort("tcp", addr)
if err != nil {
t.Fatal(err)
}
defer l2.Close()

tcpListener := l1.(*net.TCPListener)
file, err := tcpListener.File()
if err != nil {
t.Fatal(err)
}
defer file.Close()

val, err := unix.GetsockoptInt(int(file.Fd()), unix.SOL_SOCKET, unix.SO_REUSEPORT)
if err != nil {
t.Fatal(err)
}
if val != 1 {
t.Fatalf("SO_REUSEPORT not set: got %d, want 1", val)
}
}
34 changes: 34 additions & 0 deletions go/netutil/reuseport_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//go:build windows

/*
Copyright 2026 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package netutil

import (
"fmt"
"net"
"runtime"
)

// ListenReusePort binds a host:port and sets SO_REUSEPORT on the listener.
// SO_REUSEPORT allows multiple processes to bind to the same port, enabling
// kernel-level load balancing of incoming connections.
//
// Requires Linux 3.9+ or equivalent kernel support.
func ListenReusePort(network, address string) (net.Listener, error) {
return nil, fmt.Errorf("SO_REUSEPORT: not supported on OS: %s", runtime.GOOS)
}
147 changes: 147 additions & 0 deletions go/test/endtoend/vtgate/reuseport/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
//go:build !windows

/*
Copyright 2024 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package reuseport

import (
"context"
_ "embed"
"flag"
"os"
"testing"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/utils"
"vitess.io/vitess/go/vt/log"
)

var (
keyspaceName = "ks"
cell = "zone-1"

//go:embed schema.sql
schemaSQL string
)

func TestMain(m *testing.M) {
flag.Parse()
os.Exit(m.Run())
}

func setupCluster(t *testing.T) (*cluster.LocalProcessCluster, mysql.ConnParams) {
clusterInstance := cluster.NewCluster(cell, "localhost")

// Start topo server
err := clusterInstance.StartTopo()
require.NoError(t, err)

// Start keyspace
keyspace := &cluster.Keyspace{
Name: keyspaceName,
SchemaSQL: schemaSQL,
}
err = clusterInstance.StartUnshardedKeyspace(*keyspace, 0, false, clusterInstance.Cell)
require.NoError(t, err)

// Start vtgate
clusterInstance.VtGateExtraArgs = append(clusterInstance.VtGateExtraArgs, "--reuse-port")
err = clusterInstance.StartVtgate()
require.NoError(t, err)

vtParams := clusterInstance.GetVTParams(keyspaceName)
return clusterInstance, vtParams
}

func start(t *testing.T, vtParams mysql.ConnParams) (*mysql.Conn, func()) {
vtConn, err := mysql.Connect(context.Background(), &vtParams)
require.NoError(t, err)

deleteAll := func() {
_, _ = utils.ExecAllowError(t, vtConn, "set workload = oltp")

tables := []string{"t1"}
for _, table := range tables {
_, _ = utils.ExecAllowError(t, vtConn, "delete from "+table)
}
}

deleteAll()

return vtConn, func() {
deleteAll()
vtConn.Close()
}
}

func TestReusePort(t *testing.T) {
clusterInstance, vtParams := setupCluster(t)
defer clusterInstance.Teardown()

// Create a connection to the first vtgate
vtConn, closer := start(t, vtParams)
defer closer()

// Create a second vtgate with the same configuration
duplicateVtGate := cluster.VtgateProcessInstance(
clusterInstance.GetAndReservePort(),
clusterInstance.VtgateGrpcPort,
clusterInstance.VtgateMySQLPort,
clusterInstance.Cell,
clusterInstance.Cell,
clusterInstance.Hostname,
"PRIMARY",
clusterInstance.TopoProcess.Port,
clusterInstance.TmpDirectory,
clusterInstance.VtGateExtraArgs,
clusterInstance.VtGatePlannerVersion)
// Unix domain sockets do not support multiplexing
duplicateVtGate.MySQLServerSocketPath = ""
err := duplicateVtGate.Setup()
require.NoError(t, err)
defer func() {
if err := duplicateVtGate.TearDown(); err != nil {
log.Errorf("Error in vtgate teardown: %v", err)
}
}()

// Should be able to connect to the first vtgate
_, err = vtConn.ExecuteFetch("select id1 from t1", 1, false)
require.NoError(t, err)

// Tear down the first vtgate
err = clusterInstance.VtgateProcess.TearDown()
require.NoError(t, err)
require.True(t, clusterInstance.VtgateProcess.IsShutdown())

// Should fail since the vtgate has stopped
_, err = vtConn.ExecuteFetch("select id1 from t1", 1, false)
require.Error(t, err, "first vtgate should be stopped and should not serve requests")

// Create a second connection with the same parameters, which will
// now go to the duplicate vtgate
vtConn2, err := mysql.Connect(context.Background(), &vtParams)
require.NoError(t, err, "second vtgate should handle the connection")
defer vtConn2.Close()

// Should be able to fetch from the same host:port on the duplicate vtgate
_, err = vtConn2.ExecuteFetch("select id1 from t1", 1, false)
require.NoError(t, err, "second vtgate should serve requests")
}
5 changes: 5 additions & 0 deletions go/test/endtoend/vtgate/reuseport/schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
create table t1(
id1 bigint,
id2 bigint,
primary key(id1)
) Engine=InnoDB;
3 changes: 2 additions & 1 deletion go/vt/servenv/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,8 @@ func serveGRPC() {

// listen on the port
log.Infof("Listening for gRPC calls on port %v", gRPCPort)
listener, err := net.Listen("tcp", net.JoinHostPort(gRPCBindAddress, strconv.Itoa(gRPCPort)))

listener, err := Listen("tcp", net.JoinHostPort(gRPCBindAddress, strconv.Itoa(gRPCPort)))
if err != nil {
log.Exitf("Cannot listen on port %v for gRPC: %v", gRPCPort, err)
}
Expand Down
Loading
Loading