diff --git a/core/src/main/scala/za/co/absa/spline/harvester/plugin/embedded/RDDPlugin.scala b/core/src/main/scala/za/co/absa/spline/harvester/plugin/embedded/RDDPlugin.scala
index e6c876fb..dddfbd0b 100644
--- a/core/src/main/scala/za/co/absa/spline/harvester/plugin/embedded/RDDPlugin.scala
+++ b/core/src/main/scala/za/co/absa/spline/harvester/plugin/embedded/RDDPlugin.scala
@@ -19,7 +19,7 @@ package za.co.absa.spline.harvester.plugin.embedded
import org.apache.spark.Partition
import org.apache.spark.rdd.{HadoopRDD, RDD}
import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.execution.datasources.FileScanRDD
+import org.apache.spark.sql.execution.datasources.{FileScanRDD, PartitionedFile}
import za.co.absa.spline.commons.reflect.ReflectionUtils
import za.co.absa.spline.harvester.builder._
import za.co.absa.spline.harvester.plugin.Plugin.{Precedence, ReadNodeInfo}
@@ -39,7 +39,8 @@ class RDDPlugin(
override def rddReadNodeProcessor: PartialFunction[RDD[_], ReadNodeInfo] = {
case fsr: FileScanRDD =>
- val uris = fsr.filePartitions.flatMap(_.files.map(_.filePath))
+ val files = fsr.filePartitions.flatMap(_.files)
+ val uris = files.map(extractPath(_))
ReadNodeInfo(SourceIdentifier(None, uris: _*), Map.empty)
case hr: HadoopRDD[_, _] =>
val partitions = ReflectionUtils.extractValue[Array[Partition]](hr, "partitions_")
@@ -47,6 +48,13 @@ class RDDPlugin(
ReadNodeInfo(SourceIdentifier(None, uris: _*), Map.empty)
}
+ private def extractPath(file: PartitionedFile): String = {
+ val path = ReflectionUtils.extractValue[AnyRef](file, "filePath")
+ // for Spark 3.3 and lower path is a String
+ // for Spark 3.4 path is org.apache.spark.paths.SparkPath
+ path.toString
+ }
+
private def hadoopPartitionToUriString(hadoopPartition: Partition): String = {
val inputSplit = ReflectionUtils.extractValue[AnyRef](hadoopPartition, "inputSplit")
val fileSplitT = ReflectionUtils.extractValue[AnyRef](inputSplit, "t")
@@ -56,5 +64,4 @@ class RDDPlugin(
uri.toString
}
-
}
diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml
index 3dcaadfc..8739193f 100644
--- a/integration-tests/pom.xml
+++ b/integration-tests/pom.xml
@@ -175,7 +175,7 @@
org.elasticsearch
elasticsearch-spark-${elasticsearch.spark.sufix}_${scala.binary.version}
- 8.2.2
+ 8.9.1
test
@@ -267,6 +267,21 @@
+
+ spark-3.4
+
+ 16.0.1
+ 30
+
+
+
+ org.apache.iceberg
+ iceberg-spark-runtime-3.4_${scala.binary.version}
+ 1.3.1
+ test
+
+
+
diff --git a/integration-tests/src/test/scala/za/co/absa/spline/DeltaSpec.scala b/integration-tests/src/test/scala/za/co/absa/spline/DeltaSpec.scala
index 4e1c2b72..d8780c03 100644
--- a/integration-tests/src/test/scala/za/co/absa/spline/DeltaSpec.scala
+++ b/integration-tests/src/test/scala/za/co/absa/spline/DeltaSpec.scala
@@ -37,7 +37,7 @@ class DeltaSpec extends AsyncFlatSpec
private val deltaPath = TempDirectory(prefix = "delta", pathOnly = true).deleteOnExit().toURI.toString
it should "support Delta Lake as a source" taggedAs
- ignoreIf(ver"$SPARK_VERSION" < ver"2.4.2") in
+ ignoreIf(ver"$SPARK_VERSION" < ver"2.4.2" || ver"$SPARK_VERSION" >= ver"3.4.0") in
withNewSparkSession { implicit spark =>
withLineageTracking { captor =>
val testData: DataFrame = {
@@ -79,7 +79,7 @@ class DeltaSpec extends AsyncFlatSpec
}
it should "support insert into existing Delta Lake table" taggedAs
- ignoreIf(ver"$SPARK_VERSION" < ver"2.4.2") in
+ ignoreIf(ver"$SPARK_VERSION" < ver"2.4.2" || ver"$SPARK_VERSION" >= ver"3.4.0") in
withNewSparkSession { implicit spark =>
withLineageTracking { lineageCaptor =>
val testData: DataFrame = {
diff --git a/pom.xml b/pom.xml
index 2d1c7079..ec8baaad 100644
--- a/pom.xml
+++ b/pom.xml
@@ -91,6 +91,7 @@
3.1.3
3.2.3
3.3.1
+ 3.4.1
@@ -100,6 +101,8 @@
1.0.0
2.0.0
2.1.0
+ 2.4.0
+
${cassandra-connector-24.version}
@@ -108,6 +111,7 @@
3.1.0
3.2.0
3.3.0
+ 3.4.1
0.13.7
@@ -815,6 +819,29 @@
+
+ spark-3.4
+
+ ${spark-34.version}
+ ${delta-24.version}
+ ${spark-34.version}_0.19.0
+ ${cassandra-connector-34.version}
+
+
+
+
+ org.apache.spark
+ spark-parent_${scala.binary.version}
+ ${spark.version}
+ pom
+ import
+
+
+
+
+
+
+