Skip to content

Commit 4206b5c

Browse files
ghanseronanstokes-dbnfx
authored
Feature standard datasets - part 2 (#286)
* work in progress * work in progress * work in progress * wip * wip * added implementations for Datasets describe and listing * bumpedBuild * bumpedBuild * bumpedBuild * bumpedBuild * fixed dataset provider imports * fixed dataset provider imports * fixed dataset provider imports * fixed dataset provider imports * fixed dataset provider imports * fixed dataset provider imports * wip * wip * initial working version * initial working version * initial working version * initial working version * initial working version * initial working version * initial working version * initial working version * initial working version * added telephony plans * added telephony plans * added telephony plans * initial working version added plugin mechanics, initial user table and part of telephony plans * Added tokei.rs badge (#253) [![lines of code](https://tokei.rs/b1/github/databrickslabs/dbldatagen)]([https://codecov.io/github/databrickslabs/dbldatagen](https://github.com/databrickslabs/dbldatagen)) * Prep for release 036 (#251) * prep for version 0.3.6 * added telephony plans * initial implementation * added basic/iot dataset * wip * work in progress * wip * wip * wip * wip * wip * wip * work in progress * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * wip * additional coverage tests * additional coverage * additional coverage * additional coverage * additional coverage * Add some standard datasets * Fix a bug with the default number of groups * Test changes * Updated datasets * Added more unit tests and safety * Added more tests * Update test_standard_dataset_providers.py added space to line 147 to pass lint checks --------- Co-authored-by: ronanstokes-db <ronan.stokes@databricks.com> Co-authored-by: Ronan Stokes <42389040+ronanstokes-db@users.noreply.github.com> Co-authored-by: Serge Smertin <259697+nfx@users.noreply.github.com>
1 parent da1df6b commit 4206b5c

7 files changed

+691
-1
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ All notable changes to the Databricks Labs Data Generator will be documented in
1010
* Updated documentation for generating text data.
1111
* Modified data distribiutions to use abstract base classes
1212
* migrated data distribution tests to use `pytest`
13+
* Additional standard datasets
1314

1415
#### Added
1516
* Added classes for constraints on the data generation via new package `dbldatagen.constraints`

dbldatagen/datasets/__init__.py

+8
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,16 @@
11
from .dataset_provider import DatasetProvider, dataset_definition
2+
from .basic_geometries import BasicGeometriesProvider
3+
from .basic_process_historian import BasicProcessHistorianProvider
4+
from .basic_telematics import BasicTelematicsProvider
25
from .basic_user import BasicUserProvider
6+
from .benchmark_groupby import BenchmarkGroupByProvider
37
from .multi_table_telephony_provider import MultiTableTelephonyProvider
48

59
__all__ = ["dataset_provider",
10+
"basic_geometries",
11+
"basic_process_historian",
12+
"basic_telematics",
613
"basic_user",
14+
"benchmark_groupby",
715
"multi_table_telephony_provider"
816
]
+111
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
from .dataset_provider import DatasetProvider, dataset_definition
2+
3+
4+
@dataset_definition(name="basic/geometries",
5+
summary="Geometry WKT dataset",
6+
autoRegister=True,
7+
supportsStreaming=True)
8+
class BasicGeometriesProvider(DatasetProvider.NoAssociatedDatasetsMixin, DatasetProvider):
9+
"""
10+
Basic Geometry WKT Dataset
11+
==========================
12+
13+
This is a basic process geospatial dataset with WKT strings representing `POINT`, `LINESTRING`,
14+
and `POLYGON` geometries.
15+
16+
It takes the following options when retrieving the table:
17+
- random: if True, generates random data
18+
- rows : number of rows to generate
19+
- partitions: number of partitions to use
20+
- geometryType: one of `point`, `lineString`, or `polygon`, default is `polygon`
21+
- maxVertices: number of points in each `lineString` or `polygon`
22+
23+
As the data specification is a DataGenerator object, you can add further columns to the data set and
24+
add constraints (when the feature is available)
25+
26+
Note that this datset does not use any features that would prevent it from being used as a source for a
27+
streaming dataframe, and so the flag `supportsStreaming` is set to True.
28+
29+
"""
30+
MIN_LOCATION_ID = 1000000
31+
MAX_LOCATION_ID = 9223372036854775807
32+
COLUMN_COUNT = 2
33+
ALLOWED_OPTIONS = [
34+
"geometryType",
35+
"maxVertices",
36+
"random"
37+
]
38+
39+
@DatasetProvider.allowed_options(options=ALLOWED_OPTIONS)
40+
def getTableGenerator(self, sparkSession, *, tableName=None, rows=-1, partitions=-1,
41+
**options):
42+
import dbldatagen as dg
43+
import warnings as w
44+
45+
generateRandom = options.get("random", False)
46+
geometryType = options.get("geometryType", "point")
47+
maxVertices = options.get("maxVertices", 1 if geometryType == "point" else 3)
48+
49+
assert tableName is None or tableName == DatasetProvider.DEFAULT_TABLE_NAME, "Invalid table name"
50+
if rows is None or rows < 0:
51+
rows = DatasetProvider.DEFAULT_ROWS
52+
if partitions is None or partitions < 0:
53+
partitions = self.autoComputePartitions(rows, self.COLUMN_COUNT)
54+
55+
df_spec = (
56+
dg.DataGenerator(sparkSession=sparkSession, name="test_data_set1", rows=rows,
57+
partitions=partitions, randomSeedMethod="hash_fieldname")
58+
.withColumn("location_id", "long", minValue=self.MIN_LOCATION_ID, maxValue=self.MAX_LOCATION_ID,
59+
uniqueValues=rows, random=generateRandom)
60+
)
61+
if geometryType == "point":
62+
if maxVertices > 1:
63+
w.warn('Ignoring property maxVertices for point geometries')
64+
df_spec = (
65+
df_spec.withColumn("lat", "float", minValue=-90.0, maxValue=90.0,
66+
step=1e-5, random=generateRandom, omit=True)
67+
.withColumn("lon", "float", minValue=-180.0, maxValue=180.0,
68+
step=1e-5, random=generateRandom, omit=True)
69+
.withColumn("wkt", "string", expr="concat('POINT(', lon, ' ', lat, ')')")
70+
)
71+
elif geometryType == "lineString":
72+
if maxVertices < 2:
73+
maxVertices = 2
74+
w.warn("Parameter maxVertices must be >=2 for 'lineString' geometries; Setting to 2")
75+
j = 0
76+
while j < maxVertices:
77+
df_spec = (
78+
df_spec.withColumn(f"lat_{j}", "float", minValue=-90.0, maxValue=90.0,
79+
step=1e-5, random=generateRandom, omit=True)
80+
.withColumn(f"lon_{j}", "float", minValue=-180.0, maxValue=180.0,
81+
step=1e-5, random=generateRandom, omit=True)
82+
)
83+
j = j + 1
84+
concatCoordinatesExpr = [f"concat(lon_{j}, ' ', lat_{j}, ', ')" for j in range(maxVertices)]
85+
concatPairsExpr = f"replace(concat('LINESTRING(', {', '.join(concatCoordinatesExpr)}, ')'), ', )', ')')"
86+
df_spec = (
87+
df_spec.withColumn("wkt", "string", expr=concatPairsExpr)
88+
)
89+
elif geometryType == "polygon":
90+
if maxVertices < 3:
91+
maxVertices = 3
92+
w.warn("Parameter maxVertices must be >=3 for 'polygon' geometries; Setting to 3")
93+
j = 0
94+
while j < maxVertices:
95+
df_spec = (
96+
df_spec.withColumn(f"lat_{j}", "float", minValue=-90.0, maxValue=90.0,
97+
step=1e-5, random=generateRandom, omit=True)
98+
.withColumn(f"lon_{j}", "float", minValue=-180.0, maxValue=180.0,
99+
step=1e-5, random=generateRandom, omit=True)
100+
)
101+
j = j + 1
102+
vertexIndices = list(range(maxVertices)) + [0]
103+
concatCoordinatesExpr = [f"concat(lon_{j}, ' ', lat_{j}, ', ')" for j in vertexIndices]
104+
concatPairsExpr = f"replace(concat('POLYGON(', {', '.join(concatCoordinatesExpr)}, ')'), ', )', ')')"
105+
df_spec = (
106+
df_spec.withColumn("wkt", "string", expr=concatPairsExpr)
107+
)
108+
else:
109+
raise ValueError("geometryType must be 'point', 'lineString', or 'polygon'")
110+
111+
return df_spec
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
from .dataset_provider import DatasetProvider, dataset_definition
2+
3+
4+
@dataset_definition(name="basic/process_historian",
5+
summary="Basic Historian Data for Process Manufacturing",
6+
autoRegister=True,
7+
supportsStreaming=True)
8+
class BasicProcessHistorianProvider(DatasetProvider.NoAssociatedDatasetsMixin, DatasetProvider):
9+
"""
10+
Basic Process Historian Dataset
11+
===============================
12+
13+
This is a basic process historian data set with fields like plant ID, device ID, tag name, timestamp,
14+
value, and units of measure.
15+
16+
It takes the following options when retrieving the table:
17+
- random: if True, generates random data
18+
- rows : number of rows to generate
19+
- partitions: number of partitions to use
20+
- numDevices: number of unique device IDs
21+
- numPlants: number of unique plant IDs
22+
- numTags: number of unique tag names
23+
- startTimestamp: earliest timestamp for IOT time series data
24+
- endTimestamp: latest timestamp for IOT time series data
25+
- dataQualityRatios: dictionary with `pctQuestionable`, `pctSubstituted`, and `pctAnnotated`
26+
values corresponding to the percentage of values to be marked `is_questionable`, `is_substituted`,
27+
or `is_annotated`, respectively
28+
29+
As the data specification is a DataGenerator object, you can add further columns to the data set and
30+
add constraints (when the feature is available)
31+
32+
Note that this datset does not use any features that would prevent it from being used as a source for a
33+
streaming dataframe, and so the flag `supportsStreaming` is set to True.
34+
35+
"""
36+
MIN_DEVICE_ID = 0x100000000
37+
MAX_DEVICE_ID = 9223372036854775807
38+
MIN_PROPERTY_VALUE = 50.0
39+
MAX_PROPERTY_VALUE = 60.0
40+
DEFAULT_NUM_DEVICES = 10000
41+
DEFAULT_NUM_PLANTS = 100
42+
DEFAULT_NUM_TAGS = 10
43+
DEFAULT_START_TIMESTAMP = "2024-01-01 00:00:00"
44+
DEFAULT_END_TIMESTAMP = "2024-02-01 00:00:00"
45+
COLUMN_COUNT = 10
46+
ALLOWED_OPTIONS = [
47+
"numDevices",
48+
"numPlants",
49+
"numTags",
50+
"startTimestamp",
51+
"endTimestamp",
52+
"dataQualityRatios",
53+
"random"
54+
]
55+
56+
@DatasetProvider.allowed_options(options=ALLOWED_OPTIONS)
57+
def getTableGenerator(self, sparkSession, *, tableName=None, rows=-1, partitions=-1, **options):
58+
import dbldatagen as dg # import locally to avoid circular imports
59+
import numpy as np
60+
61+
generateRandom = options.get("random", False)
62+
numDevices = options.get("numDevices", self.DEFAULT_NUM_DEVICES)
63+
numPlants = options.get("numPlants", self.DEFAULT_NUM_PLANTS)
64+
numTags = options.get("numTags", self.DEFAULT_NUM_TAGS)
65+
startTimestamp = options.get("startTimestamp", self.DEFAULT_START_TIMESTAMP)
66+
endTimestamp = options.get("endTimestamp", self.DEFAULT_END_TIMESTAMP)
67+
dataQualityRatios = options.get("dataQualityRatios", None)
68+
69+
assert tableName is None or tableName == DatasetProvider.DEFAULT_TABLE_NAME, "Invalid table name"
70+
if rows is None or rows < 0:
71+
rows = DatasetProvider.DEFAULT_ROWS
72+
if partitions is None or partitions < 0:
73+
partitions = self.autoComputePartitions(rows, self.COLUMN_COUNT)
74+
75+
tag_names = [f"HEX-{str(j).zfill(int(np.ceil(np.log10(numTags))))}_INLET_TMP" for j in range(numTags)]
76+
plant_ids = [f"PLANT-{str(j).zfill(int(np.ceil(np.log10(numPlants))))}" for j in range(numPlants)]
77+
testDataSpec = (
78+
dg.DataGenerator(sparkSession, name="process_historian_data", rows=rows,
79+
partitions=partitions,
80+
randomSeedMethod="hash_fieldname")
81+
.withColumn("internal_device_id", "long", minValue=self.MIN_DEVICE_ID, maxValue=self.MAX_DEVICE_ID,
82+
uniqueValues=numDevices, omit=True, baseColumnType="hash")
83+
.withColumn("device_id", "string", format="0x%09x", baseColumn="internal_device_id")
84+
.withColumn("plant_id", "string", values=plant_ids, baseColumn="internal_device_id")
85+
.withColumn("tag_name", "string", values=tag_names, baseColumn="internal_device_id")
86+
.withColumn("ts", "timestamp", begin=startTimestamp, end=endTimestamp,
87+
interval="1 second", random=generateRandom)
88+
.withColumn("value", "float", minValue=self.MIN_PROPERTY_VALUE, maxValue=self.MAX_PROPERTY_VALUE,
89+
step=1e-3, random=generateRandom)
90+
.withColumn("engineering_units", "string", expr="'Deg.F'")
91+
)
92+
# Add the data quality columns if they were provided
93+
if dataQualityRatios is not None:
94+
if "pctQuestionable" in dataQualityRatios:
95+
testDataSpec = testDataSpec.withColumn(
96+
"is_questionable", "boolean",
97+
expr=f"rand() < {dataQualityRatios['pctQuestionable']}"
98+
)
99+
if "pctSubstituted" in dataQualityRatios:
100+
testDataSpec = testDataSpec.withColumn(
101+
"is_substituted", "boolean",
102+
expr=f"rand() < {dataQualityRatios['pctSubstituted']}"
103+
)
104+
if "pctAnnotated" in dataQualityRatios:
105+
testDataSpec = testDataSpec.withColumn(
106+
"is_annotated", "boolean",
107+
expr=f"rand() < {dataQualityRatios['pctAnnotated']}"
108+
)
109+
110+
return testDataSpec
+131
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
from .dataset_provider import DatasetProvider, dataset_definition
2+
3+
4+
@dataset_definition(name="basic/telematics",
5+
summary="Telematics dataset for GPS tracking",
6+
autoRegister=True,
7+
supportsStreaming=True)
8+
class BasicTelematicsProvider(DatasetProvider.NoAssociatedDatasetsMixin, DatasetProvider):
9+
"""
10+
Basic Telematics Dataset
11+
========================
12+
13+
This is a basic telematics dataset with time-series `lat`, `lon`, and `heading` values.
14+
15+
It takes the following options when retrieving the table:
16+
- random: if True, generates random data
17+
- rows : number of rows to generate
18+
- partitions: number of partitions to use
19+
- numDevices: number of unique device IDs
20+
- startTimestamp: earliest timestamp for IOT time series data
21+
- endTimestamp: latest timestamp for IOT time series data
22+
- minLat: minimum latitude
23+
- maxLat: maximum latitude
24+
- minLon: minimum longitude
25+
- maxLon: maximum longitude
26+
- generateWKT: if `True`, generates the well-known text representation of the location
27+
28+
As the data specification is a DataGenerator object, you can add further columns to the data set and
29+
add constraints (when the feature is available)
30+
31+
Note that this datset does not use any features that would prevent it from being used as a source for a
32+
streaming dataframe, and so the flag `supportsStreaming` is set to True.
33+
34+
"""
35+
MIN_DEVICE_ID = 1000000
36+
MAX_DEVICE_ID = 9223372036854775807
37+
DEFAULT_NUM_DEVICES = 1000
38+
DEFAULT_START_TIMESTAMP = "2024-01-01 00:00:00"
39+
DEFAULT_END_TIMESTAMP = "2024-02-01 00:00:00"
40+
DEFAULT_MIN_LAT = -90.0
41+
DEFAULT_MAX_LAT = 90.0
42+
DEFAULT_MIN_LON = -180.0
43+
DEFAULT_MAX_LON = 180.0
44+
COLUMN_COUNT = 6
45+
ALLOWED_OPTIONS = [
46+
"numDevices",
47+
"startTimestamp",
48+
"endTimestamp",
49+
"minLat",
50+
"maxLat",
51+
"minLon",
52+
"maxLon",
53+
"generateWkt",
54+
"random"
55+
]
56+
57+
@DatasetProvider.allowed_options(options=ALLOWED_OPTIONS)
58+
def getTableGenerator(self, sparkSession, *, tableName=None, rows=-1, partitions=-1,
59+
**options):
60+
import dbldatagen as dg
61+
import warnings as w
62+
63+
generateRandom = options.get("random", False)
64+
numDevices = options.get("numDevices", self.DEFAULT_NUM_DEVICES)
65+
startTimestamp = options.get("startTimestamp", self.DEFAULT_START_TIMESTAMP)
66+
endTimestamp = options.get("endTimestamp", self.DEFAULT_END_TIMESTAMP)
67+
minLat = options.get("minLat", self.DEFAULT_MIN_LAT)
68+
maxLat = options.get("maxLat", self.DEFAULT_MAX_LAT)
69+
minLon = options.get("minLon", self.DEFAULT_MIN_LON)
70+
maxLon = options.get("maxLon", self.DEFAULT_MAX_LON)
71+
generateWkt = options.get("generateWkt", False)
72+
73+
assert tableName is None or tableName == DatasetProvider.DEFAULT_TABLE_NAME, "Invalid table name"
74+
if rows is None or rows < 0:
75+
rows = DatasetProvider.DEFAULT_ROWS
76+
if partitions is None or partitions < 0:
77+
partitions = self.autoComputePartitions(rows, self.COLUMN_COUNT)
78+
if minLat < -90.0:
79+
minLat = -90.0
80+
w.warn("Received an invalid minLat value; Setting to -90.0")
81+
if minLat > 90.0:
82+
minLat = 89.0
83+
w.warn("Recieved an invalid minLat value; Setting to 89.0")
84+
if maxLat < -90:
85+
maxLat = -89.0
86+
w.warn("Recieved an invalid maxLat value; Setting to -89.0")
87+
if maxLat > 90.0:
88+
maxLat = 90.0
89+
w.warn("Received an invalid maxLat value; Setting to 90.0")
90+
if minLon < -180.0:
91+
minLon = -180.0
92+
w.warn("Received an invalid minLon value; Setting to -180.0")
93+
if minLon > 180.0:
94+
minLon = 179.0
95+
w.warn("Received an invalid minLon value; Setting to 179.0")
96+
if maxLon < -180.0:
97+
maxLon = -179.0
98+
w.warn("Received an invalid maxLon value; Setting to -179.0")
99+
if maxLon > 180.0:
100+
maxLon = 180.0
101+
w.warn("Received an invalid maxLon value; Setting to 180.0")
102+
if minLon > maxLon:
103+
(minLon, maxLon) = (maxLon, minLon)
104+
w.warn("Received minLon > maxLon; Swapping values")
105+
if minLat > maxLat:
106+
(minLat, maxLat) = (maxLat, minLat)
107+
w.warn("Received minLat > maxLat; Swapping values")
108+
df_spec = (
109+
dg.DataGenerator(sparkSession=sparkSession, rows=rows,
110+
partitions=partitions, randomSeedMethod="hash_fieldname")
111+
.withColumn("device_id", "long", minValue=self.MIN_DEVICE_ID, maxValue=self.MAX_DEVICE_ID,
112+
uniqueValues=numDevices, random=generateRandom)
113+
.withColumn("ts", "timestamp", begin=startTimestamp, end=endTimestamp,
114+
interval="1 second", random=generateRandom)
115+
.withColumn("base_lat", "float", minValue=minLat, maxValue=maxLat, step=0.5,
116+
baseColumn='device_id', omit=True)
117+
.withColumn("base_lon", "float", minValue=minLon, maxValue=maxLon, step=0.5,
118+
baseColumn='device_id', omit=True)
119+
.withColumn("unv_lat", "float", expr="base_lat + (0.5-format_number(rand(), 3))*1e-3", omit=True)
120+
.withColumn("unv_lon", "float", expr="base_lon + (0.5-format_number(rand(), 3))*1e-3", omit=True)
121+
.withColumn("lat", "float", expr=f"""CASE WHEN unv_lat > {maxLat} THEN {maxLat}
122+
ELSE CASE WHEN unv_lat < {minLat} THEN {minLat}
123+
ELSE unv_lat END END""")
124+
.withColumn("lon", "float", expr=f"""CASE WHEN unv_lon > {maxLon} THEN {maxLon}
125+
ELSE CASE WHEN unv_lon < {minLon} THEN {minLon}
126+
ELSE unv_lon END END""")
127+
.withColumn("heading", "integer", minValue=0, maxValue=359, step=1, random=generateRandom)
128+
.withColumn("wkt", "string", expr="concat('POINT(', lon, ' ', lat, ')')", omit=not generateWkt)
129+
)
130+
131+
return df_spec

0 commit comments

Comments
 (0)