Skip to content

Commit

Permalink
Fail fast if cannot connect to the MongoDB server on startup
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesChenX committed Aug 24, 2024
1 parent 2c8e98d commit 8347a6d
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import com.mongodb.reactivestreams.client.MongoClients;
import com.mongodb.reactivestreams.client.MongoCollection;
import com.mongodb.reactivestreams.client.MongoDatabase;
import io.lettuce.core.internal.Futures;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,18 @@ private TurmsMongoClient(
verifyServers(descriptions, name, requiredClusterTypes);
connect.tryEmitValue(null);
} catch (Exception e) {
Sinks.EmitResult result = connect.tryEmitError(e);
if (result.isFailure()) {
LOGGER.fatal(e.getMessage());
}
connect.tryEmitError(e);
}
});
operations = new TurmsMongoOperations(context);
// Ping the server to check if the connection is available, and the server is alive quickly.
operations.ping()
.subscribe(null,
t -> connect.tryEmitError(new RuntimeException(
"Failed to ping the server for the mongo client: \""
+ name
+ "\"",
t)));
}

public Mono<Void> destroy(long timeoutMillis) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,11 @@ Mono<Void> updateJsonSchema(

<T> Mono<T> inTransaction(Function<ClientSession, Mono<T>> execute);

Mono<Boolean> ping();

Mono<Void> disableBalancing(String collectionName);

Mono<Void> enableBalancing(String collectionName);

Mono<Boolean> isBalancerRunning();

}
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ public class TurmsMongoOperations implements MongoOperationsSupport {
.append("as", new BsonString("a"))
.append("in", new BsonString("$$a.k")));

// Diagnostics
private static final BsonDocument PING_COMMAND =
new BsonDocument("ping", BsonPool.BSON_INT32_1);

private final MongoContext context;
private final Map<Class<?>, MongoOperationPublisher<?>> publisherMap =
new NonBlockingIdentityHashMap<>(32);
Expand Down Expand Up @@ -891,6 +895,16 @@ public <T> Mono<T> inTransaction(Function<ClientSession, Mono<T>> action) {
ClientSession::commitTransaction);
}

// Health Check

@Override
public Mono<Boolean> ping() {
return Mono.from(context.getDatabase()
.runCommand(PING_COMMAND, BsonDocument.class))
.map(document -> document.getNumber("ok")
.intValue() == 1);
}

// Balancer

@Override
Expand Down

0 comments on commit 8347a6d

Please sign in to comment.