Skip to content

Commit 1049e3f

Browse files
xinlian12Annie Liang
andauthored
[SparkConnector]enableThroughputBucketSupport (#47856)
* enable throughput bucket in spark --------- Co-authored-by: Annie Liang <anniemac@Annies-MacBook-Pro.local>
1 parent c3591a7 commit 1049e3f

File tree

11 files changed

+267
-92
lines changed

11 files changed

+267
-92
lines changed

sdk/cosmos/azure-cosmos-spark_3-3_2-12/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
### 4.43.0-beta.1 (Unreleased)
44

55
#### Features Added
6+
* Added support for throughput bucket. - See [47856](https://github.com/Azure/azure-sdk-for-java/pull/47856)
67

78
#### Breaking Changes
89

sdk/cosmos/azure-cosmos-spark_3-4_2-12/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
### 4.43.0-beta.1 (Unreleased)
44

55
#### Features Added
6+
* Added support for throughput bucket. - See [47856](https://github.com/Azure/azure-sdk-for-java/pull/47856)
67

78
#### Breaking Changes
89

sdk/cosmos/azure-cosmos-spark_3-5_2-12/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
#### Features Added
66
* Added transactional batch support. See [PR 47478](https://github.com/Azure/azure-sdk-for-java/pull/47478) and [PR 47697](https://github.com/Azure/azure-sdk-for-java/pull/47697) and [47803](https://github.com/Azure/azure-sdk-for-java/pull/47803)
7+
* Added support for throughput bucket. - See [47856](https://github.com/Azure/azure-sdk-for-java/pull/47856)
78

89
#### Breaking Changes
910

sdk/cosmos/azure-cosmos-spark_3-5_2-13/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
#### Features Added
66
* Added transactional batch support. See [PR 47478](https://github.com/Azure/azure-sdk-for-java/pull/47478) and [PR 47697](https://github.com/Azure/azure-sdk-for-java/pull/47697) and [47803](https://github.com/Azure/azure-sdk-for-java/pull/47803)
7+
* Added support for throughput bucket. - See [47856](https://github.com/Azure/azure-sdk-for-java/pull/47856)
78

89
#### Breaking Changes
910

sdk/cosmos/azure-cosmos-spark_3/docs/configuration-reference.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ Used to influence the json serialization/deserialization behavior
146146
| `spark.cosmos.throughputControl.targetThroughput` | None | Throughput control group target throughput |
147147
| `spark.cosmos.throughputControl.targetThroughputThreshold` | None | Throughput control group target throughput threshold |
148148
| `spark.cosmos.throughputControl.priorityLevel` | None | Throughput control group priority level. The priority level is used to determine which requests will be throttled first when the total throughput of all control groups exceeds the max throughput. Priority based execution is currently in preview. To enable the feature, please follow the instructions [here](https://devblogs.microsoft.com/cosmosdb/introducing-priority-based-execution-in-azure-cosmos-db-preview/#next-steps) |
149+
| `spark.cosmos.throughputControl.throughputBucket` | None | Throughput bucket value. Please refer [here](https://learn.microsoft.com/azure/cosmos-db/throughput-buckets?tabs=dotnet) for full context |
149150
| `spark.cosmos.throughputControl.globalControl.database` | None | Database which will be used for throughput global control |
150151
| `spark.cosmos.throughputControl.globalControl.container` | None | Container which will be used for throughput global control |
151152
| `spark.cosmos.throughputControl.globalControl.renewIntervalInMS` | `5s` | How often the client is going to update the throughput usage of itself |

sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala

Lines changed: 123 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ private[spark] object CosmosConfigNames {
149149
val ThroughputControlTargetThroughput = "spark.cosmos.throughputControl.targetThroughput"
150150
val ThroughputControlTargetThroughputThreshold = "spark.cosmos.throughputControl.targetThroughputThreshold"
151151
val ThroughputControlPriorityLevel = "spark.cosmos.throughputControl.priorityLevel"
152+
val ThroughputControlThroughputBucket = "spark.cosmos.throughputControl.throughputBucket"
152153
val ThroughputControlGlobalControlDatabase = "spark.cosmos.throughputControl.globalControl.database"
153154
val ThroughputControlGlobalControlContainer = "spark.cosmos.throughputControl.globalControl.container"
154155
val ThroughputControlGlobalControlRenewalIntervalInMS =
@@ -278,6 +279,7 @@ private[spark] object CosmosConfigNames {
278279
ThroughputControlTargetThroughput,
279280
ThroughputControlTargetThroughputThreshold,
280281
ThroughputControlPriorityLevel,
282+
ThroughputControlThroughputBucket,
281283
ThroughputControlGlobalControlDatabase,
282284
ThroughputControlGlobalControlContainer,
283285
ThroughputControlGlobalControlRenewalIntervalInMS,
@@ -2380,16 +2382,26 @@ private object CosmosChangeFeedConfig {
23802382
}
23812383
}
23822384

2383-
private case class CosmosThroughputControlConfig(cosmosAccountConfig: CosmosAccountConfig,
2384-
groupName: String,
2385-
targetThroughput: Option[Int],
2386-
targetThroughputThreshold: Option[Double],
2387-
priorityLevel: Option[PriorityLevel],
2388-
globalControlDatabase: Option[String],
2389-
globalControlContainer: Option[String],
2390-
globalControlRenewInterval: Option[Duration],
2391-
globalControlExpireInterval: Option[Duration],
2392-
globalControlUseDedicatedContainer: Boolean)
2385+
private trait CosmosThroughputControlConfig {
2386+
def groupName: String
2387+
}
2388+
2389+
private case class CosmosSDKThroughputControlConfig(
2390+
override val groupName: String,
2391+
cosmosAccountConfig: CosmosAccountConfig,
2392+
targetThroughput: Option[Int],
2393+
targetThroughputThreshold: Option[Double],
2394+
priorityLevel: Option[PriorityLevel],
2395+
globalControlDatabase: Option[String],
2396+
globalControlContainer: Option[String],
2397+
globalControlRenewInterval: Option[Duration],
2398+
globalControlExpireInterval: Option[Duration],
2399+
globalControlUseDedicatedContainer: Boolean) extends CosmosThroughputControlConfig
2400+
2401+
private case class CosmosServerThroughputControlConfig(
2402+
override val groupName: String,
2403+
throughputBucket: Int,
2404+
priorityLevel: Option[PriorityLevel]) extends CosmosThroughputControlConfig
23932405

23942406
private object CosmosThroughputControlConfig {
23952407
private val throughputControlEnabledSupplier = CosmosConfigEntry[Boolean](
@@ -2441,6 +2453,12 @@ private object CosmosThroughputControlConfig {
24412453
parseFromStringFunction = priorityLevel => CosmosConfigEntry.parseEnumeration(priorityLevel, PriorityLevels),
24422454
helpMessage = "Throughput control group priority level. The value can be High or Low. ")
24432455

2456+
private val throughputBucketSupplier = CosmosConfigEntry[Int](
2457+
key = CosmosConfigNames.ThroughputControlThroughputBucket,
2458+
mandatory = false,
2459+
parseFromStringFunction = throughputBucket => throughputBucket.toInt,
2460+
helpMessage = "Throughput bucket value. Please refer here for full context: https://learn.microsoft.com/azure/cosmos-db/throughput-buckets?tabs=dotnet")
2461+
24442462
private val globalControlDatabaseSupplier = CosmosConfigEntry[String](
24452463
key = CosmosConfigNames.ThroughputControlGlobalControlDatabase,
24462464
mandatory = false,
@@ -2481,68 +2499,113 @@ private object CosmosThroughputControlConfig {
24812499
val throughputControlEnabled = CosmosConfigEntry.parse(cfg, throughputControlEnabledSupplier).get
24822500

24832501
if (throughputControlEnabled) {
2484-
// we will allow the customer to provide a different database account for throughput control
2485-
val throughputControlCosmosAccountConfig =
2486-
CosmosConfigEntry.parse(cfg, throughputControlAccountEndpointUriSupplier) match {
2487-
case Some(_) => parseThroughputControlAccountConfig(cfg)
2488-
case None => CosmosAccountConfig.parseCosmosAccountConfig(cfg)
2489-
}
2490-
24912502
val groupName = CosmosConfigEntry.parse(cfg, groupNameSupplier)
2492-
val targetThroughput = CosmosConfigEntry.parse(cfg, targetThroughputSupplier)
2493-
val targetThroughputThreshold = CosmosConfigEntry.parse(cfg, targetThroughputThresholdSupplier)
2494-
val priorityLevel = CosmosConfigEntry.parse(cfg, priorityLevelSupplier)
2495-
val globalControlDatabase = CosmosConfigEntry.parse(cfg, globalControlDatabaseSupplier)
2496-
val globalControlContainer = CosmosConfigEntry.parse(cfg, globalControlContainerSupplier)
2497-
val globalControlItemRenewInterval = CosmosConfigEntry.parse(cfg, globalControlItemRenewIntervalSupplier)
2498-
val globalControlItemExpireInterval = CosmosConfigEntry.parse(cfg, globalControlItemExpireIntervalSupplier)
2499-
val globalControlUseDedicatedContainer = CosmosConfigEntry.parse(cfg, globalControlUseDedicatedContainerSupplier)
2500-
2501-
if (groupName.isEmpty) {
2502-
throw new IllegalArgumentException(
2503-
s"Configuration option '${CosmosConfigNames.ThroughputControlName}' must not be empty.")
2504-
}
2503+
val throughputBucket = CosmosConfigEntry.parse(cfg, throughputBucketSupplier)
2504+
25052505
assert(groupName.isDefined, s"Parameter '${CosmosConfigNames.ThroughputControlName}' is missing.")
25062506

2507-
if (globalControlUseDedicatedContainer.isEmpty) {
2508-
throw new IllegalArgumentException(
2509-
s"Configuration option '${CosmosConfigNames.ThroughputControlGlobalControlUseDedicatedContainer}' must not be empty.")
2510-
}
2511-
assert(
2512-
globalControlUseDedicatedContainer.isDefined,
2513-
s"Parameter '${CosmosConfigNames.ThroughputControlGlobalControlUseDedicatedContainer}' is missing.")
2507+
if (throughputBucket.isDefined && throughputBucket.get > 0) {
2508+
Some(parseServerThroughputControlConfig(groupName.get, throughputBucket.get, cfg))
2509+
} else {
25142510

2515-
if (globalControlUseDedicatedContainer.get) {
2516-
if (globalControlDatabase.isEmpty || globalControlContainer.isEmpty) {
2511+
// If a non-positive throughputBucket was provided treat it as invalid
2512+
if (throughputBucket.isDefined && throughputBucket.get <= 0) {
25172513
throw new IllegalArgumentException(
2518-
s"Configuration options '${CosmosConfigNames.ThroughputControlGlobalControlDatabase}' and " +
2519-
s"'${CosmosConfigNames.ThroughputControlGlobalControlContainer}' must not be empty if " +
2520-
s" option '${CosmosConfigNames.ThroughputControlGlobalControlUseDedicatedContainer}' is true.")
2514+
s"Mixed throughput control configuration detected: " +
2515+
s"Invalid '${CosmosConfigNames.ThroughputControlThroughputBucket}' value '${throughputBucket.get}'. It must be greater than 0 or omitted.")
25212516
}
2522-
assert(
2523-
globalControlDatabase.isDefined,
2524-
s"Parameter '${CosmosConfigNames.ThroughputControlGlobalControlDatabase}' is missing.")
2525-
assert(
2526-
globalControlContainer.isDefined,
2527-
s"Parameter '${CosmosConfigNames.ThroughputControlGlobalControlContainer}' is missing.")
2528-
}
25292517

2530-
Some(CosmosThroughputControlConfig(
2531-
throughputControlCosmosAccountConfig,
2532-
groupName.get,
2533-
targetThroughput,
2534-
targetThroughputThreshold,
2535-
priorityLevel,
2536-
globalControlDatabase,
2537-
globalControlContainer,
2538-
globalControlItemRenewInterval,
2539-
globalControlItemExpireInterval,
2540-
globalControlUseDedicatedContainer.get))
2518+
// if throughput bucket is defined, then use server side throughput bucket control
2519+
// else validate SDK global throughput control config
2520+
Some(parseSDKThroughputControlConfig(groupName.get, cfg))
2521+
}
25412522
} else {
25422523
None
25432524
}
25442525
}
25452526

2527+
private[spark] def parseServerThroughputControlConfig(
2528+
groupName: String,
2529+
throughputBucket: Int,
2530+
cfg: Map[String, String]): CosmosServerThroughputControlConfig = {
2531+
2532+
// Detect presence of SDK/global throughput control options
2533+
val targetThroughputOpt = CosmosConfigEntry.parse(cfg, targetThroughputSupplier)
2534+
val targetThroughputThresholdOpt = CosmosConfigEntry.parse(cfg, targetThroughputThresholdSupplier)
2535+
val throughputControlAccountEndpointOpt = CosmosConfigEntry.parse(cfg, throughputControlAccountEndpointUriSupplier)
2536+
val throughputControlAccountKeyOpt = CosmosConfigEntry.parse(cfg, throughputControlAccountKeySupplier)
2537+
val globalControlDatabaseOpt = CosmosConfigEntry.parse(cfg, globalControlDatabaseSupplier)
2538+
val globalControlContainerOpt = CosmosConfigEntry.parse(cfg, globalControlContainerSupplier)
2539+
2540+
val sdkThroughputControlConfigsPresent = targetThroughputOpt.isDefined ||
2541+
targetThroughputThresholdOpt.isDefined ||
2542+
throughputControlAccountEndpointOpt.isDefined ||
2543+
throughputControlAccountKeyOpt.isDefined ||
2544+
globalControlDatabaseOpt.isDefined ||
2545+
globalControlContainerOpt.isDefined
2546+
2547+
if (sdkThroughputControlConfigsPresent) {
2548+
throw new IllegalArgumentException(
2549+
"Mixed throughput control configuration detected: 'throughputBucket' cannot be used together with " +
2550+
"['targetThroughput', 'targetThroughputThreshold', 'throughputControl.accountEndpoint', 'throughputControl.accountKey', " +
2551+
"'throughputControl.globalControl.database', 'throughputControl.globalControl.container']")
2552+
}
2553+
2554+
val priorityLevel = CosmosConfigEntry.parse(cfg, priorityLevelSupplier)
2555+
CosmosServerThroughputControlConfig(groupName, throughputBucket, priorityLevel)
2556+
}
2557+
2558+
private[spark] def parseSDKThroughputControlConfig(
2559+
groupName: String,
2560+
cfg: Map[String, String]): CosmosSDKThroughputControlConfig = {
2561+
// we will allow the customer to provide a different database account for throughput control
2562+
val throughputControlCosmosAccountConfig =
2563+
CosmosConfigEntry.parse(cfg, throughputControlAccountEndpointUriSupplier) match {
2564+
case Some(_) => parseThroughputControlAccountConfig(cfg)
2565+
case None => CosmosAccountConfig.parseCosmosAccountConfig(cfg)
2566+
}
2567+
2568+
val targetThroughput = CosmosConfigEntry.parse(cfg, targetThroughputSupplier)
2569+
val targetThroughputThreshold = CosmosConfigEntry.parse(cfg, targetThroughputThresholdSupplier)
2570+
val priorityLevel = CosmosConfigEntry.parse(cfg, priorityLevelSupplier)
2571+
val globalControlDatabase = CosmosConfigEntry.parse(cfg, globalControlDatabaseSupplier)
2572+
val globalControlContainer = CosmosConfigEntry.parse(cfg, globalControlContainerSupplier)
2573+
val globalControlItemRenewInterval = CosmosConfigEntry.parse(cfg, globalControlItemRenewIntervalSupplier)
2574+
val globalControlItemExpireInterval = CosmosConfigEntry.parse(cfg, globalControlItemExpireIntervalSupplier)
2575+
val globalControlUseDedicatedContainer = CosmosConfigEntry.parse(cfg, globalControlUseDedicatedContainerSupplier)
2576+
2577+
assert(
2578+
globalControlUseDedicatedContainer.isDefined,
2579+
s"Parameter '${CosmosConfigNames.ThroughputControlGlobalControlUseDedicatedContainer}' is missing.")
2580+
2581+
if (globalControlUseDedicatedContainer.get) {
2582+
if (globalControlDatabase.isEmpty || globalControlContainer.isEmpty) {
2583+
throw new IllegalArgumentException(
2584+
s"Configuration options '${CosmosConfigNames.ThroughputControlGlobalControlDatabase}' and " +
2585+
s"'${CosmosConfigNames.ThroughputControlGlobalControlContainer}' must not be empty if " +
2586+
s" option '${CosmosConfigNames.ThroughputControlGlobalControlUseDedicatedContainer}' is true.")
2587+
}
2588+
assert(
2589+
globalControlDatabase.isDefined,
2590+
s"Parameter '${CosmosConfigNames.ThroughputControlGlobalControlDatabase}' is missing.")
2591+
assert(
2592+
globalControlContainer.isDefined,
2593+
s"Parameter '${CosmosConfigNames.ThroughputControlGlobalControlContainer}' is missing.")
2594+
}
2595+
2596+
CosmosSDKThroughputControlConfig(
2597+
groupName,
2598+
throughputControlCosmosAccountConfig,
2599+
targetThroughput,
2600+
targetThroughputThreshold,
2601+
priorityLevel,
2602+
globalControlDatabase,
2603+
globalControlContainer,
2604+
globalControlItemRenewInterval,
2605+
globalControlItemExpireInterval,
2606+
globalControlUseDedicatedContainer.get)
2607+
}
2608+
25462609
private[spark] def parseThroughputControlAccountConfig(cfg: Map[String, String]): CosmosAccountConfig = {
25472610
val throughputControlAccountEndpoint = CosmosConfigEntry.parse(cfg, throughputControlAccountEndpointUriSupplier)
25482611
val throughputControlAccountKey = CosmosConfigEntry.parse(cfg, throughputControlAccountKeySupplier)

0 commit comments

Comments
 (0)