Skip to content

Commit df3f6ca

Browse files
committed
[AMQ-9858] Support purging the first number of messages from a queue
1 parent d21d12e commit df3f6ca

4 files changed

Lines changed: 74 additions & 6 deletions

File tree

activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueView.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,14 @@ public synchronized void purge() throws Exception {
6464
LOG.info("{} purge of {} messages", destination.getActiveMQDestination().getQualifiedName(), originalMessageCount);
6565
}
6666

67+
public synchronized void purge(long numberOfMessages) throws Exception {
68+
final long originalMessageCount = destination.getDestinationStatistics().getMessages().getCount();
69+
70+
((Queue)destination).purge(numberOfMessages);
71+
72+
LOG.info("{} purge {} of {} messages", destination.getActiveMQDestination().getQualifiedName(), numberOfMessages, originalMessageCount);
73+
}
74+
6775
public synchronized boolean removeMessage(String messageId) throws Exception {
6876
return ((Queue)destination).removeMessage(messageId);
6977
}

activemq-broker/src/main/java/org/apache/activemq/broker/jmx/QueueViewMBean.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,14 @@ public interface QueueViewMBean extends DestinationViewMBean {
7070
@MBeanInfo("Removes all of the messages in the queue.")
7171
void purge() throws Exception;
7272

73+
/**
74+
* Removes the first number of messages in the queue.
75+
*
76+
* @throws Exception
77+
*/
78+
@MBeanInfo("Removes the first number of messages in the queue.")
79+
void purge(long numberOfMessages) throws Exception;
80+
7381
/**
7482
* Copies a given message to another destination.
7583
*

activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1320,34 +1320,53 @@ public QueueMessageReference getMessage(String id) {
13201320
}
13211321

13221322
public void purge() throws Exception {
1323+
purge(this.destinationStatistics.getMessages().getCount());
1324+
}
1325+
1326+
public void purge(long numberOfMessages) throws Exception {
1327+
1328+
if (numberOfMessages <= 0) {
1329+
return;
1330+
}
1331+
13231332
ConnectionContext c = createConnectionContext();
13241333
List<MessageReference> list = null;
13251334
sendLock.lock();
1335+
1336+
long purgeCount = 0L;
13261337
try {
13271338
long originalMessageCount = this.destinationStatistics.getMessages().getCount();
13281339
do {
13291340
doPageIn(true, false, getMaxPageSize()); // signal no expiry processing needed.
13301341
pagedInMessagesLock.readLock().lock();
13311342
try {
13321343
list = new ArrayList<MessageReference>(pagedInMessages.values());
1333-
}finally {
1344+
} finally {
13341345
pagedInMessagesLock.readLock().unlock();
13351346
}
13361347

1337-
for (MessageReference ref : list) {
1348+
int deleteCount = list.size();
1349+
if ((numberOfMessages - purgeCount) < list.size()) {
1350+
deleteCount = (int)(numberOfMessages - purgeCount);
1351+
}
1352+
1353+
for (int n=0; n < deleteCount; n++) {
13381354
try {
1339-
QueueMessageReference r = (QueueMessageReference) ref;
1355+
QueueMessageReference r = (QueueMessageReference) list.get(n);
13401356
removeMessage(c, r);
13411357
messages.rollback(r.getMessageId());
1358+
purgeCount++;
13421359
} catch (IOException e) {
13431360
}
13441361
}
13451362
// don't spin/hang if stats are out and there is nothing left in the
13461363
// store
1347-
} while (!list.isEmpty() && this.destinationStatistics.getMessages().getCount() > 0);
1364+
} while (!list.isEmpty() &&
1365+
this.destinationStatistics.getMessages().getCount() > 0 &&
1366+
purgeCount < numberOfMessages);
13481367

1349-
if (this.destinationStatistics.getMessages().getCount() > 0) {
1350-
LOG.warn("{} after purge of {} messages, message count stats report: {}", getActiveMQDestination().getQualifiedName(), originalMessageCount, this.destinationStatistics.getMessages().getCount());
1368+
if (numberOfMessages == originalMessageCount && this.destinationStatistics.getMessages().getCount() > 0) {
1369+
LOG.warn("{} after purge {} of {} messages, message count stats report: {}", getActiveMQDestination().getQualifiedName(), numberOfMessages, originalMessageCount, this.destinationStatistics.getMessages().getCount());
13511370
}
13521371
} finally {
13531372
sendLock.unlock();

activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/PurgeTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,39 @@ public void testPurge() throws Exception {
112112
producer.close();
113113
}
114114

115+
public void testPurgeCount() throws Exception {
116+
// Send some messages
117+
int messagesSent = 1_000;
118+
int messagesPurge = 200;
119+
120+
connection = connectionFactory.createConnection();
121+
connection.setClientID(clientID);
122+
connection.start();
123+
Session session = connection.createSession(transacted, authMode);
124+
destination = createDestination();
125+
MessageProducer producer = session.createProducer(destination);
126+
for (int i = 0; i < messagesSent; i++) {
127+
Message message = session.createTextMessage("Message: " + i);
128+
producer.send(message);
129+
}
130+
131+
// Now get the QueueViewMBean and purge
132+
String objectNameStr = broker.getBrokerObjectName().toString();
133+
objectNameStr += ",destinationType=Queue,destinationName="+getDestinationString();
134+
ObjectName queueViewMBeanName = assertRegisteredObjectName(objectNameStr);
135+
QueueViewMBean proxy = (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true);
136+
137+
long count = proxy.getQueueSize();
138+
assertEquals("Queue size", count, messagesSent);
139+
140+
for (int i = 1; i <= 5; i++) {
141+
proxy.purge(messagesPurge);
142+
count = proxy.getQueueSize();
143+
assertEquals("Queue size", count, messagesSent - (messagesPurge * i));
144+
}
145+
producer.close();
146+
}
147+
115148
public void initCombosForTestDelete() {
116149
addCombinationValues("persistenceAdapter", new Object[] {new MemoryPersistenceAdapter(), new KahaDBPersistenceAdapter()});
117150
}

0 commit comments

Comments
 (0)