From 5913f5e230ecf895312be3ccea13ed35dd68a175 Mon Sep 17 00:00:00 2001 From: Dmitri Goroh Date: Tue, 14 Jan 2025 19:12:44 -0800 Subject: [PATCH] Fix: ObjectStore client connections leaking during UTs (#73) Signed-off-by: Dmitri Goroh --- .../fixtures/oci/OciHttpHandler.java | 10 +-- .../opensearch/fixtures/oci/FixtureTests.java | 81 ++++++++++++++++--- .../oci/OciObjectStorageService.java | 29 ------- 3 files changed, 74 insertions(+), 46 deletions(-) diff --git a/oci-objectstorage-fixture/src/main/java/org/opensearch/fixtures/oci/OciHttpHandler.java b/oci-objectstorage-fixture/src/main/java/org/opensearch/fixtures/oci/OciHttpHandler.java index db35b78..185989f 100644 --- a/oci-objectstorage-fixture/src/main/java/org/opensearch/fixtures/oci/OciHttpHandler.java +++ b/oci-objectstorage-fixture/src/main/java/org/opensearch/fixtures/oci/OciHttpHandler.java @@ -238,7 +238,7 @@ private void putObject( bucketName, objectName); exchange.getResponseHeaders().add("Content-Type", "application/json"); - exchange.sendResponseHeaders(RestStatus.OK.getStatus(), 0); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1); exchange.close(); } @@ -289,8 +289,6 @@ private void headObject( final LocalBucket bucket = buckets.get(bucketName); final OSObject object = bucket.getObject(objectName); if (object != null) { - exchange.getResponseHeaders().add("Content-Type", "application/json"); - exchange.sendResponseHeaders(RestStatus.OK.getStatus(), 0); final HeadObjectResponse headObjectResponse = HeadObjectResponse.builder() .contentLength((long) object.getBytes().length) @@ -299,7 +297,7 @@ private void headObject( final byte[] response = str.getBytes(StandardCharsets.UTF_8); exchange.getResponseHeaders().add("Content-Type", "application/json"); - exchange.sendResponseHeaders(RestStatus.OK.getStatus(), 0); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); exchange.getResponseBody().write(response); exchange.close(); } else { @@ -334,7 +332,7 @@ private void listObject( final byte[] response = str.getBytes(StandardCharsets.UTF_8); exchange.getResponseHeaders().add("Content-Type", "application/json"); - exchange.sendResponseHeaders(RestStatus.OK.getStatus(), 0); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), response.length); exchange.getResponseBody().write(response); exchange.close(); } @@ -354,7 +352,7 @@ private void deleteObject( bucket.deleteObject(objectName); exchange.getResponseHeaders().add("Content-Type", "application/json"); - exchange.sendResponseHeaders(RestStatus.OK.getStatus(), 0); + exchange.sendResponseHeaders(RestStatus.OK.getStatus(), -1); exchange.close(); } diff --git a/oci-objectstorage-fixture/src/test/java/org/opensearch/fixtures/oci/FixtureTests.java b/oci-objectstorage-fixture/src/test/java/org/opensearch/fixtures/oci/FixtureTests.java index 0362d28..66c8943 100644 --- a/oci-objectstorage-fixture/src/test/java/org/opensearch/fixtures/oci/FixtureTests.java +++ b/oci-objectstorage-fixture/src/test/java/org/opensearch/fixtures/oci/FixtureTests.java @@ -7,6 +7,7 @@ import com.oracle.bmc.Region; import com.oracle.bmc.auth.BasicAuthenticationDetailsProvider; import com.oracle.bmc.auth.SimpleAuthenticationDetailsProvider; +import com.oracle.bmc.http.client.jersey.ApacheClientProperties; import com.oracle.bmc.model.Range; import com.oracle.bmc.objectstorage.ObjectStorage; import com.oracle.bmc.objectstorage.ObjectStorageClient; @@ -15,6 +16,7 @@ import com.oracle.bmc.objectstorage.responses.*; import lombok.extern.log4j.Log4j2; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -68,6 +70,33 @@ public void setup() throws Exception { target = c.target(NonJerseyServer.DEFAULT_BASE_URI); objectStorage = ObjectStorageClient.builder() + // This will run after, and in addition to, the default + // client configurator; + // this allows you to get the default behavior from the + // default client + // configurator + // (in the case of the ObjectStorageClient, the + // non-buffering behavior), but + // you + // can also add other things on top of it, like adding new + // headers + + .additionalClientConfigurator( + builder -> { + // Define a connection manager and its + // properties + final PoolingHttpClientConnectionManager + poolConnectionManager = + new PoolingHttpClientConnectionManager(); + // Setting intentionally pool to 1 to find connections leaks + poolConnectionManager.setMaxTotal(1); + poolConnectionManager.setDefaultMaxPerRoute(1); + + builder.property( + ApacheClientProperties + .CONNECTION_MANAGER, + poolConnectionManager); + }) .endpoint("http://localhost:8080") .build(authenticationDetailsProvider); } @@ -87,21 +116,51 @@ public void testResource() { } @Test - public void testItAll() throws IOException { + public void testEachApiOnce() throws IOException { // 1. Create bucket final CreateBucketResponse createBucketResponse = - objectStorage.createBucket( - CreateBucketRequest.builder() - .namespaceName(NAMESPACE) - .createBucketDetails( - CreateBucketDetails.builder() - .compartmentId(COMPARTMENT_ID) - .metadata(new HashMap<>()) - .name(BUCKET_NAME) - .build()) - .build()); + objectStorage.createBucket( + CreateBucketRequest.builder() + .namespaceName(NAMESPACE) + .createBucketDetails( + CreateBucketDetails.builder() + .compartmentId(COMPARTMENT_ID) + .metadata(new HashMap<>()) + .name(BUCKET_NAME) + .build()) + .build()); assertEquals(BUCKET_NAME, createBucketResponse.getBucket().getName()); + runObjectApis(); + } + + @Test(timeout = 30_000) + public void testConnectionLeak() throws IOException { + // 1. Create bucket + final CreateBucketResponse createBucketResponse = + objectStorage.createBucket( + CreateBucketRequest.builder() + .namespaceName(NAMESPACE) + .createBucketDetails( + CreateBucketDetails.builder() + .compartmentId(COMPARTMENT_ID) + .metadata(new HashMap<>()) + .name(BUCKET_NAME) + .build()) + .build()); + assertEquals(BUCKET_NAME, createBucketResponse.getBucket().getName()); + + // ObjectStorageClient in this test should be configured with connection pool of size 1. + // So on second method invocation pool will run out of connection and execution wil get stuck. + // Timeout set for this method will spot that method did not finished by itself. + // Timeout value is set generous to run well even on the slowest machine. + for (int i=0;i<2;i++) { + runObjectApis(); + } + } + + private void runObjectApis() throws IOException { + // 2. Create object final PutObjectResponse putObjectResponse = objectStorage.putObject( diff --git a/oci-repository-plugin/src/main/java/org/opensearch/repositories/oci/OciObjectStorageService.java b/oci-repository-plugin/src/main/java/org/opensearch/repositories/oci/OciObjectStorageService.java index 4b66809..e4120a7 100644 --- a/oci-repository-plugin/src/main/java/org/opensearch/repositories/oci/OciObjectStorageService.java +++ b/oci-repository-plugin/src/main/java/org/opensearch/repositories/oci/OciObjectStorageService.java @@ -12,7 +12,6 @@ package org.opensearch.repositories.oci; import com.oracle.bmc.ClientConfiguration; -import com.oracle.bmc.http.client.jersey.ApacheClientProperties; import com.oracle.bmc.objectstorage.ObjectStorageAsync; import com.oracle.bmc.objectstorage.ObjectStorageAsyncClient; import java.io.Closeable; @@ -20,7 +19,6 @@ import java.util.HashMap; import java.util.Map; import lombok.extern.log4j.Log4j2; -import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import org.apache.logging.log4j.message.ParameterizedMessage; /** Service class to hold client instances */ @@ -74,33 +72,6 @@ static ObjectStorageAsync createClientAsync( SocketAccess.doPrivilegedIOException( () -> ObjectStorageAsyncClient.builder() - // This will run after, and in addition to, the default - // client configurator; - // this allows you to get the default behavior from the - // default client - // configurator - // (in the case of the ObjectStorageClient, the - // non-buffering behavior), but - // you - // can also add other things on top of it, like adding new - // headers - - .additionalClientConfigurator( - builder -> { - // Define a connection manager and its - // properties - final PoolingHttpClientConnectionManager - poolConnectionManager = - new PoolingHttpClientConnectionManager(); - poolConnectionManager.setMaxTotal(100); - poolConnectionManager.setDefaultMaxPerRoute( - 100); - - builder.property( - ApacheClientProperties - .CONNECTION_MANAGER, - poolConnectionManager); - }) .configuration(ClientConfiguration.builder().build()) .build(clientSettings.getAuthenticationDetailsProvider()));