diff --git a/centaur/src/main/resources/standardTestCases/dedup_localizations_papi_v2.test b/centaur/src/main/resources/standardTestCases/dedup_localizations_papi_v2.test new file mode 100644 index 00000000000..71edd6de8ea --- /dev/null +++ b/centaur/src/main/resources/standardTestCases/dedup_localizations_papi_v2.test @@ -0,0 +1,13 @@ +name: dedup_localizations_papi_v2 +testFormat: workflowsuccess +backends: [Papiv2] + +files { + workflow: dedup_localizations_papi_v2/dedup_localizations_papi_v2.wdl +} + +metadata { + workflowName: dedup_localizations_papi_v2 + status: Succeeded + "outputs.dedup_localizations_papi_v2.check_log.num_input_localizations": 1 +} diff --git a/centaur/src/main/resources/standardTestCases/dedup_localizations_papi_v2/dedup_localizations_papi_v2.wdl b/centaur/src/main/resources/standardTestCases/dedup_localizations_papi_v2/dedup_localizations_papi_v2.wdl new file mode 100644 index 00000000000..25ecc045fb0 --- /dev/null +++ b/centaur/src/main/resources/standardTestCases/dedup_localizations_papi_v2/dedup_localizations_papi_v2.wdl @@ -0,0 +1,59 @@ +version 1.0 + +workflow dedup_localizations_papi_v2 { + call producer + call consumer { input: first = producer.data, second = producer.data } + call check_log { input: out_file_path = consumer.out, log_file_name = "consumer.log" } +} + +task producer { + command { + echo "Here is some data." > data.txt + } + + runtime { + docker: "ubuntu:latest" + } + + output { + File data = "data.txt" + } +} + +task consumer { + input { + File first + File second + } + + command { + # noop + } + + runtime { + docker: "ubuntu:latest" + } + + output { + File out = stdout() + } +} + +task check_log { + input { + String out_file_path + String log_file_name + } + String file_log = sub(out_file_path, "/stdout$", "/" + log_file_name) + command { + set -euo pipefail + gsutil cp ~{file_log} log.txt + set +e + grep 'Localizing input gs://cloud-cromwell-dev-self-cleaning/cromwell_execution/travis/dedup_localizations_papi_v2/' log.txt | grep -c "data.txt" + } + output { + File out = stdout() + Int num_input_localizations = read_int(stdout()) + } + runtime { docker: "google/cloud-sdk" } +} diff --git a/supportedBackends/google/pipelines/v2alpha1/src/main/scala/cromwell/backend/google/pipelines/v2alpha1/PipelinesApiAsyncBackendJobExecutionActor.scala b/supportedBackends/google/pipelines/v2alpha1/src/main/scala/cromwell/backend/google/pipelines/v2alpha1/PipelinesApiAsyncBackendJobExecutionActor.scala index 80d90c1dc7d..9cfa80a2447 100644 --- a/supportedBackends/google/pipelines/v2alpha1/src/main/scala/cromwell/backend/google/pipelines/v2alpha1/PipelinesApiAsyncBackendJobExecutionActor.scala +++ b/supportedBackends/google/pipelines/v2alpha1/src/main/scala/cromwell/backend/google/pipelines/v2alpha1/PipelinesApiAsyncBackendJobExecutionActor.scala @@ -70,8 +70,10 @@ class PipelinesApiAsyncBackendJobExecutionActor(standardParams: StandardAsyncExe } val filesByContainerParentDirectory = filesWithSameNames.groupBy(_.containerPath.parent.toString) + // Deduplicate any inputs since parallel localization can't deal with this. + val uniqueFilesByContainerParentDirectory = filesByContainerParentDirectory map { case (p, fs) => p -> fs.toSet } - val filesWithSameNamesTransferBundles: List[String] = filesByContainerParentDirectory.toList map { case (containerParent, filesWithSameParent) => + val filesWithSameNamesTransferBundles: List[String] = uniqueFilesByContainerParentDirectory.toList map { case (containerParent, filesWithSameParent) => val arrayIdentifier = s"files_to_localize_" + DigestUtils.md5Hex(bucket + containerParent) val entries = filesWithSameParent.map(_.cloudPath) mkString("\"", "\"\n| \"", "\"")