forked from tbnorth/sheet_stats
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsheet_stats.py
276 lines (215 loc) · 8.01 KB
/
sheet_stats.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
# coding: utf-8
"""
sheet_stats.py - report column stats for spreadsheets
requires openpyxl and numpy
Terry N. Brown, terrynbrown@gmail.com, Fri Dec 16 13:20:47 2016
2017-01-02 Henry Helgen added dof=1 default
2016-12-26 Henry Helgen added average, variance, standard deviation,
coefficient of variation to output
2016-12-23 Henry Helgen updated to Python 3.5 syntax including print() and
writer = csv.writer(open(opt.output, 'w', newline=''))
"""
import csv
import argparse
import glob
import multiprocessing
import os
import sys
import time
from collections import namedtuple
from math import sqrt, isnan
NAN = float('NAN')
from openpyxl import load_workbook
PYTHON_2 = sys.version_info[0] < 3
if not PYTHON_2:
unicode = str
FIELDS = [ # fields in outout table
'file', 'field', 'n', 'blank', 'bad', 'min', 'max', 'mean', 'std',
'sum', 'sumsq', 'variance', 'coefvar'
]
class AttrDict(dict):
"""allow d.attr instead of d['attr']
http://stackoverflow.com/a/14620633
"""
def __init__(self, *args, **kwargs):
super(AttrDict, self).__init__(*args, **kwargs)
self.__dict__ = self
def make_parser():
"""build an argparse.ArgumentParser, don't call this directly,
call get_options() instead.
"""
parser = argparse.ArgumentParser(
description="""Report column stats for spreadsheets""",
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument('files', type=str, nargs='+',
help="Files to process, '*' patterns expanded."
)
required_named = parser.add_argument_group('required named arguments')
required_named.add_argument("--output",
help="Path to .csv file for output, will be overwritten",
metavar='FILE'
)
return parser
def get_options(args=None):
"""
get_options - use argparse to parse args, and return a
argparse.Namespace, possibly with some changes / expansions /
validatations.
Client code should call this method with args as per sys.argv[1:],
rather than calling make_parser() directly.
:param [str] args: arguments to parse
:return: options with modifications / validations
:rtype: argparse.Namespace
"""
opt = make_parser().parse_args(args)
# modifications / validations go here
if not opt.output:
print("No --output supplied")
exit(10)
return opt
def get_aggregate(psumsqn, psumn, pcountn, pdof=1):
"""
get_aggregate - compute mean, variance, standard deviation,
coefficient of variation This function is used instead of
numpy.mean, numpy.var, numpy.std since the sum, sumsq, and count are
available when the function is called. It avoids an extra pass
through the list.
# note pcountn means the full list n, not a sample n - 1. The degree of freedom defaults to n-1
to match the value that Oracle variance uses. Notice the use of n and n-1
Naive algorithm from https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
Var = (SumSq − (Sum × Sum) / n) / (n − 1)
:param sum of squares, sum, count, degree of freedom defaults to n-1 for sample, not n
:return: a tuple of floats mean, variance, standard deviation, coefficient of variation
"""
Agg = namedtuple("Agg", "mean variance std coefvar")
# validate inputs check for count == 0
if (pcountn - pdof) <= 0:
result = Agg(NAN, NAN, NAN, NAN)
else:
mean = psumn / (pcountn) # mean uses n not n-1
# compute variance from sum squared without knowing mean while summing
variance = (psumsqn - (psumn * psumn) / (pcountn) ) / (pcountn - pdof) # variance
#compute standard deviation
if variance < 0:
std = NAN
else:
std = sqrt(variance)
# compute coefficient of variation
if mean == 0:
coefvar = NAN
else:
coefvar = std / mean
result = Agg(mean, variance, std, coefvar)
return result
def proc_file(filepath):
"""
proc_file - process one .xlsx file
:param str filepath: path to file
:return: list of lists, rows of info. as expected in main()
"""
print(filepath)
# get the first sheet
book = load_workbook(filename=filepath, read_only=True)
sheets = book.get_sheet_names()
sheet = book[sheets[0]]
row_source = sheet.rows
row0 = next(row_source)
# get field names from the first row
fields = [i.value for i in row0]
data = {
'filepath': filepath,
'fields': {field:AttrDict({f:0 for f in FIELDS}) for field in fields}
}
for field in fields:
# init. mins/maxs with invalid value for later calc.
data['fields'][field].update(dict(
min=NAN,
max=NAN,
field=field,
file=filepath,
))
rows = 0
for row in row_source:
if rows % 1000 == 0: # feedback every 1000 rows
print(rows)
# Much cleaner to exit by creating a file called "STOP" in the
# local directory than to try and use Ctrl-C, when using
# multiprocessing. Save time by checking only every 1000 rows.
if os.path.exists("STOP"):
print("Process aborting because of './STOP' file.")
return
rows += 1
for cell_n, cell in enumerate(row):
d = data['fields'][fields[cell_n]]
if cell.value is None or unicode(cell.value).strip() == '':
d.blank += 1
else:
try:
x = float(cell.value)
d.sum += x
d.sumsq += x*x
d.n += 1
# min is x if no value seen yet, else min(prev-min, x)
if isnan(d.min):
d.min = x
else:
d.min = min(d.min, x)
# as for min
if isnan(d.max):
d.max = x
else:
d.max = max(d.max, x)
except ValueError:
d.bad += 1
assert sum(d.n+d.blank+d.bad for d in data['fields'].values()) == rows * len(fields)
# compute the derived values
for field in data['fields']:
d = data['fields'][field]
d.update(get_aggregate(d.sumsq, d.sum, d.n)._asdict().items())
return data
def get_answers(opt=None, **kwargs):
"""get_answers - process files
:param argparse.Namespace opt: options
:return: list of answers from proc_file
"""
if opt is None: # API call rather than command line
opt = type("opt", (), kwargs)
# pass filenames through glob() to expand "2017_*.xlsx" etc.
files = []
for filepath in opt.files:
files.extend(glob.glob(filepath))
# create a pool of processors
pool = multiprocessing.Pool(multiprocessing.cpu_count()-1)
# process file list with processor pool
return pool.map(proc_file, files)
def get_table_rows(answers):
"""get_table_rows - generator - convert get_answers() output to table format
:param list answers: output from get_answers()
:return: list of rows suitable for csv.writer
"""
yield FIELDS
for answer in answers:
for field in answer['fields']:
row = [answer['fields'][field][k] for k in FIELDS]
if PYTHON_2:
yield [unicode(col).encode('utf-8') for col in row]
else:
yield row
def main():
"""main() - when invoked directly"""
opt = get_options()
# csv.writer does its own EOL handling,
# see https://docs.python.org/3/library/csv.html#csv.reader
if PYTHON_2:
output = open(opt.output, 'wb')
else:
output = open(opt.output, 'w', newline='')
start = time.time()
with output as out:
writer = csv.writer(out)
for row in get_table_rows(get_answers(opt)):
writer.writerow(row)
print("%d seconds" % (time.time()-start))
if __name__ == '__main__':
main()