diff --git a/evita_api/src/main/java/io/evitadb/api/EvitaSessionContract.java b/evita_api/src/main/java/io/evitadb/api/EvitaSessionContract.java index 1d220470aa..0b533fa2cd 100644 --- a/evita_api/src/main/java/io/evitadb/api/EvitaSessionContract.java +++ b/evita_api/src/main/java/io/evitadb/api/EvitaSessionContract.java @@ -1138,5 +1138,4 @@ default boolean isTransactionOpen() { */ @Nonnull ProxyFactory getProxyFactory(); - } diff --git a/evita_engine/src/main/java/io/evitadb/core/EvitaInternalSessionContract.java b/evita_engine/src/main/java/io/evitadb/core/EvitaInternalSessionContract.java index 79850666b2..b1f1d791e5 100644 --- a/evita_engine/src/main/java/io/evitadb/core/EvitaInternalSessionContract.java +++ b/evita_engine/src/main/java/io/evitadb/core/EvitaInternalSessionContract.java @@ -143,6 +143,14 @@ > T query(@Nonnull EvitaReque */ void execute(@Nonnull Consumer logic) throws TransactionException; + /** + * Returns true if there is active method invocation in place. When method is running, it is not possible to + * kill session due to inactivity. + * + * @return true if there is active method invocation in place + */ + boolean methodIsRunning(); + /** * Retrieves a CompletableFuture that represents the finalization status of a session. If the catalog is in * transactional mode, the future will respect the requested {@link CommitBehavior} bound to the current transaction. diff --git a/evita_engine/src/main/java/io/evitadb/core/EvitaSession.java b/evita_engine/src/main/java/io/evitadb/core/EvitaSession.java index 65b8766826..cd1938db1c 100644 --- a/evita_engine/src/main/java/io/evitadb/core/EvitaSession.java +++ b/evita_engine/src/main/java/io/evitadb/core/EvitaSession.java @@ -1218,14 +1218,14 @@ public Task backupCatalog(@Nullable OffsetDateTime pastMoment, @Nonnull @Override public Optional getOpenedTransactionId() { - return ofNullable(transactionAccessor.get()) + return ofNullable(this.transactionAccessor.get()) .filter(it -> !it.isClosed()) .map(Transaction::getTransactionId); } @Override public boolean isRollbackOnly() { - return ofNullable(transactionAccessor.get()) + return ofNullable(this.transactionAccessor.get()) .map(Transaction::isRollbackOnly) .orElse(false); } @@ -1243,22 +1243,22 @@ public void setRollbackOnly() { @Override public boolean isReadOnly() { - return !sessionTraits.isReadWrite(); + return !this.sessionTraits.isReadWrite(); } @Override public boolean isBinaryFormat() { - return sessionTraits.isBinary(); + return this.sessionTraits.isBinary(); } @Override public boolean isDryRun() { - return sessionTraits.isDryRun(); + return this.sessionTraits.isDryRun(); } @Override public long getInactivityDurationInSeconds() { - return (System.currentTimeMillis() - lastCall) / 1000; + return (System.currentTimeMillis() - this.lastCall) / 1000; } @Interruptible @@ -1335,6 +1335,11 @@ public void execute(@Nonnull Consumer logic) { ); } + @Override + public boolean methodIsRunning() { + return false; + } + /** * Retrieves a CompletableFuture that represents the finalization status of a session. If the catalog is in * transactional mode, the future will respect the requested {@link CommitBehavior} bound to the current transaction. diff --git a/evita_engine/src/main/java/io/evitadb/core/SessionRegistry.java b/evita_engine/src/main/java/io/evitadb/core/SessionRegistry.java index 0146d55c76..c340d2e680 100644 --- a/evita_engine/src/main/java/io/evitadb/core/SessionRegistry.java +++ b/evita_engine/src/main/java/io/evitadb/core/SessionRegistry.java @@ -68,6 +68,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import java.util.stream.Stream; @@ -259,9 +260,22 @@ private interface EvitaProxyFinalization { * supports JDK proxies out-of-the-box so this shouldn't be a problem in the future. */ private static class EvitaSessionProxy implements InvocationHandler { + private final static Method IS_METHOD_RUNNING; + private final static Method INACTIVITY_IN_SECONDS; private final EvitaSession evitaSession; private final TracingContext tracingContext; @Getter private final ClosedEvent sessionClosedEvent; + private final AtomicInteger insideInvocation = new AtomicInteger(0); + private final AtomicLong lastCall = new AtomicLong(System.currentTimeMillis()); + + static { + try { + IS_METHOD_RUNNING = EvitaInternalSessionContract.class.getMethod("methodIsRunning"); + INACTIVITY_IN_SECONDS = EvitaInternalSessionContract.class.getMethod("getInactivityDurationInSeconds"); + } catch (NoSuchMethodException ex) { + throw new GenericEvitaInternalError("Method not found.", ex); + } + } /** * Handles arguments printing. @@ -311,6 +325,10 @@ public Object invoke(Object proxy, Method method, Object[] args) { .finish((OffsetDateTime) args[0], (int) args[1]) .commit(); return null; + } else if (method.equals(INACTIVITY_IN_SECONDS)) { + return (System.currentTimeMillis() - this.lastCall.get()) / 1000; + } else if (method.equals(IS_METHOD_RUNNING)) { + return this.insideInvocation.get() > 0; } else { try { this.evitaSession.increaseNestLevel(); @@ -320,6 +338,8 @@ public Object invoke(Object proxy, Method method, Object[] args) { () -> { final Supplier invocation = () -> { try { + this.insideInvocation.incrementAndGet(); + this.lastCall.set(System.currentTimeMillis()); return method.invoke(evitaSession, args); } catch (InvocationTargetException ex) { // handle the error @@ -369,6 +389,9 @@ public Object invoke(Object proxy, Method method, Object[] args) { "Unexpected system error occurred.", ex ); + } finally { + this.insideInvocation.decrementAndGet(); + this.lastCall.set(System.currentTimeMillis()); } }; if (method.isAnnotationPresent(RepresentsQuery.class)) { diff --git a/evita_engine/src/main/java/io/evitadb/core/async/SessionKiller.java b/evita_engine/src/main/java/io/evitadb/core/async/SessionKiller.java index 60b270dc1c..5570442dc7 100644 --- a/evita_engine/src/main/java/io/evitadb/core/async/SessionKiller.java +++ b/evita_engine/src/main/java/io/evitadb/core/async/SessionKiller.java @@ -26,6 +26,7 @@ import io.evitadb.api.configuration.ServerOptions; import io.evitadb.api.exception.InstanceTerminatedException; import io.evitadb.core.Evita; +import io.evitadb.core.EvitaInternalSessionContract; import io.evitadb.core.metric.event.session.KilledEvent; import lombok.extern.slf4j.Slf4j; @@ -76,8 +77,13 @@ public SessionKiller(int allowedInactivityInSeconds, @Nonnull Evita evita, @Nonn public void run() { try { final AtomicInteger counter = new AtomicInteger(0); - evita.getActiveSessions() - .filter(session -> session.getInactivityDurationInSeconds() >= allowedInactivityInSeconds) + this.evita.getActiveSessions() + .map(EvitaInternalSessionContract.class::cast) + .filter(session -> { + final boolean sessionOld = session.getInactivityDurationInSeconds() >= this.allowedInactivityInSeconds; + final boolean methodRunning = session.methodIsRunning(); + return sessionOld && !methodRunning; + }) .forEach(session -> { try { final String catalogName = session.getCatalogName(); @@ -91,6 +97,7 @@ public void run() { evita.terminateSession(session); counter.incrementAndGet(); + log.info("Killed session " + session.getId() + " (" + this.allowedInactivityInSeconds + "s of inactivity)."); // emit the event new KilledEvent(catalogName).commit(); } catch (InstanceTerminatedException ex) { diff --git a/evita_functional_tests/src/test/java/io/evitadb/api/EvitaTest.java b/evita_functional_tests/src/test/java/io/evitadb/api/EvitaTest.java index 0371f54f3c..99454c0f7b 100644 --- a/evita_functional_tests/src/test/java/io/evitadb/api/EvitaTest.java +++ b/evita_functional_tests/src/test/java/io/evitadb/api/EvitaTest.java @@ -579,7 +579,7 @@ void shouldKillInactiveSessionsAutomatically() { final long start = System.currentTimeMillis(); do { assertNotNull(sessionActive.getCatalogSchema()); - } while (!(System.currentTimeMillis() - start > 5000 || !sessionInactive.isActive())); + } while (!(System.currentTimeMillis() - start > 2000)); assertFalse(sessionInactive.isActive()); assertTrue(sessionActive.isActive()); diff --git a/evita_functional_tests/src/test/java/io/evitadb/core/async/SessionKillerTest.java b/evita_functional_tests/src/test/java/io/evitadb/core/async/SessionKillerTest.java new file mode 100644 index 0000000000..8b2d01754c --- /dev/null +++ b/evita_functional_tests/src/test/java/io/evitadb/core/async/SessionKillerTest.java @@ -0,0 +1,157 @@ +/* + * + * _ _ ____ ____ + * _____ _(_) |_ __ _| _ \| __ ) + * / _ \ \ / / | __/ _` | | | | _ \ + * | __/\ V /| | || (_| | |_| | |_) | + * \___| \_/ |_|\__\__,_|____/|____/ + * + * Copyright (c) 2024 + * + * Licensed under the Business Source License, Version 1.1 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://github.com/FgForrest/evitaDB/blob/master/LICENSE + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.evitadb.core.async; + +import io.evitadb.api.EvitaSessionContract; +import io.evitadb.api.configuration.EvitaConfiguration; +import io.evitadb.api.configuration.ServerOptions; +import io.evitadb.api.configuration.StorageOptions; +import io.evitadb.api.exception.CollectionNotFoundException; +import io.evitadb.api.query.Query; +import io.evitadb.api.query.QueryConstraints; +import io.evitadb.api.requestResponse.data.structure.EntityReference; +import io.evitadb.core.Evita; +import io.evitadb.test.EvitaTestSupport; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; + +import static graphql.Assert.assertFalse; +import static graphql.Assert.assertNotNull; +import static graphql.Assert.assertTrue; +import static io.evitadb.test.TestConstants.LONG_RUNNING_TEST; + +/** + * This test verifies the correct functionality of the {@link SessionKiller} class. + * + * @author Jan Novotný (novotny@fg.cz), FG Forrest a.s. (c) 2024 + */ +@DisplayName("Session killer functionality") +@Tag(LONG_RUNNING_TEST) +class SessionKillerTest implements EvitaTestSupport { + public static final String SUB_DIRECTORY = "SessionKillerTest"; + private Evita evita; + private SessionKiller sessionKiller; + + @BeforeEach + void setUp() throws IOException, NoSuchFieldException, IllegalAccessException { + cleanTestSubDirectory(SUB_DIRECTORY); + this.evita = new Evita( + EvitaConfiguration.builder() + .storage( + StorageOptions.builder() + .storageDirectory(getTestDirectory().resolve(SUB_DIRECTORY)) + .build() + ) + .server( + ServerOptions.builder() + .closeSessionsAfterSecondsOfInactivity(1) + .build() + ) + .build() + ); + final Field sessionKillerField = Evita.class.getDeclaredField("sessionKiller"); + sessionKillerField.setAccessible(true); + this.sessionKiller = (SessionKiller) sessionKillerField.get(this.evita); + } + + @Test + void shouldKillSessionAfterIntervalOfInactivity() throws InterruptedException { + this.evita.defineCatalog("test"); + final EvitaSessionContract session = this.evita.createReadOnlySession("test"); + synchronized (this.evita) { + this.evita.wait(2000); + } + this.sessionKiller.run(); + assertFalse(session.isActive()); + } + + @Test + void shouldNotKillSessionWhenThereAreInvocations() { + this.evita.defineCatalog("test"); + final EvitaSessionContract session = this.evita.createReadOnlySession("test"); + final long start = System.currentTimeMillis(); + while (System.currentTimeMillis() - start < 2000) { + assertNotNull(session.getCatalogName()); + Thread.onSpinWait(); + } + this.sessionKiller.run(); + assertTrue(session.isActive()); + } + + @Test + void shouldNotKillSessionWhenThereIsLongLastingInvocationCallActive() throws InterruptedException { + final AtomicBoolean finishMethodCall = new AtomicBoolean(false); + try { + this.evita.defineCatalog("test"); + final EvitaSessionContract session = this.evita.createReadOnlySession("test"); + final Runnable asyncCall = () -> { + final Query query = Mockito.mock(Query.class); + Mockito.when(query.normalizeQuery()).thenAnswer(invocation -> { + do { + Thread.onSpinWait(); + } while (!finishMethodCall.get()); + return Query.query(QueryConstraints.collection("unknownEntity")); + }); + try { + session.query( + query, EntityReference.class + ); + } catch (CollectionNotFoundException e) { + // expected + System.out.println("Async call finished"); + } + }; + final CompletableFuture future = CompletableFuture.runAsync(asyncCall); + synchronized (this.evita) { + this.evita.wait(2000); + } + + this.sessionKiller.run(); + + System.out.println("Finishing async call"); + finishMethodCall.set(true); + future.join(); + + assertTrue(session.isActive()); + + System.out.println("Waiting for session killer to finish"); + synchronized (this.evita) { + this.evita.wait(2000); + } + + this.sessionKiller.run(); + assertFalse(session.isActive()); + } finally { + finishMethodCall.set(true); + } + } +} \ No newline at end of file