6161
6262public class PopMetricsManager {
6363 private static final Logger log = LoggerFactory .getLogger (PopMetricsManager .class );
64- public static Supplier <AttributesBuilder > attributesBuilderSupplier ;
64+
65+ private Supplier <AttributesBuilder > attributesBuilderSupplier ;
6566
66- private static LongHistogram popBufferScanTimeConsume = new NopLongHistogram ();
67- private static LongCounter popRevivePutTotal = new NopLongCounter ();
68- private static LongCounter popReviveGetTotal = new NopLongCounter ();
69- private static LongCounter popReviveRetryMessageTotal = new NopLongCounter ();
67+ private LongHistogram popBufferScanTimeConsume = new NopLongHistogram ();
68+ private LongCounter popRevivePutTotal = new NopLongCounter ();
69+ private LongCounter popReviveGetTotal = new NopLongCounter ();
70+ private LongCounter popReviveRetryMessageTotal = new NopLongCounter ();
7071
71- public static List <Pair <InstrumentSelector , ViewBuilder >> getMetricsView () {
72+ public PopMetricsManager () {
73+ }
74+
75+ public List <Pair <InstrumentSelector , ViewBuilder >> getMetricsView () {
7276 List <Double > rpcCostTimeBuckets = Arrays .asList (
7377 (double ) Duration .ofMillis (1 ).toMillis (),
7478 (double ) Duration .ofMillis (10 ).toMillis (),
@@ -87,22 +91,22 @@ public static List<Pair<InstrumentSelector, ViewBuilder>> getMetricsView() {
8791 return Lists .newArrayList (new Pair <>(popBufferScanTimeConsumeSelector , popBufferScanTimeConsumeViewBuilder ));
8892 }
8993
90- public static void initMetrics (Meter meter , BrokerController brokerController ,
94+ public void initMetrics (Meter meter , BrokerController brokerController ,
9195 Supplier <AttributesBuilder > attributesBuilderSupplier ) {
92- PopMetricsManager .attributesBuilderSupplier = attributesBuilderSupplier ;
96+ this .attributesBuilderSupplier = attributesBuilderSupplier ;
9397
94- popBufferScanTimeConsume = meter .histogramBuilder (HISTOGRAM_POP_BUFFER_SCAN_TIME_CONSUME )
98+ this . popBufferScanTimeConsume = meter .histogramBuilder (HISTOGRAM_POP_BUFFER_SCAN_TIME_CONSUME )
9599 .setDescription ("Time consuming of pop buffer scan" )
96100 .setUnit ("milliseconds" )
97101 .ofLongs ()
98102 .build ();
99- popRevivePutTotal = meter .counterBuilder (COUNTER_POP_REVIVE_IN_MESSAGE_TOTAL )
103+ this . popRevivePutTotal = meter .counterBuilder (COUNTER_POP_REVIVE_IN_MESSAGE_TOTAL )
100104 .setDescription ("Total number of put message to revive topic" )
101105 .build ();
102- popReviveGetTotal = meter .counterBuilder (COUNTER_POP_REVIVE_OUT_MESSAGE_TOTAL )
106+ this . popReviveGetTotal = meter .counterBuilder (COUNTER_POP_REVIVE_OUT_MESSAGE_TOTAL )
103107 .setDescription ("Total number of get message from revive topic" )
104108 .build ();
105- popReviveRetryMessageTotal = meter .counterBuilder (COUNTER_POP_REVIVE_RETRY_MESSAGES_TOTAL )
109+ this . popReviveRetryMessageTotal = meter .counterBuilder (COUNTER_POP_REVIVE_RETRY_MESSAGES_TOTAL )
106110 .setDescription ("Total number of put message to pop retry topic" )
107111 .build ();
108112
@@ -126,24 +130,24 @@ public static void initMetrics(Meter meter, BrokerController brokerController,
126130 .buildWithCallback (measurement -> calculatePopReviveLatency (brokerController , measurement ));
127131 }
128132
129- private static void calculatePopBufferOffsetSize (BrokerController brokerController ,
133+ private void calculatePopBufferOffsetSize (BrokerController brokerController ,
130134 ObservableLongMeasurement measurement ) {
131135 PopBufferMergeService popBufferMergeService = brokerController .getPopMessageProcessor ().getPopBufferMergeService ();
132- measurement .record (popBufferMergeService .getOffsetTotalSize (), newAttributesBuilder ().build ());
136+ measurement .record (popBufferMergeService .getOffsetTotalSize (), this . newAttributesBuilder ().build ());
133137 }
134138
135- private static void calculatePopBufferCkSize (BrokerController brokerController ,
139+ private void calculatePopBufferCkSize (BrokerController brokerController ,
136140 ObservableLongMeasurement measurement ) {
137141 PopBufferMergeService popBufferMergeService = brokerController .getPopMessageProcessor ().getPopBufferMergeService ();
138- measurement .record (popBufferMergeService .getBufferedCKSize (), newAttributesBuilder ().build ());
142+ measurement .record (popBufferMergeService .getBufferedCKSize (), this . newAttributesBuilder ().build ());
139143 }
140144
141- private static void calculatePopReviveLatency (BrokerController brokerController ,
145+ private void calculatePopReviveLatency (BrokerController brokerController ,
142146 ObservableLongMeasurement measurement ) {
143147 PopReviveService [] popReviveServices = brokerController .getAckMessageProcessor ().getPopReviveServices ();
144148 for (PopReviveService popReviveService : popReviveServices ) {
145149 try {
146- measurement .record (popReviveService .getReviveBehindMillis (), newAttributesBuilder ()
150+ measurement .record (popReviveService .getReviveBehindMillis (), this . newAttributesBuilder ()
147151 .put (LABEL_QUEUE_ID , popReviveService .getQueueId ())
148152 .build ());
149153 } catch (ConsumeQueueException e ) {
@@ -152,12 +156,12 @@ private static void calculatePopReviveLatency(BrokerController brokerController,
152156 }
153157 }
154158
155- private static void calculatePopReviveLag (BrokerController brokerController ,
159+ private void calculatePopReviveLag (BrokerController brokerController ,
156160 ObservableLongMeasurement measurement ) {
157161 PopReviveService [] popReviveServices = brokerController .getAckMessageProcessor ().getPopReviveServices ();
158162 for (PopReviveService popReviveService : popReviveServices ) {
159163 try {
160- measurement .record (popReviveService .getReviveBehindMessages (), newAttributesBuilder ()
164+ measurement .record (popReviveService .getReviveBehindMessages (), this . newAttributesBuilder ()
161165 .put (LABEL_QUEUE_ID , popReviveService .getQueueId ())
162166 .build ());
163167 } catch (ConsumeQueueException e ) {
@@ -166,60 +170,87 @@ private static void calculatePopReviveLag(BrokerController brokerController,
166170 }
167171 }
168172
169- public static void incPopReviveAckPutCount (AckMsg ackMsg , PutMessageStatus status ) {
173+ public void incPopReviveAckPutCount (AckMsg ackMsg , PutMessageStatus status ) {
170174 incPopRevivePutCount (ackMsg .getConsumerGroup (), ackMsg .getTopic (), PopReviveMessageType .ACK , status , 1 );
171175 }
172176
173- public static void incPopReviveCkPutCount (PopCheckPoint checkPoint , PutMessageStatus status ) {
177+ public void incPopReviveCkPutCount (PopCheckPoint checkPoint , PutMessageStatus status ) {
174178 incPopRevivePutCount (checkPoint .getCId (), checkPoint .getTopic (), PopReviveMessageType .CK , status , 1 );
175179 }
176180
177- public static void incPopRevivePutCount (String group , String topic , PopReviveMessageType messageType ,
181+ public void incPopRevivePutCount (String group , String topic , PopReviveMessageType messageType ,
178182 PutMessageStatus status , int num ) {
179- Attributes attributes = newAttributesBuilder ()
183+ Attributes attributes = this . newAttributesBuilder ()
180184 .put (LABEL_CONSUMER_GROUP , group )
181185 .put (LABEL_TOPIC , topic )
182186 .put (LABEL_REVIVE_MESSAGE_TYPE , messageType .name ())
183187 .put (LABEL_PUT_STATUS , status .name ())
184188 .build ();
185- popRevivePutTotal .add (num , attributes );
189+ this . popRevivePutTotal .add (num , attributes );
186190 }
187191
188- public static void incPopReviveAckGetCount (AckMsg ackMsg , int queueId ) {
192+ public void incPopReviveAckGetCount (AckMsg ackMsg , int queueId ) {
189193 incPopReviveGetCount (ackMsg .getConsumerGroup (), ackMsg .getTopic (), PopReviveMessageType .ACK , queueId , 1 );
190194 }
191195
192- public static void incPopReviveCkGetCount (PopCheckPoint checkPoint , int queueId ) {
196+ public void incPopReviveCkGetCount (PopCheckPoint checkPoint , int queueId ) {
193197 incPopReviveGetCount (checkPoint .getCId (), checkPoint .getTopic (), PopReviveMessageType .CK , queueId , 1 );
194198 }
195199
196- public static void incPopReviveGetCount (String group , String topic , PopReviveMessageType messageType , int queueId ,
200+ public void incPopReviveGetCount (String group , String topic , PopReviveMessageType messageType , int queueId ,
197201 int num ) {
198- AttributesBuilder builder = newAttributesBuilder ();
202+ AttributesBuilder builder = this . newAttributesBuilder ();
199203 Attributes attributes = builder
200204 .put (LABEL_CONSUMER_GROUP , group )
201205 .put (LABEL_TOPIC , topic )
202206 .put (LABEL_QUEUE_ID , queueId )
203207 .put (LABEL_REVIVE_MESSAGE_TYPE , messageType .name ())
204208 .build ();
205- popReviveGetTotal .add (num , attributes );
209+ this . popReviveGetTotal .add (num , attributes );
206210 }
207211
208- public static void incPopReviveRetryMessageCount (PopCheckPoint checkPoint , PutMessageStatus status ) {
209- AttributesBuilder builder = newAttributesBuilder ();
212+ public void incPopReviveRetryMessageCount (PopCheckPoint checkPoint , PutMessageStatus status ) {
213+ AttributesBuilder builder = this . newAttributesBuilder ();
210214 Attributes attributes = builder
211215 .put (LABEL_CONSUMER_GROUP , checkPoint .getCId ())
212216 .put (LABEL_TOPIC , checkPoint .getTopic ())
213217 .put (LABEL_PUT_STATUS , status .name ())
214218 .build ();
215- popReviveRetryMessageTotal .add (1 , attributes );
219+ this .popReviveRetryMessageTotal .add (1 , attributes );
220+ }
221+
222+ public void recordPopBufferScanTimeConsume (long time ) {
223+ this .popBufferScanTimeConsume .record (time , this .newAttributesBuilder ().build ());
224+ }
225+
226+ public AttributesBuilder newAttributesBuilder () {
227+ return this .attributesBuilderSupplier != null ? this .attributesBuilderSupplier .get () : Attributes .builder ();
228+ }
229+
230+ // Getter methods for external access
231+ public LongHistogram getPopBufferScanTimeConsume () {
232+ return popBufferScanTimeConsume ;
216233 }
217234
218- public static void recordPopBufferScanTimeConsume ( long time ) {
219- popBufferScanTimeConsume . record ( time , newAttributesBuilder (). build ()) ;
235+ public LongCounter getPopRevivePutTotal ( ) {
236+ return popRevivePutTotal ;
220237 }
221238
222- public static AttributesBuilder newAttributesBuilder () {
223- return attributesBuilderSupplier != null ? attributesBuilderSupplier . get () : Attributes . builder () ;
239+ public LongCounter getPopReviveGetTotal () {
240+ return popReviveGetTotal ;
224241 }
242+
243+ public LongCounter getPopReviveRetryMessageTotal () {
244+ return popReviveRetryMessageTotal ;
245+ }
246+
247+ public Supplier <AttributesBuilder > getAttributesBuilderSupplier () {
248+ return attributesBuilderSupplier ;
249+ }
250+
251+ // Setter methods for testing
252+ public void setAttributesBuilderSupplier (Supplier <AttributesBuilder > attributesBuilderSupplier ) {
253+ this .attributesBuilderSupplier = attributesBuilderSupplier ;
254+ }
255+
225256}
0 commit comments