-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor: migrate ImporterProcessor to Spring Kafka #174
Conversation
…ing Kafka Removed `StompStringExporter` and its integration with the file upload controller, replacing it with Kafka-based message consumption using `@KafkaListener`. Updated related tests to reflect the changes, simplifying the architecture and removing unused components.
Removed the `Exporter` and `StompStringMessageDistributor` classes along with their associated test files. These components were not in use and their removal helps in reducing code clutter and maintenance overhead.
Replaced JMockit with Mockito for cleaner and more modern testing. Added setup with mocked SimpMessagingTemplate and refactored test cases to verify behavior using Mockito assertions. Improved test clarity by focusing on specific functionalities like `listenUnfiltered` and `listenFiltered`.
Added Javadoc comments to enhance code clarity and provide detailed explanations for methods and constructors. Updated formatting and variable alignment for better readability. Minor logging improvements for exception handling in the file not found scenario.
Replaced direct instantiation of `ImporterDirectoryWatcher` with a Spring-managed bean for improved configurability and dependency injection. Added a new configuration class `ImporterDirectoryWatcherConfig` to define the bean. Simplifies the `FileUploadController` constructor and enhances code maintainability.
Replaced JMockit with Mockito for mocking and expectations in tests, improving test readability and consistency with modern testing practices. Simplified imports and removed unused dependencies to clean up the codebase. Updated setup and test methods to align with Mockito's syntax and functionality.
Introduce a conditional configuration to enable or disable the `StompStringExporter` based on the `ode.stomp-exporter.enabled` property. This change optimizes resource usage by allowing the exporter to be disabled in production environments while retaining functionality for the Demo Console. Updated configuration files and sample environment variables accordingly.
Removed `FileTopics` class and its associated test. Updated configuration keys to align with the `stomp-exporter` naming convention. Adjusted YAML files and `StompStringExporter` constructor for the updated configuration structure.
Implemented LogFileParserFactory to streamline parser creation based on log file prefixes. Updated relevant classes and tests to use the new factory, ensuring consistent instantiation of log file parsers. This change improves maintainability and readability of the codebase.
Replaced JUnit 4 imports and annotations with JUnit 5 equivalents. Updated test methods to follow JUnit 5 method signatures and added necessary setup using @beforeeach. This improves test consistency and ensures compatibility with modern testing practices.
Replaced JUnit 4 imports and annotations with JUnit 5 equivalents. Updated test methods to follow JUnit 5 method signatures and added necessary setup using @beforeeach. This improves test consistency and ensures compatibility with modern testing practices.
Replaced JUnit 4 imports and annotations with JUnit 5 equivalents. Updated test methods to follow JUnit 5 method signatures and added necessary setup using @beforeeach. This improves test consistency and ensures compatibility with modern testing practices.
Replaced JUnit 4 imports and annotations with JUnit 5 equivalents. Updated test methods to follow JUnit 5 method signatures and added necessary setup using @beforeeach. This improves test consistency and ensures compatibility with modern testing practices.
Replaced JUnit 4 imports and annotations with JUnit 5 equivalents. Updated test methods to follow JUnit 5 method signatures and added necessary setup using @beforeeach. This improves test consistency and ensures compatibility with modern testing practices.
Updated the publish method to explicitly accept a LogFileParser instance, allowing for improved flexibility and better separation of concerns. Adjusted related methods and tests to accommodate the new parameter.
Replaced JMockit with Mockito for mocking behavior in LogFileToAsn1CodecPublisherTest. Simplified dependency initialization and adjusted test methods for compatibility, improving test maintainability and consistency.
jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/importer/ImporterDirectoryWatcher.java
Show resolved
Hide resolved
…/spring-kafka/importer-processor
} | ||
this.importerProcessor = new ImporterProcessor( | ||
new LogFileToAsn1CodecPublisher(kafkaTemplate, jsonTopics, rawEncodedJsonTopics), | ||
ImporterFileType.LOG_FILE, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note: There was a previous enum value for JSON_FILE, but as that enum class noted, only the LOG_FILE enum is supported. To reduce confusion, I removed the JSON_FILE enum and set this to the only supported type directly for clarity.
try { | ||
if (bis != null) { | ||
bis.close(); | ||
} | ||
if (inputStream != null) { | ||
inputStream.close(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note: by switching to try-with-resources syntax, we let Java take care of closing the input streams once they are no longer in scope (once the code leaves the try-catch block)
} | ||
|
||
@Override | ||
public Path load(String filename) { | ||
return rootLocation.resolve(filename); | ||
private final Path logFileLocation; | ||
|
||
/** | ||
* Constructs a FileSystemStorageService instance and initializes the file storage locations | ||
* based on the provided properties. | ||
* | ||
* @param properties The configuration properties used to determine the root and OBU log file upload locations. | ||
*/ | ||
@Autowired | ||
public FileSystemStorageService(FileImporterProperties properties) { | ||
|
||
this.logFileLocation = Paths.get(properties.getUploadLocationRoot(), | ||
properties.getObuLogUploadLocation()); | ||
|
||
log.info("Upload location (OBU log file): {}", this.logFileLocation); | ||
} | ||
|
||
/** | ||
* Stores a given MultipartFile in the appropriate directory based on the provided file type. | ||
* | ||
* <p>The method determines the destination path for the file based on its {@link LogFileType}. | ||
* It first verifies that the file is not empty and deletes any existing file with the same name | ||
* in the target directory. Once these checks are completed, it copies the file to the resolved path. | ||
* If any of these operations fail, a {@link StorageException} is thrown. | ||
* | ||
* @param file The file to be stored. Must not be empty. | ||
* @param type The type of the file, which determines the destination path. Expected types include | ||
* {@code LogFileType.BSM}, {@code LogFileType.OBU}, or {@code LogFileType.UNKNOWN}. | ||
* | ||
* @throws StorageException If the file type is unknown, the file is empty, an error occurs | ||
* while deleting an existing file, or an error occurs during file storage. | ||
*/ | ||
@Override | ||
public void store(MultipartFile file, LogFileType type) { | ||
|
||
// Discern the destination path via the file type (bsm or messageFrame) | ||
Path path; | ||
switch (type) { | ||
case BSM, OBU -> path = this.logFileLocation.resolve(Objects.requireNonNull(file.getOriginalFilename())); | ||
case UNKNOWN -> { | ||
log.error("File type unknown: {} {}", type, file.getName()); | ||
throw new StorageException("File type unknown: " + type + " " + file.getName()); | ||
} | ||
default -> throw new StorageException("File type unknown: " + type + " " + file.getName()); | ||
} | ||
|
||
@Override | ||
public Resource loadAsResource(String filename) { | ||
try { | ||
Path file = load(filename); | ||
Resource resource = new UrlResource(file.toUri()); | ||
if (resource.exists() && resource.isReadable()) { | ||
return resource; | ||
} else { | ||
throw new StorageFileNotFoundException("Could not read file: " + filename); | ||
} | ||
} catch (MalformedURLException e) { | ||
throw new StorageFileNotFoundException("Could not read file: " + filename, e); | ||
} | ||
// Check file is not empty | ||
if (file.isEmpty()) { | ||
log.error("File is empty: {}", path); | ||
throw new StorageException("File is empty: " + path); | ||
} | ||
|
||
@Override | ||
public void deleteAll() { | ||
FileSystemUtils.deleteRecursively(rootLocation.toFile()); | ||
EventLogger.logger.info("Deleting {}", this.rootLocation); | ||
// Check file does not already exist (if so, delete existing) | ||
try { | ||
log.info("Deleting existing file: {}", path); | ||
Files.deleteIfExists(path); | ||
} catch (IOException e) { | ||
log.error("Failed to delete existing file: {} ", path); | ||
throw new StorageException("Failed to delete existing file: " + path, e); | ||
} | ||
|
||
@Override | ||
public void init() { | ||
try { | ||
Files.createDirectory(rootLocation); | ||
} catch (IOException e) { | ||
EventLogger.logger.error("Failed to initialize storage service {}", this.rootLocation); | ||
throw new StorageException("Failed to initialize storage service " + this.rootLocation, e); | ||
} | ||
// Copy the file to the relevant directory | ||
try { | ||
log.debug("Copying file {} to {}", file.getOriginalFilename(), path); | ||
log.info("Copying file {} to {}", file.getOriginalFilename(), path); | ||
Files.copy(file.getInputStream(), path); | ||
} catch (Exception e) { | ||
log.error("Failed to store file in shared directory {}", path); | ||
throw new StorageException("Failed to store file in shared directory " + path, e); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note: all of this code was exclusively used within tests. As it was test code, I moved it to the relevant test classes
Resource loadAsResource(String filename); | ||
|
||
void deleteAll(); | ||
void store(MultipartFile file, LogFileType type); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: store
was the only method defined on this interface with a corresponding production-related implementation. The other methods were used solely for tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note: the ByteArrayPublisher was deleted, so this test was deleted, too (the same goes for all deleted *PublisherTest
files)
@@ -303,14 +332,17 @@ void testPublishNonLearLogFile() throws Exception { | |||
*/ | |||
|
|||
List<OdeData> dataList = | |||
testLogFileToAsn1CodecPublisher.publish(bis, filename, ImporterFileType.JSON_FILE, LogFileParserFactory.getLogFileParser(filename)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note: JSON_FILE
was being used as an indirect indicator that the file type was unsupported. Switching to UNKNOWN
makes this intent clearer
|
||
@BeforeEach | ||
void setup() throws IOException { | ||
dirToProcess = new File(System.getProperty("java.io.tmpdir") + "/filesToProcess").toPath(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note: java.io.tmpdir
is built-in. From the File docs:
The default temporary-file directory is specified by the system property java.io.tmpdir. On UNIX systems the default value of this property is typically "/tmp" or "/var/tmp"; on Microsoft Windows systems it is typically "C:\WINNT\TEMP". A different value may be given to this system property when the Java virtual machine is invoked, but programmatic changes to this property are not guaranteed to have any effect upon the temporary directory used by this method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note: loaded by the test framework by adding this class to the jpo-ode-svcs/src/test/resources/META-INF/services/java.nio.file.spi.FileTypeDetector
file. It is needed, for some reason, for the Filesystem-related tests to run successfully. It lives in the test package as it is exclusively intended for test usage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: This class was deleted as it didn't add any value. It didn't provide any extensibility. It didn't offer any extra functionality. It only served as a wrapper for creating the LogFileAsn1CodecPublisher, which did all the work.
@@ -216,7 +214,7 @@ private void publishList(List<OdeData> dataList, LogFileParser fileParser) { | |||
|
|||
// This method will check if the next character is a newline character (0x0A in hex or 10 in converted decimal) | |||
// or if the next character does not contain a newline character it will put that character back into the buffered input stream | |||
private BufferedInputStream removeNextNewLineCharacter(BufferedInputStream bis) { | |||
private void removeNextNewLineCharacter(BufferedInputStream bis) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note: the calls made in this method directly modified the BufferedInputStream passed it. The operations don't make a copy of the input stream, so we don't need to return the input stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good & the unit tests pass! I just had some non-blocking questions
jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/importer/ImporterDirectoryWatcher.java
Show resolved
Hide resolved
jpo-ode-svcs/src/main/java/us/dot/its/jpo/ode/upload/FileUploadController.java
Show resolved
Hide resolved
jpo-ode-svcs/src/test/resources/META-INF/services/java.nio.file.spi.FileTypeDetector
Show resolved
Hide resolved
…/spring-kafka/importer-processor
…backupFileShouldThrowExceptionUnableToMoveFile
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good!
Passed Checks:
- tests pass
- All services start up through docker-compose
- log files process properly
PR Details
Description
instanceof
to deduplicate casting and improve readabilityRelated Issue
N/A
Motivation and Context
Implementing Spring Kafka gives us better lifecycle management of producers and consumers, more reusable producer/consumer code, easier testability, and a more robust production-ready Kafka library. This is part of a more significant effort to replace our hand-rolled Kafka implementation with Spring Kafka. The previous changesets related to this effort are:
#118
#116
#123
#129
#131
#134
#162
How Has This Been Tested?
Aside from the new and existing unit tests, I also started up the application via
make start
. While the application was running, I navigated to the ODE Demo UI and uploaded multiple files located in thedata/
directory (at least one of each file type: gzip, bin, json, zip). I added a ZIP file to verify the ZIP file processing still works as intended (there wasn't a ZIP file anywhere I could find). The files I used for testing were:During testing, I found that the dev branch failed to process the
data/bsmLogDuringEvent.json
file (and any.json
files I tested). This branch behaves the same. The error we see when processing this file is:Types of changes
Checklist:
ODE Contributing Guide