Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support configurable keepalive for managed channel. #196

Merged
merged 7 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
* Copyright © 2022-2025 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -57,6 +57,10 @@ static OxiaClientBuilder create(String serviceAddress) {

OxiaClientBuilder maxConnectionPerNode(int connections);

OxiaClientBuilder connectionKeepAliveTimeout(Duration connectionKeepAliveTimeout);

OxiaClientBuilder connectionKeepAliveTime(Duration connectionKeepAlive);

/**
* Configure the authentication plugin and its parameters.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
* Copyright © 2022-2025 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -78,23 +78,15 @@ class AsyncOxiaClientImpl implements AsyncOxiaClient {
final var oxiaBackoffProvider =
OxiaBackoffProvider.create(
config.connectionBackoffMinDelay(), config.connectionBackoffMaxDelay());
var stubManager =
new OxiaStubManager(
config.authentication(),
config.enableTls(),
oxiaBackoffProvider,
config.maxConnectionPerNode());
var stubManager = new OxiaStubManager(config, oxiaBackoffProvider);

var instrumentProvider = new InstrumentProvider(config.openTelemetry(), config.namespace());
var serviceAddrStub = stubManager.getStub(config.serviceAddress());
var shardManager =
new ShardManager(executor, serviceAddrStub, instrumentProvider, config.namespace());
var notificationManager =
new NotificationManager(executor, stubManager, shardManager, instrumentProvider);

OxiaStubProvider stubProvider =
new OxiaStubProvider(config.namespace(), stubManager, shardManager);

final var stubProvider = new OxiaStubProvider(config.namespace(), stubManager, shardManager);
shardManager.addCallback(notificationManager);
var readBatchManager =
BatchManager.newReadBatchManager(config, stubProvider, instrumentProvider);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
* Copyright © 2022-2025 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -35,4 +35,6 @@ public record ClientConfig(
boolean enableTls,
@NonNull Duration connectionBackoffMinDelay,
@NonNull Duration connectionBackoffMaxDelay,
Duration connectionKeepAliveTime,
Duration connectionKeepAliveTimeout,
int maxConnectionPerNode) {}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
* Copyright © 2022-2025 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -71,6 +71,9 @@ public class OxiaClientBuilderImpl implements OxiaClientBuilder {
@NonNull protected Duration connectionBackoffMinDelay = Duration.ofMillis(100);
@NonNull protected Duration connectionBackoffMaxDelay = Duration.ofSeconds(30);

protected Duration connectionKeepAliveTime = Duration.ofSeconds(10);
protected Duration connectionKeepAliveTimeout = Duration.ofSeconds(5);

protected int maxConnectionsPerNode = DefaultMaxConnectionPerNode;

@Override
Expand Down Expand Up @@ -164,6 +167,18 @@ public OxiaClientBuilder maxConnectionPerNode(int connections) {
return this;
}

@Override
public OxiaClientBuilder connectionKeepAliveTimeout(Duration connectionKeepAliveTimeout) {
this.connectionKeepAliveTimeout = connectionKeepAliveTimeout;
return this;
}

@Override
public OxiaClientBuilder connectionKeepAliveTime(Duration keepAliveTime) {
this.connectionKeepAliveTime = keepAliveTime;
return this;
}

@Override
public OxiaClientBuilder authentication(String authPluginClassName, String authParamsString)
throws UnsupportedAuthenticationException {
Expand Down Expand Up @@ -238,23 +253,27 @@ public OxiaClientBuilder loadConfig(Properties properties) {

@Override
public @NonNull CompletableFuture<AsyncOxiaClient> asyncClient() {
var config =
new ClientConfig(
serviceAddress,
requestTimeout,
batchLinger,
maxRequestsPerBatch,
DefaultMaxBatchSize,
sessionTimeout,
clientIdentifierSupplier.get(),
openTelemetry,
namespace,
authentication,
enableTls,
connectionBackoffMinDelay,
connectionBackoffMaxDelay,
maxConnectionsPerNode);
return AsyncOxiaClientImpl.newInstance(config);
return AsyncOxiaClientImpl.newInstance(getClientConfig());
}

public ClientConfig getClientConfig() {
return new ClientConfig(
serviceAddress,
requestTimeout,
batchLinger,
maxRequestsPerBatch,
DefaultMaxBatchSize,
sessionTimeout,
clientIdentifierSupplier.get(),
openTelemetry,
namespace,
authentication,
enableTls,
connectionBackoffMinDelay,
connectionBackoffMaxDelay,
connectionKeepAliveTime,
connectionKeepAliveTimeout,
maxConnectionsPerNode);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
* Copyright © 2022-2025 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,11 +25,13 @@
import io.grpc.ManagedChannel;
import io.grpc.TlsChannelCredentials;
import io.grpc.internal.BackoffPolicy;
import io.streamnative.oxia.client.ClientConfig;
import io.streamnative.oxia.client.api.Authentication;
import io.streamnative.oxia.proto.OxiaClientGrpc;

import java.lang.reflect.Field;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;

import lombok.NonNull;
Expand All @@ -56,14 +58,15 @@ static ChannelCredentials getChannelCredential(String address, boolean tlsEnable

public OxiaStub(
String address,
@Nullable Authentication authentication,
boolean enableTls,
ClientConfig clientConfig,
@Nullable BackoffPolicy.Provider backoffProvider) {

this(Grpc.newChannelBuilder(getAddress(address), getChannelCredential(address, enableTls))
this(Grpc.newChannelBuilder(getAddress(address), getChannelCredential(address, clientConfig.enableTls()))
.keepAliveTime(clientConfig.connectionKeepAliveTime().toMillis(), MILLISECONDS)
.keepAliveTimeout(clientConfig.connectionKeepAliveTimeout().toMillis(), MILLISECONDS)
.keepAliveWithoutCalls(true)
.directExecutor()
.build(),
authentication, backoffProvider);
.build(), clientConfig.authentication(), backoffProvider);
}

public OxiaStub(ManagedChannel channel) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
* Copyright © 2022-2025 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,30 +17,22 @@

import com.google.common.annotations.VisibleForTesting;
import io.grpc.internal.BackoffPolicy;
import io.streamnative.oxia.client.api.Authentication;
import io.streamnative.oxia.client.ClientConfig;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;

public class OxiaStubManager implements AutoCloseable {
@VisibleForTesting final Map<Key, OxiaStub> stubs = new ConcurrentHashMap<>();

@Nullable private final Authentication authentication;
private final boolean enableTls;
@Nullable private final BackoffPolicy.Provider backoffProvider;

private final BackoffPolicy.Provider backoffProvider;
private final int maxConnectionPerNode;
private final ClientConfig clientConfig;

public OxiaStubManager(
@Nullable Authentication authentication,
boolean enableTls,
@Nullable BackoffPolicy.Provider backoffProvider,
int maxConnectionPerNode) {
this.authentication = authentication;
this.enableTls = enableTls;
public OxiaStubManager(ClientConfig clientConfig, BackoffPolicy.Provider backoffProvider) {
this.backoffProvider = backoffProvider;
this.maxConnectionPerNode = maxConnectionPerNode;
this.clientConfig = clientConfig;
this.maxConnectionPerNode = clientConfig.maxConnectionPerNode();
}

public OxiaStub getStub(String address) {
Expand All @@ -50,8 +42,7 @@ public OxiaStub getStub(String address) {
modKey += maxConnectionPerNode;
}
return stubs.computeIfAbsent(
new Key(address, modKey),
key -> new OxiaStub(key.address, authentication, enableTls, backoffProvider));
new Key(address, modKey), key -> new OxiaStub(key.address, clientConfig, backoffProvider));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
* Copyright © 2022-2025 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,6 +26,7 @@
import io.streamnative.oxia.client.auth.TokenAuthentication;
import java.time.Duration;
import java.util.Properties;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

class OxiaClientBuilderTest {
Expand Down Expand Up @@ -132,4 +133,15 @@ void loadConfigFromFile() {
String token = metadata.get(Metadata.Key.of("Authorization", ASCII_STRING_MARSHALLER));
assertThat(token).isEqualTo("Bearer 1234");
}

@Test
void connectionKeepAlive() {
final var keepAliveTime = Duration.ofMillis(10);
final var keepAliveTimeout = Duration.ofMillis(10);
builder.connectionKeepAliveTime(keepAliveTime);
builder.connectionKeepAliveTimeout(keepAliveTimeout);
final var impl = (OxiaClientBuilderImpl) builder;
Assertions.assertEquals(keepAliveTimeout, impl.connectionKeepAliveTimeout);
Assertions.assertEquals(keepAliveTime, impl.connectionKeepAliveTime);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
* Copyright © 2022-2025 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -104,6 +104,8 @@ class BatchTest {
authentication != null,
Duration.ofMillis(100),
Duration.ofSeconds(30),
Duration.ofSeconds(10),
Duration.ofSeconds(5),
1);

private final OxiaClientImplBase serviceImpl =
Expand Down Expand Up @@ -498,6 +500,8 @@ class FactoryTests {
false,
Duration.ofMillis(100),
Duration.ofSeconds(30),
Duration.ofSeconds(10),
Duration.ofSeconds(5),
1);

@Nested
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
* Copyright © 2022-2025 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -66,6 +66,8 @@ class BatcherTest {
false,
Duration.ofMillis(100),
Duration.ofSeconds(30),
Duration.ofSeconds(10),
Duration.ofSeconds(5),
1);

BatchedArrayBlockingQueue<Operation<?>> queue;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
* Copyright © 2022-2025 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,6 +15,8 @@
*/
package io.streamnative.oxia.client.grpc;

import static io.streamnative.oxia.client.util.ConfigUtils.*;

import io.grpc.InsecureChannelCredentials;
import io.grpc.TlsChannelCredentials;
import io.grpc.stub.StreamObserver;
Expand Down Expand Up @@ -52,9 +54,9 @@ public enum BackoffType {
public void testOxiaReconnectBackoff(BackoffType type) throws Exception {
final OxiaStubManager stubManager;
if (type == BackoffType.Oxia) {
stubManager = new OxiaStubManager(null, false, OxiaBackoffProvider.DEFAULT, 1);
stubManager = new OxiaStubManager(getDefaultClientConfig(), OxiaBackoffProvider.DEFAULT);
} else {
stubManager = new OxiaStubManager(null, false, null, 1);
stubManager = new OxiaStubManager(getDefaultClientConfig(), null);
}

final OxiaStub stub = stubManager.getStub(oxia.getServiceAddress());
Expand Down Expand Up @@ -131,9 +133,12 @@ public void onCompleted() {
@SneakyThrows
public void testMaxConnectionPerNode() {
final var maxConnectionPerNode = 10;
@Cleanup
var stubManager =
new OxiaStubManager(null, false, OxiaBackoffProvider.DEFAULT, maxConnectionPerNode);
final var clientConfig =
getDefaultClientConfig(
builder -> {
builder.maxConnectionPerNode(maxConnectionPerNode);
});
@Cleanup var stubManager = new OxiaStubManager(clientConfig, OxiaBackoffProvider.DEFAULT);
for (int i = 0; i < 1000; i++) {
stubManager.getStub(oxia.getServiceAddress());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
* Copyright © 2022-2025 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -82,6 +82,8 @@ void setup() throws IOException {
false,
Duration.ofMillis(100),
Duration.ofSeconds(30),
Duration.ofSeconds(10),
Duration.ofSeconds(5),
1);

String serverName = InProcessServerBuilder.generateName();
Expand Down
Loading
Loading