Skip to content

Commit

Permalink
fix: Fix serialization problems with migration (#2331)
Browse files Browse the repository at this point in the history
  • Loading branch information
dominikriemer authored Dec 13, 2023
1 parent 5b18838 commit dcdea28
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

import java.net.URISyntaxException;
import java.util.List;
import java.util.NoSuchElementException;

/**
* This class is responsible for managing all the adapter instances which are executed on worker nodes
Expand Down Expand Up @@ -150,7 +151,11 @@ public void stopStreamAdapter(String elementId) throws AdapterException {

// remove the adapter from the metrics manager so that
// no metrics for this adapter are exposed anymore
adapterMetrics.remove(ad.getElementId(), ad.getName());
try {
adapterMetrics.remove(ad.getElementId(), ad.getName());
} catch (NoSuchElementException e) {
LOG.error("Could not remove adapter metrics for adapter {}", ad.getName());
}
}

public void startStreamAdapter(String elementId) throws AdapterException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,22 @@

package org.apache.streampipes.model.migration;

import org.apache.streampipes.model.base.NamedStreamPipesEntity;

/**
* Models the outcome of a migration.
* @param success whether the migration was successfully or not.
* @param element the migrated pipeline element or in case of a failure the original one
* @param message message that describes the outcome of the migration
* @param <T> type of the migration element
*/
public record MigrationResult<T> (boolean success, T element, String message){
public record MigrationResult<T extends NamedStreamPipesEntity> (boolean success, T element, String message){

public static <T> MigrationResult<T> failure(T element, String message) {
public static <T extends NamedStreamPipesEntity> MigrationResult<T> failure(T element, String message) {
return new MigrationResult<>(false, element, message);
}

public static <T> MigrationResult<T> success(T element) {
public static <T extends NamedStreamPipesEntity> MigrationResult<T> success(T element) {
return new MigrationResult<>(true, element, "SUCCESS");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,10 @@ protected <T extends VersionedNamedStreamPipesEntity> MigrationResult<T> perform
TypeReference<MigrationResult<T>> typeReference = new TypeReference<>() {
};

String migrationResponseString = migrationResponse.returnContent().asString();
return JacksonSerializer
.getObjectMapper()
.readValue(migrationResponse.returnContent().asString(), typeReference);
.readValue(migrationResponseString, typeReference);
} catch (JsonProcessingException e) {
LOG.error(
"Migration of pipeline element failed before sending to the extensions service, "
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration;
import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag;
import org.apache.streampipes.rest.extensions.WelcomePage;
import org.apache.streampipes.rest.shared.serializer.SpringJacksonSerializer;
import org.apache.streampipes.service.base.rest.ServiceHealthResource;
import org.apache.streampipes.service.extensions.function.StreamPipesFunctionHandler;
import org.apache.streampipes.service.extensions.security.WebSecurityConfig;
Expand All @@ -41,7 +40,7 @@

@Configuration
@EnableAutoConfiguration
@Import({WebSecurityConfig.class, WelcomePage.class, ServiceHealthResource.class, SpringJacksonSerializer.class})
@Import({WebSecurityConfig.class, WelcomePage.class, ServiceHealthResource.class})
@ComponentScan({"org.apache.streampipes.rest.extensions.*", "org.apache.streampipes.service.base.rest.*"})
public abstract class ExtensionsModelSubmitter extends StreamPipesExtensionsServiceBase {

Expand Down

0 comments on commit dcdea28

Please sign in to comment.