Skip to content

Commit f0096d9

Browse files
committed
polishing
Signed-off-by: zeminzhou <zhouzemin@pingcap.com>
1 parent c80a226 commit f0096d9

File tree

1 file changed

+164
-16
lines changed

1 file changed

+164
-16
lines changed

cmd/bench/main.go

Lines changed: 164 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@ const (
3434
defaultRegion = "regions/aws-us-east-1"
3535
defaultNamePrefix = "keep--1h"
3636
defaultSpendingLimit = 10
37-
defaultConcurrency = 5
38-
defaultTotal = 100
39-
defaultRPS = 2.0
40-
waitInterval = 2 * time.Second
37+
defaultConcurrency = 160
38+
defaultTotal = 500
39+
defaultRPS = 1000.0
40+
waitInterval = 5 * time.Second
4141
waitTimeout = 10 * time.Minute
4242
)
4343

@@ -186,10 +186,21 @@ func runBench(ctx context.Context, client cloud.TiDBCloudClient, cfg benchConfig
186186

187187
var success int64
188188
var failed int64
189+
var totalCreateReq int64
190+
var totalGetReq int64
191+
192+
type clusterDuration struct {
193+
duration time.Duration
194+
clusterID string
195+
}
196+
197+
var durationsMu sync.Mutex
198+
var clusterDurations []clusterDuration
189199

190200
var wg sync.WaitGroup
191201

192202
timestamp := time.Now().Unix()
203+
startAt := time.Now()
193204
for i := 0; i < cfg.concurrency; i++ {
194205
wg.Add(1)
195206
go func(worker int) {
@@ -201,6 +212,7 @@ func runBench(ctx context.Context, client cloud.TiDBCloudClient, cfg benchConfig
201212
}
202213
name := fmt.Sprintf("%s-%d-%d", cfg.namePrefix, timestamp, idx)
203214
start := time.Now()
215+
atomic.AddInt64(&totalCreateReq, 1)
204216
id, err := createOnce(ctx, client, cfg, name)
205217
if err != nil {
206218
atomic.AddInt64(&failed, 1)
@@ -209,15 +221,22 @@ func runBench(ctx context.Context, client cloud.TiDBCloudClient, cfg benchConfig
209221
}
210222

211223
if cfg.waitReady {
212-
if err := waitClusterReady(ctx, client, id); err != nil {
224+
if err := waitClusterReady(ctx, client, id, &totalGetReq); err != nil {
213225
atomic.AddInt64(&failed, 1)
214226
log.Printf("worker %d wait %s failed: %v", worker, id, err)
215227
continue
216228
}
217229
}
218230

231+
duration := time.Since(start)
219232
atomic.AddInt64(&success, 1)
220-
log.Printf("worker %d create %s (id=%s) ok in %s", worker, name, id, time.Since(start))
233+
durationsMu.Lock()
234+
clusterDurations = append(clusterDurations, clusterDuration{
235+
duration: duration,
236+
clusterID: id,
237+
})
238+
durationsMu.Unlock()
239+
log.Printf("worker %d create %s (id=%s) ok in %s", worker, name, id, duration)
221240
}
222241
}(i)
223242
}
@@ -228,7 +247,44 @@ func runBench(ctx context.Context, client cloud.TiDBCloudClient, cfg benchConfig
228247
close(jobs)
229248

230249
wg.Wait()
231-
log.Printf("bench done: success=%d failed=%d", success, failed)
250+
elapsed := time.Since(startAt)
251+
minutes := elapsed.Minutes()
252+
if minutes == 0 {
253+
minutes = 1.0
254+
}
255+
totalReq := atomic.LoadInt64(&totalCreateReq) + atomic.LoadInt64(&totalGetReq)
256+
avgReqPerMinute := float64(totalReq) / minutes
257+
avgClusterPerMinute := float64(atomic.LoadInt64(&success)) / minutes
258+
259+
// Calculate average and max cluster creation duration
260+
var avgDuration, maxDuration time.Duration
261+
var maxDurationClusterID string
262+
var durationsCopy []time.Duration
263+
durationsMu.Lock()
264+
if len(clusterDurations) > 0 {
265+
durationsCopy = make([]time.Duration, len(clusterDurations))
266+
var totalDuration time.Duration
267+
for i, cd := range clusterDurations {
268+
durationsCopy[i] = cd.duration
269+
totalDuration += cd.duration
270+
if cd.duration > maxDuration {
271+
maxDuration = cd.duration
272+
maxDurationClusterID = cd.clusterID
273+
}
274+
}
275+
avgDuration = totalDuration / time.Duration(len(clusterDurations))
276+
}
277+
durationsMu.Unlock()
278+
279+
log.Printf("bench done: success=%d failed=%d totalReq=%d avg_req_per_min=%.2f (create=%d get=%d) avg_cluster_per_min=%.2f duration=%s",
280+
success, failed, totalReq, avgReqPerMinute, totalCreateReq, totalGetReq, avgClusterPerMinute, elapsed)
281+
log.Printf("cluster creation time: avg=%s max=%s (cluster_id=%s)",
282+
avgDuration, maxDuration, maxDurationClusterID)
283+
284+
// Print histogram
285+
if len(durationsCopy) > 0 {
286+
printHistogram(durationsCopy)
287+
}
232288
}
233289

234290
func createOnce(ctx context.Context, client cloud.TiDBCloudClient, cfg benchConfig, name string) (string, error) {
@@ -266,17 +322,30 @@ func createOnce(ctx context.Context, client cloud.TiDBCloudClient, cfg benchConf
266322
}
267323
}
268324

269-
resp, err := client.CreateCluster(ctx, payload)
270-
if err != nil {
271-
return "", err
272-
}
273-
if resp.ClusterId == nil {
274-
return "", fmt.Errorf("empty cluster id")
325+
for {
326+
select {
327+
case <-ctx.Done():
328+
return "", ctx.Err()
329+
default:
330+
}
331+
resp, err := client.CreateCluster(ctx, payload)
332+
if err != nil {
333+
log.Printf("CreateCluster error: %v, retrying...", err)
334+
select {
335+
case <-ctx.Done():
336+
return "", ctx.Err()
337+
case <-time.After(1 * time.Second):
338+
}
339+
continue
340+
}
341+
if resp.ClusterId == nil {
342+
return "", fmt.Errorf("empty cluster id")
343+
}
344+
return *resp.ClusterId, nil
275345
}
276-
return *resp.ClusterId, nil
277346
}
278347

279-
func waitClusterReady(ctx context.Context, client cloud.TiDBCloudClient, clusterID string) error {
348+
func waitClusterReady(ctx context.Context, client cloud.TiDBCloudClient, clusterID string, getReqCounter *int64) error {
280349
ticker := time.NewTicker(waitInterval)
281350
defer ticker.Stop()
282351
timer := time.After(waitTimeout)
@@ -288,9 +357,11 @@ func waitClusterReady(ctx context.Context, client cloud.TiDBCloudClient, cluster
288357
case <-timer:
289358
return fmt.Errorf("timeout waiting for cluster %s ready", clusterID)
290359
case <-ticker.C:
360+
atomic.AddInt64(getReqCounter, 1)
291361
c, err := client.GetCluster(ctx, clusterID, cluster.CLUSTERSERVICEGETCLUSTERVIEWPARAMETER_BASIC)
292362
if err != nil {
293-
return err
363+
log.Printf("GetCluster error for %s: %v, retrying...", clusterID, err)
364+
continue
294365
}
295366
if c.State != nil && *c.State == cluster.COMMONV1BETA1CLUSTERSTATE_ACTIVE {
296367
return nil
@@ -299,6 +370,83 @@ func waitClusterReady(ctx context.Context, client cloud.TiDBCloudClient, cluster
299370
}
300371
}
301372

373+
func printHistogram(durations []time.Duration) {
374+
if len(durations) == 0 {
375+
return
376+
}
377+
378+
// Find min and max
379+
minDuration := durations[0]
380+
maxDuration := durations[0]
381+
for _, d := range durations {
382+
if d < minDuration {
383+
minDuration = d
384+
}
385+
if d > maxDuration {
386+
maxDuration = d
387+
}
388+
}
389+
390+
// Determine bucket size based on range
391+
rangeDuration := maxDuration - minDuration
392+
numBuckets := 20
393+
var bucketSize time.Duration
394+
if rangeDuration == 0 {
395+
// All durations are the same
396+
bucketSize = 1 * time.Second
397+
numBuckets = 1
398+
} else {
399+
bucketSize = rangeDuration / time.Duration(numBuckets)
400+
if bucketSize < 10*time.Second {
401+
bucketSize = 10 * time.Second
402+
numBuckets = int(rangeDuration/bucketSize) + 1
403+
if numBuckets > 50 {
404+
numBuckets = 50
405+
}
406+
}
407+
}
408+
409+
// Count durations in each bucket
410+
buckets := make([]int, numBuckets+1)
411+
for _, d := range durations {
412+
bucketIdx := int((d - minDuration) / bucketSize)
413+
if bucketIdx > numBuckets {
414+
bucketIdx = numBuckets
415+
}
416+
buckets[bucketIdx]++
417+
}
418+
419+
// Find max count for scaling
420+
maxCount := 0
421+
for _, count := range buckets {
422+
if count > maxCount {
423+
maxCount = count
424+
}
425+
}
426+
427+
// Print histogram
428+
log.Printf("\nCluster Creation Time Histogram:")
429+
log.Printf("Range: %s to %s", minDuration, maxDuration)
430+
log.Printf("Bucket size: %s", bucketSize)
431+
log.Printf("")
432+
433+
barWidth := 50
434+
for i := 0; i <= numBuckets; i++ {
435+
if buckets[i] == 0 {
436+
continue
437+
}
438+
startTime := minDuration + time.Duration(i)*bucketSize
439+
endTime := minDuration + time.Duration(i+1)*bucketSize
440+
barLength := int(float64(buckets[i]) / float64(maxCount) * float64(barWidth))
441+
bar := ""
442+
for j := 0; j < barLength; j++ {
443+
bar += "█"
444+
}
445+
log.Printf("%8s - %8s | %s %d", startTime, endTime, bar, buckets[i])
446+
}
447+
log.Printf("")
448+
}
449+
302450
func toInt32Ptr(v int32) *int32 {
303451
return &v
304452
}

0 commit comments

Comments
 (0)