Skip to content

Commit

Permalink
Removed PosgresQueryQueue and refactored PosgresProfileStreamer to us…
Browse files Browse the repository at this point in the history
…e shared PostgresProfileQueryHandler
  • Loading branch information
joshhaug committed Feb 12, 2025
1 parent e112c6f commit 24221cb
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,22 @@
import static gov.nasa.jpl.aerie.merlin.server.remotes.postgres.PostgresParsers.realProfileTypeP;

/**
* Utility class to handle upload of resource profiles to the database. This class allows for each upload to have its
* own Connection object.
* Utility class to handle upload of resource profiles to the database.
* */
public class PostgresProfileQueryHandler implements AutoCloseable {
private final Connection connection;
private final HashMap<String, Integer> profileIds;
private final HashMap<String, Duration> profileDurations;

private PreparedStatement postProfileStatement;
private PreparedStatement postSegmentsStatement;
private PreparedStatement updateDurationStatement;

private long datasetId;
private final PreparedStatement postProfileStatement;
private final PreparedStatement postSegmentsStatement;
private final PreparedStatement updateDurationStatement;

public PostgresProfileQueryHandler(DataSource dataSource, long datasetId) throws SQLException {
this.connection = dataSource.getConnection();
connection = dataSource.getConnection();
profileIds = new HashMap<>();
profileDurations = new HashMap<>();
this.datasetId = datasetId;
prepareStatements();
}

public void prepareStatements() throws SQLException {
final String postProfilesSql =
//language=sql
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,33 +5,34 @@

import javax.sql.DataSource;
import java.sql.SQLException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;

public class PostgresProfileStreamer implements Consumer<ResourceProfiles>, AutoCloseable {
private final DataSource dataSource;
private long datasetId;
private PostgresQueryQueue queryQueue;
private final ExecutorService queryQueue;
private final PostgresProfileQueryHandler queryHandler;

public PostgresProfileStreamer(DataSource dataSource, long datasetId) throws SQLException {
this.dataSource = dataSource;
this.datasetId = datasetId;
this.queryQueue = new PostgresQueryQueue();
this.queryQueue = Executors.newSingleThreadExecutor();
this.queryHandler = new PostgresProfileQueryHandler(dataSource, datasetId);
}

@Override
public void accept(final ResourceProfiles resourceProfiles) {
queryQueue.addToQueue(() -> {
try (var transaction = new PostgresProfileQueryHandler(dataSource, datasetId)) {
transaction.uploadResourceProfiles(resourceProfiles);
} catch (SQLException e) {
throw new DatabaseException("Exception occurred while posting profiles.", e);
}
queryQueue.submit(() -> {
queryHandler.uploadResourceProfiles(resourceProfiles);
});
}

@Override
public void close() {
queryQueue.shutdown();
try {
queryHandler.close();
} catch (SQLException e) {
throw new DatabaseException("Error occurred while attempting to close PostgresProfileQueryHandler", e);
}
}

}

This file was deleted.

0 comments on commit 24221cb

Please sign in to comment.