Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/redis/cache_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ func NewRateLimiterCacheImplFromSettings(s settings.Settings, localCache *freeca
closer := &utils.MultiCloser{}
var perSecondPool Client
if s.RedisPerSecond {
perSecondPool = NewClientImpl(srv.Scope().Scope("redis_per_second_pool"), s.RedisPerSecondTls, s.RedisPerSecondAuth, s.RedisPerSecondSocketType,
perSecondPool = NewClientImpl(srv.Scope().Scope("redis_per_second_pool"), s.RedisPerSecondTls, s.RedisPerSecondAuth, s.RedisPerSecondSentinelAuth, s.RedisPerSecondSocketType,
s.RedisPerSecondType, s.RedisPerSecondUrl, s.RedisPerSecondPoolSize, s.RedisPerSecondPipelineWindow, s.RedisPerSecondPipelineLimit, s.RedisTlsConfig, s.RedisHealthCheckActiveConnection, srv, s.RedisPerSecondTimeout)
closer.Closers = append(closer.Closers, perSecondPool)
}

otherPool := NewClientImpl(srv.Scope().Scope("redis_pool"), s.RedisTls, s.RedisAuth, s.RedisSocketType, s.RedisType, s.RedisUrl, s.RedisPoolSize,
otherPool := NewClientImpl(srv.Scope().Scope("redis_pool"), s.RedisTls, s.RedisAuth, s.RedisSentinelAuth, s.RedisSocketType, s.RedisType, s.RedisUrl, s.RedisPoolSize,
s.RedisPipelineWindow, s.RedisPipelineLimit, s.RedisTlsConfig, s.RedisHealthCheckActiveConnection, srv, s.RedisTimeout)
closer.Closers = append(closer.Closers, otherPool)

Expand Down
34 changes: 32 additions & 2 deletions src/redis/driver_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func checkError(err error) {
}
}

func NewClientImpl(scope stats.Scope, useTls bool, auth, redisSocketType, redisType, url string, poolSize int,
func NewClientImpl(scope stats.Scope, useTls bool, auth, sAuth, redisSocketType, redisType, url string, poolSize int,
pipelineWindow time.Duration, pipelineLimit int, tlsConfig *tls.Config, healthCheckActiveConnection bool, srv server.Server,
timeout time.Duration,
) Client {
Expand Down Expand Up @@ -100,6 +100,29 @@ func NewClientImpl(scope stats.Scope, useTls bool, auth, redisSocketType, redisT
return radix.Dial(network, addr, dialOpts...)
}

sdf := func(network, addr string) (radix.Conn, error) {
var dialOpts []radix.DialOpt

dialOpts = append(dialOpts, radix.DialTimeout(timeout))

if useTls {
dialOpts = append(dialOpts, radix.DialUseTLS(tlsConfig))
}

if sAuth != "" {
user, pass, found := strings.Cut(sAuth, ":")
if found {
logger.Warnf("enabling authentication to sentinel on %s with user %s", maskedUrl, user)
dialOpts = append(dialOpts, radix.DialAuthUser(user, pass))
} else {
logger.Warnf("enabling authentication to sentinel on %s without user", maskedUrl)
dialOpts = append(dialOpts, radix.DialAuthPass(sAuth))
}
}

return radix.Dial(network, addr, dialOpts...)
}

stats := newPoolStats(scope)

opts := []radix.PoolOpt{radix.PoolConnFunc(df), radix.PoolWithTrace(poolTrace(&stats, healthCheckActiveConnection, srv))}
Expand Down Expand Up @@ -133,7 +156,14 @@ func NewClientImpl(scope stats.Scope, useTls bool, auth, redisSocketType, redisT
if len(urls) < 2 {
panic(RedisError("Expected master name and a list of urls for the sentinels, in the format: <redis master name>,<sentinel1>,...,<sentineln>"))
}
client, err = radix.NewSentinel(urls[0], urls[1:], radix.SentinelPoolFunc(poolFunc))

sentConnOpt := radix.SentinelConnFunc(sdf)
opt := radix.SentinelPoolFunc(poolFunc)

client, err = radix.NewSentinel(urls[0], urls[1:], opt, sentConnOpt)
if err != nil {
panic(RedisError(fmt.Sprintf("Unable to create sentinel client: %v", err)))
}
default:
panic(RedisError("Unrecognized redis type " + redisType))
}
Expand Down
30 changes: 16 additions & 14 deletions src/settings/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,13 @@ type Settings struct {
HealthyWithAtLeastOneConfigLoaded bool `envconfig:"HEALTHY_WITH_AT_LEAST_ONE_CONFIG_LOADED" default:"false"`

// Redis settings
RedisSocketType string `envconfig:"REDIS_SOCKET_TYPE" default:"unix"`
RedisType string `envconfig:"REDIS_TYPE" default:"SINGLE"`
RedisUrl string `envconfig:"REDIS_URL" default:"/var/run/nutcracker/ratelimit.sock"`
RedisPoolSize int `envconfig:"REDIS_POOL_SIZE" default:"10"`
RedisAuth string `envconfig:"REDIS_AUTH" default:""`
RedisTls bool `envconfig:"REDIS_TLS" default:"false"`
RedisSocketType string `envconfig:"REDIS_SOCKET_TYPE" default:"unix"`
RedisType string `envconfig:"REDIS_TYPE" default:"SINGLE"`
RedisUrl string `envconfig:"REDIS_URL" default:"/var/run/nutcracker/ratelimit.sock"`
RedisPoolSize int `envconfig:"REDIS_POOL_SIZE" default:"10"`
RedisAuth string `envconfig:"REDIS_AUTH" default:""`
RedisSentinelAuth string `envconfig:"REDIS_SENTINEL_AUTH" default:""`
RedisTls bool `envconfig:"REDIS_TLS" default:"false"`
// TODO: Make this setting configurable out of the box instead of having to provide it through code.
RedisTlsConfig *tls.Config
// Allow to set the client certificate and key for TLS connections.
Expand All @@ -143,14 +144,15 @@ type Settings struct {
RedisPipelineWindow time.Duration `envconfig:"REDIS_PIPELINE_WINDOW" default:"0"`
// RedisPipelineLimit sets maximum number of commands that can be pipelined before flushing.
// If limit is zero then no limit will be used and pipelines will only be limited by the specified time window.
RedisPipelineLimit int `envconfig:"REDIS_PIPELINE_LIMIT" default:"0"`
RedisPerSecond bool `envconfig:"REDIS_PERSECOND" default:"false"`
RedisPerSecondSocketType string `envconfig:"REDIS_PERSECOND_SOCKET_TYPE" default:"unix"`
RedisPerSecondType string `envconfig:"REDIS_PERSECOND_TYPE" default:"SINGLE"`
RedisPerSecondUrl string `envconfig:"REDIS_PERSECOND_URL" default:"/var/run/nutcracker/ratelimitpersecond.sock"`
RedisPerSecondPoolSize int `envconfig:"REDIS_PERSECOND_POOL_SIZE" default:"10"`
RedisPerSecondAuth string `envconfig:"REDIS_PERSECOND_AUTH" default:""`
RedisPerSecondTls bool `envconfig:"REDIS_PERSECOND_TLS" default:"false"`
RedisPipelineLimit int `envconfig:"REDIS_PIPELINE_LIMIT" default:"0"`
RedisPerSecond bool `envconfig:"REDIS_PERSECOND" default:"false"`
RedisPerSecondSocketType string `envconfig:"REDIS_PERSECOND_SOCKET_TYPE" default:"unix"`
RedisPerSecondType string `envconfig:"REDIS_PERSECOND_TYPE" default:"SINGLE"`
RedisPerSecondUrl string `envconfig:"REDIS_PERSECOND_URL" default:"/var/run/nutcracker/ratelimitpersecond.sock"`
RedisPerSecondPoolSize int `envconfig:"REDIS_PERSECOND_POOL_SIZE" default:"10"`
RedisPerSecondAuth string `envconfig:"REDIS_PERSECOND_AUTH" default:""`
RedisPerSecondSentinelAuth string `envconfig:"REDIS_PERSECOND_SENTINEL_AUTH" default:""`
RedisPerSecondTls bool `envconfig:"REDIS_PERSECOND_TLS" default:"false"`
// RedisPerSecondPipelineWindow sets the duration after which internal pipelines will be flushed for per second redis.
// See comments of RedisPipelineWindow for details.
RedisPerSecondPipelineWindow time.Duration `envconfig:"REDIS_PERSECOND_PIPELINE_WINDOW" default:"0"`
Expand Down
2 changes: 1 addition & 1 deletion test/redis/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func BenchmarkParallelDoLimit(b *testing.B) {
return func(b *testing.B) {
statsStore := gostats.NewStore(gostats.NewNullSink(), false)
sm := stats.NewMockStatManager(statsStore)
client := redis.NewClientImpl(statsStore, false, "", "tcp", "single", "127.0.0.1:6379", poolSize, pipelineWindow, pipelineLimit, nil, false, nil, 10*time.Second)
client := redis.NewClientImpl(statsStore, false, "", "", "tcp", "single", "127.0.0.1:6379", poolSize, pipelineWindow, pipelineLimit, nil, false, nil, 10*time.Second)
defer client.Close()

cache := redis.NewFixedRateLimitCacheImpl(client, nil, utils.NewTimeSourceImpl(), rand.New(utils.NewLockedSource(time.Now().Unix())), 10, nil, 0.8, "", sm, true)
Expand Down
6 changes: 3 additions & 3 deletions test/redis/driver_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func testNewClientImpl(t *testing.T, pipelineWindow time.Duration, pipelineLimit
statsStore := stats.NewStore(stats.NewNullSink(), false)

mkRedisClient := func(auth, addr string) redis.Client {
return redis.NewClientImpl(statsStore, false, auth, "tcp", "single", addr, 1, pipelineWindow, pipelineLimit, nil, false, nil, 10*time.Second)
return redis.NewClientImpl(statsStore, false, auth, "", "tcp", "single", addr, 1, pipelineWindow, pipelineLimit, nil, false, nil, 10*time.Second)
}

t.Run("connection refused", func(t *testing.T) {
Expand Down Expand Up @@ -131,7 +131,7 @@ func TestDoCmd(t *testing.T) {
statsStore := stats.NewStore(stats.NewNullSink(), false)

mkRedisClient := func(addr string) redis.Client {
return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, 0, 0, nil, false, nil, 10*time.Second)
return redis.NewClientImpl(statsStore, false, "", "", "tcp", "single", addr, 1, 0, 0, nil, false, nil, 10*time.Second)
}

t.Run("SETGET ok", func(t *testing.T) {
Expand Down Expand Up @@ -176,7 +176,7 @@ func testPipeDo(t *testing.T, pipelineWindow time.Duration, pipelineLimit int) f
statsStore := stats.NewStore(stats.NewNullSink(), false)

mkRedisClient := func(addr string) redis.Client {
return redis.NewClientImpl(statsStore, false, "", "tcp", "single", addr, 1, pipelineWindow, pipelineLimit, nil, false, nil, 10*time.Second)
return redis.NewClientImpl(statsStore, false, "", "", "tcp", "single", addr, 1, pipelineWindow, pipelineLimit, nil, false, nil, 10*time.Second)
}

t.Run("SETGET ok", func(t *testing.T) {
Expand Down