|
1 | 1 | package org.hypertrace.core.grpcutils.client; |
2 | 2 |
|
| 3 | +import static java.time.temporal.ChronoUnit.MINUTES; |
| 4 | +import static java.time.temporal.ChronoUnit.SECONDS; |
| 5 | + |
3 | 6 | import io.grpc.ManagedChannel; |
4 | 7 | import io.grpc.ManagedChannelBuilder; |
| 8 | +import java.time.Clock; |
| 9 | +import java.time.Instant; |
5 | 10 | import java.util.Map; |
6 | 11 | import java.util.concurrent.ConcurrentHashMap; |
| 12 | +import java.util.concurrent.TimeUnit; |
7 | 13 | import org.slf4j.Logger; |
8 | 14 | import org.slf4j.LoggerFactory; |
9 | 15 |
|
10 | 16 | public class GrpcChannelRegistry { |
11 | 17 | private static final Logger LOG = LoggerFactory.getLogger(GrpcChannelRegistry.class); |
12 | 18 | private final Map<String, ManagedChannel> channelMap = new ConcurrentHashMap<>(); |
| 19 | + private final Clock clock; |
13 | 20 | private volatile boolean isShutdown = false; |
14 | 21 |
|
| 22 | + @Deprecated |
| 23 | + public GrpcChannelRegistry() { |
| 24 | + this(Clock.systemUTC()); |
| 25 | + } |
| 26 | + |
| 27 | + public GrpcChannelRegistry(Clock clock) { |
| 28 | + this.clock = clock; |
| 29 | + } |
| 30 | + |
15 | 31 | /** |
16 | 32 | * Use either {@link #forSecureAddress(String, int)} or {@link #forPlaintextAddress(String, int)} |
17 | 33 | */ |
@@ -49,8 +65,83 @@ private String getChannelId(String host, int port, boolean isPlaintext) { |
49 | 65 | return securePrefix + ":" + host + ":" + port; |
50 | 66 | } |
51 | 67 |
|
| 68 | + /** |
| 69 | + * Shuts down channels using a default deadline of 1 minute. |
| 70 | + * |
| 71 | + * @see #shutdown(Instant) |
| 72 | + */ |
52 | 73 | public void shutdown() { |
53 | | - channelMap.values().forEach(ManagedChannel::shutdown); |
| 74 | + this.shutdown(this.clock.instant().plus(1, MINUTES)); |
| 75 | + } |
| 76 | + |
| 77 | + /** |
| 78 | + * Attempts to perform an orderly shutdown of all registered channels before the provided |
| 79 | + * deadline, else falling back to a forceful shutdown. The call waits for all shutdowns to |
| 80 | + * complete. More specifically, we go through three shutdown phases. |
| 81 | + * |
| 82 | + * <ol> |
| 83 | + * <li>First, we request an orderly shutdown across all registered channels. At this point, no |
| 84 | + * new calls will be accepted, but in-flight calls will be given a chance to complete before |
| 85 | + * shutting down. |
| 86 | + * <li>Next, we sequentially wait for each channel to complete. Although sequential, each |
| 87 | + * channel will wait no longer than the provided deadline. |
| 88 | + * <li>For any channels that have not shutdown successfully after the previous phase, we will |
| 89 | + * forcefully terminate it, cancelling any pending calls. Each channel is given up to 5 |
| 90 | + * seconds for forceful termination, but should complete close to instantly. |
| 91 | + * </ol> |
| 92 | + * |
| 93 | + * Upon completion, the registry is moved to a shutdown state and the channel references are |
| 94 | + * cleared. Attempting to reference any channels from the registry at this point will result in an |
| 95 | + * error. |
| 96 | + * |
| 97 | + * @param deadline Deadline for all channels to complete graceful shutdown. |
| 98 | + */ |
| 99 | + public void shutdown(Instant deadline) { |
| 100 | + channelMap.forEach(this::initiateChannelShutdown); |
| 101 | + channelMap.keySet().stream() |
| 102 | + .filter(channelId -> !this.waitForGracefulShutdown(channelId, deadline)) |
| 103 | + .forEach(this::forceShutdown); |
| 104 | + |
54 | 105 | this.isShutdown = true; |
| 106 | + this.channelMap.clear(); |
| 107 | + } |
| 108 | + |
| 109 | + private void initiateChannelShutdown(String channelId, ManagedChannel managedChannel) { |
| 110 | + LOG.info("Starting shutdown for channel [{}]", channelId); |
| 111 | + managedChannel.shutdown(); |
| 112 | + } |
| 113 | + |
| 114 | + private boolean waitForGracefulShutdown(String channelId, Instant deadline) { |
| 115 | + boolean successfullyShutdown = this.waitForTermination(channelId, deadline); |
| 116 | + if (successfullyShutdown) { |
| 117 | + LOG.info("Shutdown channel successfully [{}]", channelId); |
| 118 | + } |
| 119 | + return successfullyShutdown; |
| 120 | + } |
| 121 | + |
| 122 | + private void forceShutdown(String channelId) { |
| 123 | + LOG.error("Shutting down channel [{}] forcefully", channelId); |
| 124 | + this.channelMap.get(channelId).shutdownNow(); |
| 125 | + Instant forceShutdownDeadline = this.clock.instant().plus(5, SECONDS); |
| 126 | + if (this.waitForTermination(channelId, forceShutdownDeadline)) { |
| 127 | + LOG.error("Forced channel [{}] shutdown successful", channelId); |
| 128 | + } else { |
| 129 | + LOG.error("Unable to force channel [{}] shutdown in 5s - giving up!", channelId); |
| 130 | + } |
| 131 | + } |
| 132 | + |
| 133 | + private boolean waitForTermination(String channelId, Instant deadline) { |
| 134 | + ManagedChannel managedChannel = this.channelMap.get(channelId); |
| 135 | + long millisRemaining = Math.max(0, deadline.toEpochMilli() - this.clock.millis()); |
| 136 | + try { |
| 137 | + if (!managedChannel.awaitTermination(millisRemaining, TimeUnit.MILLISECONDS)) { |
| 138 | + LOG.error("Channel [{}] did not shut down after waiting", channelId); |
| 139 | + } |
| 140 | + } catch (InterruptedException ex) { |
| 141 | + LOG.error( |
| 142 | + "There has been an interruption while waiting for channel [{}] to shutdown", channelId); |
| 143 | + Thread.currentThread().interrupt(); |
| 144 | + } |
| 145 | + return managedChannel.isTerminated(); |
55 | 146 | } |
56 | 147 | } |
0 commit comments