diff --git a/utils/config/defaults.go b/utils/config/defaults.go index f0677bd..7280014 100644 --- a/utils/config/defaults.go +++ b/utils/config/defaults.go @@ -79,6 +79,13 @@ var max_concurrency = Default{ mandatory: false, validationType: CONFIG_RANGE} +var recover_max_concurrency = Default{ + key: "recover_max_concurrency", + section: SECTION_BACKINT, + defaultValue: "0", + mandatory: false, + validationType: CONFIG_INT} + var multipart_chunksize = Default{ key: "multipart_chunksize", section: SECTION_BACKINT, @@ -164,6 +171,7 @@ var configDefaults = []Default{ endpoint_url, ibm_auth_endpoint, max_concurrency, + recover_max_concurrency, multipart_chunksize, remove_key_prefix, additional_key_prefix, diff --git a/utils/config/getter.go b/utils/config/getter.go index fca34c5..ed96382 100644 --- a/utils/config/getter.go +++ b/utils/config/getter.go @@ -147,6 +147,18 @@ func (b BackintConfigT) ObjectLockRetentionPeriod() string { return b.Get("object_lock_retention_period") } +/* +Getting the max concurrency for recovery +If set to 0, we take the value from max_concurrency +*/ +func (b BackintConfigT) RecoverMaxConcurrency() int { + rmc := global.ToInteger(b.Get("recover_max_concurrency")) + if rmc == 0 { + return global.ToInteger(b.Get("max_concurrency")) + } + return rmc +} + /* Getting the region */ diff --git a/utils/cos/download.go b/utils/cos/download.go index f74c80b..c8c0847 100644 --- a/utils/cos/download.go +++ b/utils/cos/download.go @@ -62,7 +62,7 @@ func Download(s3Client *s3.S3, element CosObject) Result { downloadPartsResults := make(chan DownloadPartResult, numParts) // Make sure that not more than the maximum number run concurrently - sem := make(chan struct{}, config.BackintConfig.MaxConcurrency()) + sem := make(chan struct{}, config.BackintConfig.RecoverMaxConcurrency()) // Map containing the parts which are already downloaded // but could not yet be written to pipe. @@ -71,10 +71,10 @@ func Download(s3Client *s3.S3, element CosObject) Result { partsNotYetWritten := make(ByteMap) // Downloading parts asynchronously - // (limited by value of maxConcurrency) + // (limited by value of RecoverMaxConcurrency) for _, downloadPart := range downloadParts { wgGetObject.Add(1) - sem <- struct{}{} // block if maxConcurrency reached + sem <- struct{}{} // block if RecoverMaxConcurrency reached downloadSingle := DownloadSingePart{ fifo: fifo, diff --git a/utils/cos/tools.go b/utils/cos/tools.go index 9793829..7b50b54 100644 --- a/utils/cos/tools.go +++ b/utils/cos/tools.go @@ -252,7 +252,9 @@ func writeDataToPipe(fifo *os.File, data []byte, nextIndex *int64, pipeBufferSiz fifo.Name(), errors.New("Timeout")), ) - return false + global.Logger.Fatal("Stopping execution immediately.") + os.Exit(1) + // return false case err := <-written: if err != nil { global.Logger.Error(fmt.Sprintf(