diff --git a/client-api/src/main/java/io/streamnative/oxia/client/api/LockManager.java b/client-api/src/main/java/io/streamnative/oxia/client/api/LockManager.java index 870508fa..7c30e069 100644 --- a/client-api/src/main/java/io/streamnative/oxia/client/api/LockManager.java +++ b/client-api/src/main/java/io/streamnative/oxia/client/api/LockManager.java @@ -24,7 +24,9 @@ public interface LockManager extends Closeable { * * @param key the key associated with the lock * @return an AsyncLock instance for the specified key + * @deprecated use {@link LockManager#getSharedLock(String)} */ + @Deprecated default AsyncLock getLightWeightLock(String key) { return getLightWeightLock(key, OptionBackoff.DEFAULT); } @@ -35,6 +37,58 @@ default AsyncLock getLightWeightLock(String key) { * @param key the key associated with the lock * @param optionBackoff the backoff options to be used for lock acquisition retries * @return an AsyncLock instance for the specified key + * @deprecated use {@link LockManager#getSharedLock(String, OptionBackoff)} */ - AsyncLock getLightWeightLock(String key, OptionBackoff optionBackoff); + @Deprecated + default AsyncLock getLightWeightLock(String key, OptionBackoff optionBackoff) { + return getSharedLock(key, optionBackoff); + } + + /** + * Gets a shared asynchronous lock for the specified key with default backoff options. Note: + * "Shared" implies that a single lock key is shared among all threads. If different threads + * attempt to acquire a lock that has already been acquired by another thread, a + * IllegalLockStatusException from {@link + * io.streamnative.oxia.client.api.exceptions.LockException.IllegalLockStatusException} will be + * raised. + * + * @param key the key associated with the lock + * @return an AsyncLock instance for the specified key + */ + default AsyncLock getSharedLock(String key) { + return getLightWeightLock(key, OptionBackoff.DEFAULT); + } + + /** + * Gets a shared asynchronous lock for the specified key with custom backoff options. Note: + * "Shared" implies that a single lock key is shared among all threads. If different threads + * attempt to acquire a lock that has already been acquired by another thread, a + * IllegalLockStatusException from {@link + * io.streamnative.oxia.client.api.exceptions.LockException.IllegalLockStatusException} will be + * raised. + * + * @param key the key associated with the lock + * @param optionBackoff the backoff options to be used for lock acquisition retries + * @return an AsyncLock instance for the specified key + */ + AsyncLock getSharedLock(String key, OptionBackoff optionBackoff); + + /** + * Gets a thread simple asynchronous lock for the specified key with default backoff options. + * + * @param key the key associated with the lock + * @return an AsyncLock instance for the specified key + */ + default AsyncLock getThreadSimpleLock(String key) { + return getThreadSimpleLock(key, OptionBackoff.DEFAULT); + } + + /** + * Gets a thread simple asynchronous lock for the specified key with custom backoff options. + * + * @param key the key associated with the lock + * @param optionBackoff the backoff options to be used for lock acquisition retries + * @return an AsyncLock instance for the specified key + */ + AsyncLock getThreadSimpleLock(String key, OptionBackoff optionBackoff); } diff --git a/client-it/src/test/java/io/streamnative/oxia/client/it/LockManagerIT.java b/client-it/src/test/java/io/streamnative/oxia/client/it/LockManagerIT.java index 38d4e67f..3edca302 100644 --- a/client-it/src/test/java/io/streamnative/oxia/client/it/LockManagerIT.java +++ b/client-it/src/test/java/io/streamnative/oxia/client/it/LockManagerIT.java @@ -212,4 +212,137 @@ public void testCounterWithAsyncLock() throws InterruptedException { }); } } + + @Test + public void testCounterWithReentrantSyncLock() throws InterruptedException { + final String lockKey = UUID.randomUUID().toString(); + // 3 nodes with 10 threads. + @Cleanup("shutdown") + final ExecutorService service = Executors.newFixedThreadPool(10); + final Map clients = new ConcurrentHashMap<>(); + final Map lockManager = new ConcurrentHashMap<>(); + try { + final Function compute = + (threadName) -> + OxiaClientBuilder.create(oxia.getServiceAddress()) + .clientIdentifier(threadName + "") + .openTelemetry(openTelemetry) + .asyncClient() + .join(); + final var counter = new Counter(0, 3000); + final var latch = new CountDownLatch(counter.total); + for (int i = 0; i < counter.total; i++) { + service.execute( + () -> { + final String name = Thread.currentThread().getName(); + int nodeId = name.hashCode() % 3; + final AsyncOxiaClient client = clients.computeIfAbsent(nodeId, compute); + final LockManager lm = + lockManager.computeIfAbsent( + nodeId, + (n) -> + LockManagers.createLockManager( + client, + openTelemetry, + Executors.newSingleThreadScheduledExecutor( + new DefaultThreadFactory("oxia-lock-manager")), + OptionAutoRevalidate.DEFAULT)); + final AsyncLock lock = lm.getThreadSimpleLock(lockKey); + lock.lock().join(); + counter.increment(); + lock.unlock().join(); + log.info("counter : {}", counter.current); + latch.countDown(); + }); + } + + latch.await(); + Assertions.assertEquals(counter.current, counter.total); + metricReader.forceFlush(); + var metrics = metricReader.collectAllMetrics(); + var metricsByName = + metrics.stream().collect(Collectors.toMap(MetricData::getName, identity())); + System.out.println(metricsByName); + Assertions.assertTrue(metricsByName.containsKey("oxia.locks.status")); + } finally { + clients.forEach( + (s, c) -> { + try { + c.close(); + } catch (Exception e) { + log.error("close oxia client failed", e); + } + }); + } + } + + @Test + public void testCounterWithReentrantAsyncLock() throws InterruptedException { + final String lockKey = UUID.randomUUID().toString(); + // 3 nodes with 10 threads + @Cleanup("shutdown") + final ExecutorService service = Executors.newFixedThreadPool(10); + final Map clients = new ConcurrentHashMap<>(); + final Map lockManager = new ConcurrentHashMap<>(); + try { + final Function compute = + (nodeId) -> + OxiaClientBuilder.create(oxia.getServiceAddress()) + .clientIdentifier(nodeId + "") + .openTelemetry(openTelemetry) + .asyncClient() + .join(); + final var counter = new Counter(0, 3000); + final var latch = new CountDownLatch(counter.total); + for (int i = 0; i < counter.total; i++) { + service.execute( + () -> { + final String name = Thread.currentThread().getName(); + final int nodeId = name.hashCode() % 3; + final AsyncOxiaClient client = clients.computeIfAbsent(nodeId, compute); + final LockManager lm = + lockManager.computeIfAbsent( + nodeId, + (id) -> + LockManagers.createLockManager( + client, + openTelemetry, + Executors.newSingleThreadScheduledExecutor( + new DefaultThreadFactory("oxia-lock-manager")), + OptionAutoRevalidate.DEFAULT)); + final AsyncLock lock = lm.getThreadSimpleLock(lockKey); + lock.lock() + .thenAccept( + __ -> { + counter.increment(); + log.info("counter : {}", counter.current); + }) + .thenCompose(__ -> lock.unlock()) + .thenAccept(__ -> latch.countDown()) + .exceptionally( + ex -> { + Assertions.fail("unexpected exception", ex); + return null; + }); + }); + } + latch.await(); + Assertions.assertEquals(counter.current, counter.total); + metricReader.forceFlush(); + var metrics = metricReader.collectAllMetrics(); + var metricsByName = + metrics.stream().collect(Collectors.toMap(MetricData::getName, identity())); + System.out.println(metricsByName); + Assertions.assertTrue(metricsByName.containsKey("oxia.locks.status")); + } finally { + clients.forEach( + (s, c) -> { + try { + c.close(); + } catch (Exception e) { + log.error("close oxia client failed", e); + } + }); + } + } } diff --git a/client/src/main/java/io/streamnative/oxia/client/lock/LockManagerImpl.java b/client/src/main/java/io/streamnative/oxia/client/lock/LockManagerImpl.java index 5a519498..b54443a2 100644 --- a/client/src/main/java/io/streamnative/oxia/client/lock/LockManagerImpl.java +++ b/client/src/main/java/io/streamnative/oxia/client/lock/LockManagerImpl.java @@ -15,6 +15,8 @@ */ package io.streamnative.oxia.client.lock; +import static io.streamnative.oxia.client.lock.SharedSimpleLock.DEFAULT_RETRYABLE_EXCEPTIONS; + import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.metrics.ObservableLongGauge; @@ -26,7 +28,6 @@ import io.streamnative.oxia.client.api.OptionBackoff; import io.streamnative.oxia.client.metrics.Unit; import io.streamnative.oxia.client.util.Backoff; -import java.time.Clock; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -35,7 +36,7 @@ final class LockManagerImpl implements LockManager, Consumer { private final AsyncOxiaClient client; - private final Map locks; + private final Map locks; private final ScheduledExecutorService executor; private final OptionAutoRevalidate optionAutoRevalidate; private final ObservableLongGauge gaugeOxiaLocksStatus; @@ -59,8 +60,8 @@ final class LockManagerImpl implements LockManager, Consumer { .ofLongs() .buildWithCallback( (ob) -> { - final Set> entries = locks.entrySet(); - for (Map.Entry entry : entries) { + final Set> entries = locks.entrySet(); + for (Map.Entry entry : entries) { ob.record( 1, Attributes.builder() @@ -73,11 +74,30 @@ final class LockManagerImpl implements LockManager, Consumer { } @Override - public AsyncLock getLightWeightLock(String key, OptionBackoff optionBackoff) { + public AsyncLock getSharedLock(String key, OptionBackoff optionBackoff) { + return locks.computeIfAbsent( + key, + (k) -> + new SharedSimpleLock( + client, + key, + executor, + new Backoff( + optionBackoff.initDelay(), + optionBackoff.initDelayUnit(), + optionBackoff.maxDelay(), + optionBackoff.maxDelayUnit(), + null), + optionAutoRevalidate, + DEFAULT_RETRYABLE_EXCEPTIONS)); + } + + @Override + public AsyncLock getThreadSimpleLock(String key, OptionBackoff optionBackoff) { return locks.computeIfAbsent( key, (k) -> - new LightWeightLock( + new ThreadSimpleLock( client, key, executor, @@ -86,8 +106,9 @@ public AsyncLock getLightWeightLock(String key, OptionBackoff optionBackoff) { optionBackoff.initDelayUnit(), optionBackoff.maxDelay(), optionBackoff.maxDelayUnit(), - Clock.systemUTC()), - optionAutoRevalidate)); + null), + optionAutoRevalidate, + DEFAULT_RETRYABLE_EXCEPTIONS)); } @Override @@ -96,7 +117,9 @@ public void accept(Notification notification) { if (lock == null) { return; } - lock.notifyStateChanged(notification); + if (lock instanceof NotificationReceiver receiver) { + receiver.notifyStateChanged(notification); + } } @Override diff --git a/client/src/main/java/io/streamnative/oxia/client/lock/NotificationReceiver.java b/client/src/main/java/io/streamnative/oxia/client/lock/NotificationReceiver.java new file mode 100644 index 00000000..d8db708a --- /dev/null +++ b/client/src/main/java/io/streamnative/oxia/client/lock/NotificationReceiver.java @@ -0,0 +1,23 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oxia.client.lock; + +import io.streamnative.oxia.client.api.Notification; + +public interface NotificationReceiver { + + void notifyStateChanged(Notification notification); +} diff --git a/client/src/main/java/io/streamnative/oxia/client/lock/LightWeightLock.java b/client/src/main/java/io/streamnative/oxia/client/lock/SharedSimpleLock.java similarity index 94% rename from client/src/main/java/io/streamnative/oxia/client/lock/LightWeightLock.java rename to client/src/main/java/io/streamnative/oxia/client/lock/SharedSimpleLock.java index f9d4a8dc..9df65916 100644 --- a/client/src/main/java/io/streamnative/oxia/client/lock/LightWeightLock.java +++ b/client/src/main/java/io/streamnative/oxia/client/lock/SharedSimpleLock.java @@ -62,13 +62,13 @@ @Slf4j @ThreadSafe -final class LightWeightLock implements AsyncLock { +final class SharedSimpleLock implements AsyncLock, NotificationReceiver { - private static final Class[] DEFAULT_RETRYABLE_EXCEPTIONS = + public static final Class[] DEFAULT_RETRYABLE_EXCEPTIONS = new Class[] {LockBusyException.class}; private static final byte[] DEFAULT_VALUE = new byte[0]; - private static final AtomicReferenceFieldUpdater STATE_UPDATER = - AtomicReferenceFieldUpdater.newUpdater(LightWeightLock.class, LockStatus.class, "state"); + private static final AtomicReferenceFieldUpdater STATE_UPDATER = + AtomicReferenceFieldUpdater.newUpdater(SharedSimpleLock.class, LockStatus.class, "state"); private final AsyncOxiaClient client; private final String key; @@ -77,17 +77,8 @@ final class LightWeightLock implements AsyncLock { private final ScheduledExecutorService taskExecutor; private final String clientIdentifier; - LightWeightLock( - AsyncOxiaClient client, - String key, - ScheduledExecutorService executorService, - Backoff backoff, - OptionAutoRevalidate revalidate) { - this(client, key, executorService, backoff, revalidate, DEFAULT_RETRYABLE_EXCEPTIONS); - } - @SafeVarargs - LightWeightLock( + SharedSimpleLock( AsyncOxiaClient client, String key, ScheduledExecutorService executorService, @@ -233,8 +224,8 @@ private CompletableFuture tryLock1(long version) { .put(key, DEFAULT_VALUE, Set.of(PutOption.AsEphemeralRecord, versionOption)) .thenAccept( result -> { - LightWeightLock.this.versionId = result.version().versionId(); - LightWeightLock.this.sessionId = result.version().sessionId(); + SharedSimpleLock.this.versionId = result.version().versionId(); + SharedSimpleLock.this.sessionId = result.version().sessionId(); if (log.isDebugEnabled()) { log.debug( "Acquired Lock. key={} session={} client_id={}", @@ -338,8 +329,8 @@ private CompletableFuture unlock0(ExecutorService executorService) { sessionId, clientIdentifier); } - LightWeightLock.this.versionId = Version.KeyNotExists; - LightWeightLock.this.sessionId = Optional.empty(); + SharedSimpleLock.this.versionId = Version.KeyNotExists; + SharedSimpleLock.this.sessionId = Optional.empty(); STATE_UPDATER.set(this, RELEASED); }, executorService) @@ -430,7 +421,7 @@ private void revalidate() { private final MessagePassingQueue revalidateQueue = (MessagePassingQueue) PlatformDependent.newMpscQueue(); - void notifyStateChanged(Notification notification) { + public void notifyStateChanged(Notification notification) { switch (STATE_UPDATER.get(this)) { case INIT, RELEASING, RELEASED -> { // no-op diff --git a/client/src/main/java/io/streamnative/oxia/client/lock/ThreadSimpleLock.java b/client/src/main/java/io/streamnative/oxia/client/lock/ThreadSimpleLock.java new file mode 100644 index 00000000..addab387 --- /dev/null +++ b/client/src/main/java/io/streamnative/oxia/client/lock/ThreadSimpleLock.java @@ -0,0 +1,121 @@ +/* + * Copyright © 2022-2024 StreamNative Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.oxia.client.lock; + +import io.streamnative.oxia.client.api.AsyncLock; +import io.streamnative.oxia.client.api.AsyncOxiaClient; +import io.streamnative.oxia.client.api.Notification; +import io.streamnative.oxia.client.api.OptionAutoRevalidate; +import io.streamnative.oxia.client.api.exceptions.LockException; +import io.streamnative.oxia.client.util.Backoff; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Semaphore; + +final class ThreadSimpleLock implements AsyncLock, NotificationReceiver { + private final SharedSimpleLock distributedLock; + private final Semaphore memorySemaphore; + + @SafeVarargs + public ThreadSimpleLock( + AsyncOxiaClient client, + String key, + ScheduledExecutorService executorService, + Backoff backoff, + OptionAutoRevalidate optionAutoRevalidate, + Class... retryableExceptions) { + this.distributedLock = + new SharedSimpleLock( + client, key, executorService, backoff, optionAutoRevalidate, retryableExceptions); + this.memorySemaphore = new Semaphore(1); + } + + @Override + public LockStatus getStatus() { + return distributedLock.getStatus(); + } + + @Override + public CompletableFuture lock() { + return lock(ForkJoinPool.commonPool()); + } + + @Override + public CompletableFuture tryLock() { + return tryLock(ForkJoinPool.commonPool()); + } + + @Override + public CompletableFuture unlock() { + return unlock(ForkJoinPool.commonPool()); + } + + @Override + public CompletableFuture lock(ExecutorService executorService) { + try { + memorySemaphore.acquire(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + return CompletableFuture.failedFuture(ex); + } + return distributedLock + .lock(executorService) + .whenComplete( + (r, err) -> { + if (err != null) { + // lock distributed lock failed + // rollback the memory lock + memorySemaphore.release(); + } + }); + } + + @Override + public CompletableFuture tryLock(ExecutorService executorService) { + if (!memorySemaphore.tryAcquire()) { + return CompletableFuture.failedFuture(new LockException.LockBusyException()); + } + return distributedLock + .tryLock(executorService) + .whenComplete( + (r, err) -> { + if (err != null) { + // distributed lock failed, rollback the memory lock + memorySemaphore.release(); + } + }); + } + + @Override + public CompletableFuture unlock(ExecutorService executorService) { + return distributedLock + .unlock() + .whenComplete( + (r, err) -> { + if (err == null) { + // unlock memory lock only when distributed lock unlocked + memorySemaphore.release(); + } + }); + } + + @Override + public void notifyStateChanged(Notification notification) { + distributedLock.notifyStateChanged(notification); + } +}