Skip to content

Commit

Permalink
CXF-8951: Concurrent WebClient usage causes massive thread overhead
Browse files Browse the repository at this point in the history
  • Loading branch information
reta committed Apr 2, 2024
1 parent 2244630 commit 0ffe32f
Showing 1 changed file with 139 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@
import java.security.PrivilegedExceptionAction;
import java.security.cert.Certificate;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand All @@ -66,6 +68,8 @@
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;

import javax.net.ssl.SSLContext;
Expand All @@ -82,6 +86,7 @@
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageUtils;
import org.apache.cxf.service.model.EndpointInfo;
import org.apache.cxf.transport.http.policy.impl.ClientPolicyCalculator;
import org.apache.cxf.transport.https.HttpsURLConnectionInfo;
import org.apache.cxf.transport.https.SSLUtils;
import org.apache.cxf.transports.http.configuration.HTTPClientPolicy;
Expand All @@ -90,9 +95,100 @@

public class HttpClientHTTPConduit extends URLConnectionHTTPConduit {
private static final Set<String> RESTRICTED_HEADERS = getRestrictedHeaders();
volatile HttpClient client;
private static final HttpClientCache CLIENTS_CACHE = new HttpClientCache();
volatile RefCount<HttpClient> clientRef;
volatile int lastTlsHash = -1;
volatile URI sslURL;

private static final class RefCount<T extends HttpClient> {
private final AtomicLong count = new AtomicLong();
private final TLSClientParameters clientParameters;
private final HTTPClientPolicy policy;
private final T client;

RefCount(T client, HTTPClientPolicy policy, TLSClientParameters clientParameters) {
this.client = client;
this.policy = policy;
this.clientParameters = clientParameters;
}

RefCount<T> acquire() {
count.incrementAndGet();
return this;
}

void release() {
if (count.decrementAndGet() == 0) {
try {
if (client instanceof AutoCloseable) {
try {
((AutoCloseable)client).close();
} catch (Exception e) {
//ignore
}
} else if (client != null) {
tryToShutdownSelector(client);
}
} finally {
CLIENTS_CACHE.remove(policy, clientParameters);
}
}
}

HttpClient client() {
return client;
}

HTTPClientPolicy policy() {
return policy;
}

public TLSClientParameters clientParameters() {
return clientParameters;
}
}

private static final class HttpClientCache {
private final List<RefCount<HttpClient>> clients = new ArrayList<>();
private final ClientPolicyCalculator cpc = new ClientPolicyCalculator();
private final ReentrantLock lock = new ReentrantLock();

RefCount<HttpClient> computeIfAbsent(final HTTPClientPolicy policy, final TLSClientParameters clientParameters,
final Supplier<HttpClient> supplier) {
lock.lock();
try {
for (final RefCount<HttpClient> p: clients) {
if (cpc.equals(p.policy(), policy) && p.clientParameters().equals(clientParameters)) {
return p.acquire();
}
}

final HttpClient client = supplier.get();
final RefCount<HttpClient> clientRef = new RefCount<HttpClient>(client, policy, clientParameters);
clients.add(clientRef);

return clientRef.acquire();
} finally {
lock.unlock();
}
}

void remove(final HTTPClientPolicy policy, final TLSClientParameters clientParameters) {
lock.lock();
try {
final Iterator<RefCount<HttpClient>> iterator = clients.iterator();
while (iterator.hasNext()) {
final RefCount<HttpClient> p = iterator.next();
if (cpc.equals(p.policy(), policy) && p.clientParameters().equals(clientParameters)) {
iterator.remove();
break;
}
}
} finally {
lock.unlock();
}
}
}

public HttpClientHTTPConduit(Bus b, EndpointInfo ei, EndpointReferenceType t) throws IOException {
super(b, ei, t);
Expand Down Expand Up @@ -120,42 +216,39 @@ public void close(Message msg) throws IOException {
* Close the conduit
*/
public void close() {
if (client instanceof AutoCloseable) {
try {
((AutoCloseable)client).close();
} catch (Exception e) {
//ignore
}
} else if (client != null) {
String name = client.toString();
client = null;
tryToShutdownSelector(name);
if (clientRef != null) {
clientRef.release();
clientRef = null;
}
defaultAddress = null;
super.close();
}
private synchronized void tryToShutdownSelector(String n) {
// it can take three seconds (or more) for the JVM to determine the client
// is unreferenced and then shutdown the selector thread, we'll try and speed that
// up. This is somewhat of a complete hack.
int idx = n.lastIndexOf('(');
if (idx > 0) {
n = n.substring(idx + 1);
n = n.substring(0, n.length() - 1);
n = "HttpClient-" + n + "-SelectorManager";
}
try {
ThreadGroup rootGroup = Thread.currentThread().getThreadGroup();
Thread[] threads = new Thread[rootGroup.activeCount()];
int cnt = rootGroup.enumerate(threads);
for (int x = 0; x < cnt; x++) {
if (threads[x].getName().contains(n)) {
threads[x].interrupt();
}
}
} catch (Throwable t) {
//ignore, nothing we can do except wait for the garbage collection
//and then the three seconds for the timeout
private static void tryToShutdownSelector(HttpClient client) {
synchronized (client) {
String n = client.toString();

// it can take three seconds (or more) for the JVM to determine the client
// is unreferenced and then shutdown the selector thread, we'll try and speed that
// up. This is somewhat of a complete hack.
int idx = n.lastIndexOf('(');
if (idx > 0) {
n = n.substring(idx + 1);
n = n.substring(0, n.length() - 1);
n = "HttpClient-" + n + "-SelectorManager";
}
try {
ThreadGroup rootGroup = Thread.currentThread().getThreadGroup();
Thread[] threads = new Thread[rootGroup.activeCount()];
int cnt = rootGroup.enumerate(threads);
for (int x = 0; x < cnt; x++) {
if (threads[x].getName().contains(n)) {
threads[x].interrupt();
}
}
} catch (Throwable t) {
//ignore, nothing we can do except wait for the garbage collection
//and then the three seconds for the timeout
}
}
}

Expand Down Expand Up @@ -204,7 +297,10 @@ protected void setupConnection(Message message, Address address, HTTPClientPolic

if (sslURL != null && isSslTargetDifferent(sslURL, uri)) {
sslURL = null;
client = null;
if (clientRef != null) {
clientRef.release();
clientRef = null;
}
}
// If the HTTP_REQUEST_METHOD is not set, the default is "POST".
String httpRequestMethod =
Expand All @@ -214,11 +310,11 @@ protected void setupConnection(Message message, Address address, HTTPClientPolic
message.put(Message.HTTP_REQUEST_METHOD, "POST");
}

HttpClient cl = client;
RefCount<HttpClient> cl = clientRef;
if (cl == null) {
int ctimeout = determineConnectionTimeout(message, csPolicy);
int ctimeout = determineConnectionTimeout(message, csPolicy);
ProxySelector ps = new ProxyFactoryProxySelector(proxyFactory, csPolicy);

HttpClient.Builder cb = HttpClient.newBuilder()
.proxy(ps)
.followRedirects(Redirect.NEVER);
Expand Down Expand Up @@ -267,10 +363,10 @@ protected void setupConnection(Message message, Address address, HTTPClientPolic
cb.version(Version.HTTP_1_1);
}

cl = cb.build();
cl = CLIENTS_CACHE.computeIfAbsent(csPolicy, clientParameters, () -> cb.build());
if (!"https".equals(uri.getScheme())
&& !KNOWN_HTTP_VERBS_WITH_NO_CONTENT.contains(httpRequestMethod)
&& cl.version() == Version.HTTP_2
&& cl.client().version() == Version.HTTP_2
&& ("2".equals(verc) || ("auto".equals(verc) && "2".equals(HTTP_VERSION)))) {
try {
// We specifically want HTTP2, but we're using a request
Expand All @@ -281,14 +377,14 @@ protected void setupConnection(Message message, Address address, HTTPClientPolic
HttpRequest.Builder rb = HttpRequest.newBuilder()
.uri(uri)
.method("OPTIONS", BodyPublishers.noBody());
cl.send(rb.build(), BodyHandlers.ofByteArray());
cl.client().send(rb.build(), BodyHandlers.ofByteArray());
} catch (IOException | InterruptedException e) {
//
}
}
client = cl;
}
message.put(HttpClient.class, cl);
clientRef = cl;
}
message.put(HttpClient.class, cl.client());

message.put(KEY_HTTP_CONNECTION_ADDRESS, address);
}
Expand Down

0 comments on commit 0ffe32f

Please sign in to comment.