Skip to content

Commit

Permalink
Get Dataset Lineage Details
Browse files Browse the repository at this point in the history
  • Loading branch information
knighto82 committed Jan 16, 2025
1 parent 92f6eae commit fe1e8c9
Show file tree
Hide file tree
Showing 12 changed files with 302 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.github.tomakehurst.wiremock.client.WireMock;
import io.github.jpmorganchase.fusion.model.*;
import io.github.jpmorganchase.fusion.test.TestUtils;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

Expand All @@ -13,8 +14,7 @@
import static com.github.tomakehurst.wiremock.client.WireMock.equalToJson;
import static io.github.jpmorganchase.fusion.test.TestUtils.listOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.*;

public class DatasetOperationsIT extends BaseOperationsIT {

Expand Down Expand Up @@ -150,7 +150,7 @@ public void testUpdateDatasetLineage() {


// When & Then
Assertions.assertDoesNotThrow(() -> dataset.createLineage(DatasetLineage.builder()
Assertions.assertDoesNotThrow(() -> dataset.createLineage(SourceDatasets.builder()
.source(new LinkedHashSet<>(Arrays.asList(
DatasetReference.builder().catalog("foo").dataset("d1").build(),
DatasetReference.builder().catalog("foo").dataset("d2").build(),
Expand All @@ -160,6 +160,59 @@ public void testUpdateDatasetLineage() {
.build()));
}

@Test
public void testGetDatasetLineage(){
// Given
wireMockRule.stubFor(WireMock.get(WireMock.urlEqualTo("/catalogs/common/datasets/SD0002/lineage"))
.willReturn(WireMock.aResponse()
.withHeader("Content-Type", "application/json")
.withStatus(200)
.withBodyFile("dataset/dataset-SD0002-get-lineage-response.json")));

//When
Dataset dataset = getSdk().builders().dataset()
.identifier("SD0002")
.catalogIdentifier("common")
.build();

DatasetLineage lineage = dataset.getLineage();

//Then
assertThat(lineage, notNullValue());
assertThat(lineage.getDatasets(), notNullValue());
assertThat(lineage.getDatasets().size(), Matchers.is(2));
assertThat(lineage.getRelations(), notNullValue());
assertThat(lineage.getRelations(), containsInAnyOrder(
relationship("common","SD0002", "common","SD0001"),
relationship("common","SD0003", "common","SD0002")
));

}

@Test
public void testGetDatasetLineageWithNoRelationships(){
// Given
wireMockRule.stubFor(WireMock.get(WireMock.urlEqualTo("/catalogs/common/datasets/SD0002/lineage"))
.willReturn(WireMock.aResponse()
.withHeader("Content-Type", "application/json")
.withStatus(200)
.withBodyFile("dataset/dataset-SD0002-get-lineage-empty-response.json")));

//When
Dataset dataset = getSdk().builders().dataset()
.identifier("SD0002")
.catalogIdentifier("common")
.build();

DatasetLineage lineage = dataset.getLineage();

//Then
assertThat(lineage, notNullValue());
assertThat(lineage.getDatasets(), Matchers.is(Matchers.empty()));
assertThat(lineage.getRelations(), Matchers.is(Matchers.empty()));

}

@Test
public void testUpdateDatasetRetrievedFromListDatasets() {
// Given
Expand Down Expand Up @@ -264,4 +317,17 @@ public void testListDatasetsUsingIdContains() {

}

private DatasetRelationship relationship(String srcCatalog, String srcDataset, String destCatalog, String destDataset) {
return DatasetRelationship.builder()
.source(DatasetReference.builder()
.catalog(srcCatalog)
.dataset(srcDataset)
.build())
.destination(DatasetReference.builder()
.catalog(destCatalog)
.dataset(destDataset)
.build())
.build();
}

}
16 changes: 16 additions & 0 deletions src/main/java/io/github/jpmorganchase/fusion/Fusion.java
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,22 @@ public Map<String, Map<String, Object>> datasetResources(String catalogName, Str
return this.callForMap(url);
}

/**
* Get the lineage for a dataset, in the specified catalog
* Currently this will always return a lineage.
*
* @param catalogName identifier of the catalog to be queried
* @param dataset a String representing the dataset identifier to query.
* @throws APICallException if the call to the Fusion API fails
* @throws ParsingException if the response from Fusion could not be parsed successfully
* @throws OAuthException if a token could not be retrieved for authentication
*/
public DatasetLineage getLineage(String catalogName, String dataset) {

String url = String.format("%1scatalogs/%2s/datasets/%3s/lineage", this.rootURL, catalogName, dataset);
return responseParser.parseDatasetLineage(this.api.callAPI(url), catalogName);
}

/**
* Get a filtered list of the reports in the specified catalog
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,14 @@ protected String getApiPathForLineage() {
*
* @param lineage the {@code DatasetLineage} object representing the dataset lineage to be created
*/
public void createLineage(DatasetLineage lineage) {
public void createLineage(SourceDatasets lineage) {
getFusion().create(getApiPathForLineage(), lineage);
}

public DatasetLineage getLineage() {
return getFusion().getLineage(this.getCatalogIdentifier(), this.getIdentifier());
}

@Override
public Set<String> getRegisteredAttributes() {
Set<String> exclusions = super.getRegisteredAttributes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@
@Builder
public class DatasetLineage {

Set<DatasetReference> source;
Set<Dataset> datasets;
Set<DatasetRelationship> relations;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.github.jpmorganchase.fusion.model;

import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import lombok.Value;

@Value
@EqualsAndHashCode
@ToString
@Builder
public class DatasetRelationship {

DatasetReference source;
DatasetReference destination;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package io.github.jpmorganchase.fusion.model;

import java.util.Set;
import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.ToString;
import lombok.Value;

@Value
@EqualsAndHashCode
@ToString
@Builder
public class SourceDatasets {

Set<DatasetReference> source;
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ public interface APIResponseParser {

Map<String, Dataset> parseDatasetResponse(String json, String catalog);

DatasetLineage parseDatasetLineage(String json, String catalog);

Map<String, Report> parseReportResponse(String json, String catalog);

Map<String, DataFlow> parseDataFlowResponse(String json, String catalog);
Expand All @@ -24,9 +26,6 @@ public interface APIResponseParser {

Map<String, Distribution> parseDistributionResponse(String json);

<T extends CatalogResource> T parseResourceFromResponse(
String json, Class<T> resourceClass, ResourceMutationFactory<T> mutator);

<T extends CatalogResource> Map<String, T> parseResourcesFromResponse(String json, Class<T> resourceClass);

<T extends CatalogResource> Map<String, T> parseResourcesWithVarArgsFromResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,23 @@ public Map<String, Dataset> parseDatasetResponse(String json, String catalog) {
.build());
}

@Override
public DatasetLineage parseDatasetLineage(String json, String catalog) {

Map<String, Dataset> datasets = parseResourcesWithVarArgsFromResponse(
json, Dataset.class, "datasets", (resource, mc) -> resource.toBuilder()
.varArgs(mc.getVarArgs())
.fusion(fusion)
.build());

List<DatasetRelationship> relations = parseListOfResources(json, DatasetRelationship.class, "relations");

return DatasetLineage.builder()
.relations(new HashSet<>(relations))
.datasets(new HashSet<>(datasets.values()))
.build();
}

/**
* Parses a JSON response to extract a map of reports.
* <p>
Expand Down Expand Up @@ -171,21 +188,16 @@ public UploadedPart parseUploadPartResponse(String json) {
}

@Override
public <T extends CatalogResource> T parseResourceFromResponse(
public <T extends CatalogResource> Map<String, T> parseResourcesWithVarArgsFromResponse(
String json, Class<T> resourceClass, ResourceMutationFactory<T> mutator) {

Map<String, Object> responseMap = getMapFromJsonResponse(json);
T obj = gson.fromJson(json, resourceClass);

return parseResourceWithVarArgs(obj.getRegisteredAttributes(), obj, responseMap, mutator);
return parseResourcesWithVarArgsFromResponse(json, resourceClass, "resources", mutator);
}

@Override
public <T extends CatalogResource> Map<String, T> parseResourcesWithVarArgsFromResponse(
String json, Class<T> resourceClass, ResourceMutationFactory<T> mutator) {
String json, Class<T> resourceClass, String resourceAttribute, ResourceMutationFactory<T> mutator) {

Map<String, Map<String, Object>> untypedResources = parseResourcesUntyped(json);
JsonArray resources = getResources(json);
Map<String, Map<String, Object>> untypedResources = parseResourcesUntyped(json, resourceAttribute);
JsonArray resources = getResources(json, resourceAttribute);

List<T> resourceList = new ArrayList<>();
for (JsonElement element : resources) {
Expand All @@ -199,25 +211,34 @@ public <T extends CatalogResource> Map<String, T> parseResourcesWithVarArgsFromR

@Override
public <T extends CatalogResource> Map<String, T> parseResourcesFromResponse(String json, Class<T> resourceClass) {
return parseResourcesFromResponse(json, resourceClass, "resources");
}

JsonArray resources = getResources(json);
public <T> List<T> parseListOfResources(String json, Class<T> resourceClass, String resourceAttribute) {
JsonArray resources = getResources(json, resourceAttribute);

Type listType = TypeToken.getParameterized(List.class, resourceClass).getType();
List<T> resourceList = gson.fromJson(resources, listType);
return gson.fromJson(resources, listType);
}

return collectMapOfUniqueResources(resourceList);
public <T extends CatalogResource> Map<String, T> parseResourcesFromResponse(
String json, Class<T> resourceClass, String resourceAttribute) {
return collectMapOfUniqueResources(parseListOfResources(json, resourceClass, resourceAttribute));
}

@Override
public Map<String, Map<String, Object>> parseResourcesUntyped(String json) {
return parseResourcesUntyped(json, "resources");
}

public Map<String, Map<String, Object>> parseResourcesUntyped(String json, String resourceAttribute) {
Map<String, Object> responseMap = getMapFromJsonResponse(json);

Object resources = responseMap.get("resources");
Object resources = responseMap.get(resourceAttribute);
if (resources instanceof List) {
@SuppressWarnings("unchecked") // List<Object> is always safe, compiler disagrees
List<Object> resourceList = (List<Object>) resources;

if (resourceList.size() == 0) throw generateNoResourceException();
Map<String, Map<String, Object>> resourcesMap = new HashMap<>();
resourceList.forEach((o -> {
@SuppressWarnings("unchecked") // Output of GSON parsing will always be in this format
Expand All @@ -232,11 +253,11 @@ public Map<String, Map<String, Object>> parseResourcesUntyped(String json) {
}
}

private JsonArray getResources(String json) {
private JsonArray getResources(String json, String resourceAttribute) {
JsonObject obj = JsonParser.parseString(json).getAsJsonObject();
JsonArray array = obj.getAsJsonArray("resources");
if (array == null || array.size() == 0) {
throw generateNoResourceException();
JsonArray array = obj.getAsJsonArray(resourceAttribute);
if (array == null) {
array = new JsonArray();
}
return array;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static io.github.jpmorganchase.fusion.pact.util.RequestResponseHelper.getExpectation;
import static org.hamcrest.CoreMatchers.*;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;

import au.com.dius.pact.consumer.MockServer;
Expand All @@ -19,7 +20,6 @@
import io.github.jpmorganchase.fusion.model.*;
import io.github.jpmorganchase.fusion.oauth.provider.FusionTokenProvider;
import io.github.jpmorganchase.fusion.pact.util.FileHelper;
import io.github.jpmorganchase.fusion.parsing.ParsingException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
Expand Down Expand Up @@ -218,11 +218,8 @@ void testListCatalogsWhenNoneAreAvailable(MockServer mockServer) {

givenInstanceOfFusionSdk(mockServer);

ParsingException ex = Assertions.assertThrows(ParsingException.class, () -> fusion.listCatalogs());
assertThat(
"Exception message is incorrect",
ex.getMessage(),
is(equalTo("Failed to parse resources from JSON, none found")));
Map<String, Catalog> actual = fusion.listCatalogs();
assertThat("Empty map expected", actual, is(anEmptyMap()));
}

@Test
Expand Down Expand Up @@ -304,11 +301,8 @@ void testListProductsWhenNoneExist(MockServer mockServer) {

givenInstanceOfFusionSdk(mockServer);

ParsingException ex = Assertions.assertThrows(ParsingException.class, () -> fusion.listProducts("common"));
assertThat(
"Exception message is incorrect",
ex.getMessage(),
is(equalTo("Failed to parse resources from JSON, none found")));
Map<String, DataProduct> actual = fusion.listProducts("common");
assertThat("Exception message is incorrect", actual, is(anEmptyMap()));
}

@Test
Expand Down Expand Up @@ -338,11 +332,8 @@ void testListDatasetsWhenNoneExist(MockServer mockServer) {

givenInstanceOfFusionSdk(mockServer);

ParsingException ex = Assertions.assertThrows(ParsingException.class, () -> fusion.listDatasets("common"));
assertThat(
"Exception message is incorrect",
ex.getMessage(),
is(equalTo("Failed to parse resources from JSON, none found")));
Map<String, Dataset> actual = fusion.listDatasets("common");
assertThat("Empty map expected", actual, is(anEmptyMap()));
}

@Test
Expand Down Expand Up @@ -395,12 +386,8 @@ void testListDatasetMembersWhenNoneExist(MockServer mockServer) {

givenInstanceOfFusionSdk(mockServer);

ParsingException ex =
Assertions.assertThrows(ParsingException.class, () -> fusion.listDatasetMembers("common", "API_TEST"));
assertThat(
"Exception message is incorrect",
ex.getMessage(),
is(equalTo("Failed to parse resources from JSON, none found")));
Map<String, DatasetSeries> actual = fusion.listDatasetMembers("common", "API_TEST");
assertThat("Empty map expected", actual, is(anEmptyMap()));
}

@Test
Expand Down
Loading

0 comments on commit fe1e8c9

Please sign in to comment.