From eedae5903e0181353eec9308d75c2a8184d1560a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=9C=D0=B0=D1=80=D0=BA=D0=BE=20=D0=94=D0=BE=D1=98=D0=BA?= =?UTF-8?q?=D0=B8=D1=9B?= Date: Sun, 23 Jun 2024 22:02:01 +0200 Subject: [PATCH] Fixed issue with messages not being sent to MQTT and LogFile channels; Channel beans type changed to MessageChannel (additional cast to specific channels is now needed if required) --- .../SoftwareDevelopmentSimulationApp.java | 5 +- ...pringIntegrationMessageChannelsConfig.java | 97 +++++++++++-------- .../flow/FileHandlingFlow.java | 22 +++-- .../flow/PrintoutFlow.java | 25 +++-- src/main/resources/static/js/index.js | 7 +- 5 files changed, 92 insertions(+), 64 deletions(-) diff --git a/src/main/java/dev/markodojkic/softwaredevelopmentsimulation/SoftwareDevelopmentSimulationApp.java b/src/main/java/dev/markodojkic/softwaredevelopmentsimulation/SoftwareDevelopmentSimulationApp.java index f99fdc3..1487332 100755 --- a/src/main/java/dev/markodojkic/softwaredevelopmentsimulation/SoftwareDevelopmentSimulationApp.java +++ b/src/main/java/dev/markodojkic/softwaredevelopmentsimulation/SoftwareDevelopmentSimulationApp.java @@ -21,12 +21,9 @@ public static void main(String[] args) setupDataProvider(false); updateDevelopmentTeamsSetup(new DevelopmentTeamCreationParameters()); - Utilities.getIGateways().sendToInfo(""" - Welcome to Software development simulator™ - Developed by Ⓒ Marko Dojkić 2024$I hope you will enjoy using mine spring integration web based application"""); + Utilities.getIGateways().sendToInfo("Welcome to Software development simulator™ Developed by Ⓒ Marko Dojkić 2024$I hope you will enjoy using mine spring integration web based application"); //TODO: Correct JIRA activity stream timings - //TODO: Fix issues with some MQTT messages not reaching FE nor being saved in log file //GUI PLANS - thymeleaf diff --git a/src/main/java/dev/markodojkic/softwaredevelopmentsimulation/config/SpringIntegrationMessageChannelsConfig.java b/src/main/java/dev/markodojkic/softwaredevelopmentsimulation/config/SpringIntegrationMessageChannelsConfig.java index e493caa..f29306b 100755 --- a/src/main/java/dev/markodojkic/softwaredevelopmentsimulation/config/SpringIntegrationMessageChannelsConfig.java +++ b/src/main/java/dev/markodojkic/softwaredevelopmentsimulation/config/SpringIntegrationMessageChannelsConfig.java @@ -17,6 +17,7 @@ import org.springframework.integration.scheduling.PollerMetadata; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; import java.util.logging.Level; import java.util.logging.Logger; @@ -30,37 +31,52 @@ public class SpringIntegrationMessageChannelsConfig { private static final Logger logger = Logger.getLogger(SpringIntegrationMessageChannelsConfig.class.getName()); @Bean(name = "errorChannel") - public DirectChannel errorChannel() { + public MessageChannel errorChannel() { return new DirectChannel(); } - @Bean(name = "information.input") - public DirectChannel informationInput() { + @Bean(name = "errorChannel.mqtt.input") + public MessageChannel errorChannelMQTTInput() { return new DirectChannel(); } - @Bean(name = "jiraActivityStream.input") - public DirectChannel jiraActivityStreamInput() { + @Bean(name = "errorChannel.logFile.input") + public MessageChannel errorChannelLogFileInput() { + return new DirectChannel(); + } + + @Bean(name = "information.input") + public MessageChannel informationInput() { return new DirectChannel(); } @Bean(name = "information.mqtt.input") - public DirectChannel informationMQTTInput() { + public MessageChannel informationMQTTInput() { + return new DirectChannel(); + } + + @Bean(name = "information.logFile.input") + public MessageChannel informationLogFileInput() { return new DirectChannel(); } - @Bean(name = "error.mqtt.input") - public DirectChannel errorMQTTInput() { + @Bean(name = "jiraActivityStream.input") + public MessageChannel jiraActivityStreamInput() { return new DirectChannel(); } @Bean(name = "jiraActivityStream.mqtt.input") - public DirectChannel jiraActivityStreamMQTTInput() { + public MessageChannel jiraActivityStreamMQTTInput() { + return new DirectChannel(); + } + + @Bean(name = "jiraActivityStream.logFile.input") + public MessageChannel jiraActivityStreamLogFileInput() { return new DirectChannel(); } @Bean(name = "epicMessage.input") - public PriorityChannel epicInput(){ + public MessageChannel epicInput(){ PriorityChannel epicInputPriorityChannel = new PriorityChannel(0, (message1, message2) -> { Message baseTaskMessage1 = (Message) message1; Message baseTaskMessage2 = (Message) message2; @@ -75,161 +91,162 @@ public PriorityChannel epicInput(){ public IntegrationFlow assignEpicFlow(){ PollerMetadata pollerMetadata = new PollerMetadata(); pollerMetadata.setMaxMessagesPerPoll(1); - return IntegrationFlow.from("epicMessage.input").handle(message -> { + + return IntegrationFlow.from(epicInput()).handle(message -> { logger.log(Level.INFO, "{0} arrived - Current count: {1}", new String[]{((Epic) message.getPayload()).getId(), String.valueOf(IN_PROGRESS_EPICS_COUNT.incrementAndGet())}); - new Thread(() -> currentSprintEpic().send(MessageBuilder.withPayload(message.getPayload()).setHeader(ASSIGNED_DEVELOPMENT_TEAM_POSITION_NUMBER, DataProvider.getAvailableDevelopmentTeamIds().pop()).build())).start(); if(IN_PROGRESS_EPICS_COUNT.get() == Utilities.getTotalDevelopmentTeamsPresent()) controlBusInput().send(MessageBuilder.withPayload("@assignEpicFlow.stop()").build()); + new Thread(() -> currentSprintEpic().send(MessageBuilder.withPayload(message.getPayload()).setHeader(ASSIGNED_DEVELOPMENT_TEAM_POSITION_NUMBER, DataProvider.getAvailableDevelopmentTeamIds().pop()).build())).start(); }, sourcePoolingChannelAdapter -> sourcePoolingChannelAdapter.poller(pollerMetadata)).get(); } @Bean(name = "controlBus.input") - public DirectChannel controlBusInput() { + public MessageChannel controlBusInput() { return new DirectChannel(); } @Bean(name = "currentSprintEpic.input") - public DirectChannel currentSprintEpic(){ + public MessageChannel currentSprintEpic(){ DirectChannel currentSprintEpicChannel = new DirectChannel(); currentSprintEpicChannel.setDatatypes(Epic.class); return currentSprintEpicChannel; } @Bean(name = "inProgressEpic.intermediate") - public DirectChannel inProgressEpic(){ + public MessageChannel inProgressEpic(){ DirectChannel inProgressEpicChannel = new DirectChannel(); inProgressEpicChannel.setDatatypes(Epic.class); return inProgressEpicChannel; } @Bean(name = "doneEpics.output") - public DirectChannel doneEpics(){ + public MessageChannel doneEpics(){ DirectChannel doneEpicsChannel = new DirectChannel(); doneEpicsChannel.setDatatypes(Epic.class); return doneEpicsChannel; } @Bean(name = "inProgressUserStory.intermediate") - public DirectChannel inProgressUserStory(){ + public MessageChannel inProgressUserStory(){ DirectChannel inProgressUserStoryChannel = new DirectChannel(); inProgressUserStoryChannel.setDatatypes(UserStory.class); return inProgressUserStoryChannel; } @Bean(name = "currentSprintUserStories.preIntermediate") - public DirectChannel currentSprintUserStories(){ + public MessageChannel currentSprintUserStories(){ DirectChannel currentSprintUserStoriesChannel = new DirectChannel(); currentSprintUserStoriesChannel.setDatatypes(UserStory.class); return currentSprintUserStoriesChannel; } @Bean(name = "doneSprintUserStories.output") - public DirectChannel doneSprintUserStories(){ + public MessageChannel doneSprintUserStories(){ DirectChannel doneSprintUserStoriesChannel = new DirectChannel(); doneSprintUserStoriesChannel.setDatatypes(UserStory.class); return doneSprintUserStoriesChannel; } @Bean(name = "toDoTechnicalTasks.input") - public DirectChannel toDoTechnicalTasks(){ + public MessageChannel toDoTechnicalTasks(){ DirectChannel toDoTechnicalTasksChannel = new DirectChannel(); toDoTechnicalTasksChannel.setDatatypes(TechnicalTask.class); return toDoTechnicalTasksChannel; } @Bean(name = "trivialTechnicalTaskQueue.input") - public QueueChannel trivialTechnicalTaskQueue(){ + public MessageChannel trivialTechnicalTaskQueue(){ QueueChannel trivialTechnicalTaskQueueChannel = new QueueChannel(8); trivialTechnicalTaskQueueChannel.setDatatypes(TechnicalTask.class); return trivialTechnicalTaskQueueChannel; } @Bean(name = "trivialTechnicalTask.intermediate") - @BridgeFrom(value = "trivialTechnicalTaskQueue.input", poller = @Poller(fixedRate = "800")) - public DirectChannel trivialTechnicalTask(){ + @BridgeFrom(value = "trivialTechnicalTaskQueue.input", poller = @Poller(fixedRate = "1800")) + public MessageChannel trivialTechnicalTask(){ DirectChannel trivialTechnicalTaskChannel = new DirectChannel(); trivialTechnicalTaskChannel.setDatatypes(TechnicalTask.class); return trivialTechnicalTaskChannel; } @Bean(name = "normalTechnicalTaskQueue.input") - public QueueChannel normalTechnicalTaskQueue(){ + public MessageChannel normalTechnicalTaskQueue(){ QueueChannel normalTechnicalTaskQueueChannel = new QueueChannel(6); normalTechnicalTaskQueueChannel.setDatatypes(TechnicalTask.class); return normalTechnicalTaskQueueChannel; } @Bean(name = "normalTechnicalTask.intermediate") - @BridgeFrom(value = "normalTechnicalTaskQueue.input", poller = @Poller(fixedRate = "600")) - public DirectChannel normalTechnicalTask(){ + @BridgeFrom(value = "normalTechnicalTaskQueue.input", poller = @Poller(fixedRate = "1600")) + public MessageChannel normalTechnicalTask(){ DirectChannel normalTechnicalTaskChannel = new DirectChannel(); normalTechnicalTaskChannel.setDatatypes(TechnicalTask.class); return normalTechnicalTaskChannel; } @Bean(name = "minorTechnicalTaskQueue.input") - public QueueChannel minorTechnicalTaskQueue(){ + public MessageChannel minorTechnicalTaskQueue(){ QueueChannel minorTechnicalTaskQueueChannel = new QueueChannel(4); minorTechnicalTaskQueueChannel.setDatatypes(TechnicalTask.class); return minorTechnicalTaskQueueChannel; } @Bean(name = "minorTechnicalTask.intermediate") - @BridgeFrom(value = "minorTechnicalTaskQueue.input", poller = @Poller(fixedRate = "400")) - public DirectChannel minorTechnicalTask(){ + @BridgeFrom(value = "minorTechnicalTaskQueue.input", poller = @Poller(fixedRate = "1400")) + public MessageChannel minorTechnicalTask(){ DirectChannel minorTechnicalTaskChannel = new DirectChannel(); minorTechnicalTaskChannel.setDatatypes(TechnicalTask.class); return minorTechnicalTaskChannel; } @Bean(name = "majorTechnicalTaskQueue.input") - public QueueChannel majorTechnicalTaskQueue(){ + public MessageChannel majorTechnicalTaskQueue(){ QueueChannel majorTechnicalTaskQueueChannel = new QueueChannel(2); majorTechnicalTaskQueueChannel.setDatatypes(TechnicalTask.class); return majorTechnicalTaskQueueChannel; } @Bean(name = "majorTechnicalTask.intermediate") - @BridgeFrom(value = "majorTechnicalTaskQueue.input", poller = @Poller(fixedRate = "200")) - public DirectChannel majorTechnicalTask(){ + @BridgeFrom(value = "majorTechnicalTaskQueue.input", poller = @Poller(fixedRate = "1200")) + public MessageChannel majorTechnicalTask(){ DirectChannel majorTechnicalTaskChannel = new DirectChannel(); majorTechnicalTaskChannel.setDatatypes(TechnicalTask.class); return majorTechnicalTaskChannel; } @Bean(name = "criticalTechnicalTaskQueue.input") - public QueueChannel criticalTechnicalTaskQueue(){ + public MessageChannel criticalTechnicalTaskQueue(){ QueueChannel criticalTechnicalTaskQueueChannel = new QueueChannel(1); criticalTechnicalTaskQueueChannel.setDatatypes(TechnicalTask.class); return criticalTechnicalTaskQueueChannel; } @Bean(name = "criticalTechnicalTask.intermediate") - @BridgeFrom(value = "criticalTechnicalTaskQueue.input", poller = @Poller(fixedRate = "100")) - public DirectChannel criticalTechnicalTask(){ + @BridgeFrom(value = "criticalTechnicalTaskQueue.input", poller = @Poller(fixedRate = "1100")) + public MessageChannel criticalTechnicalTask(){ DirectChannel criticalTechnicalTaskChannel = new DirectChannel(); criticalTechnicalTaskChannel.setDatatypes(TechnicalTask.class); return criticalTechnicalTaskChannel; } @Bean(name = "blockerTechnicalTaskQueue.input") - public QueueChannel blockerTechnicalTaskQueue(){ + public MessageChannel blockerTechnicalTaskQueue(){ QueueChannel blockerTechnicalTaskQueueChannel = new QueueChannel(1); blockerTechnicalTaskQueueChannel.setDatatypes(TechnicalTask.class); return blockerTechnicalTaskQueueChannel; } @Bean(name = "blockerTechnicalTask.intermediate") - @BridgeFrom(value = "blockerTechnicalTaskQueue.input", poller = @Poller(fixedRate = "100")) - public DirectChannel blockerTechnicalTask(){ + @BridgeFrom(value = "blockerTechnicalTaskQueue.input", poller = @Poller(fixedRate = "1100")) + public MessageChannel blockerTechnicalTask(){ DirectChannel blockerTechnicalTaskChannel = new DirectChannel(); blockerTechnicalTaskChannel.setDatatypes(TechnicalTask.class); return blockerTechnicalTaskChannel; } @Bean(name = "doneTechnicalTasks.output") - public DirectChannel doneTechnicalTasks(){ + public MessageChannel doneTechnicalTasks(){ DirectChannel doneTechnicalTasksChannel = new DirectChannel(); doneTechnicalTasksChannel.setDatatypes(TechnicalTask.class); return doneTechnicalTasksChannel; } -} +} \ No newline at end of file diff --git a/src/main/java/dev/markodojkic/softwaredevelopmentsimulation/flow/FileHandlingFlow.java b/src/main/java/dev/markodojkic/softwaredevelopmentsimulation/flow/FileHandlingFlow.java index d7bb036..8e0f9ac 100644 --- a/src/main/java/dev/markodojkic/softwaredevelopmentsimulation/flow/FileHandlingFlow.java +++ b/src/main/java/dev/markodojkic/softwaredevelopmentsimulation/flow/FileHandlingFlow.java @@ -1,12 +1,14 @@ package dev.markodojkic.softwaredevelopmentsimulation.flow; import dev.markodojkic.softwaredevelopmentsimulation.util.Utilities; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.file.FileHeaders; import org.springframework.integration.file.dsl.Files; import org.springframework.integration.file.support.FileExistsMode; +import org.springframework.messaging.MessageChannel; import java.io.File; import java.nio.charset.StandardCharsets; @@ -15,22 +17,22 @@ @Configuration public class FileHandlingFlow { @Bean - public IntegrationFlow informationLogFileFlow() { - return configureLogFileFlow("information.logFile.input", "informationData.log"); + public IntegrationFlow informationLogFileFlow(@Qualifier("information.logFile.input") MessageChannel logFileMessageChannel) { + return configureLogFileFlow(logFileMessageChannel, "informationChannel.log"); } @Bean - public IntegrationFlow jiraActivityStreamLogFileFlow() { - return configureLogFileFlow("jiraActivityStream.logFile.input", "jiraActivityStreamData.log"); + public IntegrationFlow jiraActivityStreamLogFileFlow(@Qualifier("jiraActivityStream.logFile.input") MessageChannel logFileMessageChannel) { + return configureLogFileFlow(logFileMessageChannel, "jiraActivityStreamChannel.log"); } @Bean - public IntegrationFlow errorLogFileFlow() { - return configureLogFileFlow("error.logFile.input", "errorData.log"); + public IntegrationFlow errorLogFileFlow(@Qualifier("errorChannel.logFile.input") MessageChannel logFileMessageChannel) { + return configureLogFileFlow(logFileMessageChannel, "errorChannel.log"); } - private IntegrationFlow configureLogFileFlow(String inputChannel, String fileName) { - return IntegrationFlow.from(inputChannel) + private IntegrationFlow configureLogFileFlow(MessageChannel logFileMessageChannel, String fileName) { + return IntegrationFlow.from(logFileMessageChannel) .enrichHeaders(h -> h .header(FileHeaders.FILENAME, fileName) .header(FileHeaders.REMOTE_DIRECTORY, new File(Utilities.getCurrentApplicationLogsPath().toUri()))) @@ -38,10 +40,10 @@ private IntegrationFlow configureLogFileFlow(String inputChannel, String fileNam .handle(Files.outboundGateway(m -> m.getHeaders().get(FileHeaders.REMOTE_DIRECTORY)) .fileNameGenerator(m -> Objects.requireNonNull(m.getHeaders().get(FileHeaders.FILENAME)).toString()) .autoCreateDirectory(true) - .flushInterval(5000) + .flushInterval(1000) .fileExistsMode(FileExistsMode.APPEND_NO_FLUSH) .appendNewLine(true) .charset(StandardCharsets.UTF_8.name())) .get(); } -} +} \ No newline at end of file diff --git a/src/main/java/dev/markodojkic/softwaredevelopmentsimulation/flow/PrintoutFlow.java b/src/main/java/dev/markodojkic/softwaredevelopmentsimulation/flow/PrintoutFlow.java index fe5a0d9..bb2cdd0 100755 --- a/src/main/java/dev/markodojkic/softwaredevelopmentsimulation/flow/PrintoutFlow.java +++ b/src/main/java/dev/markodojkic/softwaredevelopmentsimulation/flow/PrintoutFlow.java @@ -1,35 +1,46 @@ package dev.markodojkic.softwaredevelopmentsimulation.flow; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.handler.LoggingHandler; +import org.springframework.messaging.MessageChannel; @Configuration public class PrintoutFlow { private static final String PRINTER_TRANSFORMER_BEAN = "printerTransformer"; @Bean - public IntegrationFlow informationPrintoutFlow(){ + public IntegrationFlow informationPrintoutFlow(@Qualifier("information.mqtt.input") MessageChannel mqttMessageChannel, @Qualifier("information.logFile.input") MessageChannel logFileMessageChannel) { return IntegrationFlow.from("information.input") .transform(PRINTER_TRANSFORMER_BEAN, "infoOutput") .log(LoggingHandler.Level.INFO, message -> System.lineSeparator().concat(message.getPayload().toString())) - .channel("information.mqtt.input").channel("information.logFile.input").get(); + .handle(message -> { + mqttMessageChannel.send(message); + logFileMessageChannel.send(message); + }).get(); } @Bean - public IntegrationFlow jiraActivityStreamPrintoutFlow(){ + public IntegrationFlow jiraActivityStreamPrintoutFlow(@Qualifier("jiraActivityStream.mqtt.input") MessageChannel mqttMessageChannel, @Qualifier("jiraActivityStream.logFile.input") MessageChannel logFileMessageChannel) { return IntegrationFlow.from("jiraActivityStream.input") .transform(PRINTER_TRANSFORMER_BEAN, "jiraActivityStreamOutput") .log(LoggingHandler.Level.INFO, message -> System.lineSeparator().concat(message.getPayload().toString())) - .channel("jiraActivityStream.mqtt.input").channel("jiraActivityStream.logFile.input").get(); + .handle(message -> { + mqttMessageChannel.send(message); + logFileMessageChannel.send(message); + }).get(); } @Bean - public IntegrationFlow errorPrintoutFlow(){ + public IntegrationFlow errorPrintoutFlow(@Qualifier("errorChannel.mqtt.input") MessageChannel mqttMessageChannel, @Qualifier("errorChannel.logFile.input") MessageChannel logFileMessageChannel) { return IntegrationFlow.from("errorChannel") .transform(PRINTER_TRANSFORMER_BEAN, "errorOutput") .log(LoggingHandler.Level.ERROR, message -> System.lineSeparator().concat(message.getPayload().toString())) - .channel("error.mqtt.input").channel("error.logFile.input").get(); + .handle(message -> { + mqttMessageChannel.send(message); + logFileMessageChannel.send(message); + }).get(); } -} +} \ No newline at end of file diff --git a/src/main/resources/static/js/index.js b/src/main/resources/static/js/index.js index 7b6e183..9e7124c 100755 --- a/src/main/resources/static/js/index.js +++ b/src/main/resources/static/js/index.js @@ -148,23 +148,24 @@ function generateUUID() { } function appendDataToMQTTTopicDivs(topicName, data) { + if(data.length === 0 || data === "\n") return; switch (topicName) { case "information-printout-topic": { const sanitizedData = sanitizeInformationData(data); - $("#informationLogs").append(`
${sanitizedData}
`); + $("#informationLogs").append(`
${sanitizedData.replace(/^/, '')}
`); break; } case "java-activity-stream-printout-topic": { const sanitizedData = sanitizeJavaActivityStreamData(data); - $("#jiraActivityStream div").prepend(sanitizedData); + $("#jiraActivityStream div").prepend(sanitizedData.replace(/^/, '')); break; } case "error-printout-topic": { const sanitizedData = sanitizeErrorData(data); - $("#errorLogs").append(`
${sanitizedData}
`); + $("#errorLogs").append(`
${sanitizedData.replace(/^/, '')}
`); break; } }