qps/rate limit & reduce expire seek range & expire-key hash#204
qps/rate limit & reduce expire seek range & expire-key hash#204piaoairy219 wants to merge 61 commits intodistributedio:masterfrom
Conversation
…ash. just after read the command we check if it is in command list we support, else will skip it
…new limit 2 just balance limit base in active titan server num 3 unit test of rate limit passed
2 add setlimit.sh
2 change qps/rate metrics 3 labels: namespace, command, localip
…and prevent that current expire-keys region writing node have higher load than other nodes
2 fix expire_test.go: gc key can get before runExpire/doExpire if expireat < now; add 2 testCases for hash/string expire: check key is deleted and gc key exists after runExpire
…ed&round&seek&commit metrics label
…ad for dashboard, replace 1.4 with 2, buckets num decrease 50%
| expireKeyPrefix = []byte("$sys:0:at:") | ||
| sysExpireLeader = []byte("$sys:0:EXL:EXLeader") | ||
| expireKeyPrefix = []byte("$sys:0:at:") | ||
| hashExpireKeyPrefix = expireKeyPrefix[:len(expireKeyPrefix)-1] |
There was a problem hiding this comment.
把hash 值直接保存到expire key 中确实是个好办法,但是重新定义一个变量是否有必要?
是否可以直接把hash 值直接写入到expireKeyPrefix= []byte("$sys:0:at:") 数字0 的位置,这样旧数据也可以兼容在内,对现有key 的拼接改动也会小一些
| } | ||
|
|
||
| type CommandLimiter struct { | ||
| localIp string |
| return nil, errors.New(rateLimit.InterfaceName + " adds is empty") | ||
| } | ||
|
|
||
| if rateLimit.LimiterNamespace == "" { |
There was a problem hiding this comment.
If there is no namespace supplied, can we just regard it as a global limitation?
@piaoairy219
| if rateLimit.LimiterNamespace == "" { | ||
| return nil, errors.New("limiter-namespace is configured with empty") | ||
| } | ||
| if rateLimit.WeightChangeFactor <= 1 { |
There was a problem hiding this comment.
A configuration validator can be used here.
| if !(rateLimit.UsageToDivide > 0 && rateLimit.UsageToDivide < rateLimit.UsageToMultiply && rateLimit.UsageToMultiply < 1) { | ||
| return nil, errors.New("should config 0 < usage-to-divide < usage-to-multiply < 1") | ||
| } | ||
| if rateLimit.InitialPercent > 1 || rateLimit.InitialPercent <= 0 { | ||
| return nil, errors.New("initial-percent should in (0, 1]") | ||
| } | ||
|
|
| strUnit = limitStr[len(limitStr)-1] | ||
| if strUnit == 'k' || strUnit == 'K' { | ||
| unit = 1024 | ||
| limitStr = limitStr[:len(limitStr)-1] |
There was a problem hiding this comment.
Wrap this into a function
| v, ok := l.limiters.Load(limiterName) | ||
| var commandLimiter *CommandLimiter | ||
| if !ok { | ||
| commandLimiter = l.init(limiterName) |
There was a problem hiding this comment.
The name init seems too generic for its feature
| limiterName := k.(string) | ||
| commandLimiter := v.(*CommandLimiter) | ||
| if commandLimiter != nil { | ||
| averageQps := commandLimiter.reportLocalStat(l.conf.GlobalBalancePeriod) |
| return key | ||
| } | ||
|
|
||
| func getNamespaceAndCmd(limiterName string) []string { |
There was a problem hiding this comment.
Returning the values namespace, command string seems to be more friendly
| } | ||
| } | ||
|
|
||
| func (l *LimitersMgr) getLimit(limiterName string, isQps bool) (int64, int) { |
There was a problem hiding this comment.
Avoid using a boolean as an argument. Refactor it to getQPSLimit and getRateLimit.
| //"getbit": Desc{Proc: AutoCommit(GetBit), Cons: Constraint{3, flags("r"), 1, 1, 1}}, | ||
| //"bitcount": Desc{Proc: AutoCommit(BitCount), Cons: Constraint{-2, flags("r"), 1, 1, 1}}, | ||
| //"bitpos": Desc{Proc: AutoCommit(BitPos), Cons: Constraint{-3, flags("r"), 1, 1, 1}}, | ||
|
|
| } else { | ||
| continue | ||
| } | ||
| } |
There was a problem hiding this comment.
CanExecute 这个逻辑设计的很好,在一些case 下返回的数据是否有问题。
There was a problem hiding this comment.
multi
lpush key 1
xxx zz
exec
There was a problem hiding this comment.
可以理解这个是安全性方面的考虑?我又两个疑问:
- 为什么需要这个功能?如果用户发送的是正确的协议,只是命令不对,可以认为是异常用户吗?
- 累加 3 次错误命令就要断开连接,是否有点太严格?如果这个功能是必要的,最好能有个配置选项。
| "bitcount": BitCount, | ||
| //"getbit": GetBit, | ||
| //"bitpos": BitPos, | ||
| //"bitcount": BitCount, |
| return kv.txn.Destory(obj, key) | ||
| } | ||
|
|
||
| if err := expireAt(kv.txn.t, mkey, obj.ID, obj.Type, obj.ExpireAt, at); err != nil { |
There was a problem hiding this comment.
这个逻辑判断 是否 IsExpired 在这个函数已经校验,或者在这个函数实现更加优雅。
There was a problem hiding this comment.
如有已经过期,是否需要返回特殊err,让用户知道设置失败。
| if i == 0 { | ||
| isQps = true | ||
| } | ||
| limit, burst := l.getLimit(limiterName, isQps) |
There was a problem hiding this comment.
Smart but weird, maybe we should define the closure first and reuse it.
| qpsLw LimiterWrapper | ||
| rateLw LimiterWrapper |
There was a problem hiding this comment.
QPS has the same meaning of request rate, we should use more friendly names as the limitation type like Command and DataFlow
| for i := 0; i < EXPIRE_HASH_NUM; i++ { | ||
| expireHash := fmt.Sprintf("%04d", i) | ||
| go startExpire(sysdb, &conf.Expire, ls, expireHash) | ||
| } |
There was a problem hiding this comment.
关于expire 相关逻辑是否可以封装到expier.go 文件一个独立的方法中
| expireLogFlag = "[Expire]" | ||
| metricsLabel = expire_unhash_worker | ||
| } | ||
|
|
There was a problem hiding this comment.
此处对于KeyPrefix 的拼接是否可以封装到独立的方法中
| } | ||
|
|
||
| func runExpire(db *DB, batchLimit int) { | ||
| func runExpire(db *DB, batchLimit int, expireHash string, lastExpireEndTs int64) int64 { |
There was a problem hiding this comment.
每次传入lastExpireEndTs 作为迭代器开始的位置与每次默认以expireKeyPrefix 开始,性能上差距有多大?
There was a problem hiding this comment.
这个在实践中发现影响很大,具体跟使用场景非常相关。对于某些场景,比如每天导入新的数据,过期时间为 1 天的场景,大多数数据会集中过期,这些过期的数据被删除后,会变为 Tombstone ,RocksDB 在查找数据时,会遍历并跳过 Tombstone,如果 Tombstone 过多,比如上百万,则极大的影响遍历性能。
| if err != nil { | ||
| txn.Rollback() | ||
| zap.L().Error("[Expire] commit failed", zap.Error(err)) | ||
| zap.L().Error(expireLogFlag+" commit failed", zap.Error(err)) |
There was a problem hiding this comment.
commit err 的情况下,是否需要return 0
|
|
||
| func (cl *CommandLimiter) checkLimit(cmdName string, cmdArgs []string) { | ||
| d := cl.qpsLw.waitTime(1) | ||
| time.Sleep(d) |
There was a problem hiding this comment.
Why not use the Wait method?
| return limit, int(burst) | ||
| } | ||
|
|
||
| func (l *LimitersMgr) CheckLimit(namespace string, cmdName string, cmdArgs []string) { |
There was a problem hiding this comment.
Add a comment for the public method
| //iter get keys [key, upperBound), so using now+1 as 2nd parameter will get "at:now:" prefixed keys | ||
| //we seek end in "at:<now>" replace in "at;" , it can reduce the seek range and seek the deleted expired keys as little as possible. | ||
| //the behavior should reduce the expire delay in days and get/mget timeout, which are caused by rocksdb tomstone problem |
| LimitConnection bool | ||
| MaxConnection int64 |
There was a problem hiding this comment.
It is too verbose to use two variables here. Maybe we should use MaxConnection with value 0 as unlimited.
| ListZipThreshold int | ||
| LimitConnection bool | ||
| MaxConnection int64 | ||
| MaxConnectionWait int64 |
There was a problem hiding this comment.
How does this variable be used? Why sleep a while when the limitation is exceeded? Why not close the connection immediately?
| LimitConnection bool | ||
| MaxConnection int64 | ||
| MaxConnectionWait int64 | ||
| ClientsNum int64 |
There was a problem hiding this comment.
Use an atomic variable to avoid locks
| func Call(ctx *Context) { | ||
| ctx.Name = strings.ToLower(ctx.Name) | ||
|
|
||
| if _, ok := txnCommands[ctx.Name]; ok && ctx.Server.LimitersMgr != nil { |
There was a problem hiding this comment.
txnCommands is not used anymore, use commands instead.
1 limit namespace&command qps/rate in view of all the titan server
2 reduce expire seek range to avoid rocksdb tomstone problem
3 hash expire-key to 256 prefix to improving the expire handling speed and prevent expire-key writing focus on single node.
4 handle empty/illegal commands
5 if connection has been closed by the client, drop left command to process
6 limit max-connection