Skip to content

Commit

Permalink
Revert "Fix/md5 management (#108)"
Browse files Browse the repository at this point in the history
This reverts commit 7e098d6.
  • Loading branch information
lcardito committed Jul 29, 2024
1 parent 163e6dd commit 628d996
Show file tree
Hide file tree
Showing 12 changed files with 39 additions and 599 deletions.
78 changes: 0 additions & 78 deletions dice-where-downloader-lib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,55 +15,8 @@
<slf4j.version>1.7.30</slf4j.version>
<jackson.version>2.14.2</jackson.version>
<javax-bind.version>2.3.1</javax-bind.version>
<test-containers.version>1.19.8</test-containers.version>
<wiremock.version>3.0.1</wiremock.version>
</properties>

<build>
<plugins>
<plugin>
<groupId>org.jacoco</groupId>
<artifactId>jacoco-maven-plugin</artifactId>
<version>${jacoco.version}</version>
<executions>
<execution>
<id>pre-unit-tests</id>
<goals>
<goal>prepare-agent</goal>
</goals>
</execution>
<execution>
<id>post-unit-tests</id>
<phase>test</phase>
<goals>
<goal>report</goal>
</goals>
</execution>
<execution>
<id>default-check</id>
<goals>
<goal>check</goal>
</goals>
<configuration>
<rules>
<rule implementation="org.jacoco.maven.RuleConfiguration">
<element>BUNDLE</element>
<limits>
<limit implementation="org.jacoco.report.check.Limit">
<counter>COMPLEXITY</counter>
<value>COVEREDRATIO</value>
<minimum>0.1</minimum>
</limit>
</limits>
</rule>
</rules>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
Expand Down Expand Up @@ -107,37 +60,6 @@
<version>${javax-bind.version}</version>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>${test-containers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>${test-containers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>localstack</artifactId>
<version>${test-containers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>nginx</artifactId>
<version>${test-containers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.wiremock</groupId>
<artifactId>wiremock</artifactId>
<version>${wiremock.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import technology.dice.dicewhere.downloader.source.FileSource;

public abstract class Download {

private static final Logger LOG = LoggerFactory.getLogger(Download.class);

protected final boolean noCheckMd5;
Expand Down Expand Up @@ -39,29 +38,25 @@ protected DownloadExecutionResult process(FileAcceptor<?> acceptor, FileSource f
result = processFileDoesNotExist(acceptor, fileSource, pathWritable);
}
LOG.info("A new file was" + (result.isNewFileDownloaded() ? "" : " not") + " downloaded");
LOG.info("Download is " + (!result.isSuccessful() ? "un" : "" + "successful"));
return result;
}

private DownloadExecutionResult processFileDoesNotExist(
FileAcceptor<?> acceptor, FileSource fileSource, boolean pathWritable) {

if (pathWritable) {
final MD5Checksum md5Checksum = fileSource.produce(acceptor, noCheckMd5);
LOG.info("File transferred");
final MD5Checksum md5Checksum = fileSource.produce(acceptor);
LOG.info("File successfully transferred");
if (!noCheckMd5) {
boolean checksumMatches = md5Checksum.matches(fileSource.fileInfo().getMd5Checksum());
if (!checksumMatches) {
LOG.error(
LOG.warn(
"Local and remote files' MD5 do not match: "
+ md5Checksum.stringFormat()
+ " Vs. "
+ fileSource.fileInfo().getMd5Checksum().stringFormat());
} else {
LOG.info("MD5 matches that of the remote file: "
+ md5Checksum.stringFormat()
+ " Vs. "
+ fileSource.fileInfo().getMd5Checksum().stringFormat());
LOG.info("MD5 matches that of the remote file");
}
return new DownloadExecutionResult(
true, checksumMatches, md5Checksum, acceptor.getUri(), checksumMatches);
Expand Down Expand Up @@ -92,10 +87,7 @@ private DownloadExecutionResult processFileExists(
+ " Vs. "
+ fileSource.fileInfo().getMd5Checksum().stringFormat());
} else {
LOG.info("MD5 matches that of the remote file: "
+ existingMd5.map(md5 -> md5.stringFormat()).orElse("?")
+ " Vs. "
+ fileSource.fileInfo().getMd5Checksum().stringFormat());
LOG.info("MD5 matches that of the remote file");
}
return new DownloadExecutionResult(
false,
Expand All @@ -114,8 +106,7 @@ private DownloadExecutionResult processFileExists(

protected abstract DownloadExecutionResult execute();

protected void checkNecessaryEnvironmentVariables() {
}
protected void checkNecessaryEnvironmentVariables() {}

public boolean isVerbose() {
return verbose;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package technology.dice.dicewhere.downloader.actions.ipinfo;

import java.net.URI;
import technology.dice.dicewhere.downloader.Download;
import technology.dice.dicewhere.downloader.PathUtils;

public abstract class IpInfoBaseDownload extends Download {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@
import technology.dice.dicewhere.downloader.stream.StreamConsumer;

public interface FileAcceptor<T> {

StreamConsumer<T> getStreamConsumer(MD5Checksum originalFileMd5, Instant originalFileTimestamp,
boolean noMd5Check);
StreamConsumer<T> getStreamConsumer(MD5Checksum originalFileMd5, Instant originalFileTimestamp);

boolean destinationExists();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import technology.dice.dicewhere.downloader.stream.StreamWithMD5Decorator;

public class LocalFileAcceptor implements FileAcceptor<Void> {

private static final Logger LOG = LoggerFactory.getLogger(LocalFileAcceptor.class);
public static final int BUFFER = 8192;

Expand All @@ -31,7 +30,7 @@ public LocalFileAcceptor(Path destination) {

@Override
public StreamConsumer<Void> getStreamConsumer(
MD5Checksum originalFileMd5, Instant originalFileTimestamp, boolean noMd5Check) {
MD5Checksum originalFileMd5, Instant originalFileTimestamp) {
return (stream, size) -> {
try {
Files.createDirectories(destination);
Expand All @@ -40,10 +39,6 @@ public StreamConsumer<Void> getStreamConsumer(
LOG.debug("Destination directory already exists");
}
Files.copy(stream, destination, StandardCopyOption.REPLACE_EXISTING);
if ((!noMd5Check) && (!originalFileMd5.matches(stream.md5()))) {
LOG.error("MD5 mismatch. Deleting destination file");
Files.delete(destination);
}
return null;
};
}
Expand Down Expand Up @@ -73,12 +68,11 @@ public Optional<MD5Checksum> existingFileMd5() {
BufferedInputStream bis = new BufferedInputStream(is);
StreamWithMD5Decorator md5Is = StreamWithMD5Decorator.of(bis)) {
byte[] buffer = new byte[BUFFER];
while ((md5Is.read(buffer)) != -1) {
}
while ((md5Is.read(buffer)) != -1) {}
return Optional.of(md5Is.md5());
} catch (IOException | NoSuchAlgorithmException e) {
throw new RuntimeException(
"Could not obtain md5 of the file existing at the target: " + destination,
"Could not obtain md5 of the file existing at the target: " + destination.toString(),
e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,8 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
Expand All @@ -25,8 +22,8 @@

public class S3FileAcceptor implements FileAcceptor<Void> {

private static final Logger LOG = LoggerFactory.getLogger(S3FileAcceptor.class);
private static final String LATEST_KEY = "latest";
public static final String MD5_METADATA_KEY = "md5";
public static final String TIMESTAMP_METADATA_KEY = "ts";
private final S3Client client;
private final String bucket;
Expand All @@ -45,10 +42,11 @@ public S3FileAcceptor(

@Override
public StreamConsumer<Void> getStreamConsumer(
MD5Checksum originalFileMd5, Instant originalFileTimestamp, boolean noMd5Check) {
MD5Checksum originalFileMd5, Instant originalFileTimestamp) {
return (StreamConsumer)
(stream, size) -> {
Map<String, String> objectMetadata = new HashMap<>();
objectMetadata.put(MD5_METADATA_KEY, originalFileMd5.stringFormat());
objectMetadata.put(
TIMESTAMP_METADATA_KEY, String.valueOf(originalFileTimestamp.toEpochMilli()));
PutObjectRequest putObjectRequest =
Expand All @@ -60,29 +58,22 @@ public StreamConsumer<Void> getStreamConsumer(
.storageClass(StorageClass.INTELLIGENT_TIERING)
.build();
client.putObject(putObjectRequest, RequestBody.fromInputStream(stream, size));

Latest latest = new Latest(clock.instant(), key);
String latestContent = mapper.writeValueAsString(latest);

if ((!noMd5Check) && (!originalFileMd5.matches(stream.md5()))) {
LOG.error("MD5 mismatch. Deleting destination file");
client.deleteObject(DeleteObjectRequest.builder()
.bucket(bucket)
.key(key)
.build());
} else {
PutObjectRequest putLatest =
PutObjectRequest.builder()
.key(Paths.get(key).getParent().toString() + "/" + LATEST_KEY)
.bucket(bucket)
.contentLength((long) latestContent.length())
.storageClass(StorageClass.INTELLIGENT_TIERING)
.build();
client.putObject(
putLatest,
RequestBody.fromInputStream(
new StringInputStream(latestContent), latestContent.length()));

PutObjectRequest putLatest =
PutObjectRequest.builder()
.key(Paths.get(key).getParent().toString() + "/" + LATEST_KEY)
.bucket(bucket)
.contentLength((long) latestContent.length())
.storageClass(StorageClass.INTELLIGENT_TIERING)
.build();
client.putObject(
putLatest,
RequestBody.fromInputStream(
new StringInputStream(latestContent), latestContent.length()));
}
return null;
};
}
Expand Down Expand Up @@ -112,7 +103,8 @@ public Optional<MD5Checksum> existingFileMd5() {

try {
final HeadObjectResponse headObjectResponse = client.headObject(headObjectRequest);
return Optional.ofNullable(headObjectResponse.eTag()).map(m -> MD5Checksum.of(m));
final Map<String, String> metadata = headObjectResponse.metadata();
return Optional.ofNullable(metadata.get(MD5_METADATA_KEY)).map(m -> MD5Checksum.of(m));
} catch (NoSuchKeyException e) {
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import technology.dice.dicewhere.downloader.stream.StreamWithMD5Decorator;

public abstract class BaseUrlSource implements FileSource {

protected FileInfo fileInfo;
protected final URL dataFileLocation;

Expand All @@ -20,14 +19,14 @@ protected BaseUrlSource(URL dataFileLocation) {
}

@Override
public MD5Checksum produce(FileAcceptor acceptor, boolean noMd5Check) {
public MD5Checksum produce(FileAcceptor acceptor) {
try {
HttpURLConnection httpConnection = (HttpURLConnection) this.dataFileLocation.openConnection();
httpConnection.setRequestMethod("GET");

try (StreamWithMD5Decorator is = StreamWithMD5Decorator.of(httpConnection.getInputStream())) {
acceptor
.getStreamConsumer(fileInfo.getMd5Checksum(), fileInfo.getTimestamp(), noMd5Check)
.getStreamConsumer(fileInfo.getMd5Checksum(), fileInfo.getTimestamp())
.consume(is, fileInfo.getSize());
return is.md5();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
import technology.dice.dicewhere.downloader.destination.FileAcceptor;

public interface FileSource {

FileInfo fileInfo();

MD5Checksum produce(FileAcceptor consumer, boolean noMd5Check);
MD5Checksum produce(FileAcceptor consumer);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import technology.dice.dicewhere.downloader.stream.StreamWithMD5Decorator;

public class S3Source implements FileSource {

private static Logger LOG = LoggerFactory.getLogger(S3Source.class);
public static final String MD5_METADATA_KEY = "md5";
public static final String TIMESTAMP_METADATA_KEY = "ts";
private final S3Client client;
private final String bucket;
Expand All @@ -43,11 +43,11 @@ public FileInfo fileInfo() {
HeadObjectRequest.builder().key(key).bucket(bucket).build();

final HeadObjectResponse headObjectResponse = client.headObject(headObjectRequest);
if (headObjectResponse.eTag() == null) {
final Map<String, String> metadata = headObjectResponse.metadata();
if (!metadata.containsKey(MD5_METADATA_KEY)) {
throw new DownloaderException(
"Remote file does not have md5 information. Please delete the file and re-upload");
}
final Map<String, String> metadata = headObjectResponse.metadata();
if (!metadata.containsKey(TIMESTAMP_METADATA_KEY)) {
LOG.warn("Timestamp not available at source. Using now as timestamp.");
}
Expand All @@ -59,20 +59,20 @@ public FileInfo fileInfo() {
Optional.ofNullable(metadata.get(TIMESTAMP_METADATA_KEY))
.map(m -> Instant.ofEpochMilli(Long.valueOf(m)))
.orElse(Instant.now()),
MD5Checksum.of(headObjectResponse.eTag()),
MD5Checksum.of(metadata.get(MD5_METADATA_KEY)),
size);
}

return this.fileInfo;
}

@Override
public MD5Checksum produce(FileAcceptor consumer, boolean noMd5Check) {
public MD5Checksum produce(FileAcceptor consumer) {
GetObjectRequest getObjectRequest = GetObjectRequest.builder().bucket(bucket).key(key).build();
try (final ResponseInputStream<GetObjectResponse> object = client.getObject(getObjectRequest);
StreamWithMD5Decorator is = StreamWithMD5Decorator.of(object)) {
consumer
.getStreamConsumer(fileInfo.getMd5Checksum(), fileInfo.getTimestamp(), noMd5Check)
.getStreamConsumer(fileInfo.getMd5Checksum(), fileInfo.getTimestamp())
.consume(is, fileInfo.getSize());
return is.md5();
} catch (IOException | NoSuchAlgorithmException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@

@FunctionalInterface
public interface StreamConsumer<T> {
T consume(StreamWithMD5Decorator stream, long size) throws IOException;
T consume(InputStream stream, long size) throws IOException;
}
Loading

0 comments on commit 628d996

Please sign in to comment.