Skip to content

Commit 6fdf2cf

Browse files
authored
[core] Split gcs_autoscaler_state_manager out from gcs_server_lib (#56053)
🫡 --------- Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
1 parent 3c9b3ed commit 6fdf2cf

11 files changed

Lines changed: 184 additions & 145 deletions

src/ray/gcs/gcs_server/BUILD.bazel

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,7 @@ ray_cc_library(
328328
visibility = ["//visibility:private"],
329329
deps = [
330330
"//src/ray/common:status",
331+
"//src/ray/protobuf:autoscaler_cc_grpc",
331332
"//src/ray/protobuf:gcs_service_cc_grpc",
332333
],
333334
)
@@ -345,6 +346,7 @@ ray_cc_library(
345346
":grpc_service_interfaces",
346347
"//src/ray/common:asio",
347348
"//src/ray/common:id",
349+
"//src/ray/protobuf:autoscaler_cc_grpc",
348350
"//src/ray/protobuf:gcs_service_cc_grpc",
349351
"//src/ray/rpc:grpc_server",
350352
"//src/ray/rpc:server_call",
@@ -444,19 +446,51 @@ ray_cc_library(
444446
)
445447

446448
ray_cc_library(
447-
name = "gcs_server_lib",
449+
name = "gcs_autoscaler_state_manager",
448450
srcs = [
449451
"gcs_autoscaler_state_manager.cc",
450-
"gcs_server.cc",
451452
],
452453
hdrs = [
453454
"gcs_autoscaler_state_manager.h",
455+
],
456+
implementation_deps = [
457+
"//src/ray/common:ray_config",
458+
"//src/ray/gcs:gcs_pb_util",
459+
"//src/ray/util:logging",
460+
"//src/ray/util:string_utils",
461+
"//src/ray/util:time",
462+
],
463+
deps = [
464+
":gcs_actor_manager",
465+
":gcs_init_data",
466+
":gcs_kv_manager",
467+
":gcs_node_manager",
468+
":gcs_placement_group_manager",
469+
":gcs_state_util",
470+
":grpc_service_interfaces",
471+
"//src/ray/common:asio",
472+
"//src/ray/common:id",
473+
"//src/ray/gcs/pubsub:gcs_pub_sub_lib",
474+
"//src/ray/protobuf:gcs_cc_proto",
475+
"//src/ray/util:thread_checker",
476+
"@com_google_absl//absl/container:flat_hash_map",
477+
"@com_google_googletest//:gtest",
478+
],
479+
)
480+
481+
ray_cc_library(
482+
name = "gcs_server_lib",
483+
srcs = [
484+
"gcs_server.cc",
485+
],
486+
hdrs = [
454487
"gcs_server.h",
455488
],
456489
deps = [
457490
":gcs_actor",
458491
":gcs_actor_manager",
459492
":gcs_actor_scheduler",
493+
":gcs_autoscaler_state_manager",
460494
":gcs_function_manager",
461495
":gcs_health_check_manager",
462496
":gcs_init_data",

src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.cc

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,6 @@
1919
#include <utility>
2020
#include <vector>
2121

22-
#include "ray/gcs/gcs_server/gcs_actor_manager.h"
23-
#include "ray/gcs/gcs_server/gcs_node_manager.h"
24-
#include "ray/gcs/gcs_server/gcs_placement_group_manager.h"
25-
#include "ray/gcs/gcs_server/state_util.h"
2622
#include "ray/gcs/pb_util.h"
2723
#include "ray/util/string_utils.h"
2824
#include "ray/util/time.h"

src/ray/gcs/gcs_server/gcs_autoscaler_state_manager.h

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,29 +14,31 @@
1414

1515
#pragma once
1616

17+
#include <gtest/gtest_prod.h>
18+
1719
#include <memory>
1820
#include <string>
1921
#include <utility>
2022
#include <vector>
2123

24+
#include "absl/container/flat_hash_map.h"
25+
#include "ray/common/asio/instrumented_io_context.h"
26+
#include "ray/gcs/gcs_server/gcs_actor_manager.h"
2227
#include "ray/gcs/gcs_server/gcs_init_data.h"
2328
#include "ray/gcs/gcs_server/gcs_kv_manager.h"
29+
#include "ray/gcs/gcs_server/gcs_node_manager.h"
30+
#include "ray/gcs/gcs_server/gcs_placement_group_manager.h"
31+
#include "ray/gcs/gcs_server/grpc_service_interfaces.h"
2432
#include "ray/gcs/gcs_server/state_util.h"
2533
#include "ray/gcs/pubsub/gcs_pub_sub.h"
26-
#include "ray/rpc/gcs/gcs_rpc_server.h"
2734
#include "ray/rpc/node_manager/raylet_client_pool.h"
2835
#include "ray/util/thread_checker.h"
2936
#include "src/ray/protobuf/gcs.pb.h"
3037

3138
namespace ray {
3239
namespace gcs {
3340

34-
class GcsActorManager;
35-
class GcsNodeManager;
36-
class GcsPlacementGroupManager;
37-
class GcsResourceManager;
38-
39-
class GcsAutoscalerStateManager : public rpc::autoscaler::AutoscalerStateHandler {
41+
class GcsAutoscalerStateManager : public rpc::autoscaler::AutoscalerStateServiceHandler {
4042
public:
4143
GcsAutoscalerStateManager(std::string session_name,
4244
GcsNodeManager &gcs_node_manager,

src/ray/gcs/gcs_server/gcs_server.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -731,7 +731,9 @@ void GcsServer::InitGcsAutoscalerStateManager(const GcsInitData &gcs_init_data)
731731
gcs_autoscaler_state_manager_->Initialize(gcs_init_data);
732732
rpc_server_.RegisterService(
733733
std::make_unique<rpc::autoscaler::AutoscalerStateGrpcService>(
734-
io_context_provider_.GetDefaultIOContext(), *gcs_autoscaler_state_manager_));
734+
io_context_provider_.GetDefaultIOContext(),
735+
*gcs_autoscaler_state_manager_,
736+
RayConfig::instance().gcs_max_active_rpcs_per_handler()));
735737
}
736738

737739
void GcsServer::InitGcsTaskManager() {
@@ -742,7 +744,7 @@ void GcsServer::InitGcsTaskManager() {
742744
io_context,
743745
*gcs_task_manager_,
744746
RayConfig::instance().gcs_max_active_rpcs_per_handler()));
745-
rpc_server_.RegisterService(std::make_unique<rpc::RayEventExportGrpcService>(
747+
rpc_server_.RegisterService(std::make_unique<rpc::events::RayEventExportGrpcService>(
746748
io_context,
747749
*gcs_task_manager_,
748750
RayConfig::instance().gcs_max_active_rpcs_per_handler()));

src/ray/gcs/gcs_server/gcs_task_manager.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ class FinishedTaskActorTaskGcPolicy : public TaskEventsGcPolicyInterface {
9595
/// This class has its own io_context and io_thread, that's separate from other GCS
9696
/// services. All handling of all rpc should be posted to the single thread it owns.
9797
class GcsTaskManager : public rpc::TaskInfoGcsServiceHandler,
98-
public rpc::RayEventExportGcsServiceHandler {
98+
public rpc::events::RayEventExportGcsServiceHandler {
9999
public:
100100
/// Create a GcsTaskManager.
101101
explicit GcsTaskManager(instrumented_io_context &io_service);

src/ray/gcs/gcs_server/grpc_service_interfaces.h

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#pragma once
2323

2424
#include "ray/common/status.h"
25+
#include "src/ray/protobuf/autoscaler.grpc.pb.h"
2526
#include "src/ray/protobuf/gcs_service.grpc.pb.h"
2627

2728
namespace ray {
@@ -261,14 +262,6 @@ class TaskInfoGcsServiceHandler {
261262
SendReplyCallback send_reply_callback) = 0;
262263
};
263264

264-
class RayEventExportGcsServiceHandler {
265-
public:
266-
virtual ~RayEventExportGcsServiceHandler() = default;
267-
virtual void HandleAddEvents(events::AddEventsRequest request,
268-
events::AddEventsReply *reply,
269-
SendReplyCallback send_reply_callback) = 0;
270-
};
271-
272265
class PlacementGroupInfoGcsServiceHandler {
273266
public:
274267
virtual ~PlacementGroupInfoGcsServiceHandler() = default;
@@ -299,5 +292,51 @@ class PlacementGroupInfoGcsServiceHandler {
299292
SendReplyCallback send_reply_callback) = 0;
300293
};
301294

295+
namespace autoscaler {
296+
297+
class AutoscalerStateServiceHandler {
298+
public:
299+
virtual ~AutoscalerStateServiceHandler() = default;
300+
301+
virtual void HandleGetClusterResourceState(GetClusterResourceStateRequest request,
302+
GetClusterResourceStateReply *reply,
303+
SendReplyCallback send_reply_callback) = 0;
304+
305+
virtual void HandleReportAutoscalingState(ReportAutoscalingStateRequest request,
306+
ReportAutoscalingStateReply *reply,
307+
SendReplyCallback send_reply_callback) = 0;
308+
309+
virtual void HandleRequestClusterResourceConstraint(
310+
RequestClusterResourceConstraintRequest request,
311+
RequestClusterResourceConstraintReply *reply,
312+
SendReplyCallback send_reply_callback) = 0;
313+
314+
virtual void HandleGetClusterStatus(GetClusterStatusRequest request,
315+
GetClusterStatusReply *reply,
316+
SendReplyCallback send_reply_callback) = 0;
317+
318+
virtual void HandleDrainNode(DrainNodeRequest request,
319+
DrainNodeReply *reply,
320+
SendReplyCallback send_reply_callback) = 0;
321+
322+
virtual void HandleReportClusterConfig(ReportClusterConfigRequest request,
323+
ReportClusterConfigReply *reply,
324+
SendReplyCallback send_reply_callback) = 0;
325+
};
326+
327+
} // namespace autoscaler
328+
329+
namespace events {
330+
331+
class RayEventExportGcsServiceHandler {
332+
public:
333+
virtual ~RayEventExportGcsServiceHandler() = default;
334+
virtual void HandleAddEvents(events::AddEventsRequest request,
335+
events::AddEventsReply *reply,
336+
SendReplyCallback send_reply_callback) = 0;
337+
};
338+
339+
} // namespace events
340+
302341
} // namespace rpc
303342
} // namespace ray

src/ray/gcs/gcs_server/grpc_services.cc

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -142,16 +142,6 @@ void TaskInfoGrpcService::InitServerCallFactories(
142142
RPC_SERVICE_HANDLER(TaskInfoGcsService, GetTaskEvents, max_active_rpcs_per_handler_)
143143
}
144144

145-
using events::AddEventsReply;
146-
using events::AddEventsRequest;
147-
148-
void RayEventExportGrpcService::InitServerCallFactories(
149-
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
150-
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories,
151-
const ClusterID &cluster_id) {
152-
RPC_SERVICE_HANDLER(RayEventExportGcsService, AddEvents, max_active_rpcs_per_handler_)
153-
}
154-
155145
void PlacementGroupInfoGrpcService::InitServerCallFactories(
156146
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
157147
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories,
@@ -171,5 +161,38 @@ void PlacementGroupInfoGrpcService::InitServerCallFactories(
171161
max_active_rpcs_per_handler_)
172162
}
173163

164+
namespace autoscaler {
165+
166+
void AutoscalerStateGrpcService::InitServerCallFactories(
167+
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
168+
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories,
169+
const ClusterID &cluster_id) {
170+
RPC_SERVICE_HANDLER(
171+
AutoscalerStateService, GetClusterResourceState, max_active_rpcs_per_handler_)
172+
RPC_SERVICE_HANDLER(
173+
AutoscalerStateService, ReportAutoscalingState, max_active_rpcs_per_handler_)
174+
RPC_SERVICE_HANDLER(
175+
AutoscalerStateService, ReportClusterConfig, max_active_rpcs_per_handler_)
176+
RPC_SERVICE_HANDLER(AutoscalerStateService,
177+
RequestClusterResourceConstraint,
178+
max_active_rpcs_per_handler_)
179+
RPC_SERVICE_HANDLER(
180+
AutoscalerStateService, GetClusterStatus, max_active_rpcs_per_handler_)
181+
RPC_SERVICE_HANDLER(AutoscalerStateService, DrainNode, max_active_rpcs_per_handler_)
182+
}
183+
184+
} // namespace autoscaler
185+
186+
namespace events {
187+
188+
void RayEventExportGrpcService::InitServerCallFactories(
189+
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
190+
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories,
191+
const ClusterID &cluster_id) {
192+
RPC_SERVICE_HANDLER(RayEventExportGcsService, AddEvents, max_active_rpcs_per_handler_)
193+
}
194+
195+
} // namespace events
196+
174197
} // namespace rpc
175198
} // namespace ray

src/ray/gcs/gcs_server/grpc_services.h

Lines changed: 45 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include "ray/gcs/gcs_server/grpc_service_interfaces.h"
3131
#include "ray/rpc/grpc_server.h"
3232
#include "ray/rpc/server_call.h"
33+
#include "src/ray/protobuf/autoscaler.grpc.pb.h"
3334
#include "src/ray/protobuf/gcs_service.grpc.pb.h"
3435

3536
namespace ray {
@@ -242,11 +243,36 @@ class TaskInfoGrpcService : public GrpcService {
242243
int64_t max_active_rpcs_per_handler_;
243244
};
244245

245-
class RayEventExportGrpcService : public GrpcService {
246+
class PlacementGroupInfoGrpcService : public GrpcService {
246247
public:
247-
explicit RayEventExportGrpcService(instrumented_io_context &io_service,
248-
RayEventExportGcsServiceHandler &handler,
249-
int64_t max_active_rpcs_per_handler)
248+
explicit PlacementGroupInfoGrpcService(instrumented_io_context &io_service,
249+
PlacementGroupInfoGcsServiceHandler &handler,
250+
int64_t max_active_rpcs_per_handler)
251+
: GrpcService(io_service),
252+
service_handler_(handler),
253+
max_active_rpcs_per_handler_(max_active_rpcs_per_handler) {}
254+
255+
protected:
256+
grpc::Service &GetGrpcService() override { return service_; }
257+
258+
void InitServerCallFactories(
259+
const std::unique_ptr<grpc::ServerCompletionQueue> &cq,
260+
std::vector<std::unique_ptr<ServerCallFactory>> *server_call_factories,
261+
const ClusterID &cluster_id) override;
262+
263+
private:
264+
PlacementGroupInfoGcsService::AsyncService service_;
265+
PlacementGroupInfoGcsServiceHandler &service_handler_;
266+
int64_t max_active_rpcs_per_handler_;
267+
};
268+
269+
namespace autoscaler {
270+
271+
class AutoscalerStateGrpcService : public GrpcService {
272+
public:
273+
explicit AutoscalerStateGrpcService(instrumented_io_context &io_service,
274+
AutoscalerStateServiceHandler &handler,
275+
int64_t max_active_rpcs_per_handler)
250276
: GrpcService(io_service),
251277
service_handler_(handler),
252278
max_active_rpcs_per_handler_(max_active_rpcs_per_handler){};
@@ -260,19 +286,23 @@ class RayEventExportGrpcService : public GrpcService {
260286
const ClusterID &cluster_id) override;
261287

262288
private:
263-
RayEventExportGcsService::AsyncService service_;
264-
RayEventExportGcsServiceHandler &service_handler_;
289+
AutoscalerStateService::AsyncService service_;
290+
AutoscalerStateServiceHandler &service_handler_;
265291
int64_t max_active_rpcs_per_handler_;
266292
};
267293

268-
class PlacementGroupInfoGrpcService : public GrpcService {
294+
} // namespace autoscaler
295+
296+
namespace events {
297+
298+
class RayEventExportGrpcService : public GrpcService {
269299
public:
270-
explicit PlacementGroupInfoGrpcService(instrumented_io_context &io_service,
271-
PlacementGroupInfoGcsServiceHandler &handler,
272-
int64_t max_active_rpcs_per_handler)
300+
explicit RayEventExportGrpcService(instrumented_io_context &io_service,
301+
RayEventExportGcsServiceHandler &handler,
302+
int64_t max_active_rpcs_per_handler)
273303
: GrpcService(io_service),
274304
service_handler_(handler),
275-
max_active_rpcs_per_handler_(max_active_rpcs_per_handler) {}
305+
max_active_rpcs_per_handler_(max_active_rpcs_per_handler){};
276306

277307
protected:
278308
grpc::Service &GetGrpcService() override { return service_; }
@@ -283,10 +313,12 @@ class PlacementGroupInfoGrpcService : public GrpcService {
283313
const ClusterID &cluster_id) override;
284314

285315
private:
286-
PlacementGroupInfoGcsService::AsyncService service_;
287-
PlacementGroupInfoGcsServiceHandler &service_handler_;
316+
RayEventExportGcsService::AsyncService service_;
317+
RayEventExportGcsServiceHandler &service_handler_;
288318
int64_t max_active_rpcs_per_handler_;
289319
};
290320

321+
} // namespace events
322+
291323
} // namespace rpc
292324
} // namespace ray

src/ray/gcs/gcs_server/tests/BUILD.bazel

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -336,8 +336,13 @@ ray_cc_test(
336336
"//:ray_fakes",
337337
"//:ray_mock",
338338
"//src/fakes/ray/rpc/raylet:fake_raylet_client",
339-
"//src/ray/gcs/gcs_server:gcs_server_lib",
339+
"//src/ray/common:asio",
340+
"//src/ray/gcs/gcs_server:gcs_autoscaler_state_manager",
341+
"//src/ray/gcs/gcs_server:gcs_init_data",
342+
"//src/ray/gcs/gcs_server:gcs_resource_manager",
343+
"//src/ray/gcs/gcs_server:gcs_store_client_kv",
340344
"//src/ray/gcs/tests:gcs_test_util_lib",
345+
"//src/ray/raylet/scheduling:cluster_resource_manager",
341346
"@com_google_googletest//:gtest_main",
342347
],
343348
)

0 commit comments

Comments
 (0)