Skip to content

Commit

Permalink
#141 Generalize consumers and event streams (#147)
Browse files Browse the repository at this point in the history
  • Loading branch information
vityaman authored Jun 14, 2024
1 parent 4c17fb8 commit 8044cd6
Show file tree
Hide file tree
Showing 41 changed files with 439 additions and 358 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package ru.vityaman.lms.botalka.app.spring.task
package ru.vityaman.lms.botalka.app.spring.event

import com.fasterxml.jackson.databind.ObjectMapper
import org.springframework.beans.factory.annotation.Qualifier
Expand All @@ -13,7 +13,7 @@ import ru.vityaman.lms.botalka.storage.kafka.KafkaTopic

@Configuration
class KafkaConfig(private val jackson: ObjectMapper) {
@Qualifier(BeanName.PUBLICATION_TOPIC)
@Qualifier(BeanName.HOMEWORK_TOPIC)
@Bean
fun publicationTopic(
@Value("\${broker.topic.publication}") topic: String,
Expand All @@ -31,6 +31,6 @@ class KafkaConfig(private val jackson: ObjectMapper) {
)

object BeanName {
const val PUBLICATION_TOPIC = "publicationTopic"
const val HOMEWORK_TOPIC = "homeworkTopic"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package ru.vityaman.lms.botalka.app.spring.event.homework

class SpringConfig {
object BeanName {
const val KAFKA_EVENT_SOURCE = "kafkaHomeworkEventSource"
const val KAFKA_CONSUMER = "kafkaHomeworkConsumer"
const val TELEGRAM_CONSUMER = "telegramHomeworkConsumer"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package ru.vityaman.lms.botalka.app.spring.event.homework

import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.beans.factory.annotation.Value
import org.springframework.stereotype.Component
import ru.vityaman.lms.botalka.app.spring.event.KafkaConfig
import ru.vityaman.lms.botalka.core.event.EventSource
import ru.vityaman.lms.botalka.core.model.Homework
import ru.vityaman.lms.botalka.storage.kafka.BasicKafkaConsumer
import ru.vityaman.lms.botalka.storage.kafka.KafkaTopic

@Component
@Qualifier(SpringConfig.BeanName.KAFKA_EVENT_SOURCE)
class SpringEventSource(
@Value("\${broker.bootstrap-servers}")
bootstrapServers: String,

@Value("\${broker.consumer.publication.group}")
groupId: String,

@Qualifier(KafkaConfig.BeanName.HOMEWORK_TOPIC)
topic: KafkaTopic<Homework.Id, Homework>,
) : EventSource<Homework> by
BasicKafkaConsumer(
BasicKafkaConsumer.Config(
bootstrapServers = bootstrapServers,
groupId = groupId,
),
topic,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package ru.vityaman.lms.botalka.app.spring.event.homework

import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.beans.factory.annotation.Value
import org.springframework.stereotype.Component
import ru.vityaman.lms.botalka.app.spring.event.KafkaConfig
import ru.vityaman.lms.botalka.commons.Consumer
import ru.vityaman.lms.botalka.core.model.Homework
import ru.vityaman.lms.botalka.storage.kafka.BasicKafkaProducer
import ru.vityaman.lms.botalka.storage.kafka.KafkaTopic

@Component
@Qualifier(SpringConfig.BeanName.KAFKA_CONSUMER)
class SpringKafkaConsumer(
@Value("\${broker.bootstrap-servers}")
bootstrapServers: String,

@Qualifier(KafkaConfig.BeanName.HOMEWORK_TOPIC)
topic: KafkaTopic<Homework.Id, Homework>,
) : Consumer<Homework> by
BasicKafkaProducer(BasicKafkaProducer.Config(bootstrapServers), topic)
.asConsumerWithKey(Homework::id)
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package ru.vityaman.lms.botalka.app.spring.task
package ru.vityaman.lms.botalka.app.spring.event.homework

import kotlinx.coroutines.runBlocking
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
import ru.vityaman.lms.botalka.app.spring.storage.MainR2dbcConfig
import ru.vityaman.lms.botalka.commons.Consumer
import ru.vityaman.lms.botalka.core.event.EventPublicationTask
import ru.vityaman.lms.botalka.core.event.domain.HomeworkEvents
import ru.vityaman.lms.botalka.core.event.loggingEventCallbacks
import ru.vityaman.lms.botalka.core.logging.Slf4jLog
import ru.vityaman.lms.botalka.core.publication.PublicationConsumer
import ru.vityaman.lms.botalka.core.publication.logging.loggingPublicationCallbacks
import ru.vityaman.lms.botalka.core.publication.task.PublicationTask
import ru.vityaman.lms.botalka.core.model.Homework
import ru.vityaman.lms.botalka.core.storage.HomeworkStorage
import ru.vityaman.lms.botalka.core.tx.TxEnv
import java.time.Clock
Expand All @@ -18,20 +20,20 @@ import java.util.concurrent.TimeUnit
class SpringPublicationTask(
homeworks: HomeworkStorage,

@Qualifier(SpringPublicationConfig.BeanName.KAFKA_CONSUMER)
consumer: PublicationConsumer,
@Qualifier(SpringConfig.BeanName.KAFKA_CONSUMER)
consumer: Consumer<Homework>,

@Qualifier(MainR2dbcConfig.BeanName.TX_ENV)
mainTx: TxEnv,

clock: Clock,
) {
private val logic = PublicationTask(
private val logic = EventPublicationTask(
consumer = consumer,
homeworks = homeworks,
txEnv = mainTx,
clock = clock,
callbacks = loggingPublicationCallbacks(Slf4jLog("PublicationTask")),
notifications = HomeworkEvents(homeworks, mainTx, clock),
callbacks = loggingEventCallbacks(
Slf4jLog("HomeworkNotifyPushTask"),
),
)

@Scheduled(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package ru.vityaman.lms.botalka.app.spring.event.homework

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.beans.factory.annotation.Value
import org.springframework.stereotype.Component
import ru.vityaman.lms.botalka.commons.Consumer
import ru.vityaman.lms.botalka.core.event.EventConsumingActor
import ru.vityaman.lms.botalka.core.event.EventSource
import ru.vityaman.lms.botalka.core.event.aspect.SlownessConfig
import ru.vityaman.lms.botalka.core.event.aspect.sequenced
import ru.vityaman.lms.botalka.core.event.aspect.slowed
import ru.vityaman.lms.botalka.core.logging.Slf4jLog
import ru.vityaman.lms.botalka.core.model.Homework

@Component
data class SpringTelegramConfig(
@Value("\${external.service.telegram.duration-seconds.retry.min}")
val retryDurationSecondsMin: Int,

@Value("\${external.service.telegram.duration-seconds.retry.max}")
val retryDurationSecondsMax: Int,

@Value("\${external.service.telegram.duration-seconds.relax.min}")
val relaxDurationSecondsMin: Int,

@Value("\${external.service.telegram.duration-seconds.relax.max}")
val relaxDurationSecondsMax: Int,
)

@Component
class SpringTelegramActor(
@Qualifier(SpringConfig.BeanName.KAFKA_EVENT_SOURCE)
mailbox: EventSource<Homework>,

@Qualifier(SpringConfig.BeanName.TELEGRAM_CONSUMER)
consumer: Consumer<Homework>,

config: SpringTelegramConfig,
) {
private val scope = CoroutineScope(Dispatchers.Default)
private val log = Slf4jLog("TelegramActor")
private val logic = EventConsumingActor(
mailbox = mailbox,
consumer = consumer,
callbacks = sequenced(
EventConsumingActor.Callbacks(
onStart = { log.info("Starting...") },
onSuccess = { log.info("Sent homework with id ${it.id}") },
onError = { log.info("Failed: ${it.message}") },
),
slowed(
SlownessConfig(
retryDurationSecondsMin = config.retryDurationSecondsMin,
retryDurationSecondsMax = config.retryDurationSecondsMax,
relaxDurationSecondsMin = config.relaxDurationSecondsMin,
relaxDurationSecondsMax = config.relaxDurationSecondsMax,
),
),
),
)

init {
scope.launch { logic.run() }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package ru.vityaman.lms.botalka.app.spring.event.homework

import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.beans.factory.annotation.Value
import org.springframework.stereotype.Component
import ru.vityaman.lms.botalka.commons.Consumer
import ru.vityaman.lms.botalka.core.external.telegram.TelegramBot
import ru.vityaman.lms.botalka.core.external.telegram.TelegramChat
import ru.vityaman.lms.botalka.core.external.telegram.TelegramConsumer
import ru.vityaman.lms.botalka.core.model.Homework

@Component
@Qualifier(SpringConfig.BeanName.TELEGRAM_CONSUMER)
class SpringTelegramConsumer(
telegram: TelegramBot,

@Value("\${external.service.telegram.admin-chat-id}")
adminChatId: Long,
) : Consumer<Homework> by
TelegramConsumer(telegram, TelegramChat(adminChatId), {
buildString {
append("Published homework '${it.title.text}'!\n")
append("\n")
append("${it.description}\n")
append("\n")
append("MaxScore: ${it.maxScore.value}\n")
append("Deadline: ${it.deadlineMoment}\n")
append("Id: ${it.id.number}\n")
}
})

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit 8044cd6

Please sign in to comment.