From 8c3354d0ea50886fe86020fc8b0b05d26a4fbdf7 Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Mon, 3 Jun 2024 18:32:28 -0400 Subject: [PATCH] CXF-9009: Async operations fails (netty-conduit) (#1868) (cherry picked from commit 87f68a95dbe0738b75f70a6a2c7796b8988023de) --- .../NettyHttpClientPipelineFactory.java | 13 +- .../http/netty/client/NettyHttpConduit.java | 16 +- systests/transport-netty/pom.xml | 18 +- .../netty/jaxws/JAXWSAsyncClientTest.java | 182 ++++++++++++++++++ 4 files changed, 211 insertions(+), 18 deletions(-) create mode 100644 systests/transport-netty/src/test/java/org/apache/cxf/systest/http2/netty/jaxws/JAXWSAsyncClientTest.java diff --git a/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientPipelineFactory.java b/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientPipelineFactory.java index be44b9c4ec2..a39ff9ad518 100644 --- a/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientPipelineFactory.java +++ b/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpClientPipelineFactory.java @@ -69,8 +69,11 @@ import io.netty.handler.ssl.util.SimpleTrustManagerFactory; import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.util.AttributeKey; public class NettyHttpClientPipelineFactory extends ChannelInitializer { + private static final String WHEN_READY_KEY = "WhenReady-Key"; + private static final AttributeKey WHEN_READY = AttributeKey.valueOf(WHEN_READY_KEY); private static final Logger LOG = LogUtils.getL7dLogger(NettyHttpClientPipelineFactory.class); @@ -79,7 +82,6 @@ public class NettyHttpClientPipelineFactory extends ChannelInitializer private final int readTimeout; private final int maxContentLength; private final boolean enableHttp2; - private ChannelPromise readyFuture; public NettyHttpClientPipelineFactory(TLSClientParameters clientParameters) { this(clientParameters, 0); @@ -115,8 +117,9 @@ protected void initChannel(Channel ch) throws Exception { } final NettyHttpClientHandler responseHandler = new NettyHttpClientHandler(); - readyFuture = ch.newPromise(); - + final ChannelPromise readyFuture = ch.newPromise(); + ch.attr(WHEN_READY).set(readyFuture); + if (enableHttp2) { final Http2Connection connection = new DefaultHttp2Connection(false); final Http2SettingsHandler settingsHandler = new Http2SettingsHandler(readyFuture); @@ -188,8 +191,8 @@ protected void configurePipeline(ChannelHandlerContext ctx, String protocol) { } } - public ChannelFuture whenReady() { - return readyFuture; + ChannelFuture whenReady(Channel channel) { + return channel.attr(NettyHttpClientPipelineFactory.WHEN_READY).get(); } private SslHandler configureClientSSLOnDemand(Channel channel) throws Exception { diff --git a/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduit.java b/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduit.java index 6ba0675331a..32e1946a18d 100644 --- a/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduit.java +++ b/rt/transports/http-netty/netty-client/src/main/java/org/apache/cxf/transport/http/netty/client/NettyHttpConduit.java @@ -458,15 +458,17 @@ public void operationComplete(ChannelFuture future) throws Exception { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { - handler.whenReady().addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) throws Exception { - if (future.isSuccess()) { - ChannelFuture channelFuture = future.channel().writeAndFlush(entity); - channelFuture.addListener(writeFailureListener); + handler.whenReady(future.channel()) + .addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + ChannelFuture channelFuture = future.channel().writeAndFlush(entity); + channelFuture.addListener(writeFailureListener); + } } } - }); + ); } } }); diff --git a/systests/transport-netty/pom.xml b/systests/transport-netty/pom.xml index d6a02d6dae1..a722eb5101d 100644 --- a/systests/transport-netty/pom.xml +++ b/systests/transport-netty/pom.xml @@ -95,27 +95,32 @@ org.apache.cxf cxf-core - ${project.version} + test org.apache.cxf cxf-rt-databinding-jaxb - ${project.version} + test org.apache.cxf cxf-rt-transports-http - ${project.version} + test org.apache.cxf cxf-rt-frontend-jaxrs - ${project.version} + test org.apache.cxf cxf-rt-rs-client - ${project.version} + test + + + org.apache.cxf + cxf-rt-frontend-jaxws + test com.fasterxml.jackson.jaxrs @@ -140,7 +145,7 @@ org.apache.cxf cxf-rt-transports-http-netty-client - ${project.version} + test org.slf4j @@ -192,6 +197,7 @@ org.springframework spring-core + test junit diff --git a/systests/transport-netty/src/test/java/org/apache/cxf/systest/http2/netty/jaxws/JAXWSAsyncClientTest.java b/systests/transport-netty/src/test/java/org/apache/cxf/systest/http2/netty/jaxws/JAXWSAsyncClientTest.java new file mode 100644 index 00000000000..ab141b42017 --- /dev/null +++ b/systests/transport-netty/src/test/java/org/apache/cxf/systest/http2/netty/jaxws/JAXWSAsyncClientTest.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.systest.http2.netty.jaxws; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import javax.jws.WebService; +import javax.xml.ws.Response; +import javax.xml.ws.soap.SOAPFaultException; + +import org.apache.cxf.endpoint.Client; +import org.apache.cxf.greeter_control.AbstractGreeterImpl; +import org.apache.cxf.greeter_control.Greeter; +import org.apache.cxf.greeter_control.types.GreetMeResponse; +import org.apache.cxf.jaxws.JaxWsProxyFactoryBean; +import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase; +import org.apache.cxf.testutil.common.AbstractBusTestServerBase; +import org.apache.cxf.transport.http.HTTPConduit; + +import io.netty.handler.timeout.ReadTimeoutException; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class JAXWSAsyncClientTest extends AbstractBusClientServerTestBase { + static final String PORT = allocatePort(Server.class); + + public static class Server extends AbstractBusTestServerBase { + + protected void run() { + GreeterImpl implementor = new GreeterImpl(); + String address = "http://localhost:" + PORT + "/SoapContext/GreeterPort"; + javax.xml.ws.Endpoint.publish(address, implementor); + } + + public static void main(String[] args) { + try { + Server s = new Server(); + s.start(); + } catch (Exception ex) { + ex.printStackTrace(); + System.exit(-1); + } finally { + System.out.println("done!"); + } + } + + @WebService(serviceName = "BasicGreeterService", + portName = "GreeterPort", + endpointInterface = "org.apache.cxf.greeter_control.Greeter", + targetNamespace = "http://cxf.apache.org/greeter_control", + wsdlLocation = "testutils/greeter_control.wsdl") + public class GreeterImpl extends AbstractGreeterImpl { + @Override + public String greetMe(String arg) { + if ("timeout".equalsIgnoreCase(arg)) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // Do nothing + } + } + + return super.greetMe(arg); + } + } + } + + + @BeforeClass + public static void startServers() throws Exception { + assertTrue("server did not launch correctly", launchServer(Server.class, true)); + } + + @AfterClass + public static void stopServers() throws Exception { + stopAllServers(); + } + + @Test + public void testAsyncClient() throws Exception { + // setup the feature by using JAXWS front-end API + JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean(); + factory.setAddress("http://localhost:" + PORT + "/SoapContext/GreeterPort"); + factory.setServiceClass(Greeter.class); + Greeter proxy = factory.create(Greeter.class); + + Response response = proxy.greetMeAsync("cxf"); + int waitCount = 0; + while (!response.isDone() && waitCount < 15) { + Thread.sleep(1000); + waitCount++; + } + + assertTrue("Response still not received.", response.isDone()); + } + + @Test + public void testAsyncClientConcurrently() throws Exception { + final ExecutorService executor = Executors.newFixedThreadPool(3); + + // setup the feature by using JAXWS front-end API + JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean(); + factory.setAddress("http://localhost:" + PORT + "/SoapContext/GreeterPort"); + factory.setServiceClass(Greeter.class); + Greeter proxy = factory.create(Greeter.class); + + final Callable callable = () -> proxy.greetMeAsync("cxf", resp -> { }).get(5, TimeUnit.SECONDS); + final List> futures = executor.invokeAll(List.of(callable, callable, callable)); + + for (final Future response: futures) { + int waitCount = 0; + while (!response.isDone() && waitCount < 15) { + Thread.sleep(1000); + waitCount++; + } + } + + assertTrue("Response still not received.", futures.stream().allMatch(Future::isDone)); + for (final Future response: futures) { + assertThat(response.get(), is(not(nullValue()))); + } + executor.shutdown(); + + assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS)); + } + + @Test + public void testTimeout() throws Exception { + // setup the feature by using JAXWS front-end API + JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean(); + factory.setAddress("http://localhost:" + PORT + "/SoapContext/GreeterPort"); + factory.setServiceClass(Greeter.class); + Greeter proxy = factory.create(Greeter.class); + + HTTPConduit cond = (HTTPConduit)((Client)proxy).getConduit(); + cond.getClient().setReceiveTimeout(500); + + try { + proxy.greetMeAsync("timeout").get(); + fail("Should have faulted"); + } catch (SOAPFaultException ex) { + fail("should not be a SOAPFaultException"); + } catch (ExecutionException ex) { + //expected + assertTrue(ex.getCause().getClass().getName(), + ex.getCause() instanceof IOException + || ex.getCause() instanceof ReadTimeoutException); + } + } +}