Skip to content

Commit

Permalink
Switch from KotlinLogging to SLF4J for logging
Browse files Browse the repository at this point in the history
Replaced KotlinLogging with SLF4J across multiple classes for more standardized logging. Improved log format consistency and removed redundant dependencies in build scripts.
  • Loading branch information
smyrgeorge committed Oct 15, 2024
1 parent fb044a2 commit 1d48eb0
Show file tree
Hide file tree
Showing 20 changed files with 133 additions and 130 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.github.smyrgeorge.actor4k.cluster

import io.github.oshai.kotlinlogging.KotlinLogging
import io.github.smyrgeorge.actor4k.cluster.gossip.MessageHandler
import io.github.smyrgeorge.actor4k.cluster.gossip.Metadata
import io.github.smyrgeorge.actor4k.cluster.grpc.Envelope
Expand All @@ -20,6 +19,8 @@ import org.ishugaliy.allgood.consistent.hash.ConsistentHash
import org.ishugaliy.allgood.consistent.hash.HashRing
import org.ishugaliy.allgood.consistent.hash.hasher.DefaultHasher
import org.ishugaliy.allgood.consistent.hash.node.ServerNode
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.concurrent.ConcurrentHashMap
import kotlin.jvm.optionals.getOrNull
import io.grpc.Server as GrpcServer
Expand All @@ -34,7 +35,7 @@ class ClusterImpl(
private val grpc: GrpcServer,
private val grpcService: GrpcService
) : Cluster {
private val log = KotlinLogging.logger {}
private val log: Logger = LoggerFactory.getLogger(this::class.java)

lateinit var raft: RaftNode
lateinit var raftManager: MemberManager
Expand Down Expand Up @@ -90,7 +91,7 @@ class ClusterImpl(
}

fun startRaft(initialGroupMembers: List<Endpoint>): ClusterImpl {
log.info { "Starting raft, initialGroupMembers=$initialGroupMembers" }
log.info("Starting raft, initialGroupMembers=$initialGroupMembers")

val endpoint = Endpoint(conf.alias, conf.host, conf.grpcPort)
val config: RaftConfig = RaftConfig
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package io.github.smyrgeorge.actor4k.cluster.gossip

import io.github.oshai.kotlinlogging.KotlinLogging
import io.github.smyrgeorge.actor4k.cluster.ClusterImpl
import io.github.smyrgeorge.actor4k.cluster.raft.Endpoint
import io.github.smyrgeorge.actor4k.cluster.util.toInstance
import io.github.smyrgeorge.actor4k.system.ActorSystem
import io.github.smyrgeorge.actor4k.util.launchGlobal
import io.github.smyrgeorge.actor4k.util.retryBlocking
import io.github.smyrgeorge.actor4k.cluster.util.toInstance
import io.microraft.model.message.RaftMessage
import io.scalecube.cluster.membership.MembershipEvent
import io.scalecube.cluster.transport.api.Message
Expand All @@ -15,13 +14,15 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.reactive.awaitFirstOrNull
import kotlinx.coroutines.runBlocking
import org.ishugaliy.allgood.consistent.hash.node.ServerNode
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.io.Serializable
import kotlin.system.exitProcess
import io.scalecube.cluster.ClusterMessageHandler as ScaleCubeClusterMessageHandler

class MessageHandler(private val conf: ClusterImpl.Conf) : ScaleCubeClusterMessageHandler {

private val log = KotlinLogging.logger {}
private val log: Logger = LoggerFactory.getLogger(this::class.java)
private val cluster: ClusterImpl by lazy {
ActorSystem.cluster as ClusterImpl
}
Expand All @@ -33,36 +34,36 @@ class MessageHandler(private val conf: ClusterImpl.Conf) : ScaleCubeClusterMessa
@Suppress("unused")
private val job: Job = launchGlobal {
if (conf.nodeManagement == ClusterImpl.Conf.NodeManagement.STATIC) {
log.info { "Starting cluster in STATIC mode. Any changes to the cluster will not be applied." }
log.info("Starting cluster in STATIC mode. Any changes to the cluster will not be applied.")
conf.seedMembers.forEach {
cluster.ring.add(ServerNode(it.alias, it.address.host(), it.address.port()))
}
return@launchGlobal
}

for (i in 1..rounds) {
log.info { "Waiting for other nodes to appear... [$i/$rounds]" }
log.info("Waiting for other nodes to appear... [$i/$rounds]")
delay(delayPerRound)
val nodes = cluster.gossip.members()
// If ore than one node found continue.
if (nodes.size > 1) break
}

val nodes = cluster.gossip.members()
log.info { "We are a group of ${nodes.size} nodes (at least)." }
log.info("We are a group of ${nodes.size} nodes (at least).")

val initialGroupMembers = if (nodes.isEmpty()) {
log.error { "Sanity check failed :: No nodes found, but I was there..." }
log.error("Sanity check failed :: No nodes found, but I was there...")
exitProcess(1)
} else if (nodes.size == 1 && nodes.first().alias() == conf.alias) {
log.info { "Found ${nodes.size} node (myself). It's going to be a lonely day..." }
log.info("Found ${nodes.size} node (myself). It's going to be a lonely day...")
listOf(Endpoint(conf.alias, conf.host, conf.grpcPort))
} else {
log.info { "Requesting initial group members form the network." }
log.info("Requesting initial group members form the network.")
val self = cluster.gossip.member().address()
val msg = Message.builder().data(Protocol.Gossip.ReqInitialGroupMembers).sender(self).build()
for (i in 1..rounds) {
log.info { "Waiting for response... [$i/$rounds]" }
log.info("Waiting for response... [$i/$rounds]")
delay(delayPerRound)
cluster.gossip.spreadGossip(msg).awaitFirstOrNull()
if (initialGroupMembers.isNotEmpty()) break
Expand All @@ -71,13 +72,13 @@ class MessageHandler(private val conf: ClusterImpl.Conf) : ScaleCubeClusterMessa
}

if (initialGroupMembers.isEmpty()) {
log.error { "No nodes found! Shutting down.." }
log.error("No nodes found! Shutting down..")
exitProcess(1)
}

var allConnected = false
for (i in 1..rounds) {
log.info { "Waiting connection with all initial group members... [$i/$rounds]" }
log.info("Waiting connection with all initial group members... [$i/$rounds]")
delay(delayPerRound)
allConnected = initialGroupMembers.all { m ->
cluster.gossip.members().any { m.alias == it.alias() }
Expand All @@ -86,7 +87,7 @@ class MessageHandler(private val conf: ClusterImpl.Conf) : ScaleCubeClusterMessa
}

if (!allConnected) {
log.warn { "Could not establish connection with some of the initial group nodes." }
log.warn("Could not establish connection with some of the initial group nodes.")
exitProcess(1)
}

Expand All @@ -96,11 +97,11 @@ class MessageHandler(private val conf: ClusterImpl.Conf) : ScaleCubeClusterMessa

override fun onGossip(g: Message) {
try {
log.debug { "Received gossip: $g" }
log.debug("Received gossip: {}", g)
when (val d = g.data<Protocol.Gossip>()) {
Protocol.Gossip.ReqInitialGroupMembers -> {
val members: List<Endpoint> = initialGroupMembers.ifEmpty {
log.info { "Raft is not started will send the members found from the gossip protocol." }
log.info("Raft is not started will send the members found from the gossip protocol.")
cluster.gossip.members().map {
val host = it.address().host()
val port = cluster.conf.grpcPort
Expand All @@ -121,13 +122,13 @@ class MessageHandler(private val conf: ClusterImpl.Conf) : ScaleCubeClusterMessa
Protocol.Gossip.UnlockShards -> cluster.shardManager.unlockShards()
}
} catch (e: Exception) {
log.error(e) { e.message }
log.error(e.message, e)
}
}

override fun onMessage(m: Message) {
try {
log.debug { "Received message: $m" }
log.debug("Received message: {}", m)
when (val d = m.data<Protocol.Targeted>()) {
is Protocol.Targeted.ResInitialGroupMembers -> {
initialGroupMembers.ifEmpty { initialGroupMembers = d.members }
Expand All @@ -137,23 +138,23 @@ class MessageHandler(private val conf: ClusterImpl.Conf) : ScaleCubeClusterMessa
try {
cluster.raft.handle(d.message)
} catch (e: UninitializedPropertyAccessException) {
log.debug { "Received message but the cluster is not yet initialized." }
log.debug("Received message but the cluster is not yet initialized.")
}
}

is Protocol.Targeted.ShardsLocked -> runBlocking { cluster.raftManager.send(d) }
is Protocol.Targeted.ShardedActorsFinished -> runBlocking { cluster.raftManager.send(d) }
}
} catch (e: Exception) {
log.error(e) { e.message }
log.error(e.message, e)
}
}

override fun onMembershipEvent(e: MembershipEvent) {
@Suppress("WHEN_ENUM_CAN_BE_NULL_IN_JAVA") when (e.type()) {
MembershipEvent.Type.ADDED -> {
val m = e.member()
log.info { "New node found: $m" }
log.info("New node found: $m")

val metadata = ByteArray(e.newMetadata().remaining())
.also { e.newMetadata().get(it) }
Expand All @@ -165,12 +166,12 @@ class MessageHandler(private val conf: ClusterImpl.Conf) : ScaleCubeClusterMessa
}

MembershipEvent.Type.REMOVED -> {
log.info { "Node removed: ${e.member().alias()}" }
log.info("Node removed: ${e.member().alias()}")
cluster.unregisterGrpcClient(e.member().alias())
}

MembershipEvent.Type.LEAVING -> log.info { "Node is leaving ${e.member().alias()}" }
MembershipEvent.Type.UPDATED -> log.info { "Node updated: ${e.member().alias()}" }
MembershipEvent.Type.LEAVING -> log.info("Node is leaving ${e.member().alias()}")
MembershipEvent.Type.UPDATED -> log.info("Node updated: ${e.member().alias()}")
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package io.github.smyrgeorge.actor4k.cluster.grpc

import io.github.oshai.kotlinlogging.KotlinLogging
import io.github.smyrgeorge.actor4k.actor.Actor
import io.github.smyrgeorge.actor4k.actor.ref.ActorRef
import io.github.smyrgeorge.actor4k.cluster.ClusterImpl
import io.github.smyrgeorge.actor4k.proto.NodeServiceGrpcKt
import io.github.smyrgeorge.actor4k.system.ActorSystem
import org.slf4j.LoggerFactory
import io.github.smyrgeorge.actor4k.proto.Cluster as ClusterProto

class GrpcService : NodeServiceGrpcKt.NodeServiceCoroutineImplBase() {

private val log = KotlinLogging.logger {}
private val log = LoggerFactory.getLogger(this::class.java)

suspend fun request(m: Envelope): Envelope.Response =
when (m) {
Expand All @@ -27,7 +27,7 @@ class GrpcService : NodeServiceGrpcKt.NodeServiceCoroutineImplBase() {
val res = actor.ask<Any>(msg)
Envelope.Response.ok(request.shard, res).toProto()
} catch (e: Exception) {
log.error(e) { e.message }
log.error(e.message, e)
e.toResponse(request.shard)
}
}
Expand All @@ -39,7 +39,7 @@ class GrpcService : NodeServiceGrpcKt.NodeServiceCoroutineImplBase() {
actor.tell(msg)
Envelope.Response.ok(request.shard, ".").toProto()
} catch (e: Exception) {
log.error(e) { e.message }
log.error(e.message, e)
e.toResponse(request.shard)
}
}
Expand All @@ -50,7 +50,7 @@ class GrpcService : NodeServiceGrpcKt.NodeServiceCoroutineImplBase() {
val res = Envelope.GetActor.Ref(request.shard, request.actorClazz, actor.name, actor.key)
Envelope.Response.ok(request.shard, res).toProto()
} catch (e: Exception) {
log.error(e) { e.message }
log.error(e.message, e)
e.toResponse(request.shard)
}
}
Expand Down
Loading

0 comments on commit 1d48eb0

Please sign in to comment.