Skip to content

Commit 29eb4da

Browse files
committed
Add --resume-size for runner
1 parent c9540e3 commit 29eb4da

File tree

6 files changed

+184
-7
lines changed

6 files changed

+184
-7
lines changed

cache/cache.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,15 @@ func (c *Cache) Redirect(ctx context.Context, blobPath string, referer string) (
104104
return u, nil
105105
}
106106

107+
func (c *Cache) Writer(ctx context.Context, cachePath string, append bool) (storagedriver.FileWriter, error) {
108+
return c.storageDriver.Writer(ctx, cachePath, append)
109+
}
110+
111+
func (c *Cache) BlobWriter(ctx context.Context, blob string, append bool) (storagedriver.FileWriter, error) {
112+
cachePath := blobCachePath(blob)
113+
return c.Writer(ctx, cachePath, append)
114+
}
115+
107116
func (c *Cache) put(ctx context.Context, cachePath string, r io.Reader, checkFunc func(int64) error) (int64, error) {
108117
fw, err := c.storageDriver.Writer(ctx, cachePath, false)
109118
if err != nil {

cmd/crproxy/cluster/runner/runner.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ type flagpole struct {
3131
BigStorageURL string
3232
BigStorageSize int
3333

34+
ResumeSize int
35+
3436
StorageURL []string
3537
Quick bool
3638
Platform []string
@@ -65,6 +67,8 @@ func NewCommand() *cobra.Command {
6567
cmd.Flags().StringArrayVar(&flags.StorageURL, "storage-url", flags.StorageURL, "Storage driver url")
6668
cmd.Flags().StringVar(&flags.BigStorageURL, "big-storage-url", flags.BigStorageURL, "Big storage driver url")
6769
cmd.Flags().IntVar(&flags.BigStorageSize, "big-storage-size", flags.BigStorageSize, "Big storage size")
70+
cmd.Flags().IntVar(&flags.ResumeSize, "resume-size", flags.ResumeSize, "Resume size")
71+
6872
cmd.Flags().StringVar(&flags.ManifestStorageURL, "manifest-storage-url", flags.ManifestStorageURL, "manifest storage driver url")
6973
cmd.Flags().BoolVar(&flags.Quick, "quick", flags.Quick, "Quick sync with tags")
7074
cmd.Flags().StringSliceVar(&flags.Platform, "platform", flags.Platform, "Platform")
@@ -197,6 +201,10 @@ func runE(ctx context.Context, flags *flagpole) error {
197201
opts = append(opts, runner.WithManifestCache(manifestsdcache))
198202
}
199203

204+
if flags.ResumeSize > 0 {
205+
opts = append(opts, runner.WithResumeSize(flags.ResumeSize))
206+
}
207+
200208
runner, err := runner.NewRunner(opts...)
201209
if err != nil {
202210
return err

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ require (
1818
github.com/spf13/cobra v1.8.1
1919
github.com/wzshiming/cmux v0.4.2
2020
github.com/wzshiming/hostmatcher v0.0.3
21-
github.com/wzshiming/httpseek v0.1.0
21+
github.com/wzshiming/httpseek v0.1.1
2222
github.com/wzshiming/imc v0.0.0-20250106051804-1cb884b5184a
2323
golang.org/x/crypto v0.28.0
2424
golang.org/x/time v0.10.0

go.sum

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -167,8 +167,8 @@ github.com/wzshiming/cmux v0.4.2 h1:tI73lL5ztVfiqw7R5m5BkxT1+vQ2PBo/oV6qPbNGPiA=
167167
github.com/wzshiming/cmux v0.4.2/go.mod h1:JgE61QfZAjEyNMX0iZo9zIKY6pr9bHVY132yYPwHW5U=
168168
github.com/wzshiming/hostmatcher v0.0.3 h1:+JYAq6vUZXDEQ1Ipfdc/D7HmaIMngcc71ftonyCQVQk=
169169
github.com/wzshiming/hostmatcher v0.0.3/go.mod h1:F04RIvIWEvOIrIKOlQlMuR8vQMKAVf2YhpU6l31Wwz4=
170-
github.com/wzshiming/httpseek v0.1.0 h1:lEgL7EBELT/VV9UaTp+m3kw5Pe1KOUdY+IPnKkag6tI=
171-
github.com/wzshiming/httpseek v0.1.0/go.mod h1:YoZhlLIwNjTBDXIT8NpK5zRjOgZouRXPaBfjVXdqMMs=
170+
github.com/wzshiming/httpseek v0.1.1 h1:tbi8ZigQ4pnBQl3j/z/AeY7iqkI4vUV8GrnhW0E9Mms=
171+
github.com/wzshiming/httpseek v0.1.1/go.mod h1:YoZhlLIwNjTBDXIT8NpK5zRjOgZouRXPaBfjVXdqMMs=
172172
github.com/wzshiming/imc v0.0.0-20250106051804-1cb884b5184a h1:yUonFTTPA3PrOz4J1RTvK7gbTn+iiwX6VAJdNCCu3is=
173173
github.com/wzshiming/imc v0.0.0-20250106051804-1cb884b5184a/go.mod h1:U4qkQ1uQB16r15JFVbbTmevsVJnqBz9p/sHdIThCs/A=
174174
github.com/wzshiming/trie v0.3.1 h1:YpuoqmEQFJiW0mns/mM6Qk4kdWrXc8kc28/KR1vn0m8=
@@ -196,8 +196,6 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
196196
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
197197
golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM=
198198
golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
199-
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac h1:7zkz7BUtwNFFqcowJ+RIgu2MaV/MapERkDIy+mwPyjs=
200-
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
201199
golang.org/x/time v0.10.0 h1:3usCWA8tQn0L8+hFJQNgzpWbd89begxN66o1Ojdn5L4=
202200
golang.org/x/time v0.10.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
203201
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

runner/runner.go

Lines changed: 160 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"io"
99
"log/slog"
10+
"math"
1011
"net/http"
1112
"net/url"
1213
"sort"
@@ -19,10 +20,14 @@ import (
1920
"github.com/daocloud/crproxy/internal/spec"
2021
"github.com/daocloud/crproxy/queue/client"
2122
"github.com/daocloud/crproxy/queue/model"
23+
"github.com/daocloud/crproxy/storage"
2224
csync "github.com/daocloud/crproxy/sync"
25+
"github.com/wzshiming/httpseek"
2326
)
2427

2528
type Runner struct {
29+
resumeSize int
30+
2631
bigCacheSize int
2732
bigCache *cache.Cache
2833
manifestCache *cache.Cache
@@ -96,6 +101,12 @@ func WithManifestCache(cache *cache.Cache) Option {
96101
}
97102
}
98103

104+
func WithResumeSize(resumeSize int) Option {
105+
return func(c *Runner) {
106+
c.resumeSize = resumeSize
107+
}
108+
}
109+
99110
func NewRunner(opts ...Option) (*Runner, error) {
100111
r := &Runner{
101112
httpClient: http.DefaultClient,
@@ -426,11 +437,137 @@ func (r *Runner) blob(ctx context.Context, host, name, blob string, size int64,
426437
r.logger.Info("skip blob by cache", "digest", blob)
427438
return nil
428439
}
429-
430440
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
431441
if err != nil {
432442
return err
433443
}
444+
445+
if r.resumeSize != 0 && size > int64(r.resumeSize) {
446+
if len(subCaches) == 1 {
447+
f, err := subCaches[0].BlobWriter(ctx, blob, true)
448+
if err == nil {
449+
450+
seeker := httpseek.NewSeeker(ctx, r.httpClient.Transport, req)
451+
452+
_, err = seeker.Seek(f.Size(), 0)
453+
if err != nil {
454+
return err
455+
}
456+
457+
progress.Store(f.Size())
458+
459+
body := &readerCounter{
460+
r: seeker,
461+
counter: progress,
462+
}
463+
_, err = io.Copy(f, body)
464+
if err != nil {
465+
return err
466+
}
467+
468+
err = f.Commit()
469+
if err != nil {
470+
return err
471+
}
472+
473+
fi, err := subCaches[0].StatBlob(ctx, blob)
474+
if err != nil {
475+
return err
476+
}
477+
478+
if fi.Size() != size {
479+
err := subCaches[0].DeleteBlob(ctx, blob)
480+
if err != nil {
481+
return fmt.Errorf("%s is %d, but expected %d: %w", blob, fi.Size(), size, err)
482+
}
483+
return fmt.Errorf("%s is %d, but expected %d", blob, fi.Size(), size)
484+
}
485+
return nil
486+
}
487+
} else {
488+
var offset int64 = math.MaxInt
489+
490+
rbws := []storage.FileWriter{}
491+
for _, cache := range subCaches {
492+
f, err := cache.BlobWriter(ctx, blob, true)
493+
if err == nil {
494+
if offset != 0 {
495+
offset = min(offset, f.Size())
496+
}
497+
rbws = append(rbws, f)
498+
499+
} else {
500+
offset = 0
501+
f, err = cache.BlobWriter(ctx, blob, false)
502+
if err != nil {
503+
return err
504+
}
505+
rbws = append(rbws, f)
506+
}
507+
}
508+
509+
var writers []io.Writer
510+
for _, w := range rbws {
511+
n := w.Size() - offset
512+
if n == 0 {
513+
writers = append(writers, w)
514+
} else if n > 0 {
515+
writers = append(writers, &skipWriter{
516+
writer: w,
517+
offset: uint64(n),
518+
})
519+
} else {
520+
panic("crproxy.runner: resume write blob error")
521+
}
522+
}
523+
524+
seeker := httpseek.NewSeeker(ctx, r.httpClient.Transport, req)
525+
526+
_, err := seeker.Seek(offset, 0)
527+
if err != nil {
528+
return err
529+
}
530+
531+
progress.Store(offset)
532+
533+
body := &readerCounter{
534+
r: seeker,
535+
counter: progress,
536+
}
537+
538+
_, err = io.Copy(io.MultiWriter(writers...), body)
539+
if err != nil {
540+
return fmt.Errorf("copy blob failed: %w", err)
541+
}
542+
543+
var errs []error
544+
for _, c := range rbws {
545+
err := c.Commit()
546+
if err != nil {
547+
errs = append(errs, err)
548+
}
549+
}
550+
551+
for _, cache := range subCaches {
552+
fi, err := cache.StatBlob(ctx, blob)
553+
if err != nil {
554+
errs = append(errs, err)
555+
continue
556+
}
557+
558+
if fi.Size() != size {
559+
if err := cache.DeleteBlob(ctx, blob); err != nil {
560+
errs = append(errs, fmt.Errorf("%s is %d, but expected %d: %w", blob, fi.Size(), size, err))
561+
} else {
562+
errs = append(errs, fmt.Errorf("%s is %d, but expected %d", blob, fi.Size(), size))
563+
}
564+
}
565+
}
566+
567+
return nil
568+
}
569+
}
570+
434571
resp, err := r.httpClient.Do(req)
435572
if err != nil {
436573
return err
@@ -869,3 +1006,25 @@ func (r *readerCounter) Read(b []byte) (int, error) {
8691006
r.counter.Add(int64(n))
8701007
return n, err
8711008
}
1009+
1010+
type skipWriter struct {
1011+
writer io.Writer
1012+
offset uint64
1013+
skipped uint64
1014+
}
1015+
1016+
func (w *skipWriter) Write(p []byte) (int, error) {
1017+
if w.skipped >= w.offset {
1018+
return w.writer.Write(p)
1019+
}
1020+
1021+
remaining := w.offset - w.skipped
1022+
if uint64(len(p)) <= remaining {
1023+
w.skipped += uint64(len(p))
1024+
return len(p), nil
1025+
}
1026+
1027+
w.skipped = w.offset
1028+
written, err := w.writer.Write(p[remaining:])
1029+
return int(remaining) + written, err
1030+
}

storage/storage.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@ import (
99
"github.com/docker/distribution/registry/storage/driver/factory"
1010
)
1111

12-
type StorageDriver = driver.StorageDriver
12+
type (
13+
FileWriter = driver.FileWriter
14+
StorageDriver = driver.StorageDriver
15+
)
1316

1417
func NewStorage(uri string) (StorageDriver, error) {
1518
u, err := url.Parse(uri)

0 commit comments

Comments
 (0)