Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/ray/ray_syncer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ ray_cc_library(
"//src/ray/common:constants",
"//src/ray/common:id",
"//src/ray/protobuf:ray_syncer_cc_grpc",
"//src/ray/rpc/authentication:authentication_mode",
"//src/ray/rpc/authentication:authentication_token",
"//src/ray/rpc/authentication:authentication_token_loader",
"//src/ray/rpc/authentication:authentication_token_validator",
"@com_github_grpc_grpc//:grpc++",
"@com_google_absl//absl/container:flat_hash_map",
],
Expand Down
1 change: 1 addition & 0 deletions src/ray/ray_syncer/ray_syncer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ ServerBidiReactor *RaySyncerService::StartSync(grpc::CallbackServerContext *cont
syncer_.node_state_->RemoveNode(node_id);
},
/*auth_token=*/auth_token_,
/*auth_token_validator=*/auth_token_validator_,
/*max_batch_size=*/syncer_.max_batch_size_,
/*max_batch_delay_ms=*/syncer_.max_batch_delay_ms_);
RAY_LOG(DEBUG).WithField(NodeID::FromBinary(reactor->GetRemoteNodeID()))
Expand Down
12 changes: 10 additions & 2 deletions src/ray/ray_syncer/ray_syncer.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "ray/common/id.h"
#include "ray/ray_syncer/common.h"
#include "ray/rpc/authentication/authentication_token.h"
#include "ray/rpc/authentication/authentication_token_validator.h"
#include "src/ray/protobuf/ray_syncer.grpc.pb.h"

namespace ray::syncer {
Expand Down Expand Up @@ -213,8 +214,12 @@ class RaySyncerService : public ray::rpc::syncer::RaySyncer::CallbackService {
public:
explicit RaySyncerService(
RaySyncer &syncer,
std::shared_ptr<const ray::rpc::AuthenticationToken> auth_token = nullptr)
: syncer_(syncer), auth_token_(std::move(auth_token)) {}
std::shared_ptr<const ray::rpc::AuthenticationToken> auth_token = nullptr,
ray::rpc::AuthenticationTokenValidator &auth_token_validator =
ray::rpc::AuthenticationTokenValidator::instance())
: syncer_(syncer),
auth_token_(std::move(auth_token)),
auth_token_validator_(auth_token_validator) {}

grpc::ServerBidiReactor<RaySyncMessageBatch, RaySyncMessageBatch> *StartSync(
grpc::CallbackServerContext *context) override;
Expand All @@ -225,6 +230,9 @@ class RaySyncerService : public ray::rpc::syncer::RaySyncer::CallbackService {
// Authentication token for validation, will be nullptr if token authentication is
// disabled
std::shared_ptr<const ray::rpc::AuthenticationToken> auth_token_;

// Validator for authentication token
ray::rpc::AuthenticationTokenValidator &auth_token_validator_;
Copy link
Contributor

@sampan-s-nayak sampan-s-nayak Feb 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any specific reason to pass and store this instance? why not just call ray::rpc::AuthenticationTokenValidator::instance() wherever required?

};

} // namespace ray::syncer
9 changes: 6 additions & 3 deletions src/ray/ray_syncer/ray_syncer_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <utility>

#include "ray/common/constants.h"
#include "ray/rpc/authentication/authentication_mode.h"

