From 3cff7277ed3e606bc60435e804b11f89faf6a5b4 Mon Sep 17 00:00:00 2001 From: dilini-muthumala Date: Tue, 5 Jan 2021 21:39:51 +0530 Subject: [PATCH 1/6] Fix for https://github.com/wso2/streaming-integrator/issues/165 --- .../extension/execution/file/FileCopyExtension.java | 8 +++++--- .../extension/execution/file/FileMoveExtension.java | 4 ++-- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/component/src/main/java/io/siddhi/extension/execution/file/FileCopyExtension.java b/component/src/main/java/io/siddhi/extension/execution/file/FileCopyExtension.java index eab812e9..36771cc3 100644 --- a/component/src/main/java/io/siddhi/extension/execution/file/FileCopyExtension.java +++ b/component/src/main/java/io/siddhi/extension/execution/file/FileCopyExtension.java @@ -226,15 +226,17 @@ protected Object[] process(Object[] data) { } } } catch (FileSystemException e) { - throw new SiddhiAppRuntimeException("Exception occurred when getting the file type " + - uri, e); + log.error("Exception occurred when getting the file type " + uri, e); + return new Object[]{false}; } finally { if (rootFileObject != null) { try { rootFileObject.close(); } catch (FileSystemException e) { - throw new SiddhiAppRuntimeException("Exception occurred when closing file object for " + + log.error("Exception occurred when closing file object for " + rootFileObject.getName().getPath(), e); + return new Object[]{false}; + } } } diff --git a/component/src/main/java/io/siddhi/extension/execution/file/FileMoveExtension.java b/component/src/main/java/io/siddhi/extension/execution/file/FileMoveExtension.java index 3324734f..6ac0191e 100644 --- a/component/src/main/java/io/siddhi/extension/execution/file/FileMoveExtension.java +++ b/component/src/main/java/io/siddhi/extension/execution/file/FileMoveExtension.java @@ -229,8 +229,8 @@ protected Object[] process(Object[] data) { } } } catch (FileSystemException e) { - throw new SiddhiAppRuntimeException("Exception occurred when getting the file type " + - uri, e); + log.error("Exception occurred when getting the file type " + uri, e); + return new Object[]{false}; } return new Object[]{true}; } From e73075ba85b4069201580226a7e6e6659ef97ff2 Mon Sep 17 00:00:00 2001 From: dilini-muthumala Date: Fri, 8 Jan 2021 23:03:44 +0530 Subject: [PATCH 2/6] Fix for https://github.com/wso2/streaming-integrator/issues/168 --- .../siddhi/extension/io/file/FileSource.java | 39 +++++++++----- .../io/file/listeners/FileSystemListener.java | 3 ++ .../extension/io/file/util/Constants.java | 2 + .../io/file/util/FileSourceConfiguration.java | 22 ++++++++ .../io/file/FileSourceLineModeTestCase.java | 52 +++++++++++++++++++ 5 files changed, 106 insertions(+), 12 deletions(-) diff --git a/component/src/main/java/io/siddhi/extension/io/file/FileSource.java b/component/src/main/java/io/siddhi/extension/io/file/FileSource.java index d21615f4..ef33b0c8 100644 --- a/component/src/main/java/io/siddhi/extension/io/file/FileSource.java +++ b/component/src/main/java/io/siddhi/extension/io/file/FileSource.java @@ -378,7 +378,6 @@ public class FileSource extends Source { private FileSourceConfiguration fileSourceConfiguration; private RemoteFileSystemConnectorFactory fileSystemConnectorFactory; private FileSourceServiceProvider fileSourceServiceProvider; - private RemoteFileSystemServerConnector fileSystemServerConnector; private String filePointer = "0"; private String[] requiredProperties; private boolean isTailingEnabled = true; @@ -401,6 +400,7 @@ public class FileSource extends Source { private long timeout = 5000; private boolean fileServerConnectorStarted = false; private ScheduledFuture scheduledFuture; + private FileSourcePoller fileSourcePoller; private ConnectionCallback connectionCallback; private String headerPresent; private String readOnlyHeader; @@ -577,7 +577,6 @@ public void connect(ConnectionCallback connectionCallback, FileSourceState fileS @Override public void disconnect() { try { - fileSystemServerConnector = null; if (isTailingEnabled && fileSourceConfiguration.getFileServerConnector() != null) { fileSourceConfiguration.getFileServerConnector().stop(); fileSourceConfiguration.setFileServerConnector(null); @@ -618,11 +617,23 @@ public void pause() { } public void resume() { - try { - updateSourceConf(); - deployServers(); - } catch (ConnectionUnavailableException e) { - throw new SiddhiAppRuntimeException("Failed to resume siddhi app runtime.", e); + if (dirUri != null && scheduledFuture != null) { + this.scheduledFuture = siddhiAppContext.getScheduledExecutorService(). + scheduleAtFixedRate(fileSourcePoller, 0, 1, TimeUnit.SECONDS); + } + if (isTailingEnabled && fileSourceConfiguration.getFileServerConnector() != null) { + FileServerConnector fileServerConnector = fileSourceConfiguration.getFileServerConnector(); + Runnable runnableServer = () -> { + try { + fileServerConnector.start(); + } catch (ServerConnectorException e) { + log.error(String.format("For the siddhi app '" + siddhiAppContext.getName() + + ",' failed to resume the server for file '%s'." + + "Hence starting to process next file.", fileUri)); + } + }; + fileSourceConfiguration.getExecutorService().execute(runnableServer); + this.fileServerConnectorStarted = true; } } @@ -756,9 +767,11 @@ private void deployServers() throws ConnectionUnavailableException { FileSystemListener fileSystemListener = new FileSystemListener(sourceEventListener, fileSourceConfiguration, metrics, schemeFileOptions); try { - fileSystemServerConnector = fileSystemConnectorFactory.createServerConnector( - siddhiAppContext.getName(), properties, fileSystemListener); + RemoteFileSystemServerConnector fileSystemServerConnector = + fileSystemConnectorFactory.createServerConnector( + siddhiAppContext.getName(), properties, fileSystemListener); fileSourceConfiguration.setFileSystemServerConnector(fileSystemServerConnector); + FileSourcePoller.CompletionCallback fileSourceCompletionCallback = (Throwable error) -> { if (error.getClass().equals(RemoteFileSystemConnectorException.class)) { @@ -768,7 +781,7 @@ private void deployServers() throws ConnectionUnavailableException { throw new SiddhiAppRuntimeException("File Polling mode run failed.", error); } }; - FileSourcePoller fileSourcePoller = + this.fileSourcePoller = new FileSourcePoller(fileSystemServerConnector, siddhiAppContext.getName()); fileSourcePoller.setCompletionCallback(fileSourceCompletionCallback); this.scheduledFuture = siddhiAppContext.getScheduledExecutorService(). @@ -950,8 +963,8 @@ public Map snapshot() { filePointer = FileSource.this.fileSourceConfiguration.getFilePointer(); state.put(Constants.FILE_POINTER, fileSourceConfiguration.getFilePointer()); state.put(Constants.TAILED_FILE, fileSourceConfiguration.getTailedFileURIMap()); - state.put(Constants.TAILING_REGEX_STRING_BUILDER, - fileSourceConfiguration.getTailingRegexStringBuilder()); + state.put(Constants.TAILING_REGEX_STRING_BUILDER, fileSourceConfiguration.getTailingRegexStringBuilder()); + state.put(Constants.PROCESSED_FILE_LIST, fileSourceConfiguration.getProcessedFileList()); return state; } @@ -963,6 +976,8 @@ public void restore(Map map) { fileSourceConfiguration.setTailedFileURIMap(tailedFileURIMap); fileSourceConfiguration.updateTailingRegexStringBuilder( (StringBuilder) map.get(Constants.TAILING_REGEX_STRING_BUILDER)); + fileSourceConfiguration.setProcessedFileList( + (List) map.get(Constants.PROCESSED_FILE_LIST)); } } } diff --git a/component/src/main/java/io/siddhi/extension/io/file/listeners/FileSystemListener.java b/component/src/main/java/io/siddhi/extension/io/file/listeners/FileSystemListener.java index 49128a4e..19ec1a49 100644 --- a/component/src/main/java/io/siddhi/extension/io/file/listeners/FileSystemListener.java +++ b/component/src/main/java/io/siddhi/extension/io/file/listeners/FileSystemListener.java @@ -81,6 +81,9 @@ public boolean onMessage(RemoteFileSystemBaseMessage remoteFileSystemBaseEvent) RemoteFileSystemEvent remoteFileSystemEvent = (RemoteFileSystemEvent) remoteFileSystemBaseEvent; for (int i = 0; i < remoteFileSystemEvent.getAddedFiles().size(); i++) { String fileURI = remoteFileSystemEvent.getAddedFiles().get(i).getPath(); + if (!fileSourceConfiguration.addFileToListIfAbsent(fileURI)) { + continue; + } VFSClientConnector vfsClientConnector; FileProcessor fileProcessor; fileSourceConfiguration.setCurrentlyReadingFileURI(fileURI); diff --git a/component/src/main/java/io/siddhi/extension/io/file/util/Constants.java b/component/src/main/java/io/siddhi/extension/io/file/util/Constants.java index 7f013493..a9bab3b8 100644 --- a/component/src/main/java/io/siddhi/extension/io/file/util/Constants.java +++ b/component/src/main/java/io/siddhi/extension/io/file/util/Constants.java @@ -117,6 +117,8 @@ public class Constants { public static final String UTF_8 = "UTF-8"; + public static final String PROCESSED_FILE_LIST = "processedFileList"; + /*prometheus reporte values*/ public static final String PROMETHEUS_REPORTER_NAME = "prometheus"; } diff --git a/component/src/main/java/io/siddhi/extension/io/file/util/FileSourceConfiguration.java b/component/src/main/java/io/siddhi/extension/io/file/util/FileSourceConfiguration.java index 2fd83cc3..aae8c817 100644 --- a/component/src/main/java/io/siddhi/extension/io/file/util/FileSourceConfiguration.java +++ b/component/src/main/java/io/siddhi/extension/io/file/util/FileSourceConfiguration.java @@ -48,6 +48,7 @@ public class FileSourceConfiguration { private FileServerConnector fileServerConnector; private RemoteFileSystemServerConnector fileSystemServerConnector; + private List processedFileList = new ArrayList<>(); private List tailedFileURIMap; private ExecutorService executorService = null; private String[] requiredProperties = null; @@ -307,4 +308,25 @@ public void setScheduler(Scheduler scheduler) { public Scheduler getScheduler() { return scheduler; } + + public List getProcessedFileList() { + return processedFileList; + } + + public void setProcessedFileList(List processedFileList) { + this.processedFileList = processedFileList; + } + + /** + * Maintains the processedFileList. Adds a file URL to the list if it has not being added already. + * @param fileURI the file URI which needs to be added to the list + * @return true if the fileURI is absent in the current list and adds to it; false if the URI is already present. + */ + public boolean addFileToListIfAbsent(String fileURI) { + if (processedFileList.contains(fileURI)) { + return false; + } + processedFileList.add(fileURI); + return true; + } } diff --git a/component/src/test/java/io/siddhi/extension/io/file/FileSourceLineModeTestCase.java b/component/src/test/java/io/siddhi/extension/io/file/FileSourceLineModeTestCase.java index 34f205f4..a5a9815d 100644 --- a/component/src/test/java/io/siddhi/extension/io/file/FileSourceLineModeTestCase.java +++ b/component/src/test/java/io/siddhi/extension/io/file/FileSourceLineModeTestCase.java @@ -1228,6 +1228,58 @@ public void receive(Event[] events) { siddhiAppRuntime.shutdown(); } + @Test + public void siddhiIOFileKeepWithStatePersistence() throws InterruptedException { + log.info("test SiddhiIOFile: Keep file with state persistence enabled"); + String streams = "" + + "@App:name('TestSiddhiApp')\n" + + "@source(type='file', mode='line', dir.uri='file:" + newRoot + "/line/header', " + + "read.only.header='true', action.after.process='keep', tailing='false', \n" + + "@map(type='csv', delimiter='|'))\n" + + "define stream FileReaderStream (code string, serialNo string, amount string);\n" + + "@sink(type='log')\n" + + "define stream FileResultStream (code string, serialNo string, amount string);\n"; + + String query = "" + + "from FileReaderStream\n" + + "select *\n" + + "insert into FileResultStream;"; + SiddhiManager siddhiManager = new SiddhiManager(); + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(streams + query); + SiddhiAppRuntime siddhiAppRuntime2 = siddhiManager.createSiddhiAppRuntime(streams + query); + siddhiAppRuntime.addCallback("FileResultStream", new StreamCallback() { + @Override + public void receive(Event[] events) { + EventPrinter.print(events); + count.incrementAndGet(); + } + }); + siddhiAppRuntime.start(); + SiddhiTestHelper.waitForEvents(100, 1, count.get(), 3000); + byte[] snapshot = siddhiAppRuntime.snapshot(); + siddhiAppRuntime.shutdown(); + + Thread.sleep(1000); + + try { + siddhiAppRuntime2.restore(snapshot); + } catch (CannotRestoreSiddhiAppStateException e) { + log.error("Failed to restore siddhi app state. Reason: " + e.getMessage(), e); + AssertJUnit.fail("Failed to restore siddhi app state"); + } + siddhiAppRuntime2.start(); + Thread.sleep(1000); + siddhiAppRuntime2.addCallback("FileResultStream", new StreamCallback() { + @Override + public void receive(Event[] events) { + EventPrinter.print(events); + AssertJUnit.fail("When state persistence is enabled, the file should not be processed again."); + } + }); + Thread.sleep(1000); + siddhiAppRuntime2.shutdown(); + } + @Test public void siddhiIOFileTestCronSupportForFile() throws InterruptedException { log.info("Siddhi IO File test for Cron support via file.uri"); From 58a0fef59c17478a5d9a38e2a140fd7165324543 Mon Sep 17 00:00:00 2001 From: dilini-muthumala Date: Wed, 13 Jan 2021 23:02:36 +0530 Subject: [PATCH 3/6] Return false when the file does not exist --- .../execution/file/FileMoveExtension.java | 10 +++-- .../io/file/FileFunctionsTestCase.java | 39 +++++++++++++++++++ 2 files changed, 45 insertions(+), 4 deletions(-) diff --git a/component/src/main/java/io/siddhi/extension/execution/file/FileMoveExtension.java b/component/src/main/java/io/siddhi/extension/execution/file/FileMoveExtension.java index 6ac0191e..cf368e0e 100644 --- a/component/src/main/java/io/siddhi/extension/execution/file/FileMoveExtension.java +++ b/component/src/main/java/io/siddhi/extension/execution/file/FileMoveExtension.java @@ -24,7 +24,6 @@ import io.siddhi.annotation.ReturnAttribute; import io.siddhi.annotation.util.DataType; import io.siddhi.core.config.SiddhiQueryContext; -import io.siddhi.core.exception.SiddhiAppRuntimeException; import io.siddhi.core.executor.ConstantExpressionExecutor; import io.siddhi.core.executor.ExpressionExecutor; import io.siddhi.core.query.processor.ProcessingMode; @@ -199,6 +198,9 @@ protected Object[] process(Object[] data) { } try { FileObject rootFileObject = Utils.getFileObject(uri, fileSystemOptions); + if (!rootFileObject.exists()) { + return new Object[]{false}; + } if (rootFileObject.getType().hasContent() && pattern.matcher(rootFileObject.getName().getBaseName()).lookingAt()) { moveFileToDestination(rootFileObject, destinationDirUri, pattern); @@ -250,7 +252,8 @@ public void stop() { } - private void moveFileToDestination(FileObject sourceFileObject, String destinationDirUri, Pattern pattern) { + private void moveFileToDestination(FileObject sourceFileObject, String destinationDirUri, Pattern pattern) + throws FileSystemException { try { String fileName = sourceFileObject.getName().getBaseName(); String destinationPath; @@ -280,8 +283,7 @@ private void moveFileToDestination(FileObject sourceFileObject, String destinati if (fileMoveMetrics != null) { fileMoveMetrics.getMoveMetric(0); } - throw new SiddhiAppRuntimeException("Exception occurred when doing file operations when moving for file: " + - sourceFileObject.getName().getPath(), e); + throw e; } } } diff --git a/component/src/test/java/io/siddhi/extension/io/file/FileFunctionsTestCase.java b/component/src/test/java/io/siddhi/extension/io/file/FileFunctionsTestCase.java index fdc6c08d..77d8cdc6 100644 --- a/component/src/test/java/io/siddhi/extension/io/file/FileFunctionsTestCase.java +++ b/component/src/test/java/io/siddhi/extension/io/file/FileFunctionsTestCase.java @@ -707,6 +707,45 @@ public void receive(Event[] events) { AssertJUnit.assertFalse(isFileExist(tempSource + "/archive/test.txt", false)); } + @Test + public void folderMoveNegativeTestcase() throws InterruptedException, IOException { + FileUtils.copyDirectory(sourceRoot, tempSource); + log.info("file:move() function should return false when the source folder is not found. " + + "This test case validates that"); + AssertJUnit.assertFalse(isFileExist(sourceRoot + "/destination", false)); + count.set(0); + String app = "" + + "@App:name('TestSiddhiApp')" + + "define stream MoveFileStream(sample string);\n" + + "from MoveFileStream" + + "#file:move('" + tempSource + "/archivee/', '" + sourceRoot + "/destination', '')\n" + + "select sample, isSuccess \n" + + "insert into ResultStream;"; + SiddhiManager siddhiManager = new SiddhiManager(); + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(app); + InputHandler stockStream = siddhiAppRuntime.getInputHandler("MoveFileStream"); + siddhiAppRuntime.addCallback("ResultStream", new StreamCallback() { + + @Override + public void receive(Event[] events) { + EventPrinter.print(events); + int n = count.getAndIncrement(); + for (Event event : events) { + if (n == 0) { + AssertJUnit.assertEquals("WSO2", event.getData(0)); + AssertJUnit.assertEquals(false, event.getData(1)); + } else { + AssertJUnit.fail("More events received than expected."); + } + } + } + }); + siddhiAppRuntime.start(); + stockStream.send(new Object[]{"WSO2"}); + Thread.sleep(100); + siddhiAppRuntime.shutdown(); + } + @Test public void folderMoveWithRegexFunction() throws InterruptedException, IOException { FileUtils.copyDirectory(sourceRoot, tempSource); From 8481434e34b1537f290c688033687ec0cd64f0f9 Mon Sep 17 00:00:00 2001 From: ramindu Date: Tue, 19 Jan 2021 22:30:27 +0530 Subject: [PATCH 4/6] Resolves https://github.com/siddhi-io/siddhi-io-file/issues/124 Resolves following issues in file:move(): 'include.by.regexp' and 'exclude.root.dir' parameters in file:move() are not dynamic When 'exclude.root.dir' parameter is set to true, the destination path does not get created correctly When four parameters are given, the regex is always set to '' --- .../execution/file/FileMoveExtension.java | 21 +++-- .../io/file/FileFunctionsTestCase.java | 80 +++++++++++++++++++ 2 files changed, 89 insertions(+), 12 deletions(-) diff --git a/component/src/main/java/io/siddhi/extension/execution/file/FileMoveExtension.java b/component/src/main/java/io/siddhi/extension/execution/file/FileMoveExtension.java index cf368e0e..f76fb639 100644 --- a/component/src/main/java/io/siddhi/extension/execution/file/FileMoveExtension.java +++ b/component/src/main/java/io/siddhi/extension/execution/file/FileMoveExtension.java @@ -72,14 +72,16 @@ "Note: Add an empty string to match all files", type = DataType.STRING, optional = true, - defaultValue = "" + defaultValue = "", + dynamic = true ), @Parameter( name = "exclude.root.dir", description = "Exclude parent folder when moving the content.", type = DataType.BOOL, optional = true, - defaultValue = "false" + defaultValue = "false", + dynamic = true ), @Parameter( name = "file.system.options", @@ -192,6 +194,9 @@ protected Object[] process(Object[] data) { boolean excludeParentFolder = false; if (inputExecutorLength == 3) { regex = (String) data[2]; + } else if (inputExecutorLength == 4) { + regex = (String) data[2]; + excludeParentFolder = (Boolean) data[3]; } if (pattern == null) { pattern = Pattern.compile(regex); @@ -205,9 +210,6 @@ protected Object[] process(Object[] data) { pattern.matcher(rootFileObject.getName().getBaseName()).lookingAt()) { moveFileToDestination(rootFileObject, destinationDirUri, pattern); } else if (rootFileObject.getType().hasChildren()) { - if (inputExecutorLength == 4) { - excludeParentFolder = (Boolean) data[3]; - } if (!excludeParentFolder) { destinationDirUri = destinationDirUri.concat(File.separator + rootFileObject.getName().getBaseName()); @@ -218,13 +220,8 @@ protected Object[] process(Object[] data) { if (sourceFileObject.getType().hasContent() && pattern.matcher(sourceFileObject.getName().getBaseName()).lookingAt()) { String sourcePartialUri = sourceFileObject.getName().getPath(); - if (excludeParentFolder) { - sourcePartialUri = sourcePartialUri.replace(uri + - rootFileObject.getName().getBaseName(), ""); - } else { - sourcePartialUri = sourcePartialUri.replace(uri, ""). - replace(sourceFileObject.getName().getBaseName(), ""); - } + sourcePartialUri = sourcePartialUri.replace(uri, ""). + replace(sourceFileObject.getName().getBaseName(), ""); moveFileToDestination(sourceFileObject, destinationDirUri + sourcePartialUri, pattern); } diff --git a/component/src/test/java/io/siddhi/extension/io/file/FileFunctionsTestCase.java b/component/src/test/java/io/siddhi/extension/io/file/FileFunctionsTestCase.java index 77d8cdc6..850245bc 100644 --- a/component/src/test/java/io/siddhi/extension/io/file/FileFunctionsTestCase.java +++ b/component/src/test/java/io/siddhi/extension/io/file/FileFunctionsTestCase.java @@ -786,6 +786,86 @@ public void receive(Event[] events) { AssertJUnit.assertFalse(isFileExist(tempSource + "/archive/subFolder/test3.txt", false)); } + @Test + public void folderMoveWithDynamicParams() throws InterruptedException, IOException { + FileUtils.copyDirectory(sourceRoot, tempSource); + log.info("test Siddhi Io File move() allows dynamic params"); + AssertJUnit.assertFalse(isFileExist(sourceRoot + "/destination", false)); + String app = "" + + "@App:name('TestSiddhiApp')" + + "define stream CopyFileStream(regex string);\n" + + "from CopyFileStream#file:move" + + "('" + tempSource + "/archive', '" + sourceRoot + "/destination', regex)\n" + + "select *\n" + + "insert into ResultStream;"; + SiddhiManager siddhiManager = new SiddhiManager(); + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(app); + InputHandler stockStream = siddhiAppRuntime.getInputHandler("CopyFileStream"); + siddhiAppRuntime.addCallback("ResultStream", new StreamCallback() { + + @Override + public void receive(Event[] events) { + EventPrinter.print(events); + int n = count.getAndIncrement(); + for (Event event : events) { + if (n == 0) { + AssertJUnit.assertEquals(".*test3.txt$", event.getData(0)); + AssertJUnit.assertEquals(true, event.getData(1)); + } else { + AssertJUnit.fail("More events received than expected."); + } + } + } + }); + siddhiAppRuntime.start(); + stockStream.send(new Object[]{".*test3.txt$"}); + Thread.sleep(100); + siddhiAppRuntime.shutdown(); + AssertJUnit.assertTrue(isFileExist(sourceRoot + "/destination/archive/subFolder/test3.txt", false)); + AssertJUnit.assertFalse(isFileExist(sourceRoot + "/destination/archive/test.txt", false)); + AssertJUnit.assertFalse(isFileExist(tempSource + "/archive/subFolder/test3.txt", false)); + } + + @Test + public void folderMoveExcludingParentFolder() throws InterruptedException, IOException { + FileUtils.copyDirectory(sourceRoot, tempSource); + log.info("test Siddhi Io File move() with a dynamic value for exclude.root.dir parameter"); + AssertJUnit.assertFalse(isFileExist(sourceRoot + "/destination", false)); + String app = "" + + "@App:name('TestSiddhiApp')" + + "define stream CopyFileStream(regex string, excludeParent bool);\n" + + "from CopyFileStream#file:move" + + "('" + tempSource + "/archive', '" + sourceRoot + "/destination', regex, excludeParent)\n" + + "select *\n" + + "insert into ResultStream;"; + SiddhiManager siddhiManager = new SiddhiManager(); + SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(app); + InputHandler stockStream = siddhiAppRuntime.getInputHandler("CopyFileStream"); + siddhiAppRuntime.addCallback("ResultStream", new StreamCallback() { + + @Override + public void receive(Event[] events) { + EventPrinter.print(events); + int n = count.getAndIncrement(); + for (Event event : events) { + if (n == 0) { + AssertJUnit.assertEquals(".*test3.txt$", event.getData(0)); + AssertJUnit.assertEquals(true, event.getData(1)); + AssertJUnit.assertEquals(true, event.getData(2)); + } else { + AssertJUnit.fail("More events received than expected."); + } + } + } + }); + siddhiAppRuntime.start(); + stockStream.send(new Object[]{".*test3.txt$", true}); + Thread.sleep(1000); + siddhiAppRuntime.shutdown(); + AssertJUnit.assertTrue(isFileExist(sourceRoot + "/destination/subFolder/test3.txt", false)); + AssertJUnit.assertFalse(isFileExist(sourceRoot + "/destination/test.txt", false)); + AssertJUnit.assertFalse(isFileExist(tempSource + "/archive/subFolder/test3.txt", false)); + } @Test public void fileIsFileFunction() throws InterruptedException { From bab3d3a02b7bd1cefc231d58e0b439e8039b749e Mon Sep 17 00:00:00 2001 From: ramindu Date: Wed, 10 Feb 2021 20:03:01 +0530 Subject: [PATCH 5/6] Fix documentation issue which refer to the path as URI --- .../extension/execution/file/FileMoveExtension.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/component/src/main/java/io/siddhi/extension/execution/file/FileMoveExtension.java b/component/src/main/java/io/siddhi/extension/execution/file/FileMoveExtension.java index f76fb639..1367ee1e 100644 --- a/component/src/main/java/io/siddhi/extension/execution/file/FileMoveExtension.java +++ b/component/src/main/java/io/siddhi/extension/execution/file/FileMoveExtension.java @@ -54,13 +54,13 @@ description = "This function performs copying file from one directory to another.\n", parameters = { @Parameter( - name = "uri", + name = "path", description = "Absolute file or directory path.", type = DataType.STRING, dynamic = true ), @Parameter( - name = "destination.dir.uri", + name = "destination.dir.path", description = "Absolute file path to the destination directory.\n" + "Note: Parent folder structure will be created if it does not exist.", type = DataType.STRING, @@ -96,16 +96,16 @@ }, parameterOverloads = { @ParameterOverload( - parameterNames = {"uri", "destination.dir.uri"} + parameterNames = {"path", "destination.dir.path"} ), @ParameterOverload( - parameterNames = {"uri", "destination.dir.uri", "include.by.regexp"} + parameterNames = {"path", "destination.dir.path", "include.by.regexp"} ), @ParameterOverload( - parameterNames = {"uri", "destination.dir.uri", "include.by.regexp", "exclude.root.dir"} + parameterNames = {"path", "destination.dir.path", "include.by.regexp", "exclude.root.dir"} ), @ParameterOverload( - parameterNames = {"uri", "destination.dir.uri", "include.by.regexp", "exclude.root.dir", + parameterNames = {"path", "destination.dir.path", "include.by.regexp", "exclude.root.dir", "file.system.options"} ) }, From c5f001752bd6f6825630b929d18e670d0bf2768d Mon Sep 17 00:00:00 2001 From: ramindu Date: Thu, 11 Feb 2021 19:13:56 +0530 Subject: [PATCH 6/6] Fix for https://github.com/wso2/streaming-integrator/issues/183 --- .../io/siddhi/extension/execution/file/FileMoveExtension.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/component/src/main/java/io/siddhi/extension/execution/file/FileMoveExtension.java b/component/src/main/java/io/siddhi/extension/execution/file/FileMoveExtension.java index 1367ee1e..e384762d 100644 --- a/component/src/main/java/io/siddhi/extension/execution/file/FileMoveExtension.java +++ b/component/src/main/java/io/siddhi/extension/execution/file/FileMoveExtension.java @@ -220,7 +220,7 @@ protected Object[] process(Object[] data) { if (sourceFileObject.getType().hasContent() && pattern.matcher(sourceFileObject.getName().getBaseName()).lookingAt()) { String sourcePartialUri = sourceFileObject.getName().getPath(); - sourcePartialUri = sourcePartialUri.replace(uri, ""). + sourcePartialUri = sourcePartialUri.replace(rootFileObject.getName().getPath(), ""). replace(sourceFileObject.getName().getBaseName(), ""); moveFileToDestination(sourceFileObject, destinationDirUri + sourcePartialUri, pattern);