Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/#91. Batch 모듈 추가, Job(질문발행알림) 추가 #99

Merged
merged 12 commits into from
May 21, 2024
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,6 @@ build
### Configuration ###
**/src/**/*.yml
**/src/**/*.json
!src/**/application.yml
!**/*/application.yml
!**/*/*-test.yml
!src/test/**/*.yml
2 changes: 1 addition & 1 deletion adevspoon-api/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
FROM openjdk:17-jdk-slim
ARG JAR_FILE=./build/libs/mentos-api-0.0.1-SNAPSHOT.jar
ARG JAR_FILE=./build/libs/adevspoon-api-0.0.1-SNAPSHOT.jar
EXPOSE 80

COPY ${JAR_FILE} app.jar
Expand Down
2 changes: 1 addition & 1 deletion adevspoon-api/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ dependencies {
tasks.register("copyConfig", Copy::class) {
copy {
from("../adevspoon-config/backend/api")
include("*.yml", "*.xml")
include("*.yml", "*.xml", "*.json")
into("src/main/resources")
}
}
5 changes: 4 additions & 1 deletion adevspoon-api/src/main/resources/application-api-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,7 @@ jwt:

custom:
image:
temp-dir: /Users/rokwon/Downloads/image-temp
temp-dir: /Users/rokwon/Downloads/image-temp

firebase:
key-path: firebaseKey.json
1 change: 1 addition & 0 deletions adevspoon-api/src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ spring:
server:
servlet:
context-path: /api
shutdown: graceful

springdoc:
api-docs:
Expand Down
13 changes: 13 additions & 0 deletions adevspoon-batch/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# 빌드 파일 옮기기
FROM public.ecr.aws/sam/build-java17:1.116.0-20240430173307 as artifact-image
WORKDIR "/task"
ARG JAR_FILE=./adevspoon-batch/build/libs/adevspoon-batch-0.0.1-SNAPSHOT.jar
COPY ${JAR_FILE} app.jar

# Lambda Web Adatper 추가 및 실행
FROM public.ecr.aws/docker/library/amazoncorretto:17-al2023-headless
COPY --from=public.ecr.aws/awsguru/aws-lambda-adapter:0.8.3 /lambda-adapter /opt/extensions/lambda-adapter
ENV PORT=8080
WORKDIR /opt
COPY --from=artifact-image /task/app.jar /opt
CMD ["java", "-jar", "app.jar", "--server.port=${PORT}", "-Duser.timezone=Awsia/Seoul", "-Dspring.profiles.active=prod"]
36 changes: 36 additions & 0 deletions adevspoon-batch/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* This file was generated by the Gradle 'init' task.
*/

tasks.getByName("bootJar") {
enabled = true
}

tasks.getByName("jar") {
enabled = false
}

plugins {
id("com.adevspoon.kotlin-common-conventions")
}

dependencies {
implementation(project(":adevspoon-common"))
implementation(project(":adevspoon-domain"))
implementation(project(":adevspoon-infrastructure"))

implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.boot:spring-boot-starter-batch")
implementation("org.springframework.boot:spring-boot-starter-data-jpa")

testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.springframework.batch:spring-batch-test")
}

tasks.register("copyConfig", Copy::class) {
copy {
from("../adevspoon-config/backend/batch")
include("*.yml", "*.xml", "*.json")
into("src/main/resources")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.adevspoon.batch

import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication

@SpringBootApplication(scanBasePackages = ["com.adevspoon"])
class AdevspoonBatchServerApplication

fun main(args: Array<String>) {
runApplication<AdevspoonBatchServerApplication>(*args)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.adevspoon.batch.config

import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.context.annotation.Primary
import org.springframework.core.task.TaskExecutor
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor


@Configuration
class BatchTaskConfig {
@Bean
@Primary
fun batchTaskExecutor(): TaskExecutor = ThreadPoolTaskExecutor()
.apply {
corePoolSize = 5
maxPoolSize = 10
setThreadNamePrefix("batch-task-")
setWaitForTasksToCompleteOnShutdown(true)
initialize()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.adevspoon.batch.controller

import com.adevspoon.batch.job.JobExecutionFacade
import org.springframework.http.ResponseEntity
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController

@RestController
@RequestMapping("/events")
class EventProcessController(
private val jobExecutionFacade: JobExecutionFacade
) {
@PostMapping
fun processEvents(@RequestBody req: Map<String, Any>): ResponseEntity<String> {
jobExecutionFacade.executeJob()
return ResponseEntity.ok("Events processed")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.adevspoon.batch.job

import org.springframework.batch.core.Job
import org.springframework.batch.core.JobParametersBuilder
import org.springframework.batch.core.launch.JobLauncher
import org.springframework.batch.core.launch.support.TaskExecutorJobLauncher
import org.springframework.context.ApplicationContext
import org.springframework.core.task.TaskExecutor
import org.springframework.stereotype.Service
import java.time.LocalDateTime

@Service
class JobExecutionFacade(
private val jobLauncher: JobLauncher,
private val batchTaskExecutor: TaskExecutor,
private val applicationContext: ApplicationContext,
) {
fun executeJob() {
(jobLauncher as TaskExecutorJobLauncher).setTaskExecutor(batchTaskExecutor)

// TODO: 추후 Event Type에 따라 Job, JobParameter 나누기
val jobParameters = JobParametersBuilder()
.addString("date", LocalDateTime.now().toString())
.toJobParameters()
val job = applicationContext.getBean(QuestionPublishedNotification.JOB_NAME, Job::class.java)
jobLauncher.run(job, jobParameters)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

배치 처리할 때, 네트워크 오류가 있을 수도 있으니 여기에서도 retry 로직 추가하는게 어떨까요?
(찾아보니 RetryTemplate 혹은 Retryable 애노테이션이 있어요.)

Copy link
Member Author

@RokwonK RokwonK May 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Batch의 각 작업(Job)의 Step 내에서 retry를 지원해주기는 합니다.
그런데 제가 정의한 작업의 경우 멀티스레딩으로 처리하는데 이 경우 실패(오류) 시 재시도할 지점을 명확히 할 수 없다는 단점이 있습니다.(ex. 1~5 스레드 중 3번 처리는 무사히 끝났는데 1번 처리가 잘못되었을때, 만약 1번 처리부터 재시도하면 3번은 중복 처리는 되는 문제)

때문에 멀티스레딩의 경우 특별한 상황이 아니면 명시적으로 재시도를 막는게 좋습니다. 잘못하면 처리된 것들도 재시도를 할 수 있기 때문입니다. QuestionPublishedNotification 로직을 보시면 job() 메서드의 preventRestart 나 reader() 메서드 saveState() 메서드가 그러한 역할을 합니다.

실패지점을 기억하고 해당 지점부터 재시작할 수 있는 직렬처리가 아닌 멀티스레드로 처리한 이유는 내결함성이 있어야하는, 반드시 처리 되어야만하는 작업(단순 정보성 푸시 알림이기 때문)은 아니기 때문에 작업을 빠르게 처리할 수 있는 멀티스레드 방식을 선택했습니다.

추가적으로 Batch Job의 각 Step에서 재시도를 지원하는 메서드는 retryLimit입니다.

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package com.adevspoon.batch.job

import com.adevspoon.domain.member.domain.UserEntity
import com.adevspoon.infrastructure.alarm.dto.AlarmType
import com.adevspoon.infrastructure.alarm.service.AlarmAdapter
import com.adevspoon.infrastructure.notification.dto.GroupNotificationInfo
import com.adevspoon.infrastructure.notification.dto.NotificationType
import com.adevspoon.infrastructure.notification.service.PushNotificationAdapter
import jakarta.persistence.EntityManagerFactory
import org.slf4j.LoggerFactory
import org.springframework.batch.core.*
import org.springframework.batch.core.configuration.annotation.JobScope
import org.springframework.batch.core.configuration.annotation.StepScope
import org.springframework.batch.core.job.builder.JobBuilder
import org.springframework.batch.core.repository.JobRepository
import org.springframework.batch.core.step.builder.StepBuilder
import org.springframework.batch.item.ExecutionContext
import org.springframework.batch.item.ItemWriter
import org.springframework.batch.item.database.JpaPagingItemReader
import org.springframework.batch.item.database.builder.JpaPagingItemReaderBuilder
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.core.task.TaskExecutor
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
import org.springframework.transaction.PlatformTransactionManager


@Configuration
class QuestionPublishedNotification(
private val transactionManager: PlatformTransactionManager,
private val entityManagerFactory: EntityManagerFactory,
private val pushNotificationAdapter: PushNotificationAdapter,
private val alarmAdapter: AlarmAdapter,
){
private val logger = LoggerFactory.getLogger(this.javaClass)!!
private val chunkSize = 500
private val successCountKey = "successCount"
private val failCountKey = "failCount"

companion object {
const val JOB_NAME = "질문발급_푸시알림"
}

@Bean(JOB_NAME)
fun job(jobRepository: JobRepository): Job {
return JobBuilder(JOB_NAME, jobRepository)
.preventRestart()
.start(notificationStep(jobRepository))
.listener(jobListener())
.build()
}

// 멀티쓰레드 처리
@Bean(JOB_NAME + "_step")
@JobScope
fun notificationStep(jobRepository: JobRepository): Step {
return StepBuilder(JOB_NAME + "_step", jobRepository)
.chunk<UserEntity, UserEntity>(chunkSize, transactionManager)
.reader(reader())
.writer(pushWriter(null))
.taskExecutor(executor())
.build()
}

@Bean(JOB_NAME + "_taskPool")
fun executor(): TaskExecutor {
val executor = ThreadPoolTaskExecutor()
executor.corePoolSize = 5
executor.maxPoolSize = 5
executor.setThreadNamePrefix("task-step-")
executor.setWaitForTasksToCompleteOnShutdown(true)
executor.initialize()
return executor
}

@Bean(JOB_NAME + "_reader")
@StepScope
fun reader(): JpaPagingItemReader<UserEntity> {
return JpaPagingItemReaderBuilder<UserEntity>()
.name(JOB_NAME + "_reader")
.queryString("SELECT u FROM UserEntity u WHERE u.fcmToken IS NOT NULL")
.pageSize(chunkSize)
.entityManagerFactory(entityManagerFactory)
.saveState(false)
.build()
}

@Bean
@StepScope
fun pushWriter(@Value("#{stepExecution.jobExecution.executionContext}") jobExecutionContext: ExecutionContext?): ItemWriter<UserEntity> {
return ItemWriter {
val notificationResponse = pushNotificationAdapter.sendMessageSync(
GroupNotificationInfo(
NotificationType.QUESTION_OPENED,
it.items.map { user -> user.fcmToken!! }
)
)

jobExecutionContext?.let {
synchronized(jobExecutionContext) {
val successCount = jobExecutionContext.getInt(successCountKey, 0)
val failCount = jobExecutionContext.getInt(failCountKey, 0)
jobExecutionContext.put(successCountKey, successCount + notificationResponse.success)
jobExecutionContext.put(failCountKey, failCount + notificationResponse.failure)
}
}
}
}

@Bean(JOB_NAME + "_listener")
fun jobListener(): JobExecutionListener {
return object : JobExecutionListener {
override fun afterJob(jobExecution: JobExecution) {
val pushSuccess = jobExecution.executionContext.getInt("successCount", -1)
val pushFail = jobExecution.executionContext.getInt("failCount", -1)
val jobInfo = mapOf<String, Any>(
"Batch Name" to JOB_NAME,
"Push Success" to pushSuccess,
"Push Fail" to pushFail,
)

if (jobExecution.status == BatchStatus.COMPLETED) {
logger.info("Push Finished - 성공: $pushSuccess, 실패: $pushFail")
alarmAdapter.sendAlarm(AlarmType.BATCH_COMPLETE, jobInfo)
} else {
logger.error("Push Failed - 성공: $pushSuccess, 실패: $pushFail")
alarmAdapter.sendAlarm(AlarmType.BATCH_ERROR, jobInfo)
}
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.adevspoon.batch.job

import jakarta.persistence.EntityManagerFactory
import org.springframework.context.annotation.Configuration

// TODO: 추후 예정
@Configuration
class QuestionReminderNotification(
private val entityManagerFactory: EntityManagerFactory
) {
private val chunkSize = 500

companion object {
const val JOB_NAME = "questionReminderNotificationJob"
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
spring:
batch:
jdbc:
initialize-schema: always
job:
enabled: false

firebase:
key-path: firebaseKey.json
13 changes: 13 additions & 0 deletions adevspoon-batch/src/main/resources/application.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
spring:
profiles:
default: local
group:
local:
- batch-local
dev:
- batch-dev
prod:
- batch-prod

server:
shutdown: graceful
8 changes: 0 additions & 8 deletions adevspoon-common/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,6 @@ tasks.getByName("jar") {
enabled = true
}

tasks.register("copyConfig", Copy::class) {
copy {
from("../adevspoon-config/backend/common")
include("*.yml", "*.xml")
into("src/main/resources")
}
}

plugins {
id("com.adevspoon.kotlin-common-conventions")
}
Expand Down
Empty file.
2 changes: 1 addition & 1 deletion adevspoon-config
Loading