From dcdea28e792c7f91e1f02fe8aa5d9dfa0d03d167 Mon Sep 17 00:00:00 2001 From: Dominik Riemer Date: Wed, 13 Dec 2023 22:36:45 +0100 Subject: [PATCH] fix: Fix serialization problems with migration (#2331) --- .../management/AdapterMasterManagement.java | 7 +++- .../model/migration/MigrationResult.java | 8 +++-- .../migration/AbstractMigrationManager.java | 3 +- .../serializer/SpringJacksonSerializer.java | 34 ------------------- .../extensions/ExtensionsModelSubmitter.java | 3 +- 5 files changed, 14 insertions(+), 41 deletions(-) delete mode 100644 streampipes-rest-shared/src/main/java/org/apache/streampipes/rest/shared/serializer/SpringJacksonSerializer.java diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java index 8df3721dec..ec87909021 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java @@ -39,6 +39,7 @@ import java.net.URISyntaxException; import java.util.List; +import java.util.NoSuchElementException; /** * This class is responsible for managing all the adapter instances which are executed on worker nodes @@ -150,7 +151,11 @@ public void stopStreamAdapter(String elementId) throws AdapterException { // remove the adapter from the metrics manager so that // no metrics for this adapter are exposed anymore - adapterMetrics.remove(ad.getElementId(), ad.getName()); + try { + adapterMetrics.remove(ad.getElementId(), ad.getName()); + } catch (NoSuchElementException e) { + LOG.error("Could not remove adapter metrics for adapter {}", ad.getName()); + } } public void startStreamAdapter(String elementId) throws AdapterException { diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/migration/MigrationResult.java b/streampipes-model/src/main/java/org/apache/streampipes/model/migration/MigrationResult.java index 805dc65ea4..f25557e76f 100644 --- a/streampipes-model/src/main/java/org/apache/streampipes/model/migration/MigrationResult.java +++ b/streampipes-model/src/main/java/org/apache/streampipes/model/migration/MigrationResult.java @@ -18,6 +18,8 @@ package org.apache.streampipes.model.migration; +import org.apache.streampipes.model.base.NamedStreamPipesEntity; + /** * Models the outcome of a migration. * @param success whether the migration was successfully or not. @@ -25,13 +27,13 @@ * @param message message that describes the outcome of the migration * @param type of the migration element */ -public record MigrationResult (boolean success, T element, String message){ +public record MigrationResult (boolean success, T element, String message){ - public static MigrationResult failure(T element, String message) { + public static MigrationResult failure(T element, String message) { return new MigrationResult<>(false, element, message); } - public static MigrationResult success(T element) { + public static MigrationResult success(T element) { return new MigrationResult<>(true, element, "SUCCESS"); } } diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java index 3641f0b7c7..ce5ede3511 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AbstractMigrationManager.java @@ -80,9 +80,10 @@ protected MigrationResult perform TypeReference> typeReference = new TypeReference<>() { }; + String migrationResponseString = migrationResponse.returnContent().asString(); return JacksonSerializer .getObjectMapper() - .readValue(migrationResponse.returnContent().asString(), typeReference); + .readValue(migrationResponseString, typeReference); } catch (JsonProcessingException e) { LOG.error( "Migration of pipeline element failed before sending to the extensions service, " diff --git a/streampipes-rest-shared/src/main/java/org/apache/streampipes/rest/shared/serializer/SpringJacksonSerializer.java b/streampipes-rest-shared/src/main/java/org/apache/streampipes/rest/shared/serializer/SpringJacksonSerializer.java deleted file mode 100644 index 164309c3cb..0000000000 --- a/streampipes-rest-shared/src/main/java/org/apache/streampipes/rest/shared/serializer/SpringJacksonSerializer.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.rest.shared.serializer; - -import org.apache.streampipes.serializers.json.JacksonSerializer; - -import com.fasterxml.jackson.databind.ObjectMapper; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Primary; - -public class SpringJacksonSerializer { - - @Bean - @Primary - public ObjectMapper objectMapper() { - return JacksonSerializer.getObjectMapper(); - } -} diff --git a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/ExtensionsModelSubmitter.java b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/ExtensionsModelSubmitter.java index 72557f6b8a..ff7c721524 100644 --- a/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/ExtensionsModelSubmitter.java +++ b/streampipes-service-extensions/src/main/java/org/apache/streampipes/service/extensions/ExtensionsModelSubmitter.java @@ -25,7 +25,6 @@ import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag; import org.apache.streampipes.rest.extensions.WelcomePage; -import org.apache.streampipes.rest.shared.serializer.SpringJacksonSerializer; import org.apache.streampipes.service.base.rest.ServiceHealthResource; import org.apache.streampipes.service.extensions.function.StreamPipesFunctionHandler; import org.apache.streampipes.service.extensions.security.WebSecurityConfig; @@ -41,7 +40,7 @@ @Configuration @EnableAutoConfiguration -@Import({WebSecurityConfig.class, WelcomePage.class, ServiceHealthResource.class, SpringJacksonSerializer.class}) +@Import({WebSecurityConfig.class, WelcomePage.class, ServiceHealthResource.class}) @ComponentScan({"org.apache.streampipes.rest.extensions.*", "org.apache.streampipes.service.base.rest.*"}) public abstract class ExtensionsModelSubmitter extends StreamPipesExtensionsServiceBase {