Skip to content

Commit adb4f33

Browse files
author
sabdenovch
committed
YT-26489: Worker pool in rpc proxy is fair-share
* Changelog entry Type: feature Component: proxy Worker pool in rpc proxies is made fair-share. commit_hash:9ce485656813b6821badbf1bc855ce79809b7206
1 parent b7be3e3 commit adb4f33

File tree

8 files changed

+67
-5
lines changed

8 files changed

+67
-5
lines changed

yt/yt/client/api/dynamic_table_client.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@ struct TMultiLookupOptions
5454
: public TTimeoutOptions
5555
, public TTabletReadOptionsBase
5656
, public TMultiplexingBandOptions
57-
{ };
57+
{
58+
std::optional<std::string> ExecutionPool;
59+
};
5860

5961
struct TExplainQueryOptions
6062
: public TSelectRowsOptionsBase

yt/yt/client/api/rpc_proxy/client_base.cpp

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -937,6 +937,12 @@ TFuture<TUnversionedLookupRowsResult> TClientBase::LookupRows(
937937

938938
YT_OPTIONAL_TO_PROTO(req, execution_pool, options.ExecutionPool);
939939

940+
auto* ext = req->Header().MutableExtension(NProto::TReqFairSharePoolExt::req_fair_share_pool_ext);
941+
YT_OPTIONAL_TO_PROTO(ext, execution_pool, options.ExecutionPool);
942+
if (auto* traceContext = NTracing::TryGetCurrentTraceContext()) {
943+
ext->set_execution_tag(ToString(traceContext->GetTraceId()));
944+
}
945+
940946
return req->Invoke().Apply(BIND([] (const TApiServiceProxy::TRspLookupRowsPtr& rsp) {
941947
auto rowset = DeserializeRowset<TUnversionedRow>(
942948
rsp->rowset_descriptor(),
@@ -983,7 +989,13 @@ TFuture<TVersionedLookupRowsResult> TClientBase::VersionedLookupRows(
983989
THROW_ERROR_EXCEPTION("Versioned lookup does not support versioned read mode %Qlv",
984990
options.VersionedReadOptions.ReadMode);
985991
}
992+
986993
YT_OPTIONAL_TO_PROTO(req, execution_pool, options.ExecutionPool);
994+
auto* ext = req->Header().MutableExtension(NProto::TReqFairSharePoolExt::req_fair_share_pool_ext);
995+
YT_OPTIONAL_TO_PROTO(ext, execution_pool, options.ExecutionPool);
996+
if (auto* traceContext = NTracing::TryGetCurrentTraceContext()) {
997+
ext->set_execution_tag(ToString(traceContext->GetTraceId()));
998+
}
987999

9881000
return req->Invoke().Apply(BIND([] (const TApiServiceProxy::TRspVersionedLookupRowsPtr& rsp) {
9891001
auto rowset = DeserializeRowset<TVersionedRow>(
@@ -1062,6 +1074,14 @@ TFuture<std::vector<TUnversionedLookupRowsResult>> TClientBase::MultiLookupRows(
10621074
req->set_multiplexing_band(static_cast<NProto::EMultiplexingBand>(options.MultiplexingBand));
10631075
ToProto(req->mutable_tablet_read_options(), options);
10641076

1077+
YT_OPTIONAL_TO_PROTO(req, execution_pool, options.ExecutionPool);
1078+
1079+
auto* ext = req->Header().MutableExtension(NProto::TReqFairSharePoolExt::req_fair_share_pool_ext);
1080+
YT_OPTIONAL_TO_PROTO(ext, execution_pool, options.ExecutionPool);
1081+
if (auto* traceContext = NTracing::TryGetCurrentTraceContext()) {
1082+
ext->set_execution_tag(ToString(traceContext->GetTraceId()));
1083+
}
1084+
10651085
return req->Invoke().Apply(BIND([subrequestCount = std::ssize(subrequests)] (const TApiServiceProxy::TRspMultiLookupPtr& rsp) {
10661086
YT_VERIFY(subrequestCount == rsp->subresponses_size());
10671087

@@ -1146,6 +1166,12 @@ TFuture<TSelectRowsResult> TClientBase::SelectRows(
11461166
req->set_allow_join_without_index(options.AllowJoinWithoutIndex);
11471167

11481168
YT_OPTIONAL_TO_PROTO(req, execution_pool, options.ExecutionPool);
1169+
auto* ext = req->Header().MutableExtension(NProto::TReqFairSharePoolExt::req_fair_share_pool_ext);
1170+
YT_OPTIONAL_TO_PROTO(ext, execution_pool, options.ExecutionPool);
1171+
if (auto* traceContext = NTracing::TryGetCurrentTraceContext()) {
1172+
ext->set_execution_tag(ToString(traceContext->GetTraceId()));
1173+
}
1174+
11491175
if (options.PlaceholderValues) {
11501176
req->set_placeholder_values(ToProto(options.PlaceholderValues));
11511177
}

yt/yt/core/concurrency/public.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,8 @@ class TPropagatingStorage;
131131

132132
YT_DECLARE_RECONFIGURABLE_SINGLETON(TFiberManagerConfig, TFiberManagerDynamicConfig);
133133

134+
extern const TFairShareThreadPoolTag DefaultExecutionTag;
135+
134136
////////////////////////////////////////////////////////////////////////////////
135137

136138
} // namespace NYT::NConcurrency

yt/yt/core/concurrency/two_level_fair_share_thread_pool.cpp

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ namespace NYT::NConcurrency {
3030

3131
using namespace NProfiling;
3232

33+
const TFairShareThreadPoolTag DefaultExecutionTag = "default";
34+
3335
////////////////////////////////////////////////////////////////////////////////
3436

3537
namespace {
@@ -609,10 +611,15 @@ class TTwoLevelFairShareQueue
609611
, CumulativeSchedulingTimeCounter_(Profiler_
610612
.WithTags(GetThreadTags(ThreadNamePrefix_))
611613
.TimeCounter("/time/scheduling_cumulative"))
612-
, PoolWeightProvider_(options.PoolWeightProvider)
613614
, VerboseLogging_(options.VerboseLogging)
615+
, PoolWeightProvider_(options.PoolWeightProvider)
614616
{ }
615617

618+
void SetWeightProvider(IPoolWeightProviderPtr weightProvider)
619+
{
620+
PoolWeightProvider_ = std::move(weightProvider);
621+
}
622+
616623
~TTwoLevelFairShareQueue()
617624
{
618625
Shutdown();
@@ -821,9 +828,9 @@ class TTwoLevelFairShareQueue
821828
const std::string ThreadNamePrefix_;
822829
const TProfiler Profiler_;
823830
const NProfiling::TTimeCounter CumulativeSchedulingTimeCounter_;
824-
const IPoolWeightProviderPtr PoolWeightProvider_;
825831
const bool VerboseLogging_;
826832

833+
IPoolWeightProviderPtr PoolWeightProvider_;
827834
std::atomic<bool> Stopped_ = false;
828835
TMpscStack<TAction> InvokeQueue_;
829836
char Padding0_[CacheLineSize - sizeof(TMpscStack<TAction>)];
@@ -1357,6 +1364,11 @@ class TTwoLevelFairShareThreadPool
13571364
return Queue_->GetInvoker(poolName, bucketName);
13581365
}
13591366

1367+
void SetWeightProvider(IPoolWeightProviderPtr weightProvider) override
1368+
{
1369+
Queue_->SetWeightProvider(std::move(weightProvider));
1370+
}
1371+
13601372
void Shutdown() override
13611373
{
13621374
TThreadPoolBase::Shutdown();

yt/yt/core/concurrency/two_level_fair_share_thread_pool.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ struct ITwoLevelFairShareThreadPool
2929
const std::string& poolName,
3030
const TFairShareThreadPoolTag& tag) = 0;
3131

32+
virtual void SetWeightProvider(IPoolWeightProviderPtr weightProvider) = 0;
33+
3234
virtual void Shutdown() = 0;
3335

3436
//! Invoked to inform of the current wait time for invocations via this invoker.
@@ -57,5 +59,6 @@ ITwoLevelFairShareThreadPoolPtr CreateTwoLevelFairShareThreadPool(
5759
const std::string& threadNamePrefix,
5860
const TNewTwoLevelFairShareThreadPoolOptions& options = {});
5961

62+
////////////////////////////////////////////////////////////////////////////////
6063

6164
} // namespace NYT::NConcurrency

yt/yt/core/misc/async_expiring_cache.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ class TAsyncExpiringCache
7575
int GetSize() const;
7676

7777
protected:
78+
const NLogging::TLogger Logger_;
79+
7880
TAsyncExpiringCacheConfigPtr GetConfig() const;
7981

8082
virtual TFuture<TValue> DoGet(
@@ -102,7 +104,6 @@ class TAsyncExpiringCache
102104
void Ping(const TKey& key);
103105

104106
private:
105-
const NLogging::TLogger Logger_;
106107
const NConcurrency::TPeriodicExecutorPtr ExpirationExecutor_;
107108
const NConcurrency::TPeriodicExecutorPtr RefreshExecutor_;
108109
const int ShardCount_ = 1;

yt/yt/core/ytree/ypath_service.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -603,7 +603,6 @@ class TViaYPathService
603603
const IYPathServicePtr UnderlyingService_;
604604
const IInvokerPtr Invoker_;
605605

606-
607606
bool DoInvoke(const IYPathServiceContextPtr& context) override
608607
{
609608
Invoker_->Invoke(BIND([=, this, this_ = MakeStrong(this)] {

yt/yt_proto/yt/client/api/rpc_proxy/proto/api_service.proto

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ option go_package = "a.yandex-team.ru/yt/go/proto/client/api/rpc_proxy";
99
import "yt_proto/yt/core/misc/proto/error.proto";
1010
import "yt_proto/yt/core/misc/proto/guid.proto";
1111
import "yt_proto/yt/core/misc/proto/hyperloglog.proto";
12+
import "yt_proto/yt/core/rpc/proto/rpc.proto";
1213
import "yt_proto/yt/core/ytree/proto/attributes.proto";
1314
import "yt_proto/yt/core/ytree/proto/request_complexity_limits.proto";
1415
import "yt_proto/yt/client/chunk_client/proto/data_statistics.proto";
@@ -637,6 +638,7 @@ message TReqMultiLookup
637638
optional TTabletReadOptions tablet_read_options = 3;
638639
optional EReplicaConsistency replica_consistency = 6;
639640
optional EMultiplexingBand multiplexing_band = 4;
641+
optional string execution_pool = 7;
640642
}
641643

642644
message TRspMultiLookup
@@ -3963,3 +3965,18 @@ message TReqWriteShuffleData
39633965
message TRspWriteShuffleData
39643966
{
39653967
}
3968+
3969+
////////////////////////////////////////////////////////////////////////////////
3970+
3971+
message TReqFairSharePoolExt
3972+
{
3973+
extend NRpc.NProto.TRequestHeader
3974+
{
3975+
optional TReqFairSharePoolExt req_fair_share_pool_ext = 202;
3976+
}
3977+
3978+
optional string execution_pool = 1 [default = "default"];
3979+
optional string execution_tag = 2 [default = "default"];
3980+
}
3981+
3982+
////////////////////////////////////////////////////////////////////////////////

0 commit comments

Comments
 (0)