namespace ray::syncer {

Expand All @@ -39,6 +40,7 @@ RayServerBidiReactor::RayServerBidiReactor(
std::function<void(std::shared_ptr<const RaySyncMessage>)> message_processor,
std::function<void(RaySyncerBidiReactor *, bool)> cleanup_cb,
std::shared_ptr<const ray::rpc::AuthenticationToken> auth_token,
ray::rpc::AuthenticationTokenValidator &auth_token_validator,
size_t max_batch_size,
uint64_t max_batch_delay_ms)
: RaySyncerBidiReactorBase<ServerBidiReactor>(
Expand All @@ -49,8 +51,9 @@ RayServerBidiReactor::RayServerBidiReactor(
max_batch_delay_ms),
cleanup_cb_(std::move(cleanup_cb)),
server_context_(server_context),
auth_token_(std::move(auth_token)) {
if (auth_token_ && !auth_token_->empty()) {
auth_token_(std::move(auth_token)),
auth_token_validator_(auth_token_validator) {
if ((auth_token_ && !auth_token_->empty()) || ray::rpc::IsK8sTokenAuthEnabled()) {
// Validate authentication token
const auto &metadata = server_context->client_metadata();
auto it = metadata.find(kAuthTokenKey);
Expand All @@ -64,7 +67,7 @@ RayServerBidiReactor::RayServerBidiReactor(

const std::string_view header(it->second.data(), it->second.length());

if (!auth_token_->CompareWithMetadata(header)) {
if (!auth_token_validator_.ValidateToken(auth_token_, header)) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR description lacks explanation of problem being fixed

Low Severity

The PR description states "This is a bug fix to Ray sync server to use authentication token validator" but doesn't explain what problem was present. It explains how the fix works (delegating to Kubernetes) but not what issue the code had before. For example, was token validation failing entirely? Was K8s authentication not being checked? The description needs a sentence explaining the actual problem.

⚠️ This PR needs a clearer title and/or description.

To help reviewers, please ensure your PR includes:

  • Title: A concise summary of the change
  • Description:
    • What problem does this solve?
    • How does this PR solve it?
    • Any relevant context for reviewers such as:
      • Why is the problem important to solve?
      • Why was this approach chosen over others?

See this list of PRs as examples for PRs that have gone above and beyond:

Fix in Cursor Fix in Web

RAY_LOG(WARNING) << "Invalid bearer token in syncer connection from node "
<< NodeID::FromBinary(GetRemoteNodeID());
Finish(grpc::Status(grpc::StatusCode::UNAUTHENTICATED, "Invalid bearer token"));
Expand Down
5 changes: 5 additions & 0 deletions src/ray/ray_syncer/ray_syncer_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "ray/ray_syncer/ray_syncer_bidi_reactor.h"
#include "ray/ray_syncer/ray_syncer_bidi_reactor_base.h"
#include "ray/rpc/authentication/authentication_token.h"
#include "ray/rpc/authentication/authentication_token_validator.h"

namespace ray::syncer {

Expand All @@ -41,6 +42,7 @@ class RayServerBidiReactor : public RaySyncerBidiReactorBase<ServerBidiReactor>
std::function<void(std::shared_ptr<const RaySyncMessage>)> message_processor,
std::function<void(RaySyncerBidiReactor *, bool)> cleanup_cb,
std::shared_ptr<const ray::rpc::AuthenticationToken> auth_token,
ray::rpc::AuthenticationTokenValidator &auth_token_validator,
size_t max_batch_size,
uint64_t max_batch_delay_ms);

Expand Down Expand Up @@ -68,6 +70,9 @@ class RayServerBidiReactor : public RaySyncerBidiReactorBase<ServerBidiReactor>
/// disabled
std::shared_ptr<const ray::rpc::AuthenticationToken> auth_token_;

/// Validator for authentication token
ray::rpc::AuthenticationTokenValidator &auth_token_validator_;

/// Track if Finish() has been called to avoid using a reactor that is terminating
std::atomic<bool> finished_{false};

Expand Down
1 change: 1 addition & 0 deletions src/ray/ray_syncer/tests/ray_syncer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -925,6 +925,7 @@ struct MockRaySyncerService : public ray::rpc::syncer::RaySyncer::CallbackServic
message_processor,
cleanup_cb,
nullptr,
ray::rpc::AuthenticationTokenValidator::instance(),
/*max_batch_size=*/1,
/*max_batch_delay_ms=*/0);
return reactor;
Expand Down