Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: migrate ImporterProcessor to Spring Kafka #174

Merged
merged 123 commits into from
Feb 20, 2025

Conversation

mcook42
Copy link

@mcook42 mcook42 commented Jan 28, 2025

PR Details

Description

  • Extracted LogParser creation to LogFileParserFactory for testability and separation of concerns
  • Moved setting of recordType and fileName to the constructors of FileParser implementations to streamline code flow and prevent potential bugs
  • Added missing Javadocs to every file touched by LogFileParserFactory extraction and constructor updates
  • Corrected style to align with checkstyle rules in all files touched by refactor. This includes adding Javadocs to all public classes/methods
  • Refactored LogFileParser's usage of instanceof to deduplicate casting and improve readability
  • Rename ParserStatus enums to more accurately reflect the meanings of the enums
  • Extracted inline strings into test resource files to improve readability and maintainability of tests
  • Removed unnecessary public modifiers from test methods
  • Migrated away from JMockit in favor of Mockito for improved testability and maintainability (Mockito is defacto mocking framework of Spring Boot) in test files touched by refactor
  • Replace EventLogger with class-specific logger wherever I changed code. This improves logging usability by including the class name in the log entries among other attributes (see logback.xml for more detail)

Related Issue

N/A

Motivation and Context

Implementing Spring Kafka gives us better lifecycle management of producers and consumers, more reusable producer/consumer code, easier testability, and a more robust production-ready Kafka library. This is part of a more significant effort to replace our hand-rolled Kafka implementation with Spring Kafka. The previous changesets related to this effort are:

#118
#116
#123
#129
#131
#134
#162

How Has This Been Tested?

Aside from the new and existing unit tests, I also started up the application via make start. While the application was running, I navigated to the ODE Demo UI and uploaded multiple files located in the data/ directory (at least one of each file type: gzip, bin, json, zip). I added a ZIP file to verify the ZIP file processing still works as intended (there wasn't a ZIP file anywhere I could find). The files I used for testing were:

  • bsmLogDuringEvent.bin
  • rxMsg_TIM_GeneratedBy_RSU.zip
  • bsmLogDuringEvent_commsignia.gz
  • driverAlert.gz
  • rxMsg_BSM_and_TIM.gz
  • rxMsg_commsignia_map.gz
  • driverAlert.bin

During testing, I found that the dev branch failed to process the data/bsmLogDuringEvent.json file (and any .json files I tested). This branch behaves the same. The error we see when processing this file is:

FileParser$FileParserException: Data size of 8762 is larger than allocated buffer size of 4096

Types of changes

  • Defect fix (non-breaking change that fixes an issue)
  • New feature (non-breaking change that adds functionality)
  • Breaking change (fix or feature that cause existing functionality to change)

Checklist:

  • I have added any new packages to the sonar-scanner.properties file
  • My change requires a change to the documentation.
  • I have updated the documentation accordingly.
  • I have read the CONTRIBUTING document.
    ODE Contributing Guide
  • I have added tests to cover my changes.
  • All new and existing tests passed.

…ing Kafka

Removed `StompStringExporter` and its integration with the file upload controller, replacing it with Kafka-based message consumption using `@KafkaListener`. Updated related tests to reflect the changes, simplifying the architecture and removing unused components.
Removed the `Exporter` and `StompStringMessageDistributor` classes along with their associated test files. These components were not in use and their removal helps in reducing code clutter and maintenance overhead.
Replaced JMockit with Mockito for cleaner and more modern testing. Added setup with mocked SimpMessagingTemplate and refactored test cases to verify behavior using Mockito assertions. Improved test clarity by focusing on specific functionalities like `listenUnfiltered` and `listenFiltered`.
Added Javadoc comments to enhance code clarity and provide detailed explanations for methods and constructors. Updated formatting and variable alignment for better readability. Minor logging improvements for exception handling in the file not found scenario.
Replaced direct instantiation of `ImporterDirectoryWatcher` with a Spring-managed bean for improved configurability and dependency injection. Added a new configuration class `ImporterDirectoryWatcherConfig` to define the bean. Simplifies the `FileUploadController` constructor and enhances code maintainability.
Replaced JMockit with Mockito for mocking and expectations in tests, improving test readability and consistency with modern testing practices. Simplified imports and removed unused dependencies to clean up the codebase. Updated setup and test methods to align with Mockito's syntax and functionality.
Introduce a conditional configuration to enable or disable the `StompStringExporter` based on the `ode.stomp-exporter.enabled` property. This change optimizes resource usage by allowing the exporter to be disabled in production environments while retaining functionality for the Demo Console. Updated configuration files and sample environment variables accordingly.
Removed `FileTopics` class and its associated test. Updated configuration keys to align with the `stomp-exporter` naming convention. Adjusted YAML files and `StompStringExporter` constructor for the updated configuration structure.
Implemented LogFileParserFactory to streamline parser creation based on log file prefixes. Updated relevant classes and tests to use the new factory, ensuring consistent instantiation of log file parsers. This change improves maintainability and readability of the codebase.
Replaced JUnit 4 imports and annotations with JUnit 5 equivalents. Updated test methods to follow JUnit 5 method signatures and added necessary setup using @beforeeach. This improves test consistency and ensures compatibility with modern testing practices.
Replaced JUnit 4 imports and annotations with JUnit 5 equivalents. Updated test methods to follow JUnit 5 method signatures and added necessary setup using @beforeeach. This improves test consistency and ensures compatibility with modern testing practices.
Replaced JUnit 4 imports and annotations with JUnit 5 equivalents. Updated test methods to follow JUnit 5 method signatures and added necessary setup using @beforeeach. This improves test consistency and ensures compatibility with modern testing practices.
Replaced JUnit 4 imports and annotations with JUnit 5 equivalents. Updated test methods to follow JUnit 5 method signatures and added necessary setup using @beforeeach. This improves test consistency and ensures compatibility with modern testing practices.
Replaced JUnit 4 imports and annotations with JUnit 5 equivalents. Updated test methods to follow JUnit 5 method signatures and added necessary setup using @beforeeach. This improves test consistency and ensures compatibility with modern testing practices.
Updated the publish method to explicitly accept a LogFileParser instance, allowing for improved flexibility and better separation of concerns. Adjusted related methods and tests to accommodate the new parameter.
Replaced JMockit with Mockito for mocking behavior in LogFileToAsn1CodecPublisherTest. Simplified dependency initialization and adjusted test methods for compatibility, improving test maintainability and consistency.
}
this.importerProcessor = new ImporterProcessor(
new LogFileToAsn1CodecPublisher(kafkaTemplate, jsonTopics, rawEncodedJsonTopics),
ImporterFileType.LOG_FILE,
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: There was a previous enum value for JSON_FILE, but as that enum class noted, only the LOG_FILE enum is supported. To reduce confusion, I removed the JSON_FILE enum and set this to the only supported type directly for clarity.

Comment on lines -102 to -108
try {
if (bis != null) {
bis.close();
}
if (inputStream != null) {
inputStream.close();
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: by switching to try-with-resources syntax, we let Java take care of closing the input streams once they are no longer in scope (once the code leaves the try-catch block)

Comment on lines -92 to 137
}

@Override
public Path load(String filename) {
return rootLocation.resolve(filename);
private final Path logFileLocation;

/**
* Constructs a FileSystemStorageService instance and initializes the file storage locations
* based on the provided properties.
*
* @param properties The configuration properties used to determine the root and OBU log file upload locations.
*/
@Autowired
public FileSystemStorageService(FileImporterProperties properties) {

this.logFileLocation = Paths.get(properties.getUploadLocationRoot(),
properties.getObuLogUploadLocation());

log.info("Upload location (OBU log file): {}", this.logFileLocation);
}

/**
* Stores a given MultipartFile in the appropriate directory based on the provided file type.
*
* <p>The method determines the destination path for the file based on its {@link LogFileType}.
* It first verifies that the file is not empty and deletes any existing file with the same name
* in the target directory. Once these checks are completed, it copies the file to the resolved path.
* If any of these operations fail, a {@link StorageException} is thrown.
*
* @param file The file to be stored. Must not be empty.
* @param type The type of the file, which determines the destination path. Expected types include
* {@code LogFileType.BSM}, {@code LogFileType.OBU}, or {@code LogFileType.UNKNOWN}.
*
* @throws StorageException If the file type is unknown, the file is empty, an error occurs
* while deleting an existing file, or an error occurs during file storage.
*/
@Override
public void store(MultipartFile file, LogFileType type) {

// Discern the destination path via the file type (bsm or messageFrame)
Path path;
switch (type) {
case BSM, OBU -> path = this.logFileLocation.resolve(Objects.requireNonNull(file.getOriginalFilename()));
case UNKNOWN -> {
log.error("File type unknown: {} {}", type, file.getName());
throw new StorageException("File type unknown: " + type + " " + file.getName());
}
default -> throw new StorageException("File type unknown: " + type + " " + file.getName());
}

@Override
public Resource loadAsResource(String filename) {
try {
Path file = load(filename);
Resource resource = new UrlResource(file.toUri());
if (resource.exists() && resource.isReadable()) {
return resource;
} else {
throw new StorageFileNotFoundException("Could not read file: " + filename);
}
} catch (MalformedURLException e) {
throw new StorageFileNotFoundException("Could not read file: " + filename, e);
}
// Check file is not empty
if (file.isEmpty()) {
log.error("File is empty: {}", path);
throw new StorageException("File is empty: " + path);
}

@Override
public void deleteAll() {
FileSystemUtils.deleteRecursively(rootLocation.toFile());
EventLogger.logger.info("Deleting {}", this.rootLocation);
// Check file does not already exist (if so, delete existing)
try {
log.info("Deleting existing file: {}", path);
Files.deleteIfExists(path);
} catch (IOException e) {
log.error("Failed to delete existing file: {} ", path);
throw new StorageException("Failed to delete existing file: " + path, e);
}

@Override
public void init() {
try {
Files.createDirectory(rootLocation);
} catch (IOException e) {
EventLogger.logger.error("Failed to initialize storage service {}", this.rootLocation);
throw new StorageException("Failed to initialize storage service " + this.rootLocation, e);
}
// Copy the file to the relevant directory
try {
log.debug("Copying file {} to {}", file.getOriginalFilename(), path);
log.info("Copying file {} to {}", file.getOriginalFilename(), path);
Files.copy(file.getInputStream(), path);
} catch (Exception e) {
log.error("Failed to store file in shared directory {}", path);
throw new StorageException("Failed to store file in shared directory " + path, e);
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: all of this code was exclusively used within tests. As it was test code, I moved it to the relevant test classes

Resource loadAsResource(String filename);

void deleteAll();
void store(MultipartFile file, LogFileType type);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: store was the only method defined on this interface with a corresponding production-related implementation. The other methods were used solely for tests

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: the ByteArrayPublisher was deleted, so this test was deleted, too (the same goes for all deleted *PublisherTest files)

@@ -303,14 +332,17 @@ void testPublishNonLearLogFile() throws Exception {
*/

List<OdeData> dataList =
testLogFileToAsn1CodecPublisher.publish(bis, filename, ImporterFileType.JSON_FILE, LogFileParserFactory.getLogFileParser(filename));
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: JSON_FILE was being used as an indirect indicator that the file type was unsupported. Switching to UNKNOWN makes this intent clearer


@BeforeEach
void setup() throws IOException {
dirToProcess = new File(System.getProperty("java.io.tmpdir") + "/filesToProcess").toPath();
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: java.io.tmpdir is built-in. From the File docs:

The default temporary-file directory is specified by the system property java.io.tmpdir. On UNIX systems the default value of this property is typically "/tmp" or "/var/tmp"; on Microsoft Windows systems it is typically "C:\WINNT\TEMP". A different value may be given to this system property when the Java virtual machine is invoked, but programmatic changes to this property are not guaranteed to have any effect upon the temporary directory used by this method.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: loaded by the test framework by adding this class to the jpo-ode-svcs/src/test/resources/META-INF/services/java.nio.file.spi.FileTypeDetector file. It is needed, for some reason, for the Filesystem-related tests to run successfully. It lives in the test package as it is exclusively intended for test usage.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: This class was deleted as it didn't add any value. It didn't provide any extensibility. It didn't offer any extra functionality. It only served as a wrapper for creating the LogFileAsn1CodecPublisher, which did all the work.

@@ -216,7 +214,7 @@ private void publishList(List<OdeData> dataList, LogFileParser fileParser) {

// This method will check if the next character is a newline character (0x0A in hex or 10 in converted decimal)
// or if the next character does not contain a newline character it will put that character back into the buffered input stream
private BufferedInputStream removeNextNewLineCharacter(BufferedInputStream bis) {
private void removeNextNewLineCharacter(BufferedInputStream bis) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: the calls made in this method directly modified the BufferedInputStream passed it. The operations don't make a copy of the input stream, so we don't need to return the input stream.

@mcook42 mcook42 marked this pull request as ready for review February 3, 2025 18:19
@mcook42 mcook42 mentioned this pull request Feb 4, 2025
9 tasks
Copy link
Member

@dmccoystephenson dmccoystephenson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good & the unit tests pass! I just had some non-blocking questions

Copy link

@Michael7371 Michael7371 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good!

Passed Checks:

  1. tests pass
  2. All services start up through docker-compose
  3. log files process properly

@mcook42 mcook42 merged commit 6206fc2 into dev Feb 20, 2025
4 checks passed
@mcook42 mcook42 deleted the mcook42/spring-kafka/importer-processor branch February 20, 2025 21:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants