Skip to content

Commit f39a860

Browse files
authored
[core] Kill GcsUnregisterSubscriber + publisher retry tests + doc (#56038)
Signed-off-by: dayshah <dhyey2019@gmail.com>
1 parent 6fdf2cf commit f39a860

File tree

21 files changed

+217
-381
lines changed

21 files changed

+217
-381
lines changed

src/fakes/ray/pubsub/publisher.h

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,24 +21,20 @@ namespace pubsub {
2121

2222
class FakePublisher : public Publisher {
2323
public:
24-
bool RegisterSubscription(const rpc::ChannelType channel_type,
24+
void RegisterSubscription(const rpc::ChannelType channel_type,
2525
const UniqueID &subscriber_id,
26-
const std::optional<std::string> &key_id) override {
27-
return true;
28-
}
26+
const std::optional<std::string> &key_id) override {}
2927

3028
void Publish(rpc::PubMessage pub_message) override {}
3129

3230
void PublishFailure(const rpc::ChannelType channel_type,
3331
const std::string &key_id) override {}
3432

35-
bool UnregisterSubscription(const rpc::ChannelType channel_type,
33+
void UnregisterSubscription(const rpc::ChannelType channel_type,
3634
const UniqueID &subscriber_id,
37-
const std::optional<std::string> &key_id) override {
38-
return true;
39-
}
35+
const std::optional<std::string> &key_id) override {}
4036

41-
void UnregisterSubscriber(const UniqueID &subscriber_id) override { return; }
37+
void UnregisterSubscriber(const UniqueID &subscriber_id) override {}
4238

4339
std::string DebugString() const override { return "FakePublisher"; }
4440
};

src/mock/ray/pubsub/publisher.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ class MockPublisher : public PublisherInterface {
2929
google::protobuf::RepeatedPtrField<rpc::PubMessage> *pub_messages,
3030
rpc::SendReplyCallback send_reply_callback),
3131
(override));
32-
MOCK_METHOD(bool,
32+
MOCK_METHOD(void,
3333
RegisterSubscription,
3434
(const rpc::ChannelType channel_type,
3535
const UniqueID &subscriber_id,
@@ -40,7 +40,7 @@ class MockPublisher : public PublisherInterface {
4040
PublishFailure,
4141
(const rpc::ChannelType channel_type, const std::string &key_id),
4242
(override));
43-
MOCK_METHOD(bool,
43+
MOCK_METHOD(void,
4444
UnregisterSubscription,
4545
(const rpc::ChannelType channel_type,
4646
const UniqueID &subscriber_id,

src/ray/core_worker/tests/reference_count_test.cc

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ using SubscriptionFailureCallbackMap =
107107
// static maps are used to simulate distirubted environment.
108108
static SubscriptionCallbackMap subscription_callback_map;
109109
static SubscriptionFailureCallbackMap subscription_failure_callback_map;
110-
static pubsub::pub_internal::SubscriptionIndex directory(
110+
static pubsub::SubscriptionIndex directory(
111111
rpc::ChannelType::WORKER_OBJECT_LOCATIONS_CHANNEL);
112112

113113
static std::string GenerateID(UniqueID publisher_id, UniqueID subscriber_id) {
@@ -127,7 +127,7 @@ using PublisherFactoryFn =
127127

128128
class MockDistributedSubscriber : public pubsub::SubscriberInterface {
129129
public:
130-
MockDistributedSubscriber(pubsub::pub_internal::SubscriptionIndex *dict,
130+
MockDistributedSubscriber(pubsub::SubscriptionIndex *dict,
131131
SubscriptionCallbackMap *sub_callback_map,
132132
SubscriptionFailureCallbackMap *sub_failure_callback_map,
133133
UniqueID subscriber_id,
@@ -136,7 +136,7 @@ class MockDistributedSubscriber : public pubsub::SubscriberInterface {
136136
subscription_callback_map_(sub_callback_map),
137137
subscription_failure_callback_map_(sub_failure_callback_map),
138138
subscriber_id_(subscriber_id),
139-
subscriber_(std::make_unique<pubsub::pub_internal::SubscriberState>(
139+
subscriber_(std::make_unique<pubsub::SubscriberState>(
140140
subscriber_id,
141141
/*get_time_ms=*/[]() { return 1.0; },
142142
/*subscriber_timeout_ms=*/1000,
@@ -207,17 +207,17 @@ class MockDistributedSubscriber : public pubsub::SubscriberInterface {
207207
return "";
208208
}
209209

210-
pubsub::pub_internal::SubscriptionIndex *directory_;
210+
pubsub::SubscriptionIndex *directory_;
211211
SubscriptionCallbackMap *subscription_callback_map_;
212212
SubscriptionFailureCallbackMap *subscription_failure_callback_map_;
213213
UniqueID subscriber_id_;
214-
std::unique_ptr<pubsub::pub_internal::SubscriberState> subscriber_;
214+
std::unique_ptr<pubsub::SubscriberState> subscriber_;
215215
PublisherFactoryFn client_factory_;
216216
};
217217

218218
class MockDistributedPublisher : public pubsub::PublisherInterface {
219219
public:
220-
MockDistributedPublisher(pubsub::pub_internal::SubscriptionIndex *dict,
220+
MockDistributedPublisher(pubsub::SubscriptionIndex *dict,
221221
SubscriptionCallbackMap *sub_callback_map,
222222
SubscriptionFailureCallbackMap *sub_failure_callback_map,
223223
WorkerID publisher_id)
@@ -227,11 +227,10 @@ class MockDistributedPublisher : public pubsub::PublisherInterface {
227227
publisher_id_(publisher_id) {}
228228
~MockDistributedPublisher() = default;
229229

230-
bool RegisterSubscription(const rpc::ChannelType channel_type,
230+
void RegisterSubscription(const rpc::ChannelType channel_type,
231231
const UniqueID &subscriber_id,
232232
const std::optional<std::string> &key_id_binary) override {
233233
RAY_CHECK(false) << "No need to implement it for testing.";
234-
return false;
235234
}
236235

237236
void PublishFailure(const rpc::ChannelType channel_type,
@@ -256,13 +255,11 @@ class MockDistributedPublisher : public pubsub::PublisherInterface {
256255
}
257256
}
258257

259-
bool UnregisterSubscription(const rpc::ChannelType channel_type,
258+
void UnregisterSubscription(const rpc::ChannelType channel_type,
260259
const UniqueID &subscriber_id,
261-
const std::optional<std::string> &key_id_binary) override {
262-
return true;
263-
}
260+
const std::optional<std::string> &key_id_binary) override {}
264261

265-
void UnregisterSubscriber(const UniqueID &subscriber_id) override { return; }
262+
void UnregisterSubscriber(const UniqueID &subscriber_id) override {}
266263

267264
void ConnectToSubscriber(
268265
const rpc::PubsubLongPollingRequest &request,
@@ -272,7 +269,7 @@ class MockDistributedPublisher : public pubsub::PublisherInterface {
272269

273270
std::string DebugString() const override { return ""; }
274271

275-
pubsub::pub_internal::SubscriptionIndex *directory_;
272+
pubsub::SubscriptionIndex *directory_;
276273
SubscriptionCallbackMap *subscription_callback_map_;
277274
SubscriptionFailureCallbackMap *subscription_failure_callback_map_;
278275
WorkerID publisher_id_;

src/ray/gcs/gcs_server/grpc_service_interfaces.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -143,10 +143,6 @@ class InternalPubSubGcsServiceHandler {
143143
virtual void HandleGcsSubscriberCommandBatch(GcsSubscriberCommandBatchRequest request,
144144
GcsSubscriberCommandBatchReply *reply,
145145
SendReplyCallback send_reply_callback) = 0;
146-
147-
virtual void HandleGcsUnregisterSubscriber(GcsUnregisterSubscriberRequest request,
148-
GcsUnregisterSubscriberReply *reply,
149-
SendReplyCallback send_reply_callback) = 0;
150146
};
151147

152148
class JobInfoGcsServiceHandler {

src/ray/gcs/gcs_server/grpc_services.cc

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,6 @@ void InternalPubSubGrpcService::InitServerCallFactories(
7979
InternalPubSubGcsService, GcsSubscriberPoll, max_active_rpcs_per_handler_);
8080
RPC_SERVICE_HANDLER(
8181
InternalPubSubGcsService, GcsSubscriberCommandBatch, max_active_rpcs_per_handler_);
82-
RPC_SERVICE_HANDLER(
83-
InternalPubSubGcsService, GcsUnregisterSubscriber, max_active_rpcs_per_handler_);
8482
}
8583

8684
void JobInfoGrpcService::InitServerCallFactories(

src/ray/gcs/gcs_server/pubsub_handler.cc

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -93,15 +93,6 @@ void InternalPubSubHandler::HandleGcsSubscriberCommandBatch(
9393
send_reply_callback(Status::OK(), nullptr, nullptr);
9494
}
9595

96-
void InternalPubSubHandler::HandleGcsUnregisterSubscriber(
97-
rpc::GcsUnregisterSubscriberRequest request,
98-
rpc::GcsUnregisterSubscriberReply *reply,
99-
rpc::SendReplyCallback send_reply_callback) {
100-
const auto subscriber_id = UniqueID::FromBinary(request.subscriber_id());
101-
gcs_publisher_.GetPublisher().UnregisterSubscriber(subscriber_id);
102-
send_reply_callback(Status::OK(), nullptr, nullptr);
103-
}
104-
10596
void InternalPubSubHandler::AsyncRemoveSubscriberFrom(const std::string &sender_id) {
10697
io_service_.post(
10798
[this, sender_id]() {

src/ray/gcs/gcs_server/pubsub_handler.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,6 @@ class InternalPubSubHandler : public rpc::InternalPubSubGcsServiceHandler {
4444
rpc::GcsSubscriberCommandBatchReply *reply,
4545
rpc::SendReplyCallback send_reply_callback) final;
4646

47-
void HandleGcsUnregisterSubscriber(rpc::GcsUnregisterSubscriberRequest request,
48-
rpc::GcsUnregisterSubscriberReply *reply,
49-
rpc::SendReplyCallback send_reply_callback) final;
50-
5147
/// This function is only for external callers. Internally, can just erase from
5248
/// sender_to_subscribers_ and everything should be on the Publisher's io_service_.
5349
void AsyncRemoveSubscriberFrom(const std::string &sender_id);

src/ray/gcs/pubsub/gcs_pub_sub.cc

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -335,10 +335,14 @@ Status PythonGcsSubscriber::Close() {
335335

336336
grpc::ClientContext context;
337337

338-
rpc::GcsUnregisterSubscriberRequest request;
338+
rpc::GcsSubscriberCommandBatchRequest request;
339339
request.set_subscriber_id(subscriber_id_);
340-
rpc::GcsUnregisterSubscriberReply reply;
341-
grpc::Status status = pubsub_stub_->GcsUnregisterSubscriber(&context, request, &reply);
340+
auto *command = request.add_commands();
341+
command->set_channel_type(channel_type_);
342+
command->mutable_unsubscribe_message();
343+
rpc::GcsSubscriberCommandBatchReply reply;
344+
grpc::Status status =
345+
pubsub_stub_->GcsSubscriberCommandBatch(&context, request, &reply);
342346

343347
if (!status.ok()) {
344348
RAY_LOG(WARNING) << "Error while unregistering the subscriber: "

src/ray/object_manager/ownership_object_directory.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,6 @@ ray::Status OwnershipBasedObjectDirectory::SubscribeObjectLocations(
346346
auto failure_callback = [this, owner_address](const std::string &object_id_binary,
347347
const Status &status) {
348348
const auto obj_id = ObjectID::FromBinary(object_id_binary);
349-
rpc::WorkerObjectLocationsPubMessage location_info;
350349
if (!status.ok()) {
351350
RAY_LOG(INFO).WithField(obj_id)
352351
<< "Failed to get the location: " << status.ToString();
@@ -362,9 +361,10 @@ ray::Status OwnershipBasedObjectDirectory::SubscribeObjectLocations(
362361
// Location lookup can fail if the owner is reachable but no longer has a
363362
// record of this ObjectRef, most likely due to an issue with the
364363
// distributed reference counting protocol.
365-
ObjectLocationSubscriptionCallback(location_info,
366-
obj_id,
367-
/*location_lookup_failed*/ true);
364+
ObjectLocationSubscriptionCallback(
365+
/*location_info=*/rpc::WorkerObjectLocationsPubMessage{},
366+
obj_id,
367+
/*location_lookup_failed*/ true);
368368
};
369369

370370
auto sub_message = std::make_unique<rpc::SubMessage>();

src/ray/protobuf/gcs_service.proto

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -680,12 +680,6 @@ message GcsSubscriberCommandBatchReply {
680680
GcsStatus status = 100;
681681
}
682682

683-
message GcsUnregisterSubscriberRequest {
684-
bytes subscriber_id = 1;
685-
}
686-
687-
message GcsUnregisterSubscriberReply {}
688-
689683
/// This supports subscribing updates from GCS with long poll, and registering /
690684
/// de-registering subscribers.
691685
service InternalPubSubGcsService {
@@ -699,10 +693,6 @@ service InternalPubSubGcsService {
699693
/// A batch of subscribe / unsubscribe requests sent by the subscriber.
700694
rpc GcsSubscriberCommandBatch(GcsSubscriberCommandBatchRequest)
701695
returns (GcsSubscriberCommandBatchReply);
702-
/// Unregister a subscriber from GCS, removing all subscriptions as well as the
703-
/// subscriber itself.
704-
rpc GcsUnregisterSubscriber(GcsUnregisterSubscriberRequest)
705-
returns (GcsUnregisterSubscriberReply);
706696
}
707697

708698
message GetAllResourceUsageRequest {}

0 commit comments

Comments
 (0)