From 7ca8de3694d6a26356930c51ebf7ec499e27013a Mon Sep 17 00:00:00 2001 From: muyangye Date: Fri, 17 Nov 2023 02:07:32 -0800 Subject: [PATCH 01/15] Serialize non-primitive types and store in Influx --- .../commons/influx/InfluxStore.java | 47 ++++++++++++++----- 1 file changed, 35 insertions(+), 12 deletions(-) diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java index 4adf3f153c..38cbbbc8a5 100644 --- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java +++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java @@ -29,6 +29,8 @@ import org.apache.streampipes.vocabulary.SO; import org.apache.streampipes.vocabulary.XSD; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import org.influxdb.InfluxDB; import org.influxdb.dto.Point; import org.influxdb.dto.Pong; @@ -133,15 +135,13 @@ public void onEvent(Event event) throws SpRuntimeException { Point.measurement(measure.getMeasureName()).time((long) timestampValue, TimeUnit.MILLISECONDS); for (EventProperty ep : measure.getEventSchema().getEventProperties()) { - if (ep instanceof EventPropertyPrimitive) { - String runtimeName = ep.getRuntimeName(); - - // timestamp should not be added as a field - if (!measure.getTimestampField().endsWith(runtimeName)) { - String sanitizedRuntimeName = sanitizedRuntimeNames.get(runtimeName); - - try { - var field = event.getOptionalFieldByRuntimeName(runtimeName); + String runtimeName = ep.getRuntimeName(); + // timestamp should not be added as a field + if (!measure.getTimestampField().endsWith(runtimeName)) { + String sanitizedRuntimeName = sanitizedRuntimeNames.get(runtimeName); + var field = event.getOptionalFieldByRuntimeName(runtimeName); + try { + if (ep instanceof EventPropertyPrimitive) { if (field.isPresent()) { PrimitiveField eventPropertyPrimitiveField = field.get().getAsPrimitive(); if (eventPropertyPrimitiveField.getRawValue() == null) { @@ -162,10 +162,18 @@ public void onEvent(Event event) throws SpRuntimeException { } else { missingFields.add(runtimeName); } - } catch (SpRuntimeException iae) { - LOG.warn("Runtime exception while extracting field value of field {} - this field will be ignored", - runtimeName, iae); + } else { + // Since InfluxDB can't store non-primitive types, store them as string + // and deserialize later in downstream processes + if (field.isPresent()) { + handleNonPrimitiveMeasurementProperty(point, event, sanitizedRuntimeName); + } else { + missingFields.add(runtimeName); + } } + } catch (SpRuntimeException iae) { + LOG.warn("Runtime exception while extracting field value of field {} - this field will be ignored", + runtimeName, iae); } } } @@ -218,6 +226,21 @@ private void handleMeasurementProperty(Point.Builder p, } } + private void handleNonPrimitiveMeasurementProperty(Point.Builder p, Event event, String preparedRuntimeName) { + var field = event.getRaw().get(preparedRuntimeName); + ObjectMapper mapper = new ObjectMapper(); + try { + String json = mapper.writeValueAsString(field); + p.addField(preparedRuntimeName, json); + // Deserialize in this way, tested the primitive types can be correctly deducted from Object +// TypeFactory typeFactory = mapper.getTypeFactory(); +// MapType mapType = typeFactory.constructMapType(HashMap.class, String.class, Object.class); +// Map map = mapper.readValue(json, mapType); + } catch (JsonProcessingException e) { + LOG.warn("Failed to serialize field {}, ignoring.", preparedRuntimeName); + } + } + /** * Shuts down the connection to the InfluxDB server */ From 36e79a80622a4cda70108242efe525d529e7ef93 Mon Sep 17 00:00:00 2001 From: muyangye Date: Sun, 26 Nov 2023 03:56:18 -0800 Subject: [PATCH 02/15] extract RawFieldSerializer --- .../commons/influx/InfluxStore.java | 39 +++++------ .../influx/serializer/RawFieldSerializer.java | 53 ++++++++++++++ .../serializer/TestRawEventSerializer.java | 70 +++++++++++++++++++ 3 files changed, 139 insertions(+), 23 deletions(-) create mode 100644 streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/serializer/RawFieldSerializer.java create mode 100644 streampipes-data-explorer-commons/src/test/java/org/apache/streampipes/dataexplorer/commons/influx/serializer/TestRawEventSerializer.java diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java index 38cbbbc8a5..4abb33d679 100644 --- a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java +++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/InfluxStore.java @@ -20,20 +20,17 @@ import org.apache.streampipes.commons.environment.Environment; import org.apache.streampipes.commons.exceptions.SpRuntimeException; +import org.apache.streampipes.dataexplorer.commons.influx.serializer.RawFieldSerializer; import org.apache.streampipes.model.datalake.DataLakeMeasure; import org.apache.streampipes.model.runtime.Event; import org.apache.streampipes.model.runtime.field.PrimitiveField; -import org.apache.streampipes.model.schema.EventProperty; import org.apache.streampipes.model.schema.EventPropertyPrimitive; import org.apache.streampipes.model.schema.PropertyScope; import org.apache.streampipes.vocabulary.SO; import org.apache.streampipes.vocabulary.XSD; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import org.influxdb.InfluxDB; import org.influxdb.dto.Point; -import org.influxdb.dto.Pong; import org.influxdb.dto.Query; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; @@ -51,6 +48,8 @@ public class InfluxStore { Map sanitizedRuntimeNames = new HashMap<>(); private InfluxDB influxDb = null; + private RawFieldSerializer rawFieldSerializer = new RawFieldSerializer(); + public InfluxStore(DataLakeMeasure measure, InfluxConnectionSettings settings) { this.measure = measure; @@ -78,7 +77,7 @@ private void connect(InfluxConnectionSettings settings) throws SpRuntimeExceptio influxDb = InfluxClientProvider.getInfluxDBClient(settings); // Checking, if server is available - Pong response = influxDb.ping(); + var response = influxDb.ping(); if (response.getVersion().equalsIgnoreCase("unknown")) { throw new SpRuntimeException("Could not connect to InfluxDb Server: " + settings.getConnectionUrl()); } @@ -92,8 +91,8 @@ private void connect(InfluxConnectionSettings settings) throws SpRuntimeExceptio // setting up the database influxDb.setDatabase(databaseName); - int batchSize = 2000; - int flushDuration = 500; + var batchSize = 2000; + var flushDuration = 500; influxDb.enableBatch(batchSize, flushDuration, TimeUnit.MILLISECONDS); } @@ -124,26 +123,26 @@ public void onEvent(Event event) throws SpRuntimeException { } // sanitize event - for (String key : event.getRaw().keySet()) { + for (var key : event.getRaw().keySet()) { if (InfluxDbReservedKeywords.KEYWORD_LIST.stream().anyMatch(k -> k.equalsIgnoreCase(key))) { event.renameFieldByRuntimeName(key, key + "_"); } } - Long timestampValue = event.getFieldBySelector(measure.getTimestampField()).getAsPrimitive().getAsLong(); - Point.Builder point = + var timestampValue = event.getFieldBySelector(measure.getTimestampField()).getAsPrimitive().getAsLong(); + var point = Point.measurement(measure.getMeasureName()).time((long) timestampValue, TimeUnit.MILLISECONDS); - for (EventProperty ep : measure.getEventSchema().getEventProperties()) { - String runtimeName = ep.getRuntimeName(); + for (var ep : measure.getEventSchema().getEventProperties()) { + var runtimeName = ep.getRuntimeName(); // timestamp should not be added as a field if (!measure.getTimestampField().endsWith(runtimeName)) { - String sanitizedRuntimeName = sanitizedRuntimeNames.get(runtimeName); + var sanitizedRuntimeName = sanitizedRuntimeNames.get(runtimeName); var field = event.getOptionalFieldByRuntimeName(runtimeName); try { if (ep instanceof EventPropertyPrimitive) { if (field.isPresent()) { - PrimitiveField eventPropertyPrimitiveField = field.get().getAsPrimitive(); + var eventPropertyPrimitiveField = field.get().getAsPrimitive(); if (eventPropertyPrimitiveField.getRawValue() == null) { nullFields.add(sanitizedRuntimeName); } else { @@ -197,7 +196,7 @@ private void handleMeasurementProperty(Point.Builder p, PrimitiveField eventPropertyPrimitiveField) { try { // Store property according to property type - String runtimeType = ep.getRuntimeType(); + var runtimeType = ep.getRuntimeType(); if (XSD.INTEGER.toString().equals(runtimeType)) { try { p.addField(preparedRuntimeName, eventPropertyPrimitiveField.getAsInt()); @@ -227,16 +226,10 @@ private void handleMeasurementProperty(Point.Builder p, } private void handleNonPrimitiveMeasurementProperty(Point.Builder p, Event event, String preparedRuntimeName) { - var field = event.getRaw().get(preparedRuntimeName); - ObjectMapper mapper = new ObjectMapper(); try { - String json = mapper.writeValueAsString(field); + var json = rawFieldSerializer.serialize(event.getRaw().get(preparedRuntimeName)); p.addField(preparedRuntimeName, json); - // Deserialize in this way, tested the primitive types can be correctly deducted from Object -// TypeFactory typeFactory = mapper.getTypeFactory(); -// MapType mapType = typeFactory.constructMapType(HashMap.class, String.class, Object.class); -// Map map = mapper.readValue(json, mapType); - } catch (JsonProcessingException e) { + } catch (SpRuntimeException e) { LOG.warn("Failed to serialize field {}, ignoring.", preparedRuntimeName); } } diff --git a/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/serializer/RawFieldSerializer.java b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/serializer/RawFieldSerializer.java new file mode 100644 index 0000000000..05380df10c --- /dev/null +++ b/streampipes-data-explorer-commons/src/main/java/org/apache/streampipes/dataexplorer/commons/influx/serializer/RawFieldSerializer.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.dataexplorer.commons.influx.serializer; + +import org.apache.streampipes.commons.exceptions.SpRuntimeException; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.BasicPolymorphicTypeValidator; + +public class RawFieldSerializer { + protected ObjectMapper objectMapper; + + public RawFieldSerializer() { + this.objectMapper = new ObjectMapper().activateDefaultTyping( + BasicPolymorphicTypeValidator.builder() + .allowIfBaseType(Object.class) + .build(), + ObjectMapper.DefaultTyping.EVERYTHING); + } + + public String serialize(Object object) { + try { + return objectMapper.writeValueAsString(object); + } catch (JsonProcessingException e) { + throw new SpRuntimeException(e.getCause()); + } + } + + public Object deserialize(String json) { + try { + return objectMapper.readValue(json, Object.class); + } catch (JsonProcessingException e) { + throw new SpRuntimeException(e.getCause()); + } + } +} diff --git a/streampipes-data-explorer-commons/src/test/java/org/apache/streampipes/dataexplorer/commons/influx/serializer/TestRawEventSerializer.java b/streampipes-data-explorer-commons/src/test/java/org/apache/streampipes/dataexplorer/commons/influx/serializer/TestRawEventSerializer.java new file mode 100644 index 0000000000..0123273579 --- /dev/null +++ b/streampipes-data-explorer-commons/src/test/java/org/apache/streampipes/dataexplorer/commons/influx/serializer/TestRawEventSerializer.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.dataexplorer.commons.influx.serializer; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class TestRawEventSerializer { + private RawFieldSerializer rawFieldSerializer = new RawFieldSerializer(); + private Map primitives = new HashMap(); + + public TestRawEventSerializer() { + primitives.put("Integer", 1); + primitives.put("Long", 1L); + primitives.put("Float", 1.0f); + primitives.put("Double", 1.0d); + primitives.put("Boolean", true); + primitives.put("String", "1"); + } + + // Test able to deserialize back the original data + @Test + public void testRawEventFieldSerializerListInMap() { + var rawListField = new ArrayList(); + rawListField.addAll(primitives.values()); + + var rawNestedField = new HashMap(); + rawNestedField.putAll(primitives); + rawNestedField.put("List", rawListField); + + var json = rawFieldSerializer.serialize(rawNestedField); + + assertEquals(rawNestedField, rawFieldSerializer.deserialize(json)); + } + + @Test + public void testRawEventFieldSerializerMapInList() { + var rawNestedField = new HashMap(); + rawNestedField.putAll(primitives); + + var rawListField = new ArrayList(); + rawListField.addAll(primitives.values()); + rawListField.add(rawNestedField); + + var json = rawFieldSerializer.serialize(rawListField); + + assertEquals(rawListField, rawFieldSerializer.deserialize(json)); + } +} From c46814f9927d6fcffb1f3707f2b578b1f872c3aa Mon Sep 17 00:00:00 2001 From: muyangye Date: Sun, 26 Nov 2023 14:12:50 -0800 Subject: [PATCH 03/15] rename test --- .../serializer/TestRawFieldSerializer.java | 70 +++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 streampipes-data-explorer-commons/src/test/java/org/apache/streampipes/dataexplorer/commons/influx/serializer/TestRawFieldSerializer.java diff --git a/streampipes-data-explorer-commons/src/test/java/org/apache/streampipes/dataexplorer/commons/influx/serializer/TestRawFieldSerializer.java b/streampipes-data-explorer-commons/src/test/java/org/apache/streampipes/dataexplorer/commons/influx/serializer/TestRawFieldSerializer.java new file mode 100644 index 0000000000..c9b85695d1 --- /dev/null +++ b/streampipes-data-explorer-commons/src/test/java/org/apache/streampipes/dataexplorer/commons/influx/serializer/TestRawFieldSerializer.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.streampipes.dataexplorer.commons.influx.serializer; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class TestRawFieldSerializer { + private RawFieldSerializer rawFieldSerializer = new RawFieldSerializer(); + private Map primitives = new HashMap(); + + public TestRawFieldSerializer() { + primitives.put("Integer", 1); + primitives.put("Long", 1L); + primitives.put("Float", 1.0f); + primitives.put("Double", 1.0d); + primitives.put("Boolean", true); + primitives.put("String", "1"); + } + + // Test able to deserialize back the original data + @Test + public void testRawFieldSerializerListInMap() { + var rawListField = new ArrayList(); + rawListField.addAll(primitives.values()); + + var rawNestedField = new HashMap(); + rawNestedField.putAll(primitives); + rawNestedField.put("List", rawListField); + + var json = rawFieldSerializer.serialize(rawNestedField); + + assertEquals(rawNestedField, rawFieldSerializer.deserialize(json)); + } + + @Test + public void testRawFieldSerializerMapInList() { + var rawNestedField = new HashMap(); + rawNestedField.putAll(primitives); + + var rawListField = new ArrayList(); + rawListField.addAll(primitives.values()); + rawListField.add(rawNestedField); + + var json = rawFieldSerializer.serialize(rawListField); + + assertEquals(rawListField, rawFieldSerializer.deserialize(json)); + } +} From 16a19b8f87bec1feebd9ad8f4d30c862d21336a8 Mon Sep 17 00:00:00 2001 From: muyangye Date: Sun, 26 Nov 2023 14:14:27 -0800 Subject: [PATCH 04/15] delete old --- .../serializer/TestRawEventSerializer.java | 70 ------------------- 1 file changed, 70 deletions(-) delete mode 100644 streampipes-data-explorer-commons/src/test/java/org/apache/streampipes/dataexplorer/commons/influx/serializer/TestRawEventSerializer.java diff --git a/streampipes-data-explorer-commons/src/test/java/org/apache/streampipes/dataexplorer/commons/influx/serializer/TestRawEventSerializer.java b/streampipes-data-explorer-commons/src/test/java/org/apache/streampipes/dataexplorer/commons/influx/serializer/TestRawEventSerializer.java deleted file mode 100644 index 0123273579..0000000000 --- a/streampipes-data-explorer-commons/src/test/java/org/apache/streampipes/dataexplorer/commons/influx/serializer/TestRawEventSerializer.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package org.apache.streampipes.dataexplorer.commons.influx.serializer; - -import org.junit.Test; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Map; - -import static org.junit.Assert.assertEquals; - -public class TestRawEventSerializer { - private RawFieldSerializer rawFieldSerializer = new RawFieldSerializer(); - private Map primitives = new HashMap(); - - public TestRawEventSerializer() { - primitives.put("Integer", 1); - primitives.put("Long", 1L); - primitives.put("Float", 1.0f); - primitives.put("Double", 1.0d); - primitives.put("Boolean", true); - primitives.put("String", "1"); - } - - // Test able to deserialize back the original data - @Test - public void testRawEventFieldSerializerListInMap() { - var rawListField = new ArrayList(); - rawListField.addAll(primitives.values()); - - var rawNestedField = new HashMap(); - rawNestedField.putAll(primitives); - rawNestedField.put("List", rawListField); - - var json = rawFieldSerializer.serialize(rawNestedField); - - assertEquals(rawNestedField, rawFieldSerializer.deserialize(json)); - } - - @Test - public void testRawEventFieldSerializerMapInList() { - var rawNestedField = new HashMap(); - rawNestedField.putAll(primitives); - - var rawListField = new ArrayList(); - rawListField.addAll(primitives.values()); - rawListField.add(rawNestedField); - - var json = rawFieldSerializer.serialize(rawListField); - - assertEquals(rawListField, rawFieldSerializer.deserialize(json)); - } -} From a2acb0c189b95a369b3ac8b34e91dd3c0ff8e2c1 Mon Sep 17 00:00:00 2001 From: muyangye Date: Mon, 4 Dec 2023 03:02:39 -0800 Subject: [PATCH 05/15] temp --- .../streampipes/manager/file/FileManager.java | 13 ++--- .../rest/impl/PipelineElementFile.java | 10 ++++ .../src/lib/apis/files.service.ts | 6 +++ .../static-file-input.component.html | 17 ++++++- .../static-file-input.component.ts | 48 ++++++++++++------- .../file-upload-dialog.component.html | 45 ++++++++++++++++- .../file-upload-dialog.component.ts | 39 +++++++++++++-- 7 files changed, 146 insertions(+), 32 deletions(-) diff --git a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/file/FileManager.java b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/file/FileManager.java index fa860de9f4..518d273c39 100644 --- a/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/file/FileManager.java +++ b/streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/file/FileManager.java @@ -29,7 +29,6 @@ import java.io.InputStream; import java.util.Arrays; import java.util.List; -import java.util.UUID; import java.util.stream.Collectors; public class FileManager { @@ -74,9 +73,8 @@ public static FileMetadata storeFile(String user, fileInputStream = cleanFile(fileInputStream, filetype); - String internalFilename = makeInternalFilename(filetype); - FileMetadata fileMetadata = makeFileMetadata(user, filename, internalFilename, filetype); - new FileHandler().storeFile(internalFilename, fileInputStream); + FileMetadata fileMetadata = makeFileMetadata(user, filename, filetype); + new FileHandler().storeFile(filename, fileInputStream); storeFileMetadata(fileMetadata); return fileMetadata; } @@ -119,23 +117,18 @@ private static IFileMetadataStorage getFileMetadataStorage() { private static FileMetadata makeFileMetadata(String user, String originalFilename, - String internalFilename, String filetype) { FileMetadata fileMetadata = new FileMetadata(); fileMetadata.setCreatedAt(System.currentTimeMillis()); fileMetadata.setCreatedByUser(user); fileMetadata.setFiletype(filetype); - fileMetadata.setInternalFilename(internalFilename); + fileMetadata.setInternalFilename(originalFilename); fileMetadata.setOriginalFilename(originalFilename); return fileMetadata; } - private static String makeInternalFilename(String filetype) { - return UUID.randomUUID() + "." + filetype; - } - private static List filterFiletypes(List allFiles, String filetypes) { return allFiles .stream() diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineElementFile.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineElementFile.java index 2c74d94043..68e574232c 100644 --- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineElementFile.java +++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/PipelineElementFile.java @@ -46,6 +46,7 @@ import java.io.IOException; import java.io.InputStream; +import java.util.stream.Collectors; @Path("/v2/files") @Component @@ -119,4 +120,13 @@ public Response getFile( } return ok(FileManager.getFile(filename)); } + + @GET + @Path("/allFilenames") + @Produces(MediaType.APPLICATION_JSON) + @PreAuthorize(AuthConstants.HAS_READ_FILE_PRIVILEGE) + public Response getAllFilenames() { + return ok(FileManager.getAllFiles().stream().map(fileMetadata -> fileMetadata.getOriginalFilename()).collect( + Collectors.toList())); + } } diff --git a/ui/projects/streampipes/platform-services/src/lib/apis/files.service.ts b/ui/projects/streampipes/platform-services/src/lib/apis/files.service.ts index 1439093ad8..e2c1423150 100644 --- a/ui/projects/streampipes/platform-services/src/lib/apis/files.service.ts +++ b/ui/projects/streampipes/platform-services/src/lib/apis/files.service.ts @@ -90,4 +90,10 @@ export class FilesService { }, ); } + + getAllFilenames(): Observable { + return this.http.get( + this.platformServicesCommons.apiBasePath + '/files/allFilenames', + ); + } } diff --git a/ui/src/app/core-ui/static-properties/static-file-input/static-file-input.component.html b/ui/src/app/core-ui/static-properties/static-file-input/static-file-input.component.html index 9fe022a957..6a0adc24cb 100644 --- a/ui/src/app/core-ui/static-properties/static-file-input/static-file-input.component.html +++ b/ui/src/app/core-ui/static-properties/static-file-input/static-file-input.component.html @@ -67,7 +67,7 @@ -
+
+
+

+ The file you uploaded already exists, please select it or upload + a file with a different name. +

+ +
diff --git a/ui/src/app/core-ui/static-properties/static-file-input/static-file-input.component.ts b/ui/src/app/core-ui/static-properties/static-file-input/static-file-input.component.ts index 9ad435b11d..64ca72d4bb 100644 --- a/ui/src/app/core-ui/static-properties/static-file-input/static-file-input.component.ts +++ b/ui/src/app/core-ui/static-properties/static-file-input/static-file-input.component.ts @@ -41,6 +41,7 @@ export class StaticFileInputComponent public chooseExistingFileControl = new UntypedFormControl(); fileName: string; + fileAlreadyExists: boolean; selectedUploadFile: File; @@ -122,24 +123,35 @@ export class StaticFileInputComponent } upload() { - this.uploadStatus = 0; if (this.selectedUploadFile !== undefined) { - this.filesService.uploadFile(this.selectedUploadFile).subscribe( - event => { - if (event.type === HttpEventType.UploadProgress) { - this.uploadStatus = Math.round( - (100 * event.loaded) / event.total, - ); - } else if (event instanceof HttpResponse) { - const internalFilename = event.body.internalFilename; - this.parentForm.controls[this.fieldName].setValue( - internalFilename, + this.filesService.getAllFilenames().subscribe(allFileNames => { + if (!allFileNames.includes(this.selectedUploadFile.name)) { + this.uploadStatus = 0; + this.filesService + .uploadFile(this.selectedUploadFile) + .subscribe( + event => { + if ( + event.type === HttpEventType.UploadProgress + ) { + this.uploadStatus = Math.round( + (100 * event.loaded) / event.total, + ); + } else if (event instanceof HttpResponse) { + const internalFilename = + event.body.internalFilename; + this.parentForm.controls[ + this.fieldName + ].setValue(internalFilename); + this.fetchFileMetadata(internalFilename); + } + }, + error => {}, ); - this.fetchFileMetadata(internalFilename); - } - }, - error => {}, - ); + } else { + this.fileAlreadyExists = true; + } + }); } } @@ -163,4 +175,8 @@ export class StaticFileInputComponent this.staticProperty.locationPath = value.internalFilename; this.parentForm.updateValueAndValidity(); } + + reenableFileUpload() { + this.fileAlreadyExists = false; + } } diff --git a/ui/src/app/files/dialog/file-upload/file-upload-dialog.component.html b/ui/src/app/files/dialog/file-upload/file-upload-dialog.component.html index 7fc060395e..9013417527 100644 --- a/ui/src/app/files/dialog/file-upload/file-upload-dialog.component.html +++ b/ui/src/app/files/dialog/file-upload/file-upload-dialog.component.html @@ -18,7 +18,7 @@
-
+
+
+
+ +

+ The following files already exist. Please rename them. +

+
+
+
+ {{ name }}: + + + +
+
+
+
+ +
-
+
-
-

- The file you uploaded already exists, please select it or upload - a file with a different name. -

- -
diff --git a/ui/src/app/core-ui/static-properties/static-file-input/static-file-input.component.ts b/ui/src/app/core-ui/static-properties/static-file-input/static-file-input.component.ts index 64ca72d4bb..079a7449f6 100644 --- a/ui/src/app/core-ui/static-properties/static-file-input/static-file-input.component.ts +++ b/ui/src/app/core-ui/static-properties/static-file-input/static-file-input.component.ts @@ -16,7 +16,13 @@ * */ -import { Component, EventEmitter, OnInit, Output } from '@angular/core'; +import { + Component, + EventEmitter, + NgModule, + OnInit, + Output, +} from '@angular/core'; import { HttpEventType, HttpResponse } from '@angular/common/http'; import { FilesService, @@ -26,6 +32,8 @@ import { import { ConfigurationInfo } from '../../../connect/model/ConfigurationInfo'; import { AbstractValidatedStaticPropertyRenderer } from '../base/abstract-validated-static-property'; import { UntypedFormControl, ValidatorFn, Validators } from '@angular/forms'; +import { MatDialog, MatDialogRef } from '@angular/material/dialog'; +import { FileRenameDialogComponent } from '../../../files/dialog/file-rename/file-rename-dialog.component'; @Component({ selector: 'sp-static-file-input', @@ -40,8 +48,9 @@ export class StaticFileInputComponent public chooseExistingFileControl = new UntypedFormControl(); + dialogRef: MatDialogRef; + fileName: string; - fileAlreadyExists: boolean; selectedUploadFile: File; @@ -55,7 +64,10 @@ export class StaticFileInputComponent filesLoaded = false; - constructor(private filesService: FilesService) { + constructor( + private filesService: FilesService, + public dialog: MatDialog, + ) { super(); } @@ -124,34 +136,43 @@ export class StaticFileInputComponent upload() { if (this.selectedUploadFile !== undefined) { - this.filesService.getAllFilenames().subscribe(allFileNames => { - if (!allFileNames.includes(this.selectedUploadFile.name)) { - this.uploadStatus = 0; - this.filesService - .uploadFile(this.selectedUploadFile) - .subscribe( - event => { - if ( - event.type === HttpEventType.UploadProgress - ) { - this.uploadStatus = Math.round( - (100 * event.loaded) / event.total, - ); - } else if (event instanceof HttpResponse) { - const internalFilename = - event.body.internalFilename; - this.parentForm.controls[ - this.fieldName - ].setValue(internalFilename); - this.fetchFileMetadata(internalFilename); - } - }, - error => {}, - ); - } else { - this.fileAlreadyExists = true; - } - }); + this.filesService + .getAllOriginalFilenames() + .subscribe(allFileNames => { + if ( + !allFileNames.includes( + this.selectedUploadFile.name.toLowerCase(), + ) + ) { + this.uploadStatus = 0; + this.filesService + .uploadFile(this.selectedUploadFile) + .subscribe( + event => { + if ( + event.type === + HttpEventType.UploadProgress + ) { + this.uploadStatus = Math.round( + (100 * event.loaded) / event.total, + ); + } else if (event instanceof HttpResponse) { + const internalFilename = + event.body.internalFilename; + this.parentForm.controls[ + this.fieldName + ].setValue(internalFilename); + this.fetchFileMetadata( + internalFilename, + ); + } + }, + error => {}, + ); + } else { + this.openRenameDialog(); + } + }); } } @@ -176,7 +197,20 @@ export class StaticFileInputComponent this.parentForm.updateValueAndValidity(); } - reenableFileUpload() { - this.fileAlreadyExists = false; + openRenameDialog() { + this.dialogRef = this.dialog.open(FileRenameDialogComponent); + this.dialogRef.afterClosed().subscribe(data => { + if (data) { + this.fileName = data; + this.selectedUploadFile = new File( + [this.selectedUploadFile], + this.fileName, + { + type: this.selectedUploadFile.type, + lastModified: this.selectedUploadFile.lastModified, + }, + ); + } + }); } } diff --git a/ui/src/app/files/dialog/file-rename/file-rename-dialog.component.html b/ui/src/app/files/dialog/file-rename/file-rename-dialog.component.html new file mode 100644 index 0000000000..c29dd84698 --- /dev/null +++ b/ui/src/app/files/dialog/file-rename/file-rename-dialog.component.html @@ -0,0 +1,70 @@ + + +
+
+
+
+
+
+ +

+ The file uploaded already exists, please choose + it from existing files or rename it: +

+
+
+ new filename: + + + +
+
+
+
+
+
+ +
+ + +
+
diff --git a/ui/src/app/files/dialog/file-rename/file-rename-dialog.component.scss b/ui/src/app/files/dialog/file-rename/file-rename-dialog.component.scss new file mode 100644 index 0000000000..704f843e4b --- /dev/null +++ b/ui/src/app/files/dialog/file-rename/file-rename-dialog.component.scss @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +@import 'src/scss/sp/sp-dialog'; diff --git a/ui/src/app/files/dialog/file-rename/file-rename-dialog.component.ts b/ui/src/app/files/dialog/file-rename/file-rename-dialog.component.ts new file mode 100644 index 0000000000..24702ba3a2 --- /dev/null +++ b/ui/src/app/files/dialog/file-rename/file-rename-dialog.component.ts @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { Component, Inject } from '@angular/core'; +import { DialogRef } from '@angular/cdk/dialog'; +import { MAT_DIALOG_DATA } from '@angular/material/dialog'; + +@Component({ + selector: 'sp-file-rename-dialog-component', + templateUrl: './file-rename-dialog.component.html', + styleUrls: ['./file-rename-dialog.component.scss'], +}) +export class FileRenameDialogComponent { + constructor( + private dialogRef: DialogRef, + @Inject(MAT_DIALOG_DATA) public fileName: string, + ) {} + + cancel() { + this.dialogRef.close(); + } +} diff --git a/ui/src/app/files/dialog/file-upload/file-upload-dialog.component.html b/ui/src/app/files/dialog/file-upload/file-upload-dialog.component.html index 9013417527..bd20fc920b 100644 --- a/ui/src/app/files/dialog/file-upload/file-upload-dialog.component.html +++ b/ui/src/app/files/dialog/file-upload/file-upload-dialog.component.html @@ -18,7 +18,7 @@
-
+
@@ -94,10 +94,7 @@

{{ name }}: @@ -116,7 +113,7 @@