4040import io .grpc .services .MetricReport ;
4141import io .grpc .util .ForwardingSubchannel ;
4242import io .grpc .util .MultiChildLoadBalancer ;
43+ import io .grpc .xds .internal .MetricReportUtils ;
4344import io .grpc .xds .orca .OrcaOobUtil ;
4445import io .grpc .xds .orca .OrcaOobUtil .OrcaOobReportListener ;
4546import io .grpc .xds .orca .OrcaPerRequestUtil ;
4950import java .util .HashSet ;
5051import java .util .List ;
5152import java .util .Objects ;
53+ import java .util .OptionalDouble ;
5254import java .util .Random ;
5355import java .util .Set ;
5456import java .util .concurrent .ScheduledExecutorService ;
@@ -189,7 +191,7 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
189191 this .backendService = "" ;
190192 }
191193 config =
192- (WeightedRoundRobinLoadBalancerConfig ) resolvedAddresses .getLoadBalancingPolicyConfig ();
194+ (WeightedRoundRobinLoadBalancerConfig ) resolvedAddresses .getLoadBalancingPolicyConfig ();
193195
194196 if (weightUpdateTimer != null && weightUpdateTimer .isPending ()) {
195197 weightUpdateTimer .cancel ();
@@ -236,7 +238,8 @@ protected void updateOverallBalancingState() {
236238
237239 private SubchannelPicker createReadyPicker (Collection <ChildLbState > activeList ) {
238240 WeightedRoundRobinPicker picker = new WeightedRoundRobinPicker (ImmutableList .copyOf (activeList ),
239- config .enableOobLoadReport , config .errorUtilizationPenalty , sequence );
241+ config .enableOobLoadReport , config .errorUtilizationPenalty , sequence ,
242+ config .metricNamesForComputingUtilization );
240243 updateWeight (picker );
241244 return picker ;
242245 }
@@ -325,12 +328,16 @@ public void addSubchannel(WrrSubchannel wrrSubchannel) {
325328 subchannels .add (wrrSubchannel );
326329 }
327330
328- public OrcaReportListener getOrCreateOrcaListener (float errorUtilizationPenalty ) {
331+ public OrcaReportListener getOrCreateOrcaListener (float errorUtilizationPenalty ,
332+ ImmutableList <String > metricNamesForComputingUtilization ) {
329333 if (orcaReportListener != null
330- && orcaReportListener .errorUtilizationPenalty == errorUtilizationPenalty ) {
334+ && orcaReportListener .errorUtilizationPenalty == errorUtilizationPenalty
335+ && orcaReportListener .metricNamesForComputingUtilization
336+ .equals (metricNamesForComputingUtilization )) {
331337 return orcaReportListener ;
332338 }
333- orcaReportListener = new OrcaReportListener (errorUtilizationPenalty );
339+ orcaReportListener =
340+ new OrcaReportListener (errorUtilizationPenalty , metricNamesForComputingUtilization );
334341 return orcaReportListener ;
335342 }
336343
@@ -355,18 +362,19 @@ public void updateBalancingState(ConnectivityState newState, SubchannelPicker ne
355362
356363 final class OrcaReportListener implements OrcaPerRequestReportListener , OrcaOobReportListener {
357364 private final float errorUtilizationPenalty ;
365+ private final ImmutableList <String > metricNamesForComputingUtilization ;
358366
359- OrcaReportListener (float errorUtilizationPenalty ) {
367+ OrcaReportListener (float errorUtilizationPenalty ,
368+ ImmutableList <String > metricNamesForComputingUtilization ) {
360369 this .errorUtilizationPenalty = errorUtilizationPenalty ;
370+ this .metricNamesForComputingUtilization = metricNamesForComputingUtilization ;
361371 }
362372
363373 @ Override
364374 public void onLoadReport (MetricReport report ) {
375+ double utilization = getUtilization (report , metricNamesForComputingUtilization );
376+
365377 double newWeight = 0 ;
366- // Prefer application utilization and fallback to CPU utilization if unset.
367- double utilization =
368- report .getApplicationUtilization () > 0 ? report .getApplicationUtilization ()
369- : report .getCpuUtilization ();
370378 if (utilization > 0 && report .getQps () > 0 ) {
371379 double penalty = 0 ;
372380 if (report .getEps () > 0 && errorUtilizationPenalty > 0 ) {
@@ -383,6 +391,40 @@ public void onLoadReport(MetricReport report) {
383391 lastUpdated = ticker .nanoTime ();
384392 weight = newWeight ;
385393 }
394+
395+ /**
396+ * Returns the utilization value computed from the specified metric names. If the custom
397+ * metrics are present and valid, the maximum of the custom metrics is returned. Otherwise,
398+ * if application utilization is > 0, it is returned. If neither are present, the CPU
399+ * utilization is returned.
400+ */
401+ private double getUtilization (MetricReport report , ImmutableList <String > metricNames ) {
402+ OptionalDouble customUtil = getCustomMetricUtilization (report , metricNames );
403+ if (customUtil .isPresent ()) {
404+ return customUtil .getAsDouble ();
405+ }
406+ double appUtil = report .getApplicationUtilization ();
407+ if (appUtil > 0 ) {
408+ return appUtil ;
409+ }
410+ return report .getCpuUtilization ();
411+ }
412+
413+ /**
414+ * Returns the maximum utilization value among the specified metric names.
415+ * Returns OptionalDouble.empty() if NONE of the specified metrics are present in the report,
416+ * or if all present metrics are NaN.
417+ * Returns OptionalDouble.of(maxUtil) if at least one non-NaN metric is present.
418+ */
419+ private OptionalDouble getCustomMetricUtilization (MetricReport report ,
420+ ImmutableList <String > metricNames ) {
421+ return metricNames .stream ()
422+ .map (name -> MetricReportUtils .getMetric (report , name ))
423+ .filter (OptionalDouble ::isPresent )
424+ .mapToDouble (OptionalDouble ::getAsDouble )
425+ .filter (d -> !Double .isNaN (d ) && d > 0 )
426+ .max ();
427+ }
386428 }
387429 }
388430
@@ -403,10 +445,10 @@ private void createAndApplyOrcaListeners() {
403445 for (WrrSubchannel weightedSubchannel : wChild .subchannels ) {
404446 if (config .enableOobLoadReport ) {
405447 OrcaOobUtil .setListener (weightedSubchannel ,
406- wChild .getOrCreateOrcaListener (config .errorUtilizationPenalty ),
448+ wChild .getOrCreateOrcaListener (config .errorUtilizationPenalty ,
449+ config .metricNamesForComputingUtilization ),
407450 OrcaOobUtil .OrcaReportingConfig .newBuilder ()
408- .setReportInterval (config .oobReportingPeriodNanos , TimeUnit .NANOSECONDS )
409- .build ());
451+ .setReportInterval (config .oobReportingPeriodNanos , TimeUnit .NANOSECONDS ).build ());
410452 } else {
411453 OrcaOobUtil .setListener (weightedSubchannel , null , null );
412454 }
@@ -473,7 +515,8 @@ static final class WeightedRoundRobinPicker extends SubchannelPicker {
473515 private volatile StaticStrideScheduler scheduler ;
474516
475517 WeightedRoundRobinPicker (List <ChildLbState > children , boolean enableOobLoadReport ,
476- float errorUtilizationPenalty , AtomicInteger sequence ) {
518+ float errorUtilizationPenalty , AtomicInteger sequence ,
519+ ImmutableList <String > metricNamesForComputingUtilization ) {
477520 checkNotNull (children , "children" );
478521 Preconditions .checkArgument (!children .isEmpty (), "empty child list" );
479522 this .children = children ;
@@ -482,7 +525,8 @@ static final class WeightedRoundRobinPicker extends SubchannelPicker {
482525 for (ChildLbState child : children ) {
483526 WeightedChildLbState wChild = (WeightedChildLbState ) child ;
484527 pickers .add (wChild .getCurrentPicker ());
485- reportListeners .add (wChild .getOrCreateOrcaListener (errorUtilizationPenalty ));
528+ reportListeners .add (wChild .getOrCreateOrcaListener (errorUtilizationPenalty ,
529+ metricNamesForComputingUtilization ));
486530 }
487531 this .pickers = pickers ;
488532 this .reportListeners = reportListeners ;
@@ -723,23 +767,23 @@ static final class WeightedRoundRobinLoadBalancerConfig {
723767 final long oobReportingPeriodNanos ;
724768 final long weightUpdatePeriodNanos ;
725769 final float errorUtilizationPenalty ;
770+ final ImmutableList <String > metricNamesForComputingUtilization ;
726771
727772 public static Builder newBuilder () {
728773 return new Builder ();
729774 }
730775
731776 private WeightedRoundRobinLoadBalancerConfig (long blackoutPeriodNanos ,
732- long weightExpirationPeriodNanos ,
733- boolean enableOobLoadReport ,
734- long oobReportingPeriodNanos ,
735- long weightUpdatePeriodNanos ,
736- float errorUtilizationPenalty ) {
777+ long weightExpirationPeriodNanos , boolean enableOobLoadReport , long oobReportingPeriodNanos ,
778+ long weightUpdatePeriodNanos , float errorUtilizationPenalty ,
779+ ImmutableList <String > metricNamesForComputingUtilization ) {
737780 this .blackoutPeriodNanos = blackoutPeriodNanos ;
738781 this .weightExpirationPeriodNanos = weightExpirationPeriodNanos ;
739782 this .enableOobLoadReport = enableOobLoadReport ;
740783 this .oobReportingPeriodNanos = oobReportingPeriodNanos ;
741784 this .weightUpdatePeriodNanos = weightUpdatePeriodNanos ;
742785 this .errorUtilizationPenalty = errorUtilizationPenalty ;
786+ this .metricNamesForComputingUtilization = metricNamesForComputingUtilization ;
743787 }
744788
745789 @ Override
@@ -754,27 +798,26 @@ public boolean equals(Object o) {
754798 && this .oobReportingPeriodNanos == that .oobReportingPeriodNanos
755799 && this .weightUpdatePeriodNanos == that .weightUpdatePeriodNanos
756800 // Float.compare considers NaNs equal
757- && Float .compare (this .errorUtilizationPenalty , that .errorUtilizationPenalty ) == 0 ;
801+ && Float .compare (this .errorUtilizationPenalty , that .errorUtilizationPenalty ) == 0
802+ && Objects .equals (this .metricNamesForComputingUtilization ,
803+ that .metricNamesForComputingUtilization );
758804 }
759805
760806 @ Override
761807 public int hashCode () {
762- return Objects .hash (
763- blackoutPeriodNanos ,
764- weightExpirationPeriodNanos ,
765- enableOobLoadReport ,
766- oobReportingPeriodNanos ,
767- weightUpdatePeriodNanos ,
768- errorUtilizationPenalty );
808+ return Objects .hash (blackoutPeriodNanos , weightExpirationPeriodNanos , enableOobLoadReport ,
809+ oobReportingPeriodNanos , weightUpdatePeriodNanos , errorUtilizationPenalty ,
810+ metricNamesForComputingUtilization );
769811 }
770812
771813 static final class Builder {
772814 long blackoutPeriodNanos = 10_000_000_000L ; // 10s
773- long weightExpirationPeriodNanos = 180_000_000_000L ; //3min
815+ long weightExpirationPeriodNanos = 180_000_000_000L ; // 3min
774816 boolean enableOobLoadReport = false ;
775817 long oobReportingPeriodNanos = 10_000_000_000L ; // 10s
776818 long weightUpdatePeriodNanos = 1_000_000_000L ; // 1s
777819 float errorUtilizationPenalty = 1.0F ;
820+ ImmutableList <String > metricNamesForComputingUtilization = ImmutableList .of ();
778821
779822 private Builder () {
780823
@@ -812,10 +855,17 @@ Builder setErrorUtilizationPenalty(float errorUtilizationPenalty) {
812855 return this ;
813856 }
814857
858+ Builder setMetricNamesForComputingUtilization (
859+ List <String > metricNamesForComputingUtilization ) {
860+ this .metricNamesForComputingUtilization =
861+ ImmutableList .copyOf (metricNamesForComputingUtilization );
862+ return this ;
863+ }
864+
815865 WeightedRoundRobinLoadBalancerConfig build () {
816866 return new WeightedRoundRobinLoadBalancerConfig (blackoutPeriodNanos ,
817- weightExpirationPeriodNanos , enableOobLoadReport , oobReportingPeriodNanos ,
818- weightUpdatePeriodNanos , errorUtilizationPenalty );
867+ weightExpirationPeriodNanos , enableOobLoadReport , oobReportingPeriodNanos ,
868+ weightUpdatePeriodNanos , errorUtilizationPenalty , metricNamesForComputingUtilization );
819869 }
820870 }
821871 }
0 commit comments