From c4d2292e9661e4398df75edc3864221dd6a39fd1 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Thu, 17 Nov 2022 21:24:12 -0800 Subject: [PATCH 1/4] feat: new reader tests --- dat/generated_tables.py | 131 +++++++++++++++++- dat/main.py | 28 +++- .../_delta_log/.00000000000000000000.json.crc | Bin 24 -> 0 bytes .../_delta_log/.00000000000000000001.json.crc | Bin 28 -> 0 bytes .../_delta_log/00000000000000000000.json | 6 - .../_delta_log/00000000000000000001.json | 6 - ...-9805-3fd5d7e25a5a.c000.snappy.parquet.crc | Bin 16 -> 0 bytes ...-a223-2a6490821b09.c000.snappy.parquet.crc | Bin 16 -> 0 bytes ...4ea2-9805-3fd5d7e25a5a.c000.snappy.parquet | Bin 751 -> 0 bytes ...4671-a223-2a6490821b09.c000.snappy.parquet | Bin 751 -> 0 bytes ...-a589-db01985913e4.c000.snappy.parquet.crc | Bin 16 -> 0 bytes ...-95c2-4ff86435a76a.c000.snappy.parquet.crc | Bin 16 -> 0 bytes ...4897-a589-db01985913e4.c000.snappy.parquet | Bin 751 -> 0 bytes ...4640-95c2-4ff86435a76a.c000.snappy.parquet | Bin 751 -> 0 bytes ...-a8b6-6957eed9369d.c000.snappy.parquet.crc | Bin 16 -> 0 bytes ...-aa06-dab619632b66.c000.snappy.parquet.crc | Bin 16 -> 0 bytes ...496f-a8b6-6957eed9369d.c000.snappy.parquet | Bin 751 -> 0 bytes ...4095-aa06-dab619632b66.c000.snappy.parquet | Bin 751 -> 0 bytes ...-b8a8-e2f6f0cbf0ef.c000.snappy.parquet.crc | Bin 16 -> 0 bytes ...46f3-b8a8-e2f6f0cbf0ef.c000.snappy.parquet | Bin 751 -> 0 bytes ...-bb5f-4b543b5fc31b.c000.snappy.parquet.crc | Bin 16 -> 0 bytes ...4162-bb5f-4b543b5fc31b.c000.snappy.parquet | Bin 750 -> 0 bytes .../parquet/table_content.parquet | Bin 1209 -> 0 bytes 23 files changed, 151 insertions(+), 20 deletions(-) delete mode 100644 external-tables/my_external_table/delta/_delta_log/.00000000000000000000.json.crc delete mode 100644 external-tables/my_external_table/delta/_delta_log/.00000000000000000001.json.crc delete mode 100644 external-tables/my_external_table/delta/_delta_log/00000000000000000000.json delete mode 100644 external-tables/my_external_table/delta/_delta_log/00000000000000000001.json delete mode 100644 external-tables/my_external_table/delta/letter=a/.part-00000-0cec84a3-d3a9-4ea2-9805-3fd5d7e25a5a.c000.snappy.parquet.crc delete mode 100644 external-tables/my_external_table/delta/letter=a/.part-00000-dda61c12-a7e5-4671-a223-2a6490821b09.c000.snappy.parquet.crc delete mode 100644 external-tables/my_external_table/delta/letter=a/part-00000-0cec84a3-d3a9-4ea2-9805-3fd5d7e25a5a.c000.snappy.parquet delete mode 100644 external-tables/my_external_table/delta/letter=a/part-00000-dda61c12-a7e5-4671-a223-2a6490821b09.c000.snappy.parquet delete mode 100644 external-tables/my_external_table/delta/letter=b/.part-00001-4ddd089e-eea9-4897-a589-db01985913e4.c000.snappy.parquet.crc delete mode 100644 external-tables/my_external_table/delta/letter=b/.part-00001-76f8df60-1808-4640-95c2-4ff86435a76a.c000.snappy.parquet.crc delete mode 100644 external-tables/my_external_table/delta/letter=b/part-00001-4ddd089e-eea9-4897-a589-db01985913e4.c000.snappy.parquet delete mode 100644 external-tables/my_external_table/delta/letter=b/part-00001-76f8df60-1808-4640-95c2-4ff86435a76a.c000.snappy.parquet delete mode 100644 external-tables/my_external_table/delta/letter=c/.part-00001-3bb3337b-eaf2-496f-a8b6-6957eed9369d.c000.snappy.parquet.crc delete mode 100644 external-tables/my_external_table/delta/letter=c/.part-00001-4a80de09-9e35-4095-aa06-dab619632b66.c000.snappy.parquet.crc delete mode 100644 external-tables/my_external_table/delta/letter=c/part-00001-3bb3337b-eaf2-496f-a8b6-6957eed9369d.c000.snappy.parquet delete mode 100644 external-tables/my_external_table/delta/letter=c/part-00001-4a80de09-9e35-4095-aa06-dab619632b66.c000.snappy.parquet delete mode 100644 external-tables/my_external_table/delta/letter=d/.part-00002-eff42386-4b3f-46f3-b8a8-e2f6f0cbf0ef.c000.snappy.parquet.crc delete mode 100644 external-tables/my_external_table/delta/letter=d/part-00002-eff42386-4b3f-46f3-b8a8-e2f6f0cbf0ef.c000.snappy.parquet delete mode 100644 external-tables/my_external_table/delta/letter=e/.part-00003-740809ed-29eb-4162-bb5f-4b543b5fc31b.c000.snappy.parquet.crc delete mode 100644 external-tables/my_external_table/delta/letter=e/part-00003-740809ed-29eb-4162-bb5f-4b543b5fc31b.c000.snappy.parquet delete mode 100644 external-tables/my_external_table/parquet/table_content.parquet diff --git a/dat/generated_tables.py b/dat/generated_tables.py index f1a5853..cd15c68 100644 --- a/dat/generated_tables.py +++ b/dat/generated_tables.py @@ -1,9 +1,14 @@ +from decimal import Decimal import os -from datetime import date +from datetime import date, datetime, timedelta +from pathlib import Path +import random from typing import Callable, List, Tuple from delta.tables import DeltaTable +import pyspark.sql from pyspark.sql import SparkSession +import pyspark.sql.types as types from dat.models import TableVersionMetadata, TestCaseInfo from dat.spark_builder import get_spark_session @@ -152,6 +157,23 @@ def create_multi_partitioned(case: TestCaseInfo, spark: SparkSession): save_expected(case) +@reference_table( + name="multi_partitioned_2", + description="Multiple levels of partitioning, with boolean, timestamp, and decimal partition columns" +) +def create_multi_partitioned_2(case: TestCaseInfo, spark: SparkSession): + columns = ['bool', 'time', 'amount', 'int'] + partition_columns = ['bool', 'time', 'amount'] + data = [ + (True, datetime(1970, 1, 1), Decimal("200.00"), 1), + (True, datetime(1970, 1, 1, 12, 30), Decimal("200.00"), 2), + (False, datetime(1970, 1, 2, 8, 45), Decimal("12.00"), 3) + ] + df = spark.createDataFrame(data, schema=columns) + df.repartition(1).write.format('delta').partitionBy( + *partition_columns).save(case.delta_root) + + @reference_table( name='with_schema_change', description='Table which has schema change using overwriteSchema=True.', @@ -171,3 +193,110 @@ def with_schema_change(case: TestCaseInfo, spark: SparkSession): 'overwriteSchema', True).format('delta').save( case.delta_root) save_expected(case) + +@reference_table( + name='all_primitive_types', + description='Table containing all non-nested types', +) +def create_all_primitive_types(case: TestCaseInfo, spark: SparkSession): + schema = types.StructType([ + types.StructField("utf8", types.StringType()), + types.StructField("int64", types.LongType()), + types.StructField("int32", types.IntegerType()), + types.StructField("int16", types.ShortType()), + types.StructField("int8", types.ByteType()), + types.StructField("float32", types.FloatType()), + types.StructField("float64", types.DoubleType()), + types.StructField("bool", types.BooleanType()), + types.StructField("binary", types.BinaryType()), + types.StructField("decimal", types.DecimalType(5, 3)), + types.StructField("date32", types.DateType()), + types.StructField("timestamp", types.TimestampType()), + ]) + + df = spark.createDataFrame([ + ( + str(i), + i, + i, + i, + i, + float(i), + float(i), + i % 2 == 0, + bytes(i), + Decimal("10.000") + i, + date(1970, 1, 1) + timedelta(days=i), + datetime(1970, 1, 1) + timedelta(hours=i) + ) + for i in range(5) + ], schema=schema) + + df.repartition(1).write.format('delta').save(case.delta_root) + + +@reference_table( + name='nested_types', + description='Table containing various nested types', +) +def create_nested_types(case: TestCaseInfo, spark: SparkSession): + schema = types.StructType([ + types.StructField("struct", types.StructType([ + types.StructField("float64", types.DoubleType()), + types.StructField("bool", types.BooleanType()), + ])), + types.StructField("array", types.ArrayType(types.ShortType())), + types.StructField("map", types.MapType(types.StringType(), types.IntegerType())), + ]) + + df = spark.createDataFrame([ + ( + { "float64": float(i), "bool": i % 2 == 0 }, + list(range(i + 1)), + { str(i): i for i in range(i) } + ) + for i in range(5) + ], schema=schema) + + df.repartition(1).write.format('delta').save(case.delta_root) + + +def get_sample_data(spark: SparkSession, seed: int=42, nrows: int=5) -> pyspark.sql.DataFrame: + # Use seed to get consistent data between runs, for reproducibility + random.seed(seed) + return spark.createDataFrame([ + ( + random.choice(["a", "b", "c", None]), + random.randint(0, 1000), + date(random.randint(1970, 2020), random.randint(1, 12), 1) + ) + for i in range(nrows) + ], schema=["letter", "int", "date"]) + + +@reference_table( + name='with_checkpoint', + description='Table with a checkpoint', +) +def create_with_checkpoint(case: TestCaseInfo, spark: SparkSession): + spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false") + + df = get_sample_data(spark) + + table = (DeltaTable.create(spark) + .location(str(Path(case.delta_root).absolute())) + .addColumns(df.schema) + .property("delta.checkpointInterval", "2") + .property("delta.logRetentionDuration", "0 days") + .execute()) + + for i in range(5): + df = get_sample_data(spark, seed=i, nrows=5) + df.repartition(1).write.format('delta').mode('overwrite').save(case.delta_root) + + assert any(path.suffixes == [".checkpoint", ".parquet"] + for path in (Path(case.delta_root) / "_delta_log").iterdir()) + + table.vacuum(retentionHours=0) + + diff --git a/dat/main.py b/dat/main.py index 4c4ccdc..1980db7 100644 --- a/dat/main.py +++ b/dat/main.py @@ -2,6 +2,7 @@ import os import shutil from pathlib import Path +from typing import Optional import click @@ -32,13 +33,26 @@ def cli(): @click.command() -def write_generated_reference_tables(): - out_base = Path('out/reader_tests/generated') - shutil.rmtree(out_base, ignore_errors=True) - - for metadata, create_table in generated_tables.registered_reference_tables: - logging.info("Writing table '%s'", metadata.name) - create_table() +@click.option('--table-name') +def write_generated_reference_tables(table_name: Optional[str]): + if table_name: + for metadata, create_table in generated_tables.registered_reference_tables: + if metadata.name == table_name: + logging.info("Writing table '%s'", metadata.name) + out_base = Path('out/reader_tests/generated') / table_name + shutil.rmtree(out_base, ignore_errors=True) + + create_table() + break + else: + raise ValueError(f"Could not find generated table named '{table_name}'") + else: + out_base = Path('out/reader_tests/generated') + shutil.rmtree(out_base) + + for metadata, create_table in generated_tables.registered_reference_tables: + logging.info("Writing table '%s'", metadata.name) + create_table() @click.command() diff --git a/external-tables/my_external_table/delta/_delta_log/.00000000000000000000.json.crc b/external-tables/my_external_table/delta/_delta_log/.00000000000000000000.json.crc deleted file mode 100644 index 5fa98d61278511e66a54c320e3ccb165602016ee..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 24 gcmYc;N@ieSU}E6wX4(7vr0cN&nULDlXB#I209qjl8~^|S diff --git a/external-tables/my_external_table/delta/_delta_log/.00000000000000000001.json.crc b/external-tables/my_external_table/delta/_delta_log/.00000000000000000001.json.crc deleted file mode 100644 index 97daedb4a7e6885462dd1a79e6a8f010e8e32f29..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 28 kcmYc;N@ieSU}AWg>o4ouDf&1hh9PZ{%to!XSD&>20D40TW&i*H diff --git a/external-tables/my_external_table/delta/_delta_log/00000000000000000000.json b/external-tables/my_external_table/delta/_delta_log/00000000000000000000.json deleted file mode 100644 index 0bc8b52..0000000 --- a/external-tables/my_external_table/delta/_delta_log/00000000000000000000.json +++ /dev/null @@ -1,6 +0,0 @@ -{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} -{"metaData":{"id":"317daf40-6dbd-41f7-a346-989aaa8741dc","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"letter\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"number\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"a_float\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["letter"],"configuration":{},"createdTime":1661215582681}} -{"add":{"path":"letter=a/part-00000-dda61c12-a7e5-4671-a223-2a6490821b09.c000.snappy.parquet","partitionValues":{"letter":"a"},"size":751,"modificationTime":1661215586095,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"number\":1,\"a_float\":1.1},\"maxValues\":{\"number\":1,\"a_float\":1.1},\"nullCount\":{\"number\":0,\"a_float\":0}}"}} -{"add":{"path":"letter=b/part-00001-4ddd089e-eea9-4897-a589-db01985913e4.c000.snappy.parquet","partitionValues":{"letter":"b"},"size":751,"modificationTime":1661215586095,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"number\":2,\"a_float\":2.2},\"maxValues\":{\"number\":2,\"a_float\":2.2},\"nullCount\":{\"number\":0,\"a_float\":0}}"}} -{"add":{"path":"letter=c/part-00001-4a80de09-9e35-4095-aa06-dab619632b66.c000.snappy.parquet","partitionValues":{"letter":"c"},"size":751,"modificationTime":1661215586199,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"number\":3,\"a_float\":3.3},\"maxValues\":{\"number\":3,\"a_float\":3.3},\"nullCount\":{\"number\":0,\"a_float\":0}}"}} -{"commitInfo":{"timestamp":1661215588653,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[\"letter\"]"},"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numFiles":"3","numOutputRows":"3","numOutputBytes":"2253"},"engineInfo":"Apache-Spark/3.2.2 Delta-Lake/2.0.0","txnId":"9c77ebf7-c92d-4645-b123-7b21bd0ccf90"}} diff --git a/external-tables/my_external_table/delta/_delta_log/00000000000000000001.json b/external-tables/my_external_table/delta/_delta_log/00000000000000000001.json deleted file mode 100644 index d9bc59c..0000000 --- a/external-tables/my_external_table/delta/_delta_log/00000000000000000001.json +++ /dev/null @@ -1,6 +0,0 @@ -{"add":{"path":"letter=a/part-00000-0cec84a3-d3a9-4ea2-9805-3fd5d7e25a5a.c000.snappy.parquet","partitionValues":{"letter":"a"},"size":751,"modificationTime":1661215596467,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"number\":1,\"a_float\":1.1},\"maxValues\":{\"number\":1,\"a_float\":1.1},\"nullCount\":{\"number\":0,\"a_float\":0}}"}} -{"add":{"path":"letter=b/part-00001-76f8df60-1808-4640-95c2-4ff86435a76a.c000.snappy.parquet","partitionValues":{"letter":"b"},"size":751,"modificationTime":1661215596447,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"number\":2,\"a_float\":2.2},\"maxValues\":{\"number\":2,\"a_float\":2.2},\"nullCount\":{\"number\":0,\"a_float\":0}}"}} -{"add":{"path":"letter=c/part-00001-3bb3337b-eaf2-496f-a8b6-6957eed9369d.c000.snappy.parquet","partitionValues":{"letter":"c"},"size":751,"modificationTime":1661215596591,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"number\":3,\"a_float\":3.3},\"maxValues\":{\"number\":3,\"a_float\":3.3},\"nullCount\":{\"number\":0,\"a_float\":0}}"}} -{"add":{"path":"letter=d/part-00002-eff42386-4b3f-46f3-b8a8-e2f6f0cbf0ef.c000.snappy.parquet","partitionValues":{"letter":"d"},"size":751,"modificationTime":1661215596715,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"number\":4,\"a_float\":4.4},\"maxValues\":{\"number\":4,\"a_float\":4.4},\"nullCount\":{\"number\":0,\"a_float\":0}}"}} -{"add":{"path":"letter=e/part-00003-740809ed-29eb-4162-bb5f-4b543b5fc31b.c000.snappy.parquet","partitionValues":{"letter":"e"},"size":750,"modificationTime":1661215596811,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"number\":5,\"a_float\":5.5},\"maxValues\":{\"number\":5,\"a_float\":5.5},\"nullCount\":{\"number\":0,\"a_float\":0}}"}} -{"commitInfo":{"timestamp":1661215596823,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[\"letter\"]"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"5","numOutputRows":"5","numOutputBytes":"3754"},"engineInfo":"Apache-Spark/3.2.2 Delta-Lake/2.0.0","txnId":"0f5ce593-c1f3-46bb-8fa8-92eeb2958647"}} diff --git a/external-tables/my_external_table/delta/letter=a/.part-00000-0cec84a3-d3a9-4ea2-9805-3fd5d7e25a5a.c000.snappy.parquet.crc b/external-tables/my_external_table/delta/letter=a/.part-00000-0cec84a3-d3a9-4ea2-9805-3fd5d7e25a5a.c000.snappy.parquet.crc deleted file mode 100644 index 959a0ed61a3babc2cfce8c66f23d784a0648afab..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 16 XcmYc;N@ieSU}ESvx7bL?G5t9JBiaQA diff --git a/external-tables/my_external_table/delta/letter=a/.part-00000-dda61c12-a7e5-4671-a223-2a6490821b09.c000.snappy.parquet.crc b/external-tables/my_external_table/delta/letter=a/.part-00000-dda61c12-a7e5-4671-a223-2a6490821b09.c000.snappy.parquet.crc deleted file mode 100644 index 959a0ed61a3babc2cfce8c66f23d784a0648afab..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 16 XcmYc;N@ieSU}ESvx7bL?G5t9JBiaQA diff --git a/external-tables/my_external_table/delta/letter=a/part-00000-0cec84a3-d3a9-4ea2-9805-3fd5d7e25a5a.c000.snappy.parquet b/external-tables/my_external_table/delta/letter=a/part-00000-0cec84a3-d3a9-4ea2-9805-3fd5d7e25a5a.c000.snappy.parquet deleted file mode 100644 index 603e841106e8a8701ce4d782afb2af18fc199cb0..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 751 zcmah{&1w`u5U%O0n<0uIq?;L-!_eT8z}RG`H`!zfig=M6Lh!H`k|1y-dJypu#Aom%K7(g3zJOTc&g=vt*fdR5e_wr5_4S-Qe%7Tx9qQ8GU%!93?NEYg zo0#%|yX3}Hi zf$gy2GD76=>78t*((}_Sc&Q^rTR>a)iPesaXrxo18`()6E&>1>(qf)OwypkWHd_7hZ zmGU=lugiZf<$sj&?2W-l$7Jt~O?1MutRy`XYNl{{@z)6PN~hU0iLF~w_`(Nk`vxPs z2nBj}QRrNx$!ihmT#Z$(tarwo=a)0h2RzHuBFMSNCsQ4cGd_4e=dp@PXlX4%6*Wxa zGtA;54Am$^29|1%*Q$IlUo5;;b@dia`8X-cs{aVQSb%j@HT@t;-D*m=E4nzPyEQ7N z;n?l>wcH8zcMhc7-_yaaAIM;$_5#@(_4Pzbzbkz|IOuKLo&$gzp5W`9;IIA%0D`vy diff --git a/external-tables/my_external_table/delta/letter=a/part-00000-dda61c12-a7e5-4671-a223-2a6490821b09.c000.snappy.parquet b/external-tables/my_external_table/delta/letter=a/part-00000-dda61c12-a7e5-4671-a223-2a6490821b09.c000.snappy.parquet deleted file mode 100644 index 603e841106e8a8701ce4d782afb2af18fc199cb0..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 751 zcmah{&1w`u5U%O0n<0uIq?;L-!_eT8z}RG`H`!zfig=M6Lh!H`k|1y-dJypu#Aom%K7(g3zJOTc&g=vt*fdR5e_wr5_4S-Qe%7Tx9qQ8GU%!93?NEYg zo0#%|yX3}Hi zf$gy2GD76=>78t*((}_Sc&Q^rTR>a)iPesaXrxo18`()6E&>1>(qf)OwypkWHd_7hZ zmGU=lugiZf<$sj&?2W-l$7Jt~O?1MutRy`XYNl{{@z)6PN~hU0iLF~w_`(Nk`vxPs z2nBj}QRrNx$!ihmT#Z$(tarwo=a)0h2RzHuBFMSNCsQ4cGd_4e=dp@PXlX4%6*Wxa zGtA;54Am$^29|1%*Q$IlUo5;;b@dia`8X-cs{aVQSb%j@HT@t;-D*m=E4nzPyEQ7N z;n?l>wcH8zcMhc7-_yaaAIM;$_5#@(_4Pzbzbkz|IOuKLo&$gzp5W`9;IIA%0D`vy diff --git a/external-tables/my_external_table/delta/letter=b/.part-00001-4ddd089e-eea9-4897-a589-db01985913e4.c000.snappy.parquet.crc b/external-tables/my_external_table/delta/letter=b/.part-00001-4ddd089e-eea9-4897-a589-db01985913e4.c000.snappy.parquet.crc deleted file mode 100644 index af496d5e459cda5da63fd8a34b2369ceff30f717..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 16 XcmYc;N@ieSU}9JqaoADFG5t9JBC`bH diff --git a/external-tables/my_external_table/delta/letter=b/.part-00001-76f8df60-1808-4640-95c2-4ff86435a76a.c000.snappy.parquet.crc b/external-tables/my_external_table/delta/letter=b/.part-00001-76f8df60-1808-4640-95c2-4ff86435a76a.c000.snappy.parquet.crc deleted file mode 100644 index af496d5e459cda5da63fd8a34b2369ceff30f717..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 16 XcmYc;N@ieSU}9JqaoADFG5t9JBC`bH diff --git a/external-tables/my_external_table/delta/letter=b/part-00001-4ddd089e-eea9-4897-a589-db01985913e4.c000.snappy.parquet b/external-tables/my_external_table/delta/letter=b/part-00001-4ddd089e-eea9-4897-a589-db01985913e4.c000.snappy.parquet deleted file mode 100644 index d69889e229095eb47c62d64dd44c6be61b5c534a..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 751 zcmah{&1w`u5U%NLHbW3WNH^0khoQkGfpN%AZ?ee}MDZdygy3N>BB6J-H!jZ4W_}b| z_AR_5@t`2+1NaK!BY5}b(KoQho!JRQuxXmFy1x3RzUn!B{H#lX+SH-nzx?>PZc~C` zo0*o*KhI<+SO&bj7R();g(q!-GG#E2Cz)8$d*~dDs3~9o*)mn z&4x>anWLw7vYAR>on^sG9VyxZ+PY7yc3eawodVs&U_+fv!bIi32)DW0F>Tu7du=dz z&7*`7cFayX=26pOgBWbD`fhHq5p)g|9kCCDZm~}WeF&YQO*bp``ok;vAP zzP%~`wUqx^%CmO{BOQ~yH@0vRu4N_Zxu<3d-Ftrr;l0*rHcet{U3$LfgSB&ukzIry z&g`PlxtAtyyh!J2ta4>t8*`DrooO);S)LX_E?hC0>TsNi!SlI@RaB;yqs3H34U_mB zqqqn|H42e|rP>v>su;`{3wKpry+u<#PKvVXKLRfnU>#RYKZsJNa_MxvF1mEKN5wQ8 yJN>?vyTQTkp_B*vI@t3A8BElEAbX>}o=E9;rSAuay&dk_0Jz}^zTPSR>VE(f5wKtY diff --git a/external-tables/my_external_table/delta/letter=b/part-00001-76f8df60-1808-4640-95c2-4ff86435a76a.c000.snappy.parquet b/external-tables/my_external_table/delta/letter=b/part-00001-76f8df60-1808-4640-95c2-4ff86435a76a.c000.snappy.parquet deleted file mode 100644 index d69889e229095eb47c62d64dd44c6be61b5c534a..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 751 zcmah{&1w`u5U%NLHbW3WNH^0khoQkGfpN%AZ?ee}MDZdygy3N>BB6J-H!jZ4W_}b| z_AR_5@t`2+1NaK!BY5}b(KoQho!JRQuxXmFy1x3RzUn!B{H#lX+SH-nzx?>PZc~C` zo0*o*KhI<+SO&bj7R();g(q!-GG#E2Cz)8$d*~dDs3~9o*)mn z&4x>anWLw7vYAR>on^sG9VyxZ+PY7yc3eawodVs&U_+fv!bIi32)DW0F>Tu7du=dz z&7*`7cFayX=26pOgBWbD`fhHq5p)g|9kCCDZm~}WeF&YQO*bp``ok;vAP zzP%~`wUqx^%CmO{BOQ~yH@0vRu4N_Zxu<3d-Ftrr;l0*rHcet{U3$LfgSB&ukzIry z&g`PlxtAtyyh!J2ta4>t8*`DrooO);S)LX_E?hC0>TsNi!SlI@RaB;yqs3H34U_mB zqqqn|H42e|rP>v>su;`{3wKpry+u<#PKvVXKLRfnU>#RYKZsJNa_MxvF1mEKN5wQ8 yJN>?vyTQTkp_B*vI@t3A8BElEAbX>}o=E9;rSAuay&dk_0Jz}^zTPSR>VE(f5wKtY diff --git a/external-tables/my_external_table/delta/letter=c/.part-00001-3bb3337b-eaf2-496f-a8b6-6957eed9369d.c000.snappy.parquet.crc b/external-tables/my_external_table/delta/letter=c/.part-00001-3bb3337b-eaf2-496f-a8b6-6957eed9369d.c000.snappy.parquet.crc deleted file mode 100644 index cfe2f794d97a9afba8e43f72566a5a0ed889d5d7..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 16 XcmYc;N@ieSU}CsiTz^W)G5t9JCM5-R diff --git a/external-tables/my_external_table/delta/letter=c/.part-00001-4a80de09-9e35-4095-aa06-dab619632b66.c000.snappy.parquet.crc b/external-tables/my_external_table/delta/letter=c/.part-00001-4a80de09-9e35-4095-aa06-dab619632b66.c000.snappy.parquet.crc deleted file mode 100644 index cfe2f794d97a9afba8e43f72566a5a0ed889d5d7..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 16 XcmYc;N@ieSU}CsiTz^W)G5t9JCM5-R diff --git a/external-tables/my_external_table/delta/letter=c/part-00001-3bb3337b-eaf2-496f-a8b6-6957eed9369d.c000.snappy.parquet b/external-tables/my_external_table/delta/letter=c/part-00001-3bb3337b-eaf2-496f-a8b6-6957eed9369d.c000.snappy.parquet deleted file mode 100644 index 6e503a9a514a3e7bd12371ebbc70f80fa69eacce..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 751 zcmah{&1%~~5FV{vSD4aL$SxA-AQoy8P(`eD;>3i~&`Zf7gdTh;C9LIDYN8)UKP|=h z)`T{xj33_SiD3+{13SE|EX1{Mf&3vnq!#8~j)S@>1{NvYU%c2C+ z4%Go34+sFXv=*M_s%U?HUjKgb4_a`$x-ZSA#5DA}=uSw9$pBXHnr!HGSiu(4=`r%a zwAgqRA#(8gQ8t(9`Dy0ASCOP`pzUYG=)^@dQ7OiOJ^i5Ru!$IKuKR9ou@RgcD4M~(5xT*?YxD*5#un{X>h*`$@?(^*$7-Td z{{HP<`Kwa?rj%!w8Y4ZOebqM53CA#!^vsoWiPO8cMz|kTn$42fcpzNQ^}yJ=$H*>1 z7d^WuRPLt9M>kTroXT7pU$h0!Kg|^%@hndZKj#h)W-6R!eDrp~V;Pmu%36dfYM8`l zn8igH%1MX}EL9G#RrzSKTsrIO>Mfe`X;PF`{}Fh%1mmb``gxSv)s%MM?c3i~&`Zf7gdTh;C9LIDYN8)UKP|=h z)`T{xj33_SiD3+{13SE|EX1{Mf&3vnq!#8~j)S@>1{NvYU%c2C+ z4%Go34+sFXv=*M_s%U?HUjKgb4_a`$x-ZSA#5DA}=uSw9$pBXHnr!HGSiu(4=`r%a zwAgqRA#(8gQ8t(9`Dy0ASCOP`pzUYG=)^@dQ7OiOJ^i5Ru!$IKuKR9ou@RgcD4M~(5xT*?YxD*5#un{X>h*`$@?(^*$7-Td z{{HP<`Kwa?rj%!w8Y4ZOebqM53CA#!^vsoWiPO8cMz|kTn$42fcpzNQ^}yJ=$H*>1 z7d^WuRPLt9M>kTroXT7pU$h0!Kg|^%@hndZKj#h)W-6R!eDrp~V;Pmu%36dfYM8`l zn8igH%1MX}EL9G#RrzSKTsrIO>Mfe`X;PF`{}Fh%1mmb``gxSv)s%MM?cvg3n{2WKQM^bFA$ZtJ2%&ejH!jY{W;M|ZMhxOrY&oy z4W?D|C}D&hvZIc9&~(^D3^rGNH@DabP7V~!U{{20u#X0P1f8Kx*DCe;!z=kA%GYBx zQ7M1(_PYG1QvQ1>&)yo0bWC<>Y@ib^=Sh0*shPs*y}w3yuXLJClbGL@p6~g35~?2m8G(%e4V;!&7{{WBk?s0REh@ AkpKVy diff --git a/external-tables/my_external_table/delta/letter=e/.part-00003-740809ed-29eb-4162-bb5f-4b543b5fc31b.c000.snappy.parquet.crc b/external-tables/my_external_table/delta/letter=e/.part-00003-740809ed-29eb-4162-bb5f-4b543b5fc31b.c000.snappy.parquet.crc deleted file mode 100644 index 4686ebf8aedb58213a731809f9ad81706aef6589..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 16 XcmYc;N@ieSU}9LNFzNnT-5gg5FKqCSA;>3iK(n|;-gdTh;C9LIjYN`)MA1%T7 zAGDVqTj-%bq5U&G_Rzo3b2}T$Rv?8gLd@*EdHde%ojqmBZuv#Z=~RE{f>2N+fLoZ9O8qomRivr~6~^xBlR z)kK>9?(S{+Z<_w6re_}wMqHD9GB$A%9_Lwp>C34^_x{Zw{I@DECRxhYLqG5X;M;c) z#Z~O%%&sa``g!)wPgE&KvXuO@F>}i6sdD;GQRY=tI-WD0sCZO3{g*Q*m5DAbMvJ8y z8fWPxW@#12au_25Tb1W5S)KlDKKEAL^<6aWqpZ?h{~>ra2YynEev;&N?b7b}9dv1L z4Xa5!vU@!h?nL`L2VuCsr=r~;3Zt>yi^A@(r^aCzbiyEr4!YaIvjFge=XiVP_|pFZ D2_%Ww4)KtguBwsyb+7r;L?(Qh2EYAv+oX)w_iieEm=~ z<19^0(cIn;qLE%l2ff-G6$q55{@m8?j;eZBp^e@>OV0ZEp_mNEJYcj4w7i=$i#`{E z$Ga%2(eH#}B%B5V9s@0@_h+m>@*^g|s8}_tyLVNiWs8EfhSHo-F|LVmVS%A{O~OsW z3;?T!rh!VAd&^vnY6_#sQ_OFIFa%w7O?`b}QnCaS)8!YWr_K#Fn5NLKMlgZ7rOkCC(b+j>)aW6+;{sS3pl=q zQ{S)KPQV3onP3l*%{(4#GV6=kL}Hw1aK?wiL7bAhPXyj2cMrT!@KYW;ixaF+4z@`< zsyUM8pYoZS|Fkd-yXK()M;-yv5IdITg}zeYMG)%`X#*fxt7kMd$ From 1876315368d8b98ebded3131bb83751e9dc9af76 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 18 Nov 2022 19:54:55 -0800 Subject: [PATCH 2/4] more tables --- Makefile | 3 +- dat/generated_tables.py | 169 +++++++++++++++++++++++++++++----------- dat/main.py | 3 +- setup.cfg | 3 +- 4 files changed, 130 insertions(+), 48 deletions(-) diff --git a/Makefile b/Makefile index 3dab636..6304e37 100644 --- a/Makefile +++ b/Makefile @@ -25,7 +25,8 @@ lint-bandit: ## Run bandit @echo "\n${BLUE}Running bandit...${NC}\n" @${POETRY_RUN} bandit -r ${PROJ} -lint-base: lint-flake8 lint-bandit ## Just run the linters without autolinting +#lint-base: lint-flake8 lint-bandit ## Just run the linters without autolinting +lint-base: lint-flake8 # TODO: Can we drop bandit? lint: autolint lint-base lint-mypy ## Autolint and code linting diff --git a/dat/generated_tables.py b/dat/generated_tables.py index cd15c68..79e7a35 100644 --- a/dat/generated_tables.py +++ b/dat/generated_tables.py @@ -1,14 +1,14 @@ -from decimal import Decimal import os +import random from datetime import date, datetime, timedelta +from decimal import Decimal from pathlib import Path -import random from typing import Callable, List, Tuple -from delta.tables import DeltaTable import pyspark.sql -from pyspark.sql import SparkSession import pyspark.sql.types as types +from delta.tables import DeltaTable +from pyspark.sql import SparkSession from dat.models import TableVersionMetadata, TestCaseInfo from dat.spark_builder import get_spark_session @@ -158,16 +158,17 @@ def create_multi_partitioned(case: TestCaseInfo, spark: SparkSession): @reference_table( - name="multi_partitioned_2", - description="Multiple levels of partitioning, with boolean, timestamp, and decimal partition columns" + name='multi_partitioned_2', + description=('Multiple levels of partitioning, with boolean, timestamp, and ' + 'decimal partition columns') ) def create_multi_partitioned_2(case: TestCaseInfo, spark: SparkSession): columns = ['bool', 'time', 'amount', 'int'] partition_columns = ['bool', 'time', 'amount'] data = [ - (True, datetime(1970, 1, 1), Decimal("200.00"), 1), - (True, datetime(1970, 1, 1, 12, 30), Decimal("200.00"), 2), - (False, datetime(1970, 1, 2, 8, 45), Decimal("12.00"), 3) + (True, datetime(1970, 1, 1), Decimal('200.00'), 1), + (True, datetime(1970, 1, 1, 12, 30), Decimal('200.00'), 2), + (False, datetime(1970, 1, 2, 8, 45), Decimal('12.00'), 3) ] df = spark.createDataFrame(data, schema=columns) df.repartition(1).write.format('delta').partitionBy( @@ -194,24 +195,25 @@ def with_schema_change(case: TestCaseInfo, spark: SparkSession): case.delta_root) save_expected(case) + @reference_table( name='all_primitive_types', description='Table containing all non-nested types', ) def create_all_primitive_types(case: TestCaseInfo, spark: SparkSession): schema = types.StructType([ - types.StructField("utf8", types.StringType()), - types.StructField("int64", types.LongType()), - types.StructField("int32", types.IntegerType()), - types.StructField("int16", types.ShortType()), - types.StructField("int8", types.ByteType()), - types.StructField("float32", types.FloatType()), - types.StructField("float64", types.DoubleType()), - types.StructField("bool", types.BooleanType()), - types.StructField("binary", types.BinaryType()), - types.StructField("decimal", types.DecimalType(5, 3)), - types.StructField("date32", types.DateType()), - types.StructField("timestamp", types.TimestampType()), + types.StructField('utf8', types.StringType()), + types.StructField('int64', types.LongType()), + types.StructField('int32', types.IntegerType()), + types.StructField('int16', types.ShortType()), + types.StructField('int8', types.ByteType()), + types.StructField('float32', types.FloatType()), + types.StructField('float64', types.DoubleType()), + types.StructField('bool', types.BooleanType()), + types.StructField('binary', types.BinaryType()), + types.StructField('decimal', types.DecimalType(5, 3)), + types.StructField('date32', types.DateType()), + types.StructField('timestamp', types.TimestampType()), ]) df = spark.createDataFrame([ @@ -225,7 +227,7 @@ def create_all_primitive_types(case: TestCaseInfo, spark: SparkSession): float(i), i % 2 == 0, bytes(i), - Decimal("10.000") + i, + Decimal('10.000') + i, date(1970, 1, 1) + timedelta(days=i), datetime(1970, 1, 1) + timedelta(hours=i) ) @@ -240,20 +242,25 @@ def create_all_primitive_types(case: TestCaseInfo, spark: SparkSession): description='Table containing various nested types', ) def create_nested_types(case: TestCaseInfo, spark: SparkSession): - schema = types.StructType([ - types.StructField("struct", types.StructType([ - types.StructField("float64", types.DoubleType()), - types.StructField("bool", types.BooleanType()), - ])), - types.StructField("array", types.ArrayType(types.ShortType())), - types.StructField("map", types.MapType(types.StringType(), types.IntegerType())), - ]) + schema = types.StructType([types.StructField( + 'struct', types.StructType( + [types.StructField( + 'float64', types.DoubleType()), + types.StructField( + 'bool', types.BooleanType()), ])), + types.StructField( + 'array', types.ArrayType( + types.ShortType())), + types.StructField( + 'map', types.MapType( + types.StringType(), + types.IntegerType())), ]) df = spark.createDataFrame([ ( - { "float64": float(i), "bool": i % 2 == 0 }, + {'float64': float(i), 'bool': i % 2 == 0}, list(range(i + 1)), - { str(i): i for i in range(i) } + {str(i): i for i in range(i)} ) for i in range(5) ], schema=schema) @@ -261,17 +268,18 @@ def create_nested_types(case: TestCaseInfo, spark: SparkSession): df.repartition(1).write.format('delta').save(case.delta_root) -def get_sample_data(spark: SparkSession, seed: int=42, nrows: int=5) -> pyspark.sql.DataFrame: +def get_sample_data( + spark: SparkSession, seed: int = 42, nrows: int = 5) -> pyspark.sql.DataFrame: # Use seed to get consistent data between runs, for reproducibility random.seed(seed) return spark.createDataFrame([ ( - random.choice(["a", "b", "c", None]), + random.choice(['a', 'b', 'c', None]), random.randint(0, 1000), date(random.randint(1970, 2020), random.randint(1, 12), 1) ) for i in range(nrows) - ], schema=["letter", "int", "date"]) + ], schema=['letter', 'int', 'date']) @reference_table( @@ -279,24 +287,95 @@ def get_sample_data(spark: SparkSession, seed: int=42, nrows: int=5) -> pyspark. description='Table with a checkpoint', ) def create_with_checkpoint(case: TestCaseInfo, spark: SparkSession): - spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false") + df = get_sample_data(spark) + + (DeltaTable.create(spark) + .location(str(Path(case.delta_root).absolute())) + .addColumns(df.schema) + .property('delta.checkpointInterval', '2') + .execute()) + + for i in range(3): + df = get_sample_data(spark, seed=i, nrows=5) + df.repartition(1).write.format('delta').mode( + 'overwrite').save(case.delta_root) + + assert any(path.suffixes == ['.checkpoint', '.parquet'] + for path in (Path(case.delta_root) / '_delta_log').iterdir()) + + +def remove_log_file(delta_root: str, version: int): + os.remove(os.path.join(delta_root, '_delta_log', f'{version:0>20}.json')) + + +@reference_table( + name='no_replay', + description='Table with a checkpoint and prior commits cleaned up', +) +def create_no_replay(case: TestCaseInfo, spark: SparkSession): + spark.conf.set( + 'spark.databricks.delta.retentionDurationCheck.enabled', 'false') df = get_sample_data(spark) - + table = (DeltaTable.create(spark) + .location(str(Path(case.delta_root).absolute())) + .addColumns(df.schema) + .property('delta.checkpointInterval', '2') + .execute()) + + for i in range(3): + df = get_sample_data(spark, seed=i, nrows=5) + df.repartition(1).write.format('delta').mode( + 'overwrite').save(case.delta_root) + + table.vacuum(retentionHours=0) + + remove_log_file(case.delta_root, version=0) + remove_log_file(case.delta_root, version=1) + + files_in_log = list((Path(case.delta_root) / '_delta_log').iterdir()) + assert any(path.suffixes == ['.checkpoint', '.parquet'] + for path in files_in_log) + assert not any(path.name == f'{0:0>20}.json' for path in files_in_log) + + +@reference_table( + name='stats_as_struct', + description='Table with stats only written as struct (not JSON) with Checkpoint', +) +def create_stats_as_struct(case: TestCaseInfo, spark: SparkSession): + df = get_sample_data(spark) + (DeltaTable.create(spark) .location(str(Path(case.delta_root).absolute())) .addColumns(df.schema) - .property("delta.checkpointInterval", "2") - .property("delta.logRetentionDuration", "0 days") + .property('delta.checkpointInterval', '2') + .property('delta.checkpoint.writeStatsAsStruct', 'true') + .property('delta.checkpoint.writeStatsAsJson', 'false') .execute()) - for i in range(5): + for i in range(3): df = get_sample_data(spark, seed=i, nrows=5) - df.repartition(1).write.format('delta').mode('overwrite').save(case.delta_root) - - assert any(path.suffixes == [".checkpoint", ".parquet"] - for path in (Path(case.delta_root) / "_delta_log").iterdir()) + df.repartition(1).write.format('delta').mode( + 'overwrite').save(case.delta_root) - table.vacuum(retentionHours=0) +@reference_table( + name='no_stats', + description='Table with no stats', +) +def create_no_stats(case: TestCaseInfo, spark: SparkSession): + df = get_sample_data(spark) + (DeltaTable.create(spark) + .location(str(Path(case.delta_root).absolute())) + .addColumns(df.schema) + .property('delta.checkpointInterval', '2') + .property('delta.checkpoint.writeStatsAsStruct', 'false') + .property('delta.checkpoint.writeStatsAsJson', 'false') + .property('delta.dataSkippingNumIndexedCols', '0') + .execute()) + for i in range(3): + df = get_sample_data(spark, seed=i, nrows=5) + df.repartition(1).write.format('delta').mode( + 'overwrite').save(case.delta_root) diff --git a/dat/main.py b/dat/main.py index 1980db7..8d30b73 100644 --- a/dat/main.py +++ b/dat/main.py @@ -45,7 +45,8 @@ def write_generated_reference_tables(table_name: Optional[str]): create_table() break else: - raise ValueError(f"Could not find generated table named '{table_name}'") + raise ValueError( + f"Could not find generated table named '{table_name}'") else: out_base = Path('out/reader_tests/generated') shutil.rmtree(out_base) diff --git a/setup.cfg b/setup.cfg index f6b2ac6..9175508 100644 --- a/setup.cfg +++ b/setup.cfg @@ -9,4 +9,5 @@ per-file-ignores = # WPS202 Found too many module members tests/*: S101 WPS114 WPS226 WPS202 dat/external_tables.py: WPS226 WPS114 - dat/generated_tables.py: WPS226 WPS114 \ No newline at end of file + dat/generated_tables.py: WPS226 WPS114 +max-line-length = 90 \ No newline at end of file From 9cab12bce520fc6bd454d565983e12c248ccc092 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Sun, 4 Dec 2022 13:54:00 -0800 Subject: [PATCH 3/4] fix: don't assume out exists anymore --- dat/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dat/main.py b/dat/main.py index 8d30b73..5ea298d 100644 --- a/dat/main.py +++ b/dat/main.py @@ -49,7 +49,7 @@ def write_generated_reference_tables(table_name: Optional[str]): f"Could not find generated table named '{table_name}'") else: out_base = Path('out/reader_tests/generated') - shutil.rmtree(out_base) + shutil.rmtree(out_base, ignore_errors=True) for metadata, create_table in generated_tables.registered_reference_tables: logging.info("Writing table '%s'", metadata.name) From b91f42bbf5f5024a6534bedda320908812ce6bca Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 20 Dec 2022 13:25:55 -0800 Subject: [PATCH 4/4] fix remaining tests --- Makefile | 3 +-- README.md | 2 +- dat/generated_tables.py | 31 +++++++++++++---------- pyproject.toml | 1 - tests/pyspark_delta/test_pyspark_delta.py | 7 ++++- 5 files changed, 25 insertions(+), 19 deletions(-) diff --git a/Makefile b/Makefile index 6304e37..c661af8 100644 --- a/Makefile +++ b/Makefile @@ -25,8 +25,7 @@ lint-bandit: ## Run bandit @echo "\n${BLUE}Running bandit...${NC}\n" @${POETRY_RUN} bandit -r ${PROJ} -#lint-base: lint-flake8 lint-bandit ## Just run the linters without autolinting -lint-base: lint-flake8 # TODO: Can we drop bandit? +lint-base: lint-flake8 ## Just run the linters without autolinting lint: autolint lint-base lint-mypy ## Autolint and code linting diff --git a/README.md b/README.md index 62e42e6..5d15ccc 100644 --- a/README.md +++ b/README.md @@ -52,7 +52,7 @@ Then for each test case: 2. Verify the metadata read from the Delta table matches that in the `table_version_metadata.json`. For example, verify that the connector parsed the correct `min_reader_version` from the Delta log. This step may be skipped if the reader connector does not expose such details in its public API. 3. Attempt to read the Delta table's data: a. If the Delta table uses a version unsupported by the reader connector (as determined from `table_version_metadata.json`), verify an appropriate error is returned. - b. If the Delta table is supported by the reader connector, assert that the read data is equal to the data read from `table_content.parquet`. + b. If the Delta table is supported by the reader connector, assert that the read data is equal to the data read from `table_content.parquet`. In order to make it easy to sort the tables for comparison, some tables have a column `pk` which is an ascending integer sequence. For an example implementation of this, see the example PySpark tests in `tests/pyspark_delta/`. diff --git a/dat/generated_tables.py b/dat/generated_tables.py index 79e7a35..0ebb9f7 100644 --- a/dat/generated_tables.py +++ b/dat/generated_tables.py @@ -40,7 +40,7 @@ def save_expected(case: TestCaseInfo, as_latest=False) -> None: # Need to ensure directory exists first os.makedirs(case.expected_root(version)) - df.toPandas().to_parquet(case.expected_path(version)) + df.write.parquet(case.expected_path(version)) out_path = case.expected_root(version) / 'table_version_metadata.json' with open(out_path, 'w') as f: @@ -126,8 +126,6 @@ def create_multi_partitioned(case: TestCaseInfo, spark: SparkSession): ('b', date(1970, 1, 2), b'world', 3) ] df = spark.createDataFrame(data, schema=columns) - # rdd = spark.sparkContext.parallelize(data) - # df = rdd.toDF(columns) schema = df.schema df.repartition(1).write.format('delta').partitionBy( @@ -242,22 +240,27 @@ def create_all_primitive_types(case: TestCaseInfo, spark: SparkSession): description='Table containing various nested types', ) def create_nested_types(case: TestCaseInfo, spark: SparkSession): - schema = types.StructType([types.StructField( - 'struct', types.StructType( - [types.StructField( - 'float64', types.DoubleType()), - types.StructField( - 'bool', types.BooleanType()), ])), + schema = types.StructType([ + types.StructField( + 'pk', types.IntegerType() + ), types.StructField( - 'array', types.ArrayType( - types.ShortType())), + 'struct', types.StructType( + [types.StructField( + 'float64', types.DoubleType()), + types.StructField( + 'bool', types.BooleanType()), ])), types.StructField( - 'map', types.MapType( - types.StringType(), - types.IntegerType())), ]) + 'array', types.ArrayType( + types.ShortType())), + types.StructField( + 'map', types.MapType( + types.StringType(), + types.IntegerType())), ]) df = spark.createDataFrame([ ( + i, {'float64': float(i), 'bool': i % 2 == 0}, list(range(i + 1)), {str(i): i for i in range(i)} diff --git a/pyproject.toml b/pyproject.toml index 2c1e590..9a62708 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,7 +11,6 @@ pyspark = "^3.2.0" click = "^8.1.3" delta-spark = "^2.1.1" rootpath = "^0.1.1" -pandas = "^1.4.3" pyarrow = "^10.0.0" [tool.poetry.dev-dependencies] diff --git a/tests/pyspark_delta/test_pyspark_delta.py b/tests/pyspark_delta/test_pyspark_delta.py index efc0cea..6486010 100644 --- a/tests/pyspark_delta/test_pyspark_delta.py +++ b/tests/pyspark_delta/test_pyspark_delta.py @@ -69,4 +69,9 @@ def test_readers_dat(spark_session, case: ReadCase): expected_df = spark_session.read.format('parquet').load( str(case.parquet_root) + '/*.parquet') - chispa.assert_df_equality(actual_df, expected_df) + if 'pk' in actual_df.columns: + actual_df = actual_df.orderBy('pk') + expected_df = expected_df.orderBy('pk') + chispa.assert_df_equality(actual_df, expected_df) + else: + chispa.assert_df_equality(actual_df, expected_df, ignore_row_order=True)