Skip to content

Commit e2677b1

Browse files
authored
Fix #358 - Fixes deleteRecursive fails with ConnectionPoolTimeoutException (#359)
Fixes #358 - Delete recursively doesn't delete really big directory structures * Updated changelog * Changed to a ForkJoinPool factory to provide configured instances. * Improved logic of recursive delete * Removed sleep and added retry for connection pool response timeout * Migrated to use lang FieldUtils instead of deprecated Mockito Whitebox for reading private fields. * Added unrelated but useful comment.
1 parent 58299c8 commit e2677b1

8 files changed

Lines changed: 558 additions & 65 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ This project aims to adhere to [Semantic Versioning](http://semver.org/).
2525
- [Content type is set for file object in directory listing when it isn't available](https://github.com/joyent/java-manta/issues/341)
2626
- [Fixes validation guard clauses that are not validating anything](https://github.com/joyent/java-manta/issues/346)
2727
- [MDC logging of the load balancer address now logs the proper address](https://github.com/joyent/java-manta/issues/266)
28+
- [Fixes deleteRecursive fails with ConnectionPoolTimeoutException](https://github.com/joyent/java-manta/issues/358)
2829
### Changed
2930
- Validation of paths passed to `MantaClient` is now more consistently strict.
3031
More useful errors should be thrown sooner for invalid paths, without any
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
/*
2+
* Copyright (c) 2017, Joyent, Inc. All rights reserved.
3+
*
4+
* This Source Code Form is subject to the terms of the Mozilla Public
5+
* License, v. 2.0. If a copy of the MPL was not distributed with this
6+
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
7+
*/
8+
package com.joyent.manta.client;
9+
10+
import com.joyent.manta.config.ConfigContext;
11+
import org.apache.commons.lang3.Validate;
12+
13+
import java.util.Optional;
14+
import java.util.concurrent.ForkJoinPool;
15+
16+
/**
17+
* Factory class that returns a {@link ForkJoinPool} instance configured with
18+
* the with a maximum parallelism value based on Manta client settings.
19+
*
20+
* @author <a href="https://github.com/dekobon">Elijah Zupancic</a>
21+
* @since 3.1.7
22+
*/
23+
final class FindForkJoinPoolFactory {
24+
/**
25+
* System property indicating the amount of parallelism to set for the
26+
* default {@link ForkJoinPool}.
27+
*/
28+
private static final String SYSTEM_FORK_JOIN_POOL_PARALLELISM_KEY =
29+
"java.util.concurrent.ForkJoinPool.common.parallelism";
30+
31+
/**
32+
* Percent of maximum connections that are reserved so that they are not
33+
* used by the threads in the {@link ForkJoinPool}. We want a buffer
34+
* so that the {@link ForkJoinPool} doesn't use up all available
35+
* connections for the {@link MantaClient} instance.
36+
*/
37+
private static final double PERCENT_OF_RESERVED_CONNECTIONS = 0.5;
38+
39+
/**
40+
* Minimum number of connections to allow if the reserved
41+
* connection count is available as an fractional integer
42+
* of the maximum connection count.
43+
*/
44+
private static final int MINIMUM_CONNECTIONS_VIABLE = 1;
45+
46+
/**
47+
* Factory class with no need for instances.
48+
*/
49+
private FindForkJoinPoolFactory() {
50+
}
51+
52+
/**
53+
* Returns a new instance of {@link ForkJoinPool} configured with the
54+
* correct parallelism value.
55+
*
56+
* @param config Manta configuration context object
57+
* @return configured instance
58+
*/
59+
static ForkJoinPool getInstance(final ConfigContext config) {
60+
Validate.notNull(config, "Configuration context is null");
61+
final int maximumConnections = Validate.notNull(
62+
config.getMaximumConnections(),
63+
"Maximum connections setting is null");
64+
65+
Validate.isTrue(maximumConnections > 0,
66+
"Maximum connections is not greater than zero");
67+
68+
final int parallelism = calculateParallelism(maximumConnections);
69+
70+
return new ForkJoinPool(parallelism);
71+
}
72+
73+
/**
74+
* Calculates the parallelism value for the {@link ForkJoinPool} by
75+
* comparing the maximum connections available in the HTTP connection
76+
* pool with the system parallelism setting.
77+
* @param maximumConnections maximum number of connections in the HTTP
78+
* connection pool
79+
* @return parallelism value
80+
*/
81+
private static int calculateParallelism(final int maximumConnections) {
82+
final int reserved = calculateNumberOfReservedConnections(maximumConnections);
83+
84+
final int maximumUsableConnections = Math.max(maximumConnections - reserved,
85+
MINIMUM_CONNECTIONS_VIABLE);
86+
87+
/* If the maximum number of usable connections equals our minimum then
88+
* we are forced into a parallelism value of the minimum connections
89+
* value (ie 1). */
90+
if (maximumUsableConnections == MINIMUM_CONNECTIONS_VIABLE) {
91+
return MINIMUM_CONNECTIONS_VIABLE;
92+
}
93+
94+
final int systemParallelism = calculateSystemParallelism();
95+
96+
/* We choose from the lesser of the system parallelism value or the
97+
* number of connections available because: 1. We want to prevent
98+
* exhaustion of the HTTP connection pool when the ForkJoinPool
99+
* is running at its maximum thread count. 2. We don't want the
100+
* parallelism value to be higher than what the running system
101+
* supports. */
102+
return Math.min(maximumUsableConnections, systemParallelism);
103+
}
104+
105+
/**
106+
* Calculates the number of connections to leave reserved so that the
107+
* {@link ForkJoinPool} can't allocate them.
108+
*
109+
* @param maximumConnections maximum number of connections in the HTTP
110+
* connection pool
111+
* @return fractional value rounded to the nearest integer of the number of
112+
* connections to leave reserved
113+
*/
114+
private static int calculateNumberOfReservedConnections(final int maximumConnections) {
115+
final double reserved = maximumConnections * PERCENT_OF_RESERVED_CONNECTIONS;
116+
return Math.toIntExact(Math.round(reserved));
117+
}
118+
119+
/**
120+
* Calculates the system parallelism setting by choosing the default value
121+
* generated based on the number of processors or by choosing the user
122+
* supplied system property.
123+
*
124+
* @return integer representing the parallelism value for a {@link ForkJoinPool}
125+
*/
126+
private static int calculateSystemParallelism() {
127+
final Optional<Integer> systemParallelism = readSystemForkJoinPoolParallelismSetting();
128+
return systemParallelism.orElse(Runtime.getRuntime().availableProcessors());
129+
}
130+
131+
/**
132+
* Reads and parses the system {@link ForkJoinPool} system property if
133+
* present.
134+
*
135+
* @return optional integer value for the system parallelism setting
136+
*/
137+
private static Optional<Integer> readSystemForkJoinPoolParallelismSetting() {
138+
final String parallelismSysProp = System.getProperty(SYSTEM_FORK_JOIN_POOL_PARALLELISM_KEY);
139+
140+
if (parallelismSysProp == null) {
141+
return Optional.empty();
142+
}
143+
144+
try {
145+
return Optional.of(Integer.valueOf(parallelismSysProp));
146+
} catch (NumberFormatException e) {
147+
return Optional.empty();
148+
}
149+
}
150+
}

0 commit comments

Comments
 (0)