Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions utils/config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,13 @@ var max_concurrency = Default{
mandatory: false,
validationType: CONFIG_RANGE}

var recover_max_concurrency = Default{
key: "recover_max_concurrency",
section: SECTION_BACKINT,
defaultValue: "0",
mandatory: false,
validationType: CONFIG_INT}

var multipart_chunksize = Default{
key: "multipart_chunksize",
section: SECTION_BACKINT,
Expand Down Expand Up @@ -164,6 +171,7 @@ var configDefaults = []Default{
endpoint_url,
ibm_auth_endpoint,
max_concurrency,
recover_max_concurrency,
multipart_chunksize,
remove_key_prefix,
additional_key_prefix,
Expand Down
12 changes: 12 additions & 0 deletions utils/config/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,18 @@ func (b BackintConfigT) ObjectLockRetentionPeriod() string {
return b.Get("object_lock_retention_period")
}

/*
Getting the max concurrency for recovery
If set to 0, we take the value from max_concurrency
*/
func (b BackintConfigT) RecoverMaxConcurrency() int {
rmc := global.ToInteger(b.Get("recover_max_concurrency"))
if rmc == 0 {
return global.ToInteger(b.Get("max_concurrency"))
}
return rmc
}

/*
Getting the region
*/
Expand Down
6 changes: 3 additions & 3 deletions utils/cos/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func Download(s3Client *s3.S3, element CosObject) Result {
downloadPartsResults := make(chan DownloadPartResult, numParts)

// Make sure that not more than the maximum number run concurrently
sem := make(chan struct{}, config.BackintConfig.MaxConcurrency())
sem := make(chan struct{}, config.BackintConfig.RecoverMaxConcurrency())

// Map containing the parts which are already downloaded
// but could not yet be written to pipe.
Expand All @@ -71,10 +71,10 @@ func Download(s3Client *s3.S3, element CosObject) Result {
partsNotYetWritten := make(ByteMap)

// Downloading parts asynchronously
// (limited by value of maxConcurrency)
// (limited by value of RecoverMaxConcurrency)
for _, downloadPart := range downloadParts {
wgGetObject.Add(1)
sem <- struct{}{} // block if maxConcurrency reached
sem <- struct{}{} // block if RecoverMaxConcurrency reached

downloadSingle := DownloadSingePart{
fifo: fifo,
Expand Down
4 changes: 3 additions & 1 deletion utils/cos/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,9 @@ func writeDataToPipe(fifo *os.File, data []byte, nextIndex *int64, pipeBufferSiz
fifo.Name(),
errors.New("Timeout")),
)
return false
global.Logger.Fatal("Stopping execution immediately.")
os.Exit(1)
// return false
case err := <-written:
if err != nil {
global.Logger.Error(fmt.Sprintf(
Expand Down