From 18aa63ae0fffc510c865b51adcc916b6fbab547e Mon Sep 17 00:00:00 2001 From: Tim Bossenmaier Date: Mon, 11 Mar 2024 09:04:50 +0100 Subject: [PATCH 1/2] fix: improve core start up behavior --- .../management/health/AdapterHealthCheck.java | 21 +++++- .../AdapterDescriptionMigration093.java | 73 ------------------- .../rest/impl/admin/MigrationResource.java | 42 +++++++++-- .../service/core/PostStartupTask.java | 20 +++++ 4 files changed, 77 insertions(+), 79 deletions(-) delete mode 100644 streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AdapterDescriptionMigration093.java diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java index 1d8843a62c..9410d89608 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java @@ -16,6 +16,24 @@ * */ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + package org.apache.streampipes.connect.management.health; import org.apache.streampipes.commons.exceptions.connect.AdapterException; @@ -222,7 +240,8 @@ public Map getAdaptersToRecover( adapterDescription.getElementId())); } catch (AdapterException e) { LOG.info( - "Could not recover adapter at endpoint {} due to {}", + "Could not recover adapter at endpoint {} - " + + "marking it as requested to recover (reason: {})", adapterEndpointUrl, e.getMessage() ); diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AdapterDescriptionMigration093.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AdapterDescriptionMigration093.java deleted file mode 100644 index d10d279993..0000000000 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/migration/AdapterDescriptionMigration093.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.manager.migration; - -import org.apache.streampipes.commons.exceptions.SepaParseException; -import org.apache.streampipes.manager.endpoint.HttpJsonParser; -import org.apache.streampipes.manager.operations.Operations; -import org.apache.streampipes.manager.util.AuthTokenUtils; -import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration; -import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix; -import org.apache.streampipes.storage.api.IAdapterStorage; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.URI; - -import static org.apache.streampipes.manager.migration.MigrationUtils.getRequestUrl; - -public class AdapterDescriptionMigration093 extends AbstractMigrationManager { - - private static final Logger LOG = LoggerFactory.getLogger(AdapterDescriptionMigration093.class); - - private final IAdapterStorage adapterDescriptionStorage; - - public AdapterDescriptionMigration093(IAdapterStorage adapterDescriptionStorage) { - this.adapterDescriptionStorage = adapterDescriptionStorage; - } - - public void reinstallAdapters(SpServiceRegistration extensionsServiceConfig) { - var migrationProvider = AdapterDescriptionMigration093Provider.INSTANCE; - if (migrationProvider.hasAppIdsToReinstall()) { - var appIdsToReinstall = migrationProvider.getAppIdsToReinstall(); - var serviceUrl = extensionsServiceConfig.getServiceUrl(); - extensionsServiceConfig.getTags() - .stream() - .filter(tag -> tag.getPrefix() == SpServiceTagPrefix.ADAPTER) - .filter(tag -> appIdsToReinstall.contains(tag.getValue())) - .forEach(tag -> { - var appId = tag.getValue(); - try { - if (adapterDescriptionStorage.getAdaptersByAppId(appId).isEmpty()) { - var requestUrl = getRequestUrl(SpServiceTagPrefix.ADAPTER, appId, serviceUrl); - var entityPayload = HttpJsonParser.getContentFromUrl(URI.create(requestUrl)); - Operations.verifyAndAddElement( - entityPayload, - AuthTokenUtils.getAuthTokenForCurrentUser(), - true); - } - } catch (IOException | SepaParseException e) { - LOG.warn("Could not reinstall adapter description {}", appId); - } - }); - } - } -} diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java index abca2da422..6bbdfe15e0 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java @@ -16,13 +16,48 @@ * */ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + package org.apache.streampipes.rest.impl.admin; import org.apache.streampipes.connect.management.management.AdapterMigrationManager; import org.apache.streampipes.manager.health.CoreInitialInstallationProgress; import org.apache.streampipes.manager.health.CoreServiceStatusManager; import org.apache.streampipes.manager.health.ServiceRegistrationManager; -import org.apache.streampipes.manager.migration.AdapterDescriptionMigration093; import org.apache.streampipes.manager.migration.PipelineElementMigrationManager; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceRegistration; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceStatus; @@ -64,8 +99,6 @@ public class MigrationResource extends AbstractAuthGuardedRestResource { private final CRUDStorage extensionsServiceStorage = getNoSqlStorage().getExtensionsServiceStorage(); - - private final IAdapterStorage adapterDescriptionStorage = getNoSqlStorage().getAdapterDescriptionStorage(); private final IAdapterStorage adapterStorage = getNoSqlStorage().getAdapterInstanceStorage(); private final IDataProcessorStorage dataProcessorStorage = getNoSqlStorage().getDataProcessorStorage(); @@ -104,13 +137,12 @@ public ResponseEntity performMigrations( try { var extensionsServiceConfig = serviceManager.getService(serviceId); if (!CoreInitialInstallationProgress.INSTANCE.isInitiallyInstalling()) { - new AdapterDescriptionMigration093(adapterDescriptionStorage).reinstallAdapters(extensionsServiceConfig); if (!migrationConfigs.isEmpty()) { var anyServiceMigrating = serviceManager.isAnyServiceMigrating(); var coreReady = isCoreReady(); if (anyServiceMigrating || !coreReady) { LOG.info( - "Refusing migration request since precondition is not met (anyServiceMigratione={}, coreReady={}.", + "Refusing migration request since precondition is not met (anyServiceMigrating={}, coreReady={}.", anyServiceMigrating, coreReady ); diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java index 76ba815dcc..1514013ee7 100644 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java +++ b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java @@ -16,10 +16,29 @@ * */ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + package org.apache.streampipes.service.core; import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager; import org.apache.streampipes.connect.management.management.WorkerAdministrationManagement; +import org.apache.streampipes.manager.health.ServiceHealthCheck; import org.apache.streampipes.manager.operations.Operations; import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTagPrefix; import org.apache.streampipes.model.pipeline.Pipeline; @@ -65,6 +84,7 @@ public PostStartupTask(IPipelineStorage pipelineStorage) { @Override public void run() { + new ServiceHealthCheck().run(); performAdapterAssetUpdate(); startAllPreviouslyStoppedPipelines(); startAdapters(); From ab1bff730b364b5c5d2b40a24875bd2fcab52ba0 Mon Sep 17 00:00:00 2001 From: Tim Bossenmaier Date: Mon, 11 Mar 2024 09:31:50 +0100 Subject: [PATCH 2/2] fix: remove duplicate file headers --- .../management/health/AdapterHealthCheck.java | 18 ---------- .../rest/impl/admin/MigrationResource.java | 36 ------------------- .../service/core/PostStartupTask.java | 18 ---------- 3 files changed, 72 deletions(-) diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java index 9410d89608..210f6e6674 100644 --- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java +++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/health/AdapterHealthCheck.java @@ -16,24 +16,6 @@ * */ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - package org.apache.streampipes.connect.management.health; import org.apache.streampipes.commons.exceptions.connect.AdapterException; diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java index 6bbdfe15e0..aa3a509b80 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/admin/MigrationResource.java @@ -16,42 +16,6 @@ * */ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - package org.apache.streampipes.rest.impl.admin; import org.apache.streampipes.connect.management.management.AdapterMigrationManager; diff --git a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java index 1514013ee7..8e042c72f6 100644 --- a/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java +++ b/streampipes-service-core/src/main/java/org/apache/streampipes/service/core/PostStartupTask.java @@ -16,24 +16,6 @@ * */ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - package org.apache.streampipes.service.core; import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager;