7
7
8
8
This code is experimental and both APIs and code generated is liable to change in future versions.
9
9
"""
10
- from pyspark .sql .types import LongType , FloatType , IntegerType , StringType , DoubleType , BooleanType , ShortType , \
11
- TimestampType , DateType , DecimalType , ByteType , BinaryType , StructType , ArrayType , DataType
10
+ import logging
12
11
13
12
import pyspark .sql as ssql
14
- import pyspark .sql .functions as F
13
+ from pyspark .sql .types import LongType , FloatType , IntegerType , StringType , DoubleType , BooleanType , ShortType , \
14
+ TimestampType , DateType , DecimalType , ByteType , BinaryType , StructType , ArrayType , DataType
15
15
16
- from .utils import strip_margins
17
16
from .spark_singleton import SparkSingleton
17
+ from .utils import strip_margins
18
+
19
+ SUMMARY_FIELD_NAME = "summary"
20
+ SUMMARY_FIELD_NAME_RENAMED = "__summary__"
21
+ DATA_SUMMARY_FIELD_NAME = "__data_summary__"
18
22
19
23
20
24
class DataAnalyzer :
@@ -23,6 +27,8 @@ class DataAnalyzer:
23
27
24
28
:param df: Spark dataframe to analyze
25
29
:param sparkSession: Spark session instance to use when performing spark operations
30
+ :param debug: If True, additional debug information is logged
31
+ :param verbose: If True, additional information is logged
26
32
27
33
.. warning::
28
34
Experimental
@@ -43,11 +49,17 @@ class DataAnalyzer:
43
49
|# Column definitions are stubs only - modify to generate correct data
44
50
|#""" , '|' )
45
51
46
- def __init__ (self , df = None , sparkSession = None ):
52
+ def __init__ (self , df = None , sparkSession = None , debug = False , verbose = False ):
47
53
""" Constructor:
48
54
:param df: Dataframe to analyze
49
55
:param sparkSession: Spark session to use
50
56
"""
57
+ # set up logging
58
+ self .verbose = verbose
59
+ self .debug = debug
60
+
61
+ self ._setupLogger ()
62
+
51
63
assert df is not None , "dataframe must be supplied"
52
64
53
65
self ._df = df
@@ -58,6 +70,19 @@ def __init__(self, df=None, sparkSession=None):
58
70
self ._sparkSession = sparkSession
59
71
self ._dataSummary = None
60
72
73
+ def _setupLogger (self ):
74
+ """Set up logging
75
+
76
+ This will set the logger at warning, info or debug levels depending on the instance construction parameters
77
+ """
78
+ self .logger = logging .getLogger ("DataAnalyzer" )
79
+ if self .debug :
80
+ self .logger .setLevel (logging .DEBUG )
81
+ elif self .verbose :
82
+ self .logger .setLevel (logging .INFO )
83
+ else :
84
+ self .logger .setLevel (logging .WARNING )
85
+
61
86
def _displayRow (self , row ):
62
87
"""Display details for row"""
63
88
results = []
@@ -95,6 +120,31 @@ def _addMeasureToSummary(self, measureName, summaryExpr="''", fieldExprs=None, d
95
120
96
121
return dfResult
97
122
123
+ def _get_dataframe_describe_stats (self , df ):
124
+ """ Get summary statistics for dataframe handling renaming of summary field if necessary"""
125
+ print ("schema" , df .schema )
126
+
127
+ src_fields = [fld .name for fld in df .schema .fields ]
128
+ print ("src_fields" , src_fields )
129
+ renamed_summary = False
130
+
131
+ # get summary statistics handling the case where a field named 'summary' exists
132
+ # if the `summary` field name exists, we'll rename it to avoid a conflict
133
+ if SUMMARY_FIELD_NAME in src_fields :
134
+ renamed_summary = True
135
+ df = df .withColumnRenamed (SUMMARY_FIELD_NAME , SUMMARY_FIELD_NAME_RENAMED )
136
+
137
+ # The dataframe describe method produces a field named `summary`. We'll rename this to avoid conflict with
138
+ # any natural fields using the same name.
139
+ summary_df = df .describe ().withColumnRenamed (SUMMARY_FIELD_NAME , DATA_SUMMARY_FIELD_NAME )
140
+
141
+ # if we renamed a field called `summary` in the data, we'll rename it back.
142
+ # The data summary field produced by the describe method has already been renamed so there will be no conflict.
143
+ if renamed_summary :
144
+ summary_df = summary_df .withColumnRenamed (SUMMARY_FIELD_NAME_RENAMED , SUMMARY_FIELD_NAME )
145
+
146
+ return summary_df
147
+
98
148
def summarizeToDF (self ):
99
149
""" Generate summary analysis of data set as dataframe
100
150
@@ -154,11 +204,12 @@ def summarizeToDF(self):
154
204
dfData = self ._df ,
155
205
dfSummary = dfDataSummary )
156
206
157
- descriptionDf = self ._df .describe ().where ("summary in ('mean', 'stddev')" )
207
+ descriptionDf = (self ._get_dataframe_describe_stats (self ._df )
208
+ .where (f"{ DATA_SUMMARY_FIELD_NAME } in ('mean', 'stddev')" ))
158
209
describeData = descriptionDf .collect ()
159
210
160
211
for row in describeData :
161
- measure = row ['summary' ]
212
+ measure = row [DATA_SUMMARY_FIELD_NAME ]
162
213
163
214
values = {k [0 ]: '' for k in dtypes }
164
215
@@ -401,7 +452,12 @@ def scriptDataGeneratorFromData(self, suppressOutput=False, name=None):
401
452
402
453
"""
403
454
assert self ._df is not None
404
- assert type (self ._df ) is ssql .DataFrame , "sourceDf must be a valid Pyspark dataframe"
455
+
456
+ if not isinstance (self ._df , ssql .DataFrame ):
457
+ self .logger .warning (strip_margins (
458
+ """The parameter `sourceDf` should be a valid Pyspark dataframe.
459
+ |Note this warning may false due to use of remote connection to a Spark cluster""" ,
460
+ '|' ))
405
461
406
462
if self ._dataSummary is None :
407
463
df_summary = self .summarizeToDF ()
0 commit comments