Skip to content

Commit

Permalink
Change load scenario
Browse files Browse the repository at this point in the history
  • Loading branch information
Andryss committed May 15, 2024
1 parent b1fcfa5 commit c5a4f52
Show file tree
Hide file tree
Showing 15 changed files with 239 additions and 215 deletions.
1 change: 1 addition & 0 deletions common/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ kotlin {
dependencies {
implementation(libs.redisson)
implementation(libs.kotlinx.serialization.json)
implementation(libs.kotlinx.coroutines.core)
implementation(libs.org.eclipse.paho.client.mqttv3)
}
23 changes: 20 additions & 3 deletions common/src/main/kotlin/org/vivlaniv/nexohub/util/mqtt/Mqtt.kt
Original file line number Diff line number Diff line change
@@ -1,20 +1,37 @@
package org.vivlaniv.nexohub.util.mqtt

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.launch
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
import org.eclipse.paho.client.mqttv3.MqttClient
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

val mqttLogger: Logger = LoggerFactory.getLogger("mqtt")

val mqttScope = CoroutineScope(
ThreadPoolExecutor(
4,
16,
60, TimeUnit.SECONDS,
LinkedBlockingQueue()
).asCoroutineDispatcher()
)

inline fun <reified T> MqttClient.subscribe(
topic: String,
crossinline onReceived: (String, T) -> Unit
crossinline onReceived: suspend CoroutineScope.(String, T) -> Unit
) {
subscribe(topic, 2) { tpc, msg ->
mqttLogger.info("got message on topic $tpc: $msg")
onReceived(tpc, Json.decodeFromString<T>(msg.payload.decodeToString()))
mqttScope.launch {
mqttLogger.info("got message on topic $tpc: $msg")
onReceived(tpc, Json.decodeFromString<T>(msg.payload.decodeToString()))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,7 @@ import org.vivlaniv.nexohub.handlers.configureSearchDeviceHandler
import org.vivlaniv.nexohub.handlers.configureSendDeviceSignalHandler
import org.vivlaniv.nexohub.handlers.configureSignInHandler
import org.vivlaniv.nexohub.handlers.configureSignUpHandler
import org.vivlaniv.nexohub.util.redis.publish
import org.vivlaniv.nexohub.util.redis.subscribe
import java.util.Properties
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Future
import java.util.concurrent.TimeoutException

class Application(
val log: Logger,
Expand Down Expand Up @@ -78,25 +72,3 @@ fun main() {

log.info("connection-service started")
}

val waitResponse = ConcurrentHashMap<String, CompletableFuture<TaskResult>>()

inline fun <reified T : Task> RedissonClient.sendAsync(topic: String, task: T): Future<TaskResult> {
val resultFuture = CompletableFuture<TaskResult>().completeAsync {
Thread.sleep(3_000)
waitResponse.remove(task.id)
throw TimeoutException()
}
waitResponse[task.id] = resultFuture
publish<T>(topic, task)
return resultFuture
}

inline fun <reified T : TaskResult> completeAsync(result: T): Boolean {
return waitResponse.remove(result.tid)?.complete(result) ?: false
}

inline fun <reified T : TaskResult> RedissonClient.requireAsyncCompleter(topic: String) {
if (getTopic(topic).countListeners() > 0) return
subscribe<T>(topic) { completeAsync<T>(it) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import org.vivlaniv.nexohub.GetDevicesPropertiesTaskResult
import org.vivlaniv.nexohub.GetSavedDevicesTask
import org.vivlaniv.nexohub.GetSavedDevicesTaskResult
import org.vivlaniv.nexohub.SaveSensorsTask
import org.vivlaniv.nexohub.requireAsyncCompleter
import org.vivlaniv.nexohub.sendAsync
import org.vivlaniv.nexohub.util.async.requireAsyncCompleter
import org.vivlaniv.nexohub.util.async.sendAsync
import org.vivlaniv.nexohub.util.mqtt.publish
import org.vivlaniv.nexohub.util.mqtt.subscribe
import org.vivlaniv.nexohub.util.redis.publish
import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeoutException

fun Application.configureFetchAllPropertiesHandler() {
val getSavedDevicesTopic = props.getProperty("topic.saved.devices", "devices/saved/get")
Expand All @@ -28,18 +28,18 @@ fun Application.configureFetchAllPropertiesHandler() {
val user = topic.substringBefore('/')

val response = try {
val savedDevicesIds = (redis.sendAsync(
val savedDevicesIds = (sendAsync(redis,
"$getSavedDevicesTopic/in",
GetSavedDevicesTask(user = user)
).get() as GetSavedDevicesTaskResult)
).await() as GetSavedDevicesTaskResult)
.devices.mapTo(mutableListOf()) { it.id }

request.include?.let { savedDevicesIds.retainAll(it) }

val getDevicesPropsResult = redis.sendAsync(
val getDevicesPropsResult = sendAsync(redis,
"$getDevicesPropertiesTopic/in",
GetDevicesPropertiesTask(user = user, include = savedDevicesIds)
).get() as GetDevicesPropertiesTaskResult
).await() as GetDevicesPropertiesTaskResult

if (getDevicesPropsResult.code != 0) {
FetchDevicesPropertiesTaskResult(request.id, getDevicesPropsResult.code, getDevicesPropsResult.errorMessage)
Expand All @@ -49,7 +49,7 @@ fun Application.configureFetchAllPropertiesHandler() {
SaveSensorsTask(sensors = getDevicesPropsResult.properties!!)
)
FetchDevicesPropertiesTaskResult(request.id, properties = getDevicesPropsResult.properties)
} catch (e: ExecutionException) {
} catch (e: TimeoutException) {
FetchDevicesPropertiesTaskResult(request.id, 1, "timeout occurred, try again later")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ import org.vivlaniv.nexohub.GetSavedDevicesTask
import org.vivlaniv.nexohub.GetSavedDevicesTaskResult
import org.vivlaniv.nexohub.SaveSensorsTask
import org.vivlaniv.nexohub.SavedDevice
import org.vivlaniv.nexohub.requireAsyncCompleter
import org.vivlaniv.nexohub.sendAsync
import org.vivlaniv.nexohub.util.async.requireAsyncCompleter
import org.vivlaniv.nexohub.util.async.sendAsync
import org.vivlaniv.nexohub.util.mqtt.publish
import org.vivlaniv.nexohub.util.mqtt.subscribe
import org.vivlaniv.nexohub.util.redis.publish
import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeoutException

fun Application.configureFetchSavedDevicesHandler() {
val getSavedDevicesTopic = props.getProperty("topic.saved.devices", "devices/saved/get")
Expand All @@ -28,18 +28,18 @@ fun Application.configureFetchSavedDevicesHandler() {
val user = topic.substringBefore('/')

val response = try {
val savedDevices = (redis.sendAsync(
val savedDevices = (sendAsync(redis,
"$getSavedDevicesTopic/in",
GetSavedDevicesTask(user = user)
).get() as GetSavedDevicesTaskResult)
).await() as GetSavedDevicesTaskResult)
.devices

val savedDevicesMap = mapOf(*savedDevices.map { it.id to it }.toTypedArray())

val getDevicesResult = redis.sendAsync(
val getDevicesResult = sendAsync(redis,
"$getDevicesTopic/in",
GetDevicesTask(user = user, include = savedDevicesMap.keys.toList())
).get() as GetDevicesTaskResult
).await() as GetDevicesTaskResult

if (getDevicesResult.code != 0) {
FetchSavedDevicesTaskResult(tid = request.id, code = getDevicesResult.code, errorMessage = getDevicesResult.errorMessage)
Expand All @@ -52,7 +52,7 @@ fun Application.configureFetchSavedDevicesHandler() {
val savedDevicesResponse = devices.map { SavedDevice(it.id, it.type,
savedDevicesMap[it.id]?.room, savedDevicesMap[it.id]?.alias, it.properties, it.signals) }
FetchSavedDevicesTaskResult(tid = request.id, devices = savedDevicesResponse)
} catch (e: ExecutionException) {
} catch (e: TimeoutException) {
FetchSavedDevicesTaskResult(request.id, 1, "timeout occurred, try again later")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import org.vivlaniv.nexohub.PutDevicePropertyTask
import org.vivlaniv.nexohub.PutDevicePropertyTaskResult
import org.vivlaniv.nexohub.SetDevicePropertyTask
import org.vivlaniv.nexohub.SetDevicePropertyTaskResult
import org.vivlaniv.nexohub.requireAsyncCompleter
import org.vivlaniv.nexohub.sendAsync
import org.vivlaniv.nexohub.util.async.requireAsyncCompleter
import org.vivlaniv.nexohub.util.async.sendAsync
import org.vivlaniv.nexohub.util.mqtt.publish
import org.vivlaniv.nexohub.util.mqtt.subscribe
import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeoutException

fun Application.configurePutDevicePropertyHandler() {
val getSavedDevicesTopic = props.getProperty("topic.saved.devices", "devices/saved/get")
Expand All @@ -26,28 +26,28 @@ fun Application.configurePutDevicePropertyHandler() {
val device = request.device

val response = try {
val savedDevicesIds = (redis.sendAsync(
val savedDevicesIds = (sendAsync(redis,
"$getSavedDevicesTopic/in",
GetSavedDevicesTask(user = user)
).get() as GetSavedDevicesTaskResult)
).await() as GetSavedDevicesTaskResult)
.devices.map { it.id }

if (device !in savedDevicesIds) {
PutDevicePropertyTaskResult(request.id, 1, "device $device not found", device)
}

val setDevicePropResult = redis.sendAsync(
val setDevicePropResult = sendAsync(redis,
"$setDevicePropertyTopic/in",
SetDevicePropertyTask(
user = user,
device = device,
name = request.property,
value = request.value
)
).get() as SetDevicePropertyTaskResult
).await() as SetDevicePropertyTaskResult

PutDevicePropertyTaskResult(request.id, setDevicePropResult.code, setDevicePropResult.errorMessage, device)
} catch (e: ExecutionException) {
} catch (e: TimeoutException) {
PutDevicePropertyTaskResult(request.id, 1, "timeout occurred, try again later", device)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import org.vivlaniv.nexohub.SaveDeviceTask
import org.vivlaniv.nexohub.SaveDeviceTaskResult
import org.vivlaniv.nexohub.SaveUserDeviceTask
import org.vivlaniv.nexohub.SaveUserDeviceTaskResult
import org.vivlaniv.nexohub.requireAsyncCompleter
import org.vivlaniv.nexohub.sendAsync
import org.vivlaniv.nexohub.util.async.requireAsyncCompleter
import org.vivlaniv.nexohub.util.async.sendAsync
import org.vivlaniv.nexohub.util.mqtt.publish
import org.vivlaniv.nexohub.util.mqtt.subscribe
import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeoutException

fun Application.configureSaveDeviceHandler() {
val getDeviceTopic = props.getProperty("topic.get.device", "device/get")
Expand All @@ -24,16 +24,16 @@ fun Application.configureSaveDeviceHandler() {
val user = topic.substringBefore('/')

val response = try {
val getDeviceResult = redis.sendAsync(
val getDeviceResult = sendAsync(redis,
"$getDeviceTopic/in",
GetDeviceTask(user = user, device = request.device)
).get() as GetDeviceTaskResult
).await() as GetDeviceTaskResult

if (getDeviceResult.code != 0) {
SaveDeviceTaskResult(request.id, getDeviceResult.code, getDeviceResult.errorMessage)
}

val saveDeviceResult = redis.sendAsync(
val saveDeviceResult = sendAsync(redis,
"$saveDeviceTopic/in",
SaveUserDeviceTask(
type = getDeviceResult.device!!.type,
Expand All @@ -42,10 +42,10 @@ fun Application.configureSaveDeviceHandler() {
room = request.room,
alias = request.alias
)
).get() as SaveUserDeviceTaskResult
).await() as SaveUserDeviceTaskResult

SaveDeviceTaskResult(request.id, saveDeviceResult.code, saveDeviceResult.errorMessage)
} catch (e: ExecutionException) {
} catch (e: TimeoutException) {
SaveDeviceTaskResult(request.id, 1, "timeout occurred, try again later")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import org.vivlaniv.nexohub.GetSavedDevicesTask
import org.vivlaniv.nexohub.GetSavedDevicesTaskResult
import org.vivlaniv.nexohub.SearchDevicesTask
import org.vivlaniv.nexohub.SearchDevicesTaskResult
import org.vivlaniv.nexohub.requireAsyncCompleter
import org.vivlaniv.nexohub.sendAsync
import org.vivlaniv.nexohub.util.async.requireAsyncCompleter
import org.vivlaniv.nexohub.util.async.sendAsync
import org.vivlaniv.nexohub.util.mqtt.publish
import org.vivlaniv.nexohub.util.mqtt.subscribe
import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeoutException

fun Application.configureSearchDeviceHandler() {
val getSavedDevicesTopic = props.getProperty("topic.saved.devices", "devices/saved/get")
Expand All @@ -24,20 +24,20 @@ fun Application.configureSearchDeviceHandler() {
val user = topic.substringBefore('/')

val response = try {
val savedDevicesIds = (redis.sendAsync(
val savedDevicesIds = (sendAsync(redis,
"$getSavedDevicesTopic/in",
GetSavedDevicesTask(user = user)
).get() as GetSavedDevicesTaskResult)
).await() as GetSavedDevicesTaskResult)
.devices.map { it.id }

val unknownDevices = (redis.sendAsync(
val unknownDevices = (sendAsync(redis,
"$getDevicesTopic/in",
GetDevicesTask(user = user, exclude = savedDevicesIds)
).get() as GetDevicesTaskResult)
).await() as GetDevicesTaskResult)
.devices

SearchDevicesTaskResult(request.id, devices = unknownDevices ?: emptyList())
} catch (e: ExecutionException) {
} catch (e: TimeoutException) {
SearchDevicesTaskResult(request.id, 1, "timeout occurred, try again later")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import org.vivlaniv.nexohub.SendDeviceSignalTask
import org.vivlaniv.nexohub.SendDeviceSignalTaskResult
import org.vivlaniv.nexohub.SignalDeviceTask
import org.vivlaniv.nexohub.SignalDeviceTaskResult
import org.vivlaniv.nexohub.requireAsyncCompleter
import org.vivlaniv.nexohub.sendAsync
import org.vivlaniv.nexohub.util.async.requireAsyncCompleter
import org.vivlaniv.nexohub.util.async.sendAsync
import org.vivlaniv.nexohub.util.mqtt.publish
import org.vivlaniv.nexohub.util.mqtt.subscribe
import java.util.concurrent.ExecutionException
import java.util.concurrent.TimeoutException

fun Application.configureSendDeviceSignalHandler() {
val getSavedDevicesTopic = props.getProperty("topic.saved.devices", "devices/saved/get")
Expand All @@ -24,23 +24,23 @@ fun Application.configureSendDeviceSignalHandler() {
val user = topic.substringBefore('/')

val response = try {
val savedDevicesIds = (redis.sendAsync(
val savedDevicesIds = (sendAsync(redis,
"$getSavedDevicesTopic/in",
GetSavedDevicesTask(user = user)
).get() as GetSavedDevicesTaskResult)
).await() as GetSavedDevicesTaskResult)
.devices.map { it.id }

if (request.device !in savedDevicesIds) {
SendDeviceSignalTaskResult(request.id, 1, "device ${request.device} not found")
}

val signalDeviceResult = redis.sendAsync(
val signalDeviceResult = sendAsync(redis,
"$signalDeviceTopic/in",
SignalDeviceTask(user = user, device = request.device, name = request.signal, args = request.arguments)
).get() as SignalDeviceTaskResult
).await() as SignalDeviceTaskResult

SendDeviceSignalTaskResult(request.id, signalDeviceResult.code, signalDeviceResult.errorMessage)
} catch (e: ExecutionException) {
} catch (e: TimeoutException) {
SendDeviceSignalTaskResult(request.id, 1, "timeout occurred, try again later")
}

Expand Down
Loading

0 comments on commit c5a4f52

Please sign in to comment.