Skip to content

Commit

Permalink
Fixed issue with messages not being sent to MQTT and LogFile channels…
Browse files Browse the repository at this point in the history
…; Channel beans type changed to MessageChannel (additional cast to specific channels is now needed if required)
  • Loading branch information
MarkoDojkic committed Jun 23, 2024
1 parent 71a9485 commit eedae59
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,9 @@ public static void main(String[] args)
setupDataProvider(false);
updateDevelopmentTeamsSetup(new DevelopmentTeamCreationParameters());

Utilities.getIGateways().sendToInfo("""
Welcome to Software development simulator™
Developed by Ⓒ Marko Dojkić 2024$I hope you will enjoy using mine spring integration web based application""");
Utilities.getIGateways().sendToInfo("Welcome to Software development simulator™ Developed by Ⓒ Marko Dojkić 2024$I hope you will enjoy using mine spring integration web based application");

//TODO: Correct JIRA activity stream timings
//TODO: Fix issues with some MQTT messages not reaching FE nor being saved in log file

//GUI PLANS - thymeleaf

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.springframework.integration.scheduling.PollerMetadata;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;

import java.util.logging.Level;
import java.util.logging.Logger;
Expand All @@ -30,37 +31,52 @@ public class SpringIntegrationMessageChannelsConfig {
private static final Logger logger = Logger.getLogger(SpringIntegrationMessageChannelsConfig.class.getName());

@Bean(name = "errorChannel")
public DirectChannel errorChannel() {
public MessageChannel errorChannel() {
return new DirectChannel();
}

@Bean(name = "information.input")
public DirectChannel informationInput() {
@Bean(name = "errorChannel.mqtt.input")
public MessageChannel errorChannelMQTTInput() {
return new DirectChannel();
}

@Bean(name = "jiraActivityStream.input")
public DirectChannel jiraActivityStreamInput() {
@Bean(name = "errorChannel.logFile.input")
public MessageChannel errorChannelLogFileInput() {
return new DirectChannel();
}

@Bean(name = "information.input")
public MessageChannel informationInput() {
return new DirectChannel();
}

@Bean(name = "information.mqtt.input")
public DirectChannel informationMQTTInput() {
public MessageChannel informationMQTTInput() {
return new DirectChannel();
}

@Bean(name = "information.logFile.input")
public MessageChannel informationLogFileInput() {
return new DirectChannel();
}

@Bean(name = "error.mqtt.input")
public DirectChannel errorMQTTInput() {
@Bean(name = "jiraActivityStream.input")
public MessageChannel jiraActivityStreamInput() {
return new DirectChannel();
}

@Bean(name = "jiraActivityStream.mqtt.input")
public DirectChannel jiraActivityStreamMQTTInput() {
public MessageChannel jiraActivityStreamMQTTInput() {
return new DirectChannel();
}

@Bean(name = "jiraActivityStream.logFile.input")
public MessageChannel jiraActivityStreamLogFileInput() {
return new DirectChannel();
}

@Bean(name = "epicMessage.input")
public PriorityChannel epicInput(){
public MessageChannel epicInput(){
PriorityChannel epicInputPriorityChannel = new PriorityChannel(0, (message1, message2) -> {
Message<BaseTask> baseTaskMessage1 = (Message<BaseTask>) message1;
Message<BaseTask> baseTaskMessage2 = (Message<BaseTask>) message2;
Expand All @@ -75,161 +91,162 @@ public PriorityChannel epicInput(){
public IntegrationFlow assignEpicFlow(){
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setMaxMessagesPerPoll(1);
return IntegrationFlow.from("epicMessage.input").handle(message -> {

return IntegrationFlow.from(epicInput()).handle(message -> {
logger.log(Level.INFO, "{0} arrived - Current count: {1}", new String[]{((Epic) message.getPayload()).getId(), String.valueOf(IN_PROGRESS_EPICS_COUNT.incrementAndGet())});
new Thread(() -> currentSprintEpic().send(MessageBuilder.withPayload(message.getPayload()).setHeader(ASSIGNED_DEVELOPMENT_TEAM_POSITION_NUMBER, DataProvider.getAvailableDevelopmentTeamIds().pop()).build())).start();
if(IN_PROGRESS_EPICS_COUNT.get() == Utilities.getTotalDevelopmentTeamsPresent()) controlBusInput().send(MessageBuilder.withPayload("@assignEpicFlow.stop()").build());
new Thread(() -> currentSprintEpic().send(MessageBuilder.withPayload(message.getPayload()).setHeader(ASSIGNED_DEVELOPMENT_TEAM_POSITION_NUMBER, DataProvider.getAvailableDevelopmentTeamIds().pop()).build())).start();
}, sourcePoolingChannelAdapter -> sourcePoolingChannelAdapter.poller(pollerMetadata)).get();
}

@Bean(name = "controlBus.input")
public DirectChannel controlBusInput() {
public MessageChannel controlBusInput() {
return new DirectChannel();
}

@Bean(name = "currentSprintEpic.input")
public DirectChannel currentSprintEpic(){
public MessageChannel currentSprintEpic(){
DirectChannel currentSprintEpicChannel = new DirectChannel();
currentSprintEpicChannel.setDatatypes(Epic.class);
return currentSprintEpicChannel;
}

@Bean(name = "inProgressEpic.intermediate")
public DirectChannel inProgressEpic(){
public MessageChannel inProgressEpic(){
DirectChannel inProgressEpicChannel = new DirectChannel();
inProgressEpicChannel.setDatatypes(Epic.class);
return inProgressEpicChannel;
}

@Bean(name = "doneEpics.output")
public DirectChannel doneEpics(){
public MessageChannel doneEpics(){
DirectChannel doneEpicsChannel = new DirectChannel();
doneEpicsChannel.setDatatypes(Epic.class);
return doneEpicsChannel;
}

@Bean(name = "inProgressUserStory.intermediate")
public DirectChannel inProgressUserStory(){
public MessageChannel inProgressUserStory(){
DirectChannel inProgressUserStoryChannel = new DirectChannel();
inProgressUserStoryChannel.setDatatypes(UserStory.class);
return inProgressUserStoryChannel;
}

@Bean(name = "currentSprintUserStories.preIntermediate")
public DirectChannel currentSprintUserStories(){
public MessageChannel currentSprintUserStories(){
DirectChannel currentSprintUserStoriesChannel = new DirectChannel();
currentSprintUserStoriesChannel.setDatatypes(UserStory.class);
return currentSprintUserStoriesChannel;
}

@Bean(name = "doneSprintUserStories.output")
public DirectChannel doneSprintUserStories(){
public MessageChannel doneSprintUserStories(){
DirectChannel doneSprintUserStoriesChannel = new DirectChannel();
doneSprintUserStoriesChannel.setDatatypes(UserStory.class);
return doneSprintUserStoriesChannel;
}

@Bean(name = "toDoTechnicalTasks.input")
public DirectChannel toDoTechnicalTasks(){
public MessageChannel toDoTechnicalTasks(){
DirectChannel toDoTechnicalTasksChannel = new DirectChannel();
toDoTechnicalTasksChannel.setDatatypes(TechnicalTask.class);
return toDoTechnicalTasksChannel;
}

@Bean(name = "trivialTechnicalTaskQueue.input")
public QueueChannel trivialTechnicalTaskQueue(){
public MessageChannel trivialTechnicalTaskQueue(){
QueueChannel trivialTechnicalTaskQueueChannel = new QueueChannel(8);
trivialTechnicalTaskQueueChannel.setDatatypes(TechnicalTask.class);
return trivialTechnicalTaskQueueChannel;
}

@Bean(name = "trivialTechnicalTask.intermediate")
@BridgeFrom(value = "trivialTechnicalTaskQueue.input", poller = @Poller(fixedRate = "800"))
public DirectChannel trivialTechnicalTask(){
@BridgeFrom(value = "trivialTechnicalTaskQueue.input", poller = @Poller(fixedRate = "1800"))
public MessageChannel trivialTechnicalTask(){
DirectChannel trivialTechnicalTaskChannel = new DirectChannel();
trivialTechnicalTaskChannel.setDatatypes(TechnicalTask.class);
return trivialTechnicalTaskChannel;
}

@Bean(name = "normalTechnicalTaskQueue.input")
public QueueChannel normalTechnicalTaskQueue(){
public MessageChannel normalTechnicalTaskQueue(){
QueueChannel normalTechnicalTaskQueueChannel = new QueueChannel(6);
normalTechnicalTaskQueueChannel.setDatatypes(TechnicalTask.class);
return normalTechnicalTaskQueueChannel;
}

@Bean(name = "normalTechnicalTask.intermediate")
@BridgeFrom(value = "normalTechnicalTaskQueue.input", poller = @Poller(fixedRate = "600"))
public DirectChannel normalTechnicalTask(){
@BridgeFrom(value = "normalTechnicalTaskQueue.input", poller = @Poller(fixedRate = "1600"))
public MessageChannel normalTechnicalTask(){
DirectChannel normalTechnicalTaskChannel = new DirectChannel();
normalTechnicalTaskChannel.setDatatypes(TechnicalTask.class);
return normalTechnicalTaskChannel;
}

@Bean(name = "minorTechnicalTaskQueue.input")
public QueueChannel minorTechnicalTaskQueue(){
public MessageChannel minorTechnicalTaskQueue(){
QueueChannel minorTechnicalTaskQueueChannel = new QueueChannel(4);
minorTechnicalTaskQueueChannel.setDatatypes(TechnicalTask.class);
return minorTechnicalTaskQueueChannel;
}

@Bean(name = "minorTechnicalTask.intermediate")
@BridgeFrom(value = "minorTechnicalTaskQueue.input", poller = @Poller(fixedRate = "400"))
public DirectChannel minorTechnicalTask(){
@BridgeFrom(value = "minorTechnicalTaskQueue.input", poller = @Poller(fixedRate = "1400"))
public MessageChannel minorTechnicalTask(){
DirectChannel minorTechnicalTaskChannel = new DirectChannel();
minorTechnicalTaskChannel.setDatatypes(TechnicalTask.class);
return minorTechnicalTaskChannel;
}

@Bean(name = "majorTechnicalTaskQueue.input")
public QueueChannel majorTechnicalTaskQueue(){
public MessageChannel majorTechnicalTaskQueue(){
QueueChannel majorTechnicalTaskQueueChannel = new QueueChannel(2);
majorTechnicalTaskQueueChannel.setDatatypes(TechnicalTask.class);
return majorTechnicalTaskQueueChannel;
}

@Bean(name = "majorTechnicalTask.intermediate")
@BridgeFrom(value = "majorTechnicalTaskQueue.input", poller = @Poller(fixedRate = "200"))
public DirectChannel majorTechnicalTask(){
@BridgeFrom(value = "majorTechnicalTaskQueue.input", poller = @Poller(fixedRate = "1200"))
public MessageChannel majorTechnicalTask(){
DirectChannel majorTechnicalTaskChannel = new DirectChannel();
majorTechnicalTaskChannel.setDatatypes(TechnicalTask.class);
return majorTechnicalTaskChannel;
}

@Bean(name = "criticalTechnicalTaskQueue.input")
public QueueChannel criticalTechnicalTaskQueue(){
public MessageChannel criticalTechnicalTaskQueue(){
QueueChannel criticalTechnicalTaskQueueChannel = new QueueChannel(1);
criticalTechnicalTaskQueueChannel.setDatatypes(TechnicalTask.class);
return criticalTechnicalTaskQueueChannel;
}

@Bean(name = "criticalTechnicalTask.intermediate")
@BridgeFrom(value = "criticalTechnicalTaskQueue.input", poller = @Poller(fixedRate = "100"))
public DirectChannel criticalTechnicalTask(){
@BridgeFrom(value = "criticalTechnicalTaskQueue.input", poller = @Poller(fixedRate = "1100"))
public MessageChannel criticalTechnicalTask(){
DirectChannel criticalTechnicalTaskChannel = new DirectChannel();
criticalTechnicalTaskChannel.setDatatypes(TechnicalTask.class);
return criticalTechnicalTaskChannel;
}

@Bean(name = "blockerTechnicalTaskQueue.input")
public QueueChannel blockerTechnicalTaskQueue(){
public MessageChannel blockerTechnicalTaskQueue(){
QueueChannel blockerTechnicalTaskQueueChannel = new QueueChannel(1);
blockerTechnicalTaskQueueChannel.setDatatypes(TechnicalTask.class);
return blockerTechnicalTaskQueueChannel;
}

@Bean(name = "blockerTechnicalTask.intermediate")
@BridgeFrom(value = "blockerTechnicalTaskQueue.input", poller = @Poller(fixedRate = "100"))
public DirectChannel blockerTechnicalTask(){
@BridgeFrom(value = "blockerTechnicalTaskQueue.input", poller = @Poller(fixedRate = "1100"))
public MessageChannel blockerTechnicalTask(){
DirectChannel blockerTechnicalTaskChannel = new DirectChannel();
blockerTechnicalTaskChannel.setDatatypes(TechnicalTask.class);
return blockerTechnicalTaskChannel;
}

@Bean(name = "doneTechnicalTasks.output")
public DirectChannel doneTechnicalTasks(){
public MessageChannel doneTechnicalTasks(){
DirectChannel doneTechnicalTasksChannel = new DirectChannel();
doneTechnicalTasksChannel.setDatatypes(TechnicalTask.class);
return doneTechnicalTasksChannel;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package dev.markodojkic.softwaredevelopmentsimulation.flow;

import dev.markodojkic.softwaredevelopmentsimulation.util.Utilities;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.file.FileHeaders;
import org.springframework.integration.file.dsl.Files;
import org.springframework.integration.file.support.FileExistsMode;
import org.springframework.messaging.MessageChannel;

import java.io.File;
import java.nio.charset.StandardCharsets;
Expand All @@ -15,33 +17,33 @@
@Configuration
public class FileHandlingFlow {
@Bean
public IntegrationFlow informationLogFileFlow() {
return configureLogFileFlow("information.logFile.input", "informationData.log");
public IntegrationFlow informationLogFileFlow(@Qualifier("information.logFile.input") MessageChannel logFileMessageChannel) {
return configureLogFileFlow(logFileMessageChannel, "informationChannel.log");
}

@Bean
public IntegrationFlow jiraActivityStreamLogFileFlow() {
return configureLogFileFlow("jiraActivityStream.logFile.input", "jiraActivityStreamData.log");
public IntegrationFlow jiraActivityStreamLogFileFlow(@Qualifier("jiraActivityStream.logFile.input") MessageChannel logFileMessageChannel) {
return configureLogFileFlow(logFileMessageChannel, "jiraActivityStreamChannel.log");
}

@Bean
public IntegrationFlow errorLogFileFlow() {
return configureLogFileFlow("error.logFile.input", "errorData.log");
public IntegrationFlow errorLogFileFlow(@Qualifier("errorChannel.logFile.input") MessageChannel logFileMessageChannel) {
return configureLogFileFlow(logFileMessageChannel, "errorChannel.log");
}

private IntegrationFlow configureLogFileFlow(String inputChannel, String fileName) {
return IntegrationFlow.from(inputChannel)
private IntegrationFlow configureLogFileFlow(MessageChannel logFileMessageChannel, String fileName) {
return IntegrationFlow.from(logFileMessageChannel)
.enrichHeaders(h -> h
.header(FileHeaders.FILENAME, fileName)
.header(FileHeaders.REMOTE_DIRECTORY, new File(Utilities.getCurrentApplicationLogsPath().toUri())))
.transform(String.class, message -> message.concat("%$"))
.handle(Files.outboundGateway(m -> m.getHeaders().get(FileHeaders.REMOTE_DIRECTORY))
.fileNameGenerator(m -> Objects.requireNonNull(m.getHeaders().get(FileHeaders.FILENAME)).toString())
.autoCreateDirectory(true)
.flushInterval(5000)
.flushInterval(1000)
.fileExistsMode(FileExistsMode.APPEND_NO_FLUSH)
.appendNewLine(true)
.charset(StandardCharsets.UTF_8.name()))
.get();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,35 +1,46 @@
package dev.markodojkic.softwaredevelopmentsimulation.flow;

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.messaging.MessageChannel;

@Configuration
public class PrintoutFlow {
private static final String PRINTER_TRANSFORMER_BEAN = "printerTransformer";

@Bean
public IntegrationFlow informationPrintoutFlow(){
public IntegrationFlow informationPrintoutFlow(@Qualifier("information.mqtt.input") MessageChannel mqttMessageChannel, @Qualifier("information.logFile.input") MessageChannel logFileMessageChannel) {
return IntegrationFlow.from("information.input")
.transform(PRINTER_TRANSFORMER_BEAN, "infoOutput")
.log(LoggingHandler.Level.INFO, message -> System.lineSeparator().concat(message.getPayload().toString()))
.channel("information.mqtt.input").channel("information.logFile.input").get();
.handle(message -> {
mqttMessageChannel.send(message);
logFileMessageChannel.send(message);
}).get();
}

@Bean
public IntegrationFlow jiraActivityStreamPrintoutFlow(){
public IntegrationFlow jiraActivityStreamPrintoutFlow(@Qualifier("jiraActivityStream.mqtt.input") MessageChannel mqttMessageChannel, @Qualifier("jiraActivityStream.logFile.input") MessageChannel logFileMessageChannel) {
return IntegrationFlow.from("jiraActivityStream.input")
.transform(PRINTER_TRANSFORMER_BEAN, "jiraActivityStreamOutput")
.log(LoggingHandler.Level.INFO, message -> System.lineSeparator().concat(message.getPayload().toString()))
.channel("jiraActivityStream.mqtt.input").channel("jiraActivityStream.logFile.input").get();
.handle(message -> {
mqttMessageChannel.send(message);
logFileMessageChannel.send(message);
}).get();
}

@Bean
public IntegrationFlow errorPrintoutFlow(){
public IntegrationFlow errorPrintoutFlow(@Qualifier("errorChannel.mqtt.input") MessageChannel mqttMessageChannel, @Qualifier("errorChannel.logFile.input") MessageChannel logFileMessageChannel) {
return IntegrationFlow.from("errorChannel")
.transform(PRINTER_TRANSFORMER_BEAN, "errorOutput")
.log(LoggingHandler.Level.ERROR, message -> System.lineSeparator().concat(message.getPayload().toString()))
.channel("error.mqtt.input").channel("error.logFile.input").get();
.handle(message -> {
mqttMessageChannel.send(message);
logFileMessageChannel.send(message);
}).get();
}
}
}
Loading

0 comments on commit eedae59

Please sign in to comment.