diff --git a/evita_engine/src/main/java/io/evitadb/core/Evita.java b/evita_engine/src/main/java/io/evitadb/core/Evita.java index aede6aa592..8d41ebd0a8 100644 --- a/evita_engine/src/main/java/io/evitadb/core/Evita.java +++ b/evita_engine/src/main/java/io/evitadb/core/Evita.java @@ -54,7 +54,7 @@ import io.evitadb.api.requestResponse.schema.mutation.catalog.RemoveCatalogSchemaMutation; import io.evitadb.api.task.ServerTask; import io.evitadb.core.async.ClientRunnableTask; -import io.evitadb.core.async.ObservableExecutorService; +import io.evitadb.core.async.ObservableExecutorServiceWithHardDeadline; import io.evitadb.core.async.ObservableThreadExecutor; import io.evitadb.core.async.Scheduler; import io.evitadb.core.async.SessionKiller; @@ -165,12 +165,12 @@ public final class Evita implements EvitaContract { * Executor service that handles all requests to the Evita instance. */ @Getter - private final ObservableExecutorService requestExecutor; + private final ObservableExecutorServiceWithHardDeadline requestExecutor; /** * Executor service that handles transaction handling, once transaction gets committed. */ @Getter - private final ObservableExecutorService transactionExecutor; + private final ObservableExecutorServiceWithHardDeadline transactionExecutor; /** * Scheduler service for executing asynchronous service tasks. */ diff --git a/evita_engine/src/main/java/io/evitadb/core/async/ObservableExecutorServiceWithHardDeadline.java b/evita_engine/src/main/java/io/evitadb/core/async/ObservableExecutorServiceWithHardDeadline.java new file mode 100644 index 0000000000..31c355559e --- /dev/null +++ b/evita_engine/src/main/java/io/evitadb/core/async/ObservableExecutorServiceWithHardDeadline.java @@ -0,0 +1,130 @@ +/* + * + * _ _ ____ ____ + * _____ _(_) |_ __ _| _ \| __ ) + * / _ \ \ / / | __/ _` | | | | _ \ + * | __/\ V /| | || (_| | |_| | |_) | + * \___| \_/ |_|\__\__,_|____/|____/ + * + * Copyright (c) 2024 + * + * Licensed under the Business Source License, Version 1.1 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/FgForrest/evitaDB/blob/master/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.evitadb.core.async; + + +import javax.annotation.Nonnull; +import java.util.concurrent.Callable; + +/** + * This interface extends {@link ObservableExecutorService} and marks a service that actively cancels tasks that exceed + * their specified timeout duration. The default timeout duration for tasks submitted without an explicit timeout is + * specified by {@link #getDefaultTimeoutInMilliseconds()}. + * + * @author Jan Novotný (novotny@fg.cz), FG Forrest a.s. (c) 2024 + */ +public interface ObservableExecutorServiceWithHardDeadline extends ObservableExecutorService { + + /** + * Retrieves the default timeout value in milliseconds for all tasks submitted without explicit timeout. + * + * @return the default timeout duration in milliseconds + */ + long getDefaultTimeoutInMilliseconds(); + + /** + * Creates a task with the given name and lambda function to be executed. + * + * @param name the name of the task + * @param lambda the task to be executed + * @return a Runnable representing the task + */ + @Nonnull + Runnable createTask(@Nonnull String name, @Nonnull Runnable lambda); + + /** + * Creates a task to be executed from the given lambda. + * + * @param lambda the task to be executed + * @return a Runnable representing the task + */ + @Nonnull + Runnable createTask(@Nonnull Runnable lambda); + + /** + * Creates a task with the given name and lambda function, to be executed with a specified timeout. + * + * @param name the name of the task + * @param lambda the task to be executed + * @param timeoutInMilliseconds the timeout duration in milliseconds + * @return a Runnable representing the task + */ + @Nonnull + Runnable createTask(@Nonnull String name, @Nonnull Runnable lambda, long timeoutInMilliseconds); + + /** + * Creates a task with the given lambda function, to be executed with a specified timeout. + * + * @param lambda the task to be executed + * @param timeoutInMilliseconds the timeout duration in milliseconds + * @return a Runnable representing the task + */ + @Nonnull + Runnable createTask(@Nonnull Runnable lambda, long timeoutInMilliseconds); + + /** + * Creates a task with the given name and lambda function to be executed. + * + * @param name the name of the task + * @param lambda the task to be executed + * @param the result type of method call + * @return a Callable representing the task + */ + @Nonnull + Callable createTask(@Nonnull String name, @Nonnull Callable lambda); + + /** + * Creates a task to be executed from the given lambda. + * + * @param lambda the task to be executed + * @param the result type of method call + * @return a Callable representing the task + */ + @Nonnull + Callable createTask(@Nonnull Callable lambda); + + /** + * Creates a task with the given name and lambda function, to be executed with a specified timeout. + * + * @param name the name of the task + * @param lambda the task to be executed + * @param timeoutInMilliseconds the timeout duration in milliseconds + * @param the result type of method call + * @return a Callable representing the task + */ + @Nonnull + Callable createTask(@Nonnull String name, @Nonnull Callable lambda, long timeoutInMilliseconds); + + /** + * Creates a task from the given lambda function, to be executed with a specified timeout. + * + * @param lambda the task to be executed + * @param timeoutInMilliseconds the timeout duration in milliseconds + * @param the result type of method call + * @return a Callable representing the task + */ + @Nonnull + Callable createTask(@Nonnull Callable lambda, long timeoutInMilliseconds); + +} diff --git a/evita_engine/src/main/java/io/evitadb/core/async/ObservableThreadExecutor.java b/evita_engine/src/main/java/io/evitadb/core/async/ObservableThreadExecutor.java index 9c28715c49..18cf5e406f 100644 --- a/evita_engine/src/main/java/io/evitadb/core/async/ObservableThreadExecutor.java +++ b/evita_engine/src/main/java/io/evitadb/core/async/ObservableThreadExecutor.java @@ -47,7 +47,7 @@ * @author Jan Novotný (novotny@fg.cz), FG Forrest a.s. (c) 2024 */ @Slf4j -public class ObservableThreadExecutor implements ObservableExecutorService { +public class ObservableThreadExecutor implements ObservableExecutorServiceWithHardDeadline { private static final int BUFFER_CAPACITY = 512; /** * Buffer used for purging finished tasks. @@ -133,6 +133,11 @@ public ForkJoinPool getForkJoinPoolInternal() { return forkJoinPool; } + @Override + public long getDefaultTimeoutInMilliseconds() { + return this.timeoutInMilliseconds; + } + @Override public long getSubmittedTaskCount() { return this.submittedTaskCount.get(); @@ -357,6 +362,7 @@ private void cancelTimedOutTasks() { // if task is running / waiting longer than the threshold, cancel it and remove it from the queue if (task.isTimedOut(threshold)) { timedOutTasks++; + log.info("Cancelling timed out task: {}", task); task.cancel(); it.remove(); } else { @@ -409,10 +415,76 @@ private interface ObservableTask { } + @Override + @Nonnull + public Runnable createTask(@Nonnull String name, @Nonnull Runnable lambda) { + return new ObservableRunnable(name, lambda, this.timeoutInMilliseconds); + } + + @Override + @Nonnull + public Runnable createTask(@Nonnull Runnable lambda) { + final StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); + return new ObservableRunnable( + stackTrace.length > 1 ? stackTrace[1].toString() : "Unknown", + lambda, this.timeoutInMilliseconds); + } + + @Override + @Nonnull + public Runnable createTask(@Nonnull String name, @Nonnull Runnable lambda, long timeoutInMilliseconds) { + return new ObservableRunnable(name, lambda, timeoutInMilliseconds); + } + + @Override + @Nonnull + public Runnable createTask(@Nonnull Runnable lambda, long timeoutInMilliseconds) { + final StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); + return new ObservableRunnable( + stackTrace.length > 1 ? stackTrace[1].toString() : "Unknown", + lambda, timeoutInMilliseconds); + } + + @Override + @Nonnull + public Callable createTask(@Nonnull String name, @Nonnull Callable lambda) { + return new ObservableCallable<>(name, lambda, this.timeoutInMilliseconds); + } + + @Override + @Nonnull + public Callable createTask(@Nonnull Callable lambda) { + final StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); + return new ObservableCallable<>( + stackTrace.length > 1 ? stackTrace[1].toString() : "Unknown", + lambda, this.timeoutInMilliseconds + ); + } + + @Override + @Nonnull + public Callable createTask(@Nonnull String name, @Nonnull Callable lambda, long timeoutInMilliseconds) { + return new ObservableCallable<>(name, lambda, timeoutInMilliseconds); + } + + @Override + @Nonnull + public Callable createTask(@Nonnull Callable lambda, long timeoutInMilliseconds) { + final StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); + return new ObservableCallable<>( + stackTrace.length > 1 ? stackTrace[1].toString() : "Unknown", + lambda, timeoutInMilliseconds + ); + } + /** * Wrapper around a {@link Runnable} that implements the {@link ObservableTask} interface. */ private static class ObservableRunnable implements Runnable, ObservableTask { + /** + * Name / description of the task. + */ + private final String name; /** * Delegate runnable that is being wrapped. */ @@ -427,6 +499,14 @@ private static class ObservableRunnable implements Runnable, ObservableTask { private final CompletableFuture future = new CompletableFuture<>(); public ObservableRunnable(@Nonnull Runnable delegate, long timeoutInMilliseconds) { + final StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); + this.name = stackTrace.length > 1 ? stackTrace[1].toString() : "Unknown"; + this.delegate = delegate; + this.timedOutAt = System.currentTimeMillis() + timeoutInMilliseconds; + } + + public ObservableRunnable(@Nonnull String name, @Nonnull Runnable delegate, long timeoutInMilliseconds) { + this.name = name; this.delegate = delegate; this.timedOutAt = System.currentTimeMillis() + timeoutInMilliseconds; } @@ -457,6 +537,11 @@ public void run() { throw e; } } + + @Override + public String toString() { + return this.name; + } } /** @@ -464,6 +549,10 @@ public void run() { * @param the type of the result */ private static class ObservableCallable implements Callable, ObservableTask { + /** + * Name / description of the task. + */ + private final String name; /** * Delegate callable that is being wrapped. */ @@ -478,6 +567,14 @@ private static class ObservableCallable implements Callable, ObservableTas private final CompletableFuture future = new CompletableFuture<>(); public ObservableCallable(@Nonnull Callable delegate, long timeout) { + final StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); + this.name = stackTrace.length > 1 ? stackTrace[1].toString() : "Unknown"; + this.delegate = delegate; + this.timedOutAt = System.currentTimeMillis() + timeout; + } + + public ObservableCallable(@Nonnull String name, @Nonnull Callable delegate, long timeout) { + this.name = name; this.delegate = delegate; this.timedOutAt = System.currentTimeMillis() + timeout; } @@ -509,6 +606,11 @@ public V call() throws Exception { throw e; } } + + @Override + public String toString() { + return this.name; + } } /** diff --git a/evita_engine/src/main/java/io/evitadb/core/async/Scheduler.java b/evita_engine/src/main/java/io/evitadb/core/async/Scheduler.java index 6caf0568c3..52bdf4cedc 100644 --- a/evita_engine/src/main/java/io/evitadb/core/async/Scheduler.java +++ b/evita_engine/src/main/java/io/evitadb/core/async/Scheduler.java @@ -487,6 +487,7 @@ private long purgeFinishedTasks() { final TaskSimplifiedState taskState = status.simplifiedState(); if (taskState == TaskSimplifiedState.FINISHED || taskState == TaskSimplifiedState.FAILED) { // if task is finished, remove it from the queue + log.info("Task {} is waiting for precondition for too long, removing it from the queue.", status.taskId()); it.remove(); // if its defense period hasn't perished add it to list, that might end up in the queue again if (status.finished().isAfter(threshold)) { diff --git a/evita_external_api/evita_external_api_grpc/client/src/main/java/io/evitadb/driver/EvitaClient.java b/evita_external_api/evita_external_api_grpc/client/src/main/java/io/evitadb/driver/EvitaClient.java index 3d972f49f1..7e4bcd76b7 100644 --- a/evita_external_api/evita_external_api_grpc/client/src/main/java/io/evitadb/driver/EvitaClient.java +++ b/evita_external_api/evita_external_api_grpc/client/src/main/java/io/evitadb/driver/EvitaClient.java @@ -23,6 +23,7 @@ package io.evitadb.driver; +import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.Empty; import com.linecorp.armeria.client.ClientFactory; import com.linecorp.armeria.client.ClientFactoryBuilder; @@ -43,6 +44,8 @@ import io.evitadb.api.requestResponse.schema.mutation.catalog.CreateCatalogSchemaMutation; import io.evitadb.api.requestResponse.system.SystemStatus; import io.evitadb.driver.config.EvitaClientConfiguration; +import io.evitadb.driver.exception.EvitaClientServerCallException; +import io.evitadb.driver.exception.EvitaClientTimedOutException; import io.evitadb.driver.exception.IncompatibleClientException; import io.evitadb.driver.interceptor.ClientSessionInterceptor; import io.evitadb.driver.trace.ClientTracingContext; @@ -54,7 +57,6 @@ import io.evitadb.exception.InvalidEvitaVersionException; import io.evitadb.externalApi.grpc.certificate.ClientCertificateManager; import io.evitadb.externalApi.grpc.generated.EvitaServiceGrpc.EvitaServiceFutureStub; -import io.evitadb.externalApi.grpc.generated.EvitaServiceGrpc.EvitaServiceStub; import io.evitadb.externalApi.grpc.generated.*; import io.evitadb.externalApi.grpc.requestResponse.EvitaEnumConverter; import io.evitadb.externalApi.grpc.requestResponse.schema.mutation.DelegatingTopLevelCatalogSchemaMutationConverter; @@ -85,9 +87,11 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Function; @@ -114,7 +118,14 @@ public class EvitaClient implements EvitaContract { static final Pattern ERROR_MESSAGE_PATTERN = Pattern.compile("(\\w+:\\w+:\\w+): (.*)"); - + /** + * Client call timeout. + */ + final ThreadLocal> timeout; + /** + * Created evita service stub that returns futures. + */ + private final StubTimeoutProxy evitaServiceFutureStub; /** * The configuration of the evitaDB client. */ @@ -152,18 +163,33 @@ public class EvitaClient implements EvitaContract { * Client implementation of management service. */ private final EvitaClientManagement management; + /** - * Client call timeout. - */ - final ThreadLocal> timeout; - /** - * Created evita service stub. - */ - final EvitaServiceStub evitaServiceStub; - /** - * Created evita service stub that returns futures. + * Transforms the given Throwable into a RuntimeException based on its type. + * + * @param ex The original exception to be transformed. Must not be null. + * @param onUnauthenticated A runnable to be executed if the exception indicates an unauthenticated status. Must not be null. + * @return A corresponding RuntimeException based on the type of the original exception. */ - final EvitaServiceFutureStub evitaServiceFutureStub; + @Nonnull + public static RuntimeException transformException( + @Nonnull Throwable ex, + @Nonnull Runnable onUnauthenticated + ) { + if (ex instanceof StatusRuntimeException statusRuntimeException) { + return transformStatusRuntimeException(statusRuntimeException, onUnauthenticated); + } else if (ex instanceof EvitaInvalidUsageException invalidUsageException) { + return invalidUsageException; + } else if (ex instanceof EvitaInternalError evitaInternalError) { + return evitaInternalError; + } else { + log.error("Unexpected internal Evita error occurred: {}", ex.getMessage(), ex); + return new EvitaClientServerCallException( + "Unexpected internal Evita error occurred.", + ex + ); + } + } @Nonnull private static ClientTracingContext getClientTracingContext(@Nonnull EvitaClientConfiguration configuration) { @@ -177,6 +203,45 @@ private static ClientTracingContext getClientTracingContext(@Nonnull EvitaClient return context; } + /** + * Handles a {@link StatusRuntimeException} by checking the status code and performing appropriate actions. + * + * @param statusRuntimeException the {@link StatusRuntimeException} to handle + * @param onUnauthenticated the action to perform when the status code is {@link Code#UNAUTHENTICATED} + */ + @Nonnull + private static RuntimeException transformStatusRuntimeException( + @Nonnull StatusRuntimeException statusRuntimeException, + @Nonnull Runnable onUnauthenticated + ) { + final Code statusCode = statusRuntimeException.getStatus().getCode(); + final String description = ofNullable(statusRuntimeException.getStatus().getDescription()) + .map(it -> statusCode.name() + ": " + it) + .orElseGet(statusCode::name); + if (statusCode == Code.UNAUTHENTICATED) { + onUnauthenticated.run(); + return new InstanceTerminatedException("session"); + } else if (statusCode == Code.INVALID_ARGUMENT) { + final Matcher expectedFormat = ERROR_MESSAGE_PATTERN.matcher(description); + if (expectedFormat.matches()) { + return EvitaInvalidUsageException.createExceptionWithErrorCode( + expectedFormat.group(2), expectedFormat.group(1) + ); + } else { + return new EvitaInvalidUsageException(description); + } + } else { + final Matcher expectedFormat = ERROR_MESSAGE_PATTERN.matcher(description); + if (expectedFormat.matches()) { + return GenericEvitaInternalError.createExceptionWithErrorCode( + expectedFormat.group(2), expectedFormat.group(1) + ); + } else { + return new GenericEvitaInternalError(description); + } + } + } + public EvitaClient(@Nonnull EvitaClientConfiguration configuration) { this(configuration, null); } @@ -208,8 +273,8 @@ public EvitaClient( .clientPrivateKeyPassword(configuration.certificateKeyPassword()) .build(); - clientFactoryBuilder.tlsCustomizer(tlsCustomizer -> { - clientCertificateManager.buildClientSslContext( + clientFactoryBuilder.tlsCustomizer( + tlsCustomizer -> clientCertificateManager.buildClientSslContext( (certificateType, certificate) -> { try { switch (certificateType) { @@ -227,8 +292,7 @@ public EvitaClient( } }, tlsCustomizer - ); - }); + )); } else { uriScheme = "http"; } @@ -236,7 +300,7 @@ public EvitaClient( this.executor = Executors.newCachedThreadPool(); this.clientFactory = clientFactoryBuilder.build(); final GrpcClientBuilder grpcClientBuilder = GrpcClients.builder(uriScheme + "://" + configuration.host() + ":" + configuration.port() + "/") - .factory(clientFactory) + .factory(this.clientFactory) .serializationFormat(GrpcSerializationFormats.PROTO) .intercept(new ClientSessionInterceptor(configuration)); @@ -247,8 +311,7 @@ public EvitaClient( ofNullable(grpcConfigurator).ifPresent(it -> it.accept(grpcClientBuilder)); this.grpcClientBuilder = grpcClientBuilder; - this.evitaServiceStub = grpcClientBuilder.build(EvitaServiceStub.class); - this.evitaServiceFutureStub = grpcClientBuilder.build(EvitaServiceFutureStub.class); + this.evitaServiceFutureStub = new StubTimeoutProxy<>(grpcClientBuilder.build(EvitaServiceFutureStub.class)); this.reflectionLookup = new ReflectionLookup(configuration.reflectionLookupBehaviour()); this.timeout = ThreadLocal.withInitial(() -> { final LinkedList timeouts = new LinkedList<>(); @@ -324,59 +387,43 @@ public EvitaClientSession createSession(@Nonnull SessionTraits traits) { if (traits.isReadWrite()) { if (traits.isBinary()) { grpcResponse = executeWithEvitaService( - this.evitaServiceFutureStub, - evitaService -> { - final Timeout timeoutToUse = this.timeout.get().peek(); - return evitaService.createBinaryReadWriteSession( - GrpcEvitaSessionRequest.newBuilder() - .setCatalogName(traits.catalogName()) - .setCommitBehavior(EvitaEnumConverter.toGrpcCommitBehavior(traits.commitBehaviour())) - .setDryRun(traits.isDryRun()) - .build() - ).get(timeoutToUse.timeout(), timeoutToUse.timeoutUnit()); - } + evitaService -> evitaService.createBinaryReadWriteSession( + GrpcEvitaSessionRequest.newBuilder() + .setCatalogName(traits.catalogName()) + .setCommitBehavior(EvitaEnumConverter.toGrpcCommitBehavior(traits.commitBehaviour())) + .setDryRun(traits.isDryRun()) + .build() + ) ); } else { grpcResponse = executeWithEvitaService( - this.evitaServiceFutureStub, - evitaService -> { - final Timeout timeoutToUse = this.timeout.get().peek(); - return evitaService.createReadWriteSession( - GrpcEvitaSessionRequest.newBuilder() - .setCatalogName(traits.catalogName()) - .setCommitBehavior(EvitaEnumConverter.toGrpcCommitBehavior(traits.commitBehaviour())) - .setDryRun(traits.isDryRun()) - .build() - ).get(timeoutToUse.timeout(), timeoutToUse.timeoutUnit()); - } + evitaService -> evitaService.createReadWriteSession( + GrpcEvitaSessionRequest.newBuilder() + .setCatalogName(traits.catalogName()) + .setCommitBehavior(EvitaEnumConverter.toGrpcCommitBehavior(traits.commitBehaviour())) + .setDryRun(traits.isDryRun()) + .build() + ) ); } } else { if (traits.isBinary()) { grpcResponse = executeWithEvitaService( - this.evitaServiceFutureStub, - evitaService -> { - final Timeout timeoutToUse = this.timeout.get().peek(); - return evitaService.createBinaryReadOnlySession( - GrpcEvitaSessionRequest.newBuilder() - .setCatalogName(traits.catalogName()) - .setDryRun(traits.isDryRun()) - .build() - ).get(timeoutToUse.timeout(), timeoutToUse.timeoutUnit()); - } + evitaService -> evitaService.createBinaryReadOnlySession( + GrpcEvitaSessionRequest.newBuilder() + .setCatalogName(traits.catalogName()) + .setDryRun(traits.isDryRun()) + .build() + ) ); } else { grpcResponse = executeWithEvitaService( - this.evitaServiceFutureStub, - evitaService -> { - final Timeout timeoutToUse = this.timeout.get().peek(); - return evitaService.createReadOnlySession( - GrpcEvitaSessionRequest.newBuilder() - .setCatalogName(traits.catalogName()) - .setDryRun(traits.isDryRun()) - .build() - ).get(timeoutToUse.timeout(), timeoutToUse.timeoutUnit()); - } + evitaService -> evitaService.createReadOnlySession( + GrpcEvitaSessionRequest.newBuilder() + .setCatalogName(traits.catalogName()) + .setDryRun(traits.isDryRun()) + .build() + ) ); } } @@ -432,12 +479,7 @@ public void terminateSession(@Nonnull EvitaSessionContract session) { public Set getCatalogNames() { assertActive(); final GrpcCatalogNamesResponse grpcResponse = executeWithEvitaService( - this.evitaServiceFutureStub, - evitaService -> { - final Timeout timeoutToUse = this.timeout.get().peek(); - return evitaService.getCatalogNames(Empty.newBuilder().build()) - .get(timeoutToUse.timeout(), timeoutToUse.timeoutUnit()); - } + evitaService -> evitaService.getCatalogNames(Empty.newBuilder().build()) ); return new LinkedHashSet<>( grpcResponse.getCatalogNamesList() @@ -467,12 +509,7 @@ public void renameCatalog(@Nonnull String catalogName, @Nonnull String newCatalo .setNewCatalogName(newCatalogName) .build(); final GrpcRenameCatalogResponse grpcResponse = executeWithEvitaService( - this.evitaServiceFutureStub, - evitaService -> { - final Timeout timeoutToUse = this.timeout.get().peek(); - return evitaService.renameCatalog(request) - .get(timeoutToUse.timeout(), timeoutToUse.timeoutUnit()); - } + evitaService -> evitaService.renameCatalog(request) ); final boolean success = grpcResponse.getSuccess(); if (success) { @@ -490,12 +527,7 @@ public void replaceCatalog(@Nonnull String catalogNameToBeReplacedWith, @Nonnull .build(); final GrpcReplaceCatalogResponse grpcResponse = executeWithEvitaService( - this.evitaServiceFutureStub, - evitaService -> { - final Timeout timeoutToUse = this.timeout.get().peek(); - return evitaService.replaceCatalog(request) - .get(timeoutToUse.timeout(), timeoutToUse.timeoutUnit()); - } + evitaService -> evitaService.replaceCatalog(request) ); final boolean success = grpcResponse.getSuccess(); if (success) { @@ -513,12 +545,7 @@ public boolean deleteCatalogIfExists(@Nonnull String catalogName) { .build(); final GrpcDeleteCatalogIfExistsResponse grpcResponse = executeWithEvitaService( - this.evitaServiceFutureStub, - evitaService -> { - final Timeout timeoutToUse = this.timeout.get().peek(); - return evitaService.deleteCatalogIfExists(request) - .get(timeoutToUse.timeout(), timeoutToUse.timeoutUnit()); - } + evitaService -> evitaService.deleteCatalogIfExists(request) ); final boolean success = grpcResponse.getSuccess(); if (success) { @@ -540,12 +567,7 @@ public void update(@Nonnull TopLevelCatalogSchemaMutation... catalogMutations) { .build(); executeWithEvitaService( - this.evitaServiceFutureStub, - evitaService -> { - final Timeout timeoutToUse = this.timeout.get().peek(); - return evitaService.update(request) - .get(timeoutToUse.timeout(), timeoutToUse.timeoutUnit()); - } + evitaService -> evitaService.update(request) ); } @@ -715,11 +737,12 @@ public String getVersion() { */ @SuppressWarnings("unused") public void executeWithExtendedTimeout(@Nonnull Runnable lambda, long timeout, @Nonnull TimeUnit unit) { + final LinkedList callTimeouts = this.timeout.get(); try { - this.timeout.get().push(new Timeout(timeout, unit)); + callTimeouts.push(new Timeout(timeout, unit)); lambda.run(); } finally { - this.timeout.get().pop(); + callTimeouts.pop(); } } @@ -735,11 +758,12 @@ public void executeWithExtendedTimeout(@Nonnull Runnable lambda, long timeout, @ */ @SuppressWarnings("unused") public T executeWithExtendedTimeout(@Nonnull Supplier lambda, long timeout, @Nonnull TimeUnit unit) { + final LinkedList callTimeouts = this.timeout.get(); try { - this.timeout.get().push(new Timeout(timeout, unit)); + callTimeouts.push(new Timeout(timeout, unit)); return lambda.get(); } finally { - this.timeout.get().pop(); + callTimeouts.pop(); } } @@ -760,40 +784,25 @@ protected void assertActive() { * @param return type of the function * @return result of the applied function */ - static T executeWithEvitaService(@Nonnull S clientStub, @Nonnull AsyncCallFunction lambda) { + private T executeWithEvitaService( + @Nonnull AsyncCallFunction> lambda + ) { + final Timeout timeout = this.timeout.get().peek(); try { - return lambda.apply(clientStub); - } catch (StatusRuntimeException statusRuntimeException) { - final Code statusCode = statusRuntimeException.getStatus().getCode(); - final String description = ofNullable(statusRuntimeException.getStatus().getDescription()) - .orElse("No description."); - if (statusCode == Code.INVALID_ARGUMENT) { - final Matcher expectedFormat = ERROR_MESSAGE_PATTERN.matcher(description); - if (expectedFormat.matches()) { - throw EvitaInvalidUsageException.createExceptionWithErrorCode( - expectedFormat.group(2), expectedFormat.group(1) - ); - } else { - throw new EvitaInvalidUsageException(description); + return lambda.apply(this.evitaServiceFutureStub.get(timeout)) + .get(timeout.timeout(), timeout.timeoutUnit()); + } catch (ExecutionException e) { + throw EvitaClient.transformException( + e.getCause() == null ? e : e.getCause(), + () -> { } - } else { - final Matcher expectedFormat = ERROR_MESSAGE_PATTERN.matcher(description); - if (expectedFormat.matches()) { - throw GenericEvitaInternalError.createExceptionWithErrorCode( - expectedFormat.group(2), expectedFormat.group(1) - ); - } else { - throw new GenericEvitaInternalError(description); - } - } - } catch (EvitaInvalidUsageException | EvitaInternalError evitaError) { - throw evitaError; - } catch (Throwable e) { - log.error("Unexpected internal Evita error occurred: {}", e.getMessage(), e); - throw new GenericEvitaInternalError( - "Unexpected internal Evita error occurred: " + e.getMessage(), - "Unexpected internal Evita error occurred.", - e + ); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new EvitaClientServerCallException("Server call interrupted.", e); + } catch (TimeoutException e) { + throw new EvitaClientTimedOutException( + timeout.timeout(), timeout.timeoutUnit() ); } } diff --git a/evita_external_api/evita_external_api_grpc/client/src/main/java/io/evitadb/driver/EvitaClientManagement.java b/evita_external_api/evita_external_api_grpc/client/src/main/java/io/evitadb/driver/EvitaClientManagement.java index ec23babc92..61eec5ff1b 100644 --- a/evita_external_api/evita_external_api_grpc/client/src/main/java/io/evitadb/driver/EvitaClientManagement.java +++ b/evita_external_api/evita_external_api_grpc/client/src/main/java/io/evitadb/driver/EvitaClientManagement.java @@ -23,6 +23,7 @@ package io.evitadb.driver; +import com.google.common.util.concurrent.ListenableFuture; import com.google.protobuf.ByteString; import com.google.protobuf.Empty; import com.google.protobuf.StringValue; @@ -38,6 +39,8 @@ import io.evitadb.api.task.TaskStatus; import io.evitadb.api.task.TaskStatus.TaskSimplifiedState; import io.evitadb.dataType.PaginatedList; +import io.evitadb.driver.exception.EvitaClientServerCallException; +import io.evitadb.driver.exception.EvitaClientTimedOutException; import io.evitadb.exception.UnexpectedIOException; import io.evitadb.externalApi.grpc.dataType.EvitaDataTypesConverter; import io.evitadb.externalApi.grpc.generated.EvitaManagementServiceGrpc.EvitaManagementServiceFutureStub; @@ -67,11 +70,12 @@ import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import static io.evitadb.driver.EvitaClient.executeWithEvitaService; import static io.evitadb.externalApi.grpc.dataType.EvitaDataTypesConverter.toGrpcUuid; /** @@ -92,11 +96,11 @@ public class EvitaClientManagement implements EvitaManagementContract, Closeable /** * Created evita service stub. */ - private final EvitaManagementServiceStub evitaManagementServiceStub; + private final StubTimeoutProxy evitaManagementServiceStub; /** * Created evita service stub that returns futures. */ - private final EvitaManagementServiceFutureStub evitaManagementServiceFutureStub; + private final StubTimeoutProxy evitaManagementServiceFutureStub; public EvitaClientManagement(@Nonnull EvitaClient evitaClient, @Nonnull GrpcClientBuilder grpcClientBuilder) { this.evitaClient = evitaClient; @@ -105,8 +109,8 @@ public EvitaClientManagement(@Nonnull EvitaClient evitaClient, @Nonnull GrpcClie evitaClient.getConfiguration().trackedTaskLimit(), 2000 ); - this.evitaManagementServiceStub = grpcClientBuilder.build(EvitaManagementServiceStub.class); - this.evitaManagementServiceFutureStub = grpcClientBuilder.build(EvitaManagementServiceFutureStub.class); + this.evitaManagementServiceStub = new StubTimeoutProxy<>(grpcClientBuilder.build(EvitaManagementServiceStub.class)); + this.evitaManagementServiceFutureStub = new StubTimeoutProxy<>(grpcClientBuilder.build(EvitaManagementServiceFutureStub.class)); } @Nonnull @@ -115,12 +119,7 @@ public CatalogStatistics[] getCatalogStatistics() { this.evitaClient.assertActive(); final GrpcEvitaCatalogStatisticsResponse response = executeWithEvitaService( - this.evitaManagementServiceFutureStub, - evitaService -> { - final Timeout timeoutToUse = this.evitaClient.timeout.get().peek(); - return evitaService.getCatalogStatistics(Empty.newBuilder().build()) - .get(timeoutToUse.timeout(), timeoutToUse.timeoutUnit()); - } + evitaService -> evitaService.getCatalogStatistics(Empty.newBuilder().build()) ); return response.getCatalogStatisticsList() @@ -152,8 +151,7 @@ public Task restoreCatalog( ) throws UnexpectedIOException { this.evitaClient.assertActive(); - return executeWithEvitaService( - this.evitaManagementServiceStub, + return executeWithEvitaBlockingService( evitaService -> { final CompletableFuture> result = new CompletableFuture<>(); final AtomicLong bytesSent = new AtomicLong(0); @@ -230,17 +228,12 @@ public void onCompleted() { public Task restoreCatalog(@Nonnull String catalogName, @Nonnull UUID fileId) throws FileForFetchNotFoundException { this.evitaClient.assertActive(); + final GrpcRestoreCatalogFromServerFileRequest request = GrpcRestoreCatalogFromServerFileRequest.newBuilder() + .setFileId(toGrpcUuid(fileId)) + .setCatalogName(catalogName) + .build(); final GrpcRestoreCatalogResponse response = executeWithEvitaService( - this.evitaManagementServiceFutureStub, - evitaService -> { - final Timeout timeoutToUse = this.evitaClient.timeout.get().peek(); - return evitaService.restoreCatalogFromServerFile( - GrpcRestoreCatalogFromServerFileRequest.newBuilder() - .setFileId(toGrpcUuid(fileId)) - .setCatalogName(catalogName) - .build() - ).get(timeoutToUse.timeout(), timeoutToUse.timeoutUnit()); - } + evitaService -> evitaService.restoreCatalogFromServerFile(request) ); //noinspection unchecked @@ -258,24 +251,21 @@ public Task restoreCatalog(@Nonnull String catalogName, @Nonnull UUID f ) { this.evitaClient.assertActive(); - final GrpcTaskStatusesResponse response = executeWithEvitaService( - this.evitaManagementServiceFutureStub, - evitaService -> { - final Timeout timeoutToUse = this.evitaClient.timeout.get().peek(); - final GrpcTaskStatusesRequest.Builder builder = GrpcTaskStatusesRequest.newBuilder() - .setPageNumber(page) - .setPageSize(pageSize); - if (taskType != null) { - for (String theTaskType : taskType) { - builder.addTaskType(StringValue.of(theTaskType)); - } - } - for (TaskSimplifiedState state : states) { - builder.addSimplifiedState(EvitaEnumConverter.toGrpcSimplifiedStatus(state)); - } - return evitaService.listTaskStatuses(builder.build()) - .get(timeoutToUse.timeout(), timeoutToUse.timeoutUnit()); + final GrpcTaskStatusesRequest.Builder builder = GrpcTaskStatusesRequest.newBuilder() + .setPageNumber(page) + .setPageSize(pageSize); + if (taskType != null) { + for (String theTaskType : taskType) { + builder.addTaskType(StringValue.of(theTaskType)); } + } + for (TaskSimplifiedState state : states) { + builder.addSimplifiedState(EvitaEnumConverter.toGrpcSimplifiedStatus(state)); + } + final GrpcTaskStatusesRequest request = builder.build(); + + final GrpcTaskStatusesResponse response = executeWithEvitaService( + evitaService -> evitaService.listTaskStatuses(request) ); return new PaginatedList<>( @@ -294,16 +284,11 @@ public Task restoreCatalog(@Nonnull String catalogName, @Nonnull UUID f public Optional> getTaskStatus(@Nonnull UUID jobId) { this.evitaClient.assertActive(); + final GrpcTaskStatusRequest request = GrpcTaskStatusRequest.newBuilder() + .setTaskId(toGrpcUuid(jobId)) + .build(); final GrpcTaskStatusResponse response = executeWithEvitaService( - this.evitaManagementServiceFutureStub, - evitaService -> { - final Timeout timeoutToUse = this.evitaClient.timeout.get().peek(); - return evitaService.getTaskStatus( - GrpcTaskStatusRequest.newBuilder() - .setTaskId(toGrpcUuid(jobId)) - .build() - ).get(timeoutToUse.timeout(), timeoutToUse.timeoutUnit()); - } + evitaService -> evitaService.getTaskStatus(request) ); return response.hasTaskStatus() ? @@ -315,18 +300,13 @@ public Task restoreCatalog(@Nonnull String catalogName, @Nonnull UUID f public Collection> getTaskStatuses(@Nonnull UUID... jobId) { this.evitaClient.assertActive(); + final Builder builder = GrpcSpecifiedTaskStatusesRequest.newBuilder(); + for (UUID id : jobId) { + builder.addTaskIds(toGrpcUuid(id)); + } + final GrpcSpecifiedTaskStatusesRequest request = builder.build(); final GrpcSpecifiedTaskStatusesResponse response = executeWithEvitaService( - this.evitaManagementServiceFutureStub, - evitaService -> { - final Timeout timeoutToUse = this.evitaClient.timeout.get().peek(); - final Builder builder = GrpcSpecifiedTaskStatusesRequest.newBuilder(); - for (UUID id : jobId) { - builder.addTaskIds(toGrpcUuid(id)); - } - return evitaService.getTaskStatuses( - builder.build() - ).get(timeoutToUse.timeout(), timeoutToUse.timeoutUnit()); - } + evitaService -> evitaService.getTaskStatuses(request) ); return response.getTaskStatusList() @@ -339,16 +319,13 @@ public Task restoreCatalog(@Nonnull String catalogName, @Nonnull UUID f public boolean cancelTask(@Nonnull UUID jobId) { this.evitaClient.assertActive(); + final GrpcCancelTaskRequest request = GrpcCancelTaskRequest.newBuilder() + .setTaskId(toGrpcUuid(jobId)) + .build(); final GrpcCancelTaskResponse response = executeWithEvitaService( - this.evitaManagementServiceFutureStub, - evitaService -> { - final Timeout timeoutToUse = this.evitaClient.timeout.get().peek(); - return evitaService.cancelTask( - GrpcCancelTaskRequest.newBuilder() - .setTaskId(toGrpcUuid(jobId)) - .build() - ).get(timeoutToUse.timeout(), timeoutToUse.timeoutUnit()); - } + evitaService -> evitaService.cancelTask( + request + ) ); return response.getSuccess(); @@ -359,17 +336,12 @@ public boolean cancelTask(@Nonnull UUID jobId) { public PaginatedList listFilesToFetch(int page, int pageSize, @Nullable String origin) { this.evitaClient.assertActive(); + final GrpcFilesToFetchRequest request = GrpcFilesToFetchRequest.newBuilder() + .setPageNumber(page) + .setPageSize(pageSize) + .build(); final GrpcFilesToFetchResponse response = executeWithEvitaService( - this.evitaManagementServiceFutureStub, - evitaService -> { - final Timeout timeoutToUse = this.evitaClient.timeout.get().peek(); - return evitaService.listFilesToFetch( - GrpcFilesToFetchRequest.newBuilder() - .setPageNumber(page) - .setPageSize(pageSize) - .build() - ).get(timeoutToUse.timeout(), timeoutToUse.timeoutUnit()); - } + evitaService -> evitaService.listFilesToFetch(request) ); return new PaginatedList<>( @@ -388,16 +360,11 @@ public PaginatedList listFilesToFetch(int page, int pageSize, @Nul public Optional getFileToFetch(@Nonnull UUID fileId) { this.evitaClient.assertActive(); + final GrpcFileToFetchRequest request = GrpcFileToFetchRequest.newBuilder() + .setFileId(toGrpcUuid(fileId)) + .build(); final GrpcFileToFetchResponse response = executeWithEvitaService( - this.evitaManagementServiceFutureStub, - evitaService -> { - final Timeout timeoutToUse = this.evitaClient.timeout.get().peek(); - return evitaService.getFileToFetch( - GrpcFileToFetchRequest.newBuilder() - .setFileId(toGrpcUuid(fileId)) - .build() - ).get(timeoutToUse.timeout(), timeoutToUse.timeoutUnit()); - } + evitaService -> evitaService.getFileToFetch(request) ); return response.hasFileToFetch() ? @@ -414,8 +381,7 @@ public InputStream fetchFile(@Nonnull UUID fileId) throws FileForFetchNotFoundEx CompletableFuture downloadFuture = new CompletableFuture<>(); // Download the file asynchronously - executeWithEvitaService( - this.evitaManagementServiceStub, + executeWithEvitaBlockingService( evitaService -> { evitaService.fetchFile( GrpcFetchFileRequest.newBuilder().setFileId(toGrpcUuid(fileId)).build(), @@ -470,16 +436,11 @@ public void close() throws IOException { public void deleteFile(@Nonnull UUID fileId) throws FileForFetchNotFoundException { this.evitaClient.assertActive(); + final GrpcDeleteFileToFetchRequest request = GrpcDeleteFileToFetchRequest.newBuilder() + .setFileId(toGrpcUuid(fileId)) + .build(); final GrpcDeleteFileToFetchResponse response = executeWithEvitaService( - this.evitaManagementServiceFutureStub, - evitaService -> { - final Timeout timeoutToUse = this.evitaClient.timeout.get().peek(); - return evitaService.deleteFile( - GrpcDeleteFileToFetchRequest.newBuilder() - .setFileId(toGrpcUuid(fileId)) - .build() - ).get(timeoutToUse.timeout(), timeoutToUse.timeoutUnit()); - } + evitaService -> evitaService.deleteFile(request) ); if (!response.getSuccess()) { @@ -492,21 +453,17 @@ public void deleteFile(@Nonnull UUID fileId) throws FileForFetchNotFoundExceptio public SystemStatus getSystemStatus() { this.evitaClient.assertActive(); - return executeWithEvitaService( - this.evitaManagementServiceFutureStub, - evitaService -> { - final Timeout timeoutToUse = this.evitaClient.timeout.get().peek(); - final GrpcEvitaServerStatusResponse response = evitaService.serverStatus(Empty.newBuilder().build()) - .get(timeoutToUse.timeout(), timeoutToUse.timeoutUnit()); - return new SystemStatus( - response.getVersion(), - EvitaDataTypesConverter.toOffsetDateTime(response.getStartedAt()), - Duration.of(response.getUptime(), ChronoUnit.SECONDS), - response.getInstanceId(), - response.getCatalogsCorrupted(), - response.getCatalogsOk() - ); - } + final GrpcEvitaServerStatusResponse response = executeWithEvitaService( + evitaService -> evitaService.serverStatus(Empty.newBuilder().build()) + ); + + return new SystemStatus( + response.getVersion(), + EvitaDataTypesConverter.toOffsetDateTime(response.getStartedAt()), + Duration.of(response.getUptime(), ChronoUnit.SECONDS), + response.getInstanceId(), + response.getCatalogsCorrupted(), + response.getCatalogsOk() ); } @@ -515,15 +472,11 @@ public SystemStatus getSystemStatus() { public String getConfiguration() { this.evitaClient.assertActive(); - return executeWithEvitaService( - this.evitaManagementServiceFutureStub, - evitaService -> { - final Timeout timeoutToUse = this.evitaClient.timeout.get().peek(); - final GrpcEvitaConfigurationResponse response = evitaService.getConfiguration(Empty.newBuilder().build()) - .get(timeoutToUse.timeout(), timeoutToUse.timeoutUnit()); - return response.getConfiguration(); - } + final GrpcEvitaConfigurationResponse response = executeWithEvitaService( + evitaService -> evitaService.getConfiguration(Empty.newBuilder().build()) ); + + return response.getConfiguration(); } @Override @@ -546,4 +499,65 @@ public ClientTask createTask(@Nonnull TaskStatus taskStatus) return this.clientTaskTracker.createTask(taskStatus); } + /** + * Method that is called within the {@link EvitaClientSession} to apply the wanted logic on a channel retrieved + * from a channel pool. + * + * @param lambda function that holds a logic passed by the caller + * @param return type of the function + * @return result of the applied function + */ + private T executeWithEvitaBlockingService( + @Nonnull AsyncCallFunction lambda + ) { + final Timeout timeout = this.evitaClient.timeout.get().peek(); + try { + return lambda.apply( + this.evitaManagementServiceStub.get(timeout) + ); + } catch (ExecutionException e) { + throw EvitaClient.transformException( + e.getCause() == null ? e : e.getCause(), + () -> {} + ); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new EvitaClientServerCallException("Server call interrupted.", e); + } catch (TimeoutException e) { + throw new EvitaClientTimedOutException( + timeout.timeout(), timeout.timeoutUnit() + ); + } + } + + /** + * Method that is called within the {@link EvitaClientSession} to apply the wanted logic on a channel retrieved + * from a channel pool. + * + * @param lambda function that holds a logic passed by the caller + * @param return type of the function + * @return result of the applied function + */ + private T executeWithEvitaService( + @Nonnull AsyncCallFunction> lambda + ) { + final Timeout timeout = this.evitaClient.timeout.get().peek(); + try { + return lambda.apply(this.evitaManagementServiceFutureStub.get(timeout)) + .get(timeout.timeout(), timeout.timeoutUnit()); + } catch (ExecutionException e) { + throw EvitaClient.transformException( + e.getCause() == null ? e : e.getCause(), + () -> {} + ); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new EvitaClientServerCallException("Server call interrupted.", e); + } catch (TimeoutException e) { + throw new EvitaClientTimedOutException( + timeout.timeout(), timeout.timeoutUnit() + ); + } + } + } diff --git a/evita_external_api/evita_external_api_grpc/client/src/main/java/io/evitadb/driver/EvitaClientSession.java b/evita_external_api/evita_external_api_grpc/client/src/main/java/io/evitadb/driver/EvitaClientSession.java index 6fe19f2960..bc9e580b9c 100644 --- a/evita_external_api/evita_external_api_grpc/client/src/main/java/io/evitadb/driver/EvitaClientSession.java +++ b/evita_external_api/evita_external_api_grpc/client/src/main/java/io/evitadb/driver/EvitaClientSession.java @@ -86,7 +86,6 @@ import io.evitadb.driver.exception.EvitaClientTimedOutException; import io.evitadb.driver.interceptor.ClientSessionInterceptor.SessionIdHolder; import io.evitadb.driver.requestResponse.schema.ClientCatalogSchemaDecorator; -import io.evitadb.exception.EvitaInternalError; import io.evitadb.exception.EvitaInvalidUsageException; import io.evitadb.exception.GenericEvitaInternalError; import io.evitadb.externalApi.grpc.dataType.EvitaDataTypesConverter; @@ -107,8 +106,6 @@ import io.evitadb.utils.Assert; import io.evitadb.utils.ReflectionLookup; import io.grpc.ClientCall; -import io.grpc.Status.Code; -import io.grpc.StatusRuntimeException; import io.grpc.stub.ClientCalls; import io.grpc.stub.StreamObserver; import lombok.EqualsAndHashCode; @@ -134,7 +131,6 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; -import java.util.regex.Matcher; import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -143,7 +139,6 @@ import static io.evitadb.api.query.QueryConstraints.entityFetch; import static io.evitadb.api.query.QueryConstraints.require; import static io.evitadb.api.requestResponse.schema.ClassSchemaAnalyzer.extractEntityTypeFromClass; -import static io.evitadb.driver.EvitaClient.ERROR_MESSAGE_PATTERN; import static io.evitadb.externalApi.grpc.dataType.EvitaDataTypesConverter.toGrpcOffsetDateTime; import static io.evitadb.externalApi.grpc.dataType.EvitaDataTypesConverter.toOffsetDateTime; import static io.evitadb.externalApi.grpc.dataType.EvitaDataTypesConverter.toTaskStatus; @@ -188,11 +183,11 @@ public class EvitaClientSession implements EvitaSessionContract { /** * Entity session service that works with futures. */ - private final EvitaSessionServiceFutureStub evitaSessionServiceFutureStub; + private final StubTimeoutProxy evitaSessionServiceFutureStub; /** * Entity session service. */ - private final EvitaSessionServiceStub evitaSessionServiceStub; + private final StubTimeoutProxy evitaSessionServiceStub; /** * Contains reference to the catalog name targeted by queries / mutations from this session. */ @@ -297,8 +292,8 @@ public EvitaClientSession( this.reflectionLookup = evita.getReflectionLookup(); this.proxyFactory = schemaCache.getProxyFactory(); this.schemaCache = schemaCache; - this.evitaSessionServiceFutureStub = grpcClientBuilder.build(EvitaSessionServiceFutureStub.class); - this.evitaSessionServiceStub = grpcClientBuilder.build(EvitaSessionServiceStub.class); + this.evitaSessionServiceFutureStub = new StubTimeoutProxy<>(grpcClientBuilder.build(EvitaSessionServiceFutureStub.class)); + this.evitaSessionServiceStub = new StubTimeoutProxy<>(grpcClientBuilder.build(EvitaSessionServiceStub.class)); this.catalogName = catalogName; this.catalogState = catalogState; this.commitBehaviour = commitBehaviour; @@ -1649,23 +1644,20 @@ private List queryListInternal( private T executeWithBlockingEvitaSessionService( @Nonnull AsyncCallFunction> lambda ) { - final Timeout timeout = callTimeout.peek(); + final Timeout timeout = this.callTimeout.peek(); try { - return executeWithEvitaSessionService( - lambda, this.evitaSessionServiceFutureStub.withDeadlineAfter( - timeout.timeout(), timeout.timeoutUnit() - ) - ).get(timeout.timeout(), timeout.timeoutUnit()); + SessionIdHolder.setSessionId(getId().toString()); + return lambda.apply(this.evitaSessionServiceFutureStub.get(timeout)) + .get(timeout.timeout(), timeout.timeoutUnit()); } catch (ExecutionException e) { - if (e.getCause() instanceof EvitaInvalidUsageException invalidUsageException) { - throw invalidUsageException; - } else if (e.getCause() instanceof EvitaInternalError internalError) { - throw internalError; - } else if (e.getCause() instanceof StatusRuntimeException statusRuntimeException) { - throw transformStatusRuntimeException(statusRuntimeException); - } else { - throw new EvitaClientServerCallException("Server call failed.", e.getCause()); - } + throw EvitaClient.transformException( + e.getCause() == null ? e : e.getCause(), + () -> { + // close session and rethrow + final CompletableFuture future = closeInternally(); + future.complete(0L); + } + ); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new EvitaClientServerCallException("Server call interrupted.", e); @@ -1673,6 +1665,8 @@ private T executeWithBlockingEvitaSessionService( throw new EvitaClientTimedOutException( timeout.timeout(), timeout.timeoutUnit() ); + } finally { + SessionIdHolder.reset(); } } @@ -1682,81 +1676,34 @@ private T executeWithBlockingEvitaSessionService( * * @param lambda function that holds a logic passed by the caller */ - private void executeWithAsyncEvitaSessionService( - @Nonnull AsyncCallFunction lambda - ) { - executeWithEvitaSessionService( - lambda, this.evitaSessionServiceStub - ); - } - - /** - * Method that is called within the {@link EvitaClientSession} to apply the wanted logic on a channel retrieved - * from a channel pool. - * - * @param lambda function that holds a logic passed by the caller - * @param return type of the function - * @param type of the expected stub - * @return result of the applied function - */ - private T executeWithEvitaSessionService( - @Nonnull AsyncCallFunction lambda, @Nonnull S stub + private T executeWithAsyncEvitaSessionService( + @Nonnull AsyncCallFunction lambda ) { + final Timeout timeout = this.callTimeout.peek(); try { SessionIdHolder.setSessionId(getId().toString()); - return lambda.apply(stub); - } catch (StatusRuntimeException statusRuntimeException) { - throw transformStatusRuntimeException(statusRuntimeException); - } catch (EvitaInvalidUsageException | EvitaInternalError evitaError) { - throw evitaError; - } catch (Throwable e) { - log.error("Unexpected internal Evita error occurred: {}", e.getMessage(), e); - throw new GenericEvitaInternalError( - "Unexpected internal Evita error occurred: " + e.getMessage(), - "Unexpected internal Evita error occurred.", - e + return lambda.apply(this.evitaSessionServiceStub.get(timeout)); + } catch (ExecutionException e) { + throw EvitaClient.transformException( + e.getCause() == null ? e : e.getCause(), + () -> { + // close session and rethrow + final CompletableFuture future = closeInternally(); + future.complete(0L); + } + ); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new EvitaClientServerCallException("Server call interrupted.", e); + } catch (TimeoutException e) { + throw new EvitaClientTimedOutException( + timeout.timeout(), timeout.timeoutUnit() ); } finally { SessionIdHolder.reset(); } } - /** - * Handles a {@link StatusRuntimeException} by checking the status code and performing appropriate actions. - * - * @param statusRuntimeException the {@link StatusRuntimeException} to handle - */ - @Nonnull - private RuntimeException transformStatusRuntimeException(@Nonnull StatusRuntimeException statusRuntimeException) { - final Code statusCode = statusRuntimeException.getStatus().getCode(); - final String description = ofNullable(statusRuntimeException.getStatus().getDescription()) - .orElse("No description."); - if (statusCode == Code.UNAUTHENTICATED) { - // close session and rethrow - final CompletableFuture future = closeInternally(); - future.complete(0L); - return new InstanceTerminatedException("session"); - } else if (statusCode == Code.INVALID_ARGUMENT) { - final Matcher expectedFormat = ERROR_MESSAGE_PATTERN.matcher(description); - if (expectedFormat.matches()) { - return EvitaInvalidUsageException.createExceptionWithErrorCode( - expectedFormat.group(2), expectedFormat.group(1) - ); - } else { - return new EvitaInvalidUsageException(description); - } - } else { - final Matcher expectedFormat = ERROR_MESSAGE_PATTERN.matcher(description); - if (expectedFormat.matches()) { - return GenericEvitaInternalError.createExceptionWithErrorCode( - expectedFormat.group(2), expectedFormat.group(1) - ); - } else { - return new GenericEvitaInternalError(description); - } - } - } - @Nonnull private Optional queryOneInternal( @Nonnull Query query, diff --git a/evita_external_api/evita_external_api_grpc/client/src/main/java/io/evitadb/driver/StubTimeoutProxy.java b/evita_external_api/evita_external_api_grpc/client/src/main/java/io/evitadb/driver/StubTimeoutProxy.java new file mode 100644 index 0000000000..77b471d94f --- /dev/null +++ b/evita_external_api/evita_external_api_grpc/client/src/main/java/io/evitadb/driver/StubTimeoutProxy.java @@ -0,0 +1,83 @@ +/* + * + * _ _ ____ ____ + * _____ _(_) |_ __ _| _ \| __ ) + * / _ \ \ / / | __/ _` | | | | _ \ + * | __/\ V /| | || (_| | |_| | |_) | + * \___| \_/ |_|\__\__,_|____/|____/ + * + * Copyright (c) 2024 + * + * Licensed under the Business Source License, Version 1.1 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/FgForrest/evitaDB/blob/master/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.evitadb.driver; + + +import io.grpc.stub.AbstractStub; +import lombok.RequiredArgsConstructor; + +import javax.annotation.Nonnull; +import java.util.Objects; + +/** + * This proxy class is used to access gRPC stubs of particular type with a specific timeout. If the timeout is the same + * as the last one used, then the last stub is returned. Otherwise, a new stub with extended timeout is created. + * + * It is expected that the stubs with same timeouts will be reused, so the optimization for reusing the last used stub + * should be beneficial even if the stubs are quite cheap. + * + * @author Jan Novotný (novotny@fg.cz), FG Forrest a.s. (c) 2024 + */ +@RequiredArgsConstructor +class StubTimeoutProxy> { + /** + * The gRPC stub instance that is used to create new stubs with extended timeouts. + */ + private final T stub; + /** + * The last used gRPC stub with its associated timeout configuration. + */ + private StubTimeout lastUsedStub; + + /** + * Retrieves a gRPC stub with the specified timeout. If the timeout is different from the last used + * timeout, a new stub with the extended timeout is created. Otherwise, the previously used stub + * is returned. + * + * @param timeout The timeout configuration to be applied to the gRPC stub. + * @return A gRPC stub with the specified timeout configuration. + */ + public T get(@Nonnull Timeout timeout) { + if (this.lastUsedStub == null || !Objects.equals(this.lastUsedStub.timeout(), timeout)) { + final T stubWithExtendedTimeout = this.stub.withDeadlineAfter(timeout.timeout(), timeout.timeoutUnit()); + this.lastUsedStub = new StubTimeout<>(stubWithExtendedTimeout, timeout); + return stubWithExtendedTimeout; + } else { + return this.lastUsedStub.stub(); + } + } + + /** + * A record that holds a gRPC stub and its associated timeout configuration. + * + * @param The type of the gRPC stub. + * @param stub The gRPC stub instance. + * @param timeout The timeout configuration to be applied to the gRPC stub. + */ + private record StubTimeout( + @Nonnull T stub, + @Nonnull Timeout timeout + ) { } + +} diff --git a/evita_external_api/evita_external_api_grpc/server/src/main/java/io/evitadb/externalApi/grpc/services/EvitaManagementService.java b/evita_external_api/evita_external_api_grpc/server/src/main/java/io/evitadb/externalApi/grpc/services/EvitaManagementService.java index 5438c75a26..f0f34a6b88 100644 --- a/evita_external_api/evita_external_api_grpc/server/src/main/java/io/evitadb/externalApi/grpc/services/EvitaManagementService.java +++ b/evita_external_api/evita_external_api_grpc/server/src/main/java/io/evitadb/externalApi/grpc/services/EvitaManagementService.java @@ -45,20 +45,16 @@ import io.evitadb.externalApi.api.system.ProbesProvider.ApiState; import io.evitadb.externalApi.api.system.ProbesProvider.Readiness; import io.evitadb.externalApi.configuration.AbstractApiConfiguration; -import io.evitadb.externalApi.grpc.constants.GrpcHeaders; import io.evitadb.externalApi.grpc.dataType.EvitaDataTypesConverter; import io.evitadb.externalApi.grpc.generated.*; import io.evitadb.externalApi.grpc.generated.GrpcTaskStatusesResponse.Builder; import io.evitadb.externalApi.grpc.requestResponse.EvitaEnumConverter; -import io.evitadb.externalApi.grpc.services.interceptors.ServerSessionInterceptor; import io.evitadb.externalApi.http.ExternalApiProvider; import io.evitadb.externalApi.http.ExternalApiServer; -import io.evitadb.externalApi.trace.ExternalApiTracingContextProvider; import io.evitadb.utils.Assert; import io.evitadb.utils.ClassifierUtils; import io.evitadb.utils.ClassifierUtils.Keyword; import io.evitadb.utils.UUIDUtil; -import io.grpc.Metadata; import io.grpc.stub.StreamObserver; import lombok.extern.slf4j.Slf4j; @@ -75,7 +71,6 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicLong; import static io.evitadb.externalApi.grpc.dataType.EvitaDataTypesConverter.toGrpcOffsetDateTime; @@ -83,6 +78,7 @@ import static io.evitadb.externalApi.grpc.dataType.EvitaDataTypesConverter.toUuid; import static io.evitadb.externalApi.grpc.requestResponse.EvitaEnumConverter.toGrpcHealthProblem; import static io.evitadb.externalApi.grpc.requestResponse.EvitaEnumConverter.toGrpcReadinessState; +import static io.evitadb.externalApi.grpc.services.EvitaService.executeWithClientContext; import static io.evitadb.externalApi.grpc.services.interceptors.GlobalExceptionHandlerInterceptor.sendErrorToClient; import static java.util.Optional.ofNullable; @@ -106,35 +102,6 @@ public class EvitaManagementService extends EvitaManagementServiceGrpc.EvitaMana */ @Nonnull private final EvitaManagement management; - /** - * Executes entire lambda function within the scope of a tracing context. - * - * @param lambda lambda function to be executed - * @param executor executor service to be used as a carrier for a lambda function - */ - private static void executeWithClientContext( - @Nonnull Runnable lambda, - @Nonnull ExecutorService executor, - @Nonnull StreamObserver responseObserver - ) { - final Metadata metadata = ServerSessionInterceptor.METADATA.get(); - ExternalApiTracingContextProvider.getContext() - .executeWithinBlock( - GrpcHeaders.getGrpcTraceTaskNameWithMethodName(metadata), - metadata, - () -> executor.execute( - () -> { - try { - lambda.run(); - } catch (RuntimeException exception) { - // Delegate exception handling to GlobalExceptionHandlerInterceptor - sendErrorToClient(exception, responseObserver); - } - } - ) - ); - } - /** * Deletes temporary file if it exists. * diff --git a/evita_external_api/evita_external_api_grpc/server/src/main/java/io/evitadb/externalApi/grpc/services/EvitaService.java b/evita_external_api/evita_external_api_grpc/server/src/main/java/io/evitadb/externalApi/grpc/services/EvitaService.java index eff2ddffab..d7a3b623aa 100644 --- a/evita_external_api/evita_external_api_grpc/server/src/main/java/io/evitadb/externalApi/grpc/services/EvitaService.java +++ b/evita_external_api/evita_external_api_grpc/server/src/main/java/io/evitadb/externalApi/grpc/services/EvitaService.java @@ -30,6 +30,7 @@ import io.evitadb.api.SessionTraits.SessionFlags; import io.evitadb.api.requestResponse.schema.mutation.TopLevelCatalogSchemaMutation; import io.evitadb.core.Evita; +import io.evitadb.core.async.ObservableExecutorServiceWithHardDeadline; import io.evitadb.externalApi.event.ReadinessEvent; import io.evitadb.externalApi.event.ReadinessEvent.Prospective; import io.evitadb.externalApi.event.ReadinessEvent.Result; @@ -41,6 +42,8 @@ import io.evitadb.externalApi.grpc.services.interceptors.ServerSessionInterceptor; import io.evitadb.externalApi.trace.ExternalApiTracingContextProvider; import io.evitadb.utils.UUIDUtil; +import io.grpc.Context; +import io.grpc.Deadline; import io.grpc.Metadata; import io.grpc.stub.StreamObserver; import lombok.extern.slf4j.Slf4j; @@ -49,7 +52,7 @@ import javax.annotation.Nullable; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; import static io.evitadb.externalApi.grpc.requestResponse.EvitaEnumConverter.toGrpcCatalogState; @@ -94,17 +97,19 @@ private static SessionFlags[] getSessionFlags(GrpcSessionType sessionType, boole * @param lambda lambda function to be executed * @param executor executor service to be used as a carrier for a lambda function */ - private static void executeWithClientContext( + public static void executeWithClientContext( @Nonnull Runnable lambda, - @Nonnull ExecutorService executor, + @Nonnull ObservableExecutorServiceWithHardDeadline executor, @Nonnull StreamObserver responseObserver ) { + // Retrieve the deadline from the context + final Deadline deadline = Context.current().getDeadline(); final Metadata metadata = ServerSessionInterceptor.METADATA.get(); - ExternalApiTracingContextProvider.getContext() - .executeWithinBlock( - GrpcHeaders.getGrpcTraceTaskNameWithMethodName(metadata), - metadata, - () -> executor.execute( + final String methodName = GrpcHeaders.getGrpcTraceTaskNameWithMethodName(metadata); + final Runnable theMethod = + () -> executor.execute( + executor.createTask( + methodName, () -> { try { lambda.run(); @@ -112,9 +117,18 @@ private static void executeWithClientContext( // Delegate exception handling to GlobalExceptionHandlerInterceptor GlobalExceptionHandlerInterceptor.sendErrorToClient(exception, responseObserver); } - } + }, + deadline == null ? + executor.getDefaultTimeoutInMilliseconds() : deadline.timeRemaining(TimeUnit.MILLISECONDS) ) ); + + ExternalApiTracingContextProvider.getContext() + .executeWithinBlock( + methodName, + metadata, + theMethod + ); } public EvitaService(@Nonnull Evita evita) { @@ -241,7 +255,8 @@ public void defineCatalog(GrpcDefineCatalogRequest request, StreamObserver lambda, - @Nonnull ExecutorService executor, + @Nonnull ObservableExecutorServiceWithHardDeadline executor, @Nonnull StreamObserver responseObserver ) { + // Retrieve the deadline from the context + final Deadline deadline = Context.current().getDeadline(); final Metadata metadata = ServerSessionInterceptor.METADATA.get(); - ExternalApiTracingContextProvider.getContext() - .executeWithinBlock( - GrpcHeaders.getGrpcTraceTaskNameWithMethodName(metadata), - metadata, - () -> { - final EvitaInternalSessionContract session = ServerSessionInterceptor.SESSION.get(); - executor.execute( + final String methodName = GrpcHeaders.getGrpcTraceTaskNameWithMethodName(metadata); + final Runnable theMethod = + () -> { + final EvitaInternalSessionContract session = ServerSessionInterceptor.SESSION.get(); + executor.execute( + executor.createTask( + methodName, () -> { try { lambda.accept(session); @@ -143,9 +148,18 @@ private static void executeWithClientContext( // Delegate exception handling to GlobalExceptionHandlerInterceptor GlobalExceptionHandlerInterceptor.sendErrorToClient(exception, responseObserver); } - } - ); - } + }, + deadline == null ? + executor.getDefaultTimeoutInMilliseconds() : deadline.timeRemaining(TimeUnit.MILLISECONDS) + ) + ); + }; + + ExternalApiTracingContextProvider.getContext() + .executeWithinBlock( + methodName, + metadata, + theMethod ); } @@ -314,9 +328,10 @@ private static void queryInternal( .setPrimaryKey(((EntityReference) e).getPrimaryKey()) .build()) ); - entityBuilder.setRecordPage(dataChunkBuilder - .addAllEntityReferences(entityReferences) - .build() + entityBuilder.setRecordPage( + dataChunkBuilder + .addAllEntityReferences(entityReferences) + .build() ) .build(); } diff --git a/evita_functional_tests/src/test/java/io/evitadb/driver/EvitaClientReadOnlyTest.java b/evita_functional_tests/src/test/java/io/evitadb/driver/EvitaClientReadOnlyTest.java index 468eae4c3b..e07826666d 100644 --- a/evita_functional_tests/src/test/java/io/evitadb/driver/EvitaClientReadOnlyTest.java +++ b/evita_functional_tests/src/test/java/io/evitadb/driver/EvitaClientReadOnlyTest.java @@ -1413,7 +1413,7 @@ void shouldTranslateErrorCorrectlyAndLeaveSessionOpen(EvitaClient evitaClient) { clientSession.getEntity("nonExisting", 1, entityFetchAll().getRequirements()); } catch (EvitaInvalidUsageException ex) { assertTrue(clientSession.isActive()); - assertEquals("No collection found for entity type `nonExisting`!", ex.getPublicMessage()); + assertTrue(ex.getPublicMessage().contains("No collection found for entity type `nonExisting`!")); assertEquals(ex.getPrivateMessage(), ex.getPublicMessage()); assertNotNull(ex.getErrorCode()); } finally { diff --git a/evita_functional_tests/src/test/java/io/evitadb/driver/EvitaClientReadWriteTest.java b/evita_functional_tests/src/test/java/io/evitadb/driver/EvitaClientReadWriteTest.java index 9fc599fbd0..2d4ea4ca6c 100644 --- a/evita_functional_tests/src/test/java/io/evitadb/driver/EvitaClientReadWriteTest.java +++ b/evita_functional_tests/src/test/java/io/evitadb/driver/EvitaClientReadWriteTest.java @@ -1759,7 +1759,8 @@ private static EvitaManagementServiceFutureStub getManagementStubInternal( final EvitaManagementContract management = evitaClient.management(); final Field evitaManagementServiceStub = management.getClass().getDeclaredField("evitaManagementServiceFutureStub"); evitaManagementServiceStub.setAccessible(true); - return (EvitaManagementServiceFutureStub) evitaManagementServiceStub.get(management); + return ((StubTimeoutProxy) evitaManagementServiceStub.get(management)) + .get(evitaClient.timeout.get().peek()); } catch (Exception ex) { throw new RuntimeException(ex); } diff --git a/evita_functional_tests/src/test/java/io/evitadb/server/EvitaServerTest.java b/evita_functional_tests/src/test/java/io/evitadb/server/EvitaServerTest.java index 90ba7eee25..d1aa57d1ef 100644 --- a/evita_functional_tests/src/test/java/io/evitadb/server/EvitaServerTest.java +++ b/evita_functional_tests/src/test/java/io/evitadb/server/EvitaServerTest.java @@ -341,7 +341,7 @@ void shouldRestrictAccessViaNonSupportedProtocolsAndPortsExceptLab() { evitaClientBadPort.getCatalogNames(); fail("gRPC call should have failed on bad port!"); } catch (Exception ex) { - assertEquals("io.grpc.StatusRuntimeException: UNIMPLEMENTED: HTTP status code 404", ex.getCause().getMessage()); + assertEquals("UNIMPLEMENTED: HTTP status code 404", ex.getMessage()); } final EvitaClient evitaClientBadScheme = new EvitaClient( @@ -356,7 +356,7 @@ void shouldRestrictAccessViaNonSupportedProtocolsAndPortsExceptLab() { evitaClientBadScheme.getCatalogNames(); fail("gRPC call should have failed on bad scheme!"); } catch (Exception ex) { - assertEquals("io.grpc.StatusRuntimeException: UNAVAILABLE", ex.getCause().getMessage()); + assertEquals("UNAVAILABLE", ex.getMessage()); } // we should be able to access gRCP via correct scheme and port