Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support thread scope distributed lock #183

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ public interface LockManager extends Closeable {
*
* @param key the key associated with the lock
* @return an AsyncLock instance for the specified key
* @deprecated use {@link LockManager#getSharedLock(String)}
*/
@Deprecated
default AsyncLock getLightWeightLock(String key) {
return getLightWeightLock(key, OptionBackoff.DEFAULT);
}
Expand All @@ -35,6 +37,58 @@ default AsyncLock getLightWeightLock(String key) {
* @param key the key associated with the lock
* @param optionBackoff the backoff options to be used for lock acquisition retries
* @return an AsyncLock instance for the specified key
* @deprecated use {@link LockManager#getSharedLock(String, OptionBackoff)}
*/
AsyncLock getLightWeightLock(String key, OptionBackoff optionBackoff);
@Deprecated
default AsyncLock getLightWeightLock(String key, OptionBackoff optionBackoff) {
return getSharedLock(key, optionBackoff);
}

/**
* Gets a shared asynchronous lock for the specified key with default backoff options. Note:
* "Shared" implies that a single lock key is shared among all threads. If different threads
* attempt to acquire a lock that has already been acquired by another thread, a
* IllegalLockStatusException from {@link
* io.streamnative.oxia.client.api.exceptions.LockException.IllegalLockStatusException} will be
* raised.
*
* @param key the key associated with the lock
* @return an AsyncLock instance for the specified key
*/
default AsyncLock getSharedLock(String key) {
return getLightWeightLock(key, OptionBackoff.DEFAULT);
}

/**
* Gets a shared asynchronous lock for the specified key with custom backoff options. Note:
* "Shared" implies that a single lock key is shared among all threads. If different threads
* attempt to acquire a lock that has already been acquired by another thread, a
* IllegalLockStatusException from {@link
* io.streamnative.oxia.client.api.exceptions.LockException.IllegalLockStatusException} will be
* raised.
*
* @param key the key associated with the lock
* @param optionBackoff the backoff options to be used for lock acquisition retries
* @return an AsyncLock instance for the specified key
*/
AsyncLock getSharedLock(String key, OptionBackoff optionBackoff);

/**
* Gets a thread simple asynchronous lock for the specified key with default backoff options.
*
* @param key the key associated with the lock
* @return an AsyncLock instance for the specified key
*/
default AsyncLock getThreadSimpleLock(String key) {
return getThreadSimpleLock(key, OptionBackoff.DEFAULT);
}

/**
* Gets a thread simple asynchronous lock for the specified key with custom backoff options.
*
* @param key the key associated with the lock
* @param optionBackoff the backoff options to be used for lock acquisition retries
* @return an AsyncLock instance for the specified key
*/
AsyncLock getThreadSimpleLock(String key, OptionBackoff optionBackoff);
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,4 +212,137 @@ public void testCounterWithAsyncLock() throws InterruptedException {
});
}
}

@Test
public void testCounterWithReentrantSyncLock() throws InterruptedException {
final String lockKey = UUID.randomUUID().toString();
// 3 nodes with 10 threads.
@Cleanup("shutdown")
final ExecutorService service = Executors.newFixedThreadPool(10);
final Map<Integer, AsyncOxiaClient> clients = new ConcurrentHashMap<>();
final Map<Integer, LockManager> lockManager = new ConcurrentHashMap<>();
try {
final Function<Integer, AsyncOxiaClient> compute =
(threadName) ->
OxiaClientBuilder.create(oxia.getServiceAddress())
.clientIdentifier(threadName + "")
.openTelemetry(openTelemetry)
.asyncClient()
.join();
final var counter = new Counter(0, 3000);
final var latch = new CountDownLatch(counter.total);
for (int i = 0; i < counter.total; i++) {
service.execute(
() -> {
final String name = Thread.currentThread().getName();
int nodeId = name.hashCode() % 3;
final AsyncOxiaClient client = clients.computeIfAbsent(nodeId, compute);
final LockManager lm =
lockManager.computeIfAbsent(
nodeId,
(n) ->
LockManagers.createLockManager(
client,
openTelemetry,
Executors.newSingleThreadScheduledExecutor(
new DefaultThreadFactory("oxia-lock-manager")),
OptionAutoRevalidate.DEFAULT));
final AsyncLock lock = lm.getThreadSimpleLock(lockKey);
lock.lock().join();
counter.increment();
lock.unlock().join();
log.info("counter : {}", counter.current);
latch.countDown();
});
}

latch.await();
Assertions.assertEquals(counter.current, counter.total);
metricReader.forceFlush();
var metrics = metricReader.collectAllMetrics();
var metricsByName =
metrics.stream().collect(Collectors.toMap(MetricData::getName, identity()));
System.out.println(metricsByName);
Assertions.assertTrue(metricsByName.containsKey("oxia.locks.status"));
} finally {
clients.forEach(
(s, c) -> {
try {
c.close();
} catch (Exception e) {
log.error("close oxia client failed", e);
}
});
}
}

@Test
public void testCounterWithReentrantAsyncLock() throws InterruptedException {
final String lockKey = UUID.randomUUID().toString();
// 3 nodes with 10 threads
@Cleanup("shutdown")
final ExecutorService service = Executors.newFixedThreadPool(10);
final Map<Integer, AsyncOxiaClient> clients = new ConcurrentHashMap<>();
final Map<Integer, LockManager> lockManager = new ConcurrentHashMap<>();
try {
final Function<Integer, AsyncOxiaClient> compute =
(nodeId) ->
OxiaClientBuilder.create(oxia.getServiceAddress())
.clientIdentifier(nodeId + "")
.openTelemetry(openTelemetry)
.asyncClient()
.join();
final var counter = new Counter(0, 3000);
final var latch = new CountDownLatch(counter.total);
for (int i = 0; i < counter.total; i++) {
service.execute(
() -> {
final String name = Thread.currentThread().getName();
final int nodeId = name.hashCode() % 3;
final AsyncOxiaClient client = clients.computeIfAbsent(nodeId, compute);
final LockManager lm =
lockManager.computeIfAbsent(
nodeId,
(id) ->
LockManagers.createLockManager(
client,
openTelemetry,
Executors.newSingleThreadScheduledExecutor(
new DefaultThreadFactory("oxia-lock-manager")),
OptionAutoRevalidate.DEFAULT));
final AsyncLock lock = lm.getThreadSimpleLock(lockKey);
lock.lock()
.thenAccept(
__ -> {
counter.increment();
log.info("counter : {}", counter.current);
})
.thenCompose(__ -> lock.unlock())
.thenAccept(__ -> latch.countDown())
.exceptionally(
ex -> {
Assertions.fail("unexpected exception", ex);
return null;
});
});
}
latch.await();
Assertions.assertEquals(counter.current, counter.total);
metricReader.forceFlush();
var metrics = metricReader.collectAllMetrics();
var metricsByName =
metrics.stream().collect(Collectors.toMap(MetricData::getName, identity()));
System.out.println(metricsByName);
Assertions.assertTrue(metricsByName.containsKey("oxia.locks.status"));
} finally {
clients.forEach(
(s, c) -> {
try {
c.close();
} catch (Exception e) {
log.error("close oxia client failed", e);
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package io.streamnative.oxia.client.lock;

import static io.streamnative.oxia.client.lock.SharedSimpleLock.DEFAULT_RETRYABLE_EXCEPTIONS;

import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableLongGauge;
Expand All @@ -26,7 +28,6 @@
import io.streamnative.oxia.client.api.OptionBackoff;
import io.streamnative.oxia.client.metrics.Unit;
import io.streamnative.oxia.client.util.Backoff;
import java.time.Clock;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -35,7 +36,7 @@

final class LockManagerImpl implements LockManager, Consumer<Notification> {
private final AsyncOxiaClient client;
private final Map<String, LightWeightLock> locks;
private final Map<String, AsyncLock> locks;
private final ScheduledExecutorService executor;
private final OptionAutoRevalidate optionAutoRevalidate;
private final ObservableLongGauge gaugeOxiaLocksStatus;
Expand All @@ -59,8 +60,8 @@ final class LockManagerImpl implements LockManager, Consumer<Notification> {
.ofLongs()
.buildWithCallback(
(ob) -> {
final Set<Map.Entry<String, LightWeightLock>> entries = locks.entrySet();
for (Map.Entry<String, LightWeightLock> entry : entries) {
final Set<Map.Entry<String, AsyncLock>> entries = locks.entrySet();
for (Map.Entry<String, AsyncLock> entry : entries) {
ob.record(
1,
Attributes.builder()
Expand All @@ -73,11 +74,30 @@ final class LockManagerImpl implements LockManager, Consumer<Notification> {
}

@Override
public AsyncLock getLightWeightLock(String key, OptionBackoff optionBackoff) {
public AsyncLock getSharedLock(String key, OptionBackoff optionBackoff) {
return locks.computeIfAbsent(
key,
(k) ->
new SharedSimpleLock(
client,
key,
executor,
new Backoff(
optionBackoff.initDelay(),
optionBackoff.initDelayUnit(),
optionBackoff.maxDelay(),
optionBackoff.maxDelayUnit(),
null),
optionAutoRevalidate,
DEFAULT_RETRYABLE_EXCEPTIONS));
}

@Override
public AsyncLock getThreadSimpleLock(String key, OptionBackoff optionBackoff) {
return locks.computeIfAbsent(
key,
(k) ->
new LightWeightLock(
new ThreadSimpleLock(
client,
key,
executor,
Expand All @@ -86,8 +106,9 @@ public AsyncLock getLightWeightLock(String key, OptionBackoff optionBackoff) {
optionBackoff.initDelayUnit(),
optionBackoff.maxDelay(),
optionBackoff.maxDelayUnit(),
Clock.systemUTC()),
optionAutoRevalidate));
null),
optionAutoRevalidate,
DEFAULT_RETRYABLE_EXCEPTIONS));
}

@Override
Expand All @@ -96,7 +117,9 @@ public void accept(Notification notification) {
if (lock == null) {
return;
}
lock.notifyStateChanged(notification);
if (lock instanceof NotificationReceiver receiver) {
receiver.notifyStateChanged(notification);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.oxia.client.lock;

import io.streamnative.oxia.client.api.Notification;

public interface NotificationReceiver {

void notifyStateChanged(Notification notification);
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@

@Slf4j
@ThreadSafe
final class LightWeightLock implements AsyncLock {
final class SharedSimpleLock implements AsyncLock, NotificationReceiver {

private static final Class<? extends Throwable>[] DEFAULT_RETRYABLE_EXCEPTIONS =
public static final Class<? extends Throwable>[] DEFAULT_RETRYABLE_EXCEPTIONS =
new Class[] {LockBusyException.class};
private static final byte[] DEFAULT_VALUE = new byte[0];
private static final AtomicReferenceFieldUpdater<LightWeightLock, LockStatus> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(LightWeightLock.class, LockStatus.class, "state");
private static final AtomicReferenceFieldUpdater<SharedSimpleLock, LockStatus> STATE_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(SharedSimpleLock.class, LockStatus.class, "state");

private final AsyncOxiaClient client;
private final String key;
Expand All @@ -77,17 +77,8 @@ final class LightWeightLock implements AsyncLock {
private final ScheduledExecutorService taskExecutor;
private final String clientIdentifier;

LightWeightLock(
AsyncOxiaClient client,
String key,
ScheduledExecutorService executorService,
Backoff backoff,
OptionAutoRevalidate revalidate) {
this(client, key, executorService, backoff, revalidate, DEFAULT_RETRYABLE_EXCEPTIONS);
}

@SafeVarargs
LightWeightLock(
SharedSimpleLock(
AsyncOxiaClient client,
String key,
ScheduledExecutorService executorService,
Expand Down Expand Up @@ -233,8 +224,8 @@ private CompletableFuture<Void> tryLock1(long version) {
.put(key, DEFAULT_VALUE, Set.of(PutOption.AsEphemeralRecord, versionOption))
.thenAccept(
result -> {
LightWeightLock.this.versionId = result.version().versionId();
LightWeightLock.this.sessionId = result.version().sessionId();
SharedSimpleLock.this.versionId = result.version().versionId();
SharedSimpleLock.this.sessionId = result.version().sessionId();
if (log.isDebugEnabled()) {
log.debug(
"Acquired Lock. key={} session={} client_id={}",
Expand Down Expand Up @@ -338,8 +329,8 @@ private CompletableFuture<Void> unlock0(ExecutorService executorService) {
sessionId,
clientIdentifier);
}
LightWeightLock.this.versionId = Version.KeyNotExists;
LightWeightLock.this.sessionId = Optional.empty();
SharedSimpleLock.this.versionId = Version.KeyNotExists;
SharedSimpleLock.this.sessionId = Optional.empty();
STATE_UPDATER.set(this, RELEASED);
},
executorService)
Expand Down Expand Up @@ -430,7 +421,7 @@ private void revalidate() {
private final MessagePassingQueue<Notification> revalidateQueue =
(MessagePassingQueue<Notification>) PlatformDependent.newMpscQueue();

void notifyStateChanged(Notification notification) {
public void notifyStateChanged(Notification notification) {
switch (STATE_UPDATER.get(this)) {
case INIT, RELEASING, RELEASED -> {
// no-op
Expand Down
Loading
Loading