forked from josiahjohnston/eia_scrape
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathutils.py
143 lines (128 loc) · 5.04 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# Copyright 2017. All rights reserved. See AUTHORS.txt
# Licensed under the Apache License, Version 2.0 which is in LICENSE.txt
"""
Utilities to scrape data from URLs and databases.
File download function adapted from
http://stackoverflow.com/questions/16694907/how-to-download-large-file-in-python-with-requests-py
"""
import datetime
import getpass
import hashlib
import os, sys
import psycopg2
import requests
import zipfile
import pandas as pd
download_metadata_fields = ('filename', 'url', 'download_timestamp_utc', 'sha1')
# A standard size for chunking data for disk writes: 64kb = 2^16 = 65536
BLOCKSIZE = 65536
_NOT_ENTERED = object()
saved_user = _NOT_ENTERED
saved_password = _NOT_ENTERED
def download_file(url, local_path):
"""
Robustly download the contents of a url to a local file.
Return metadata suitable for a log file:
(local_path, url, timestamp, sha1_hash)
See also: download_metadata_fields
"""
r = requests.get(url, stream=True)
hasher = hashlib.sha1()
with open(local_path, 'wb') as f:
for chunk in r.iter_content(chunk_size=BLOCKSIZE):
if chunk: # filter out keep-alive new chunks
f.write(chunk)
hasher.update(chunk)
timestamp = datetime.datetime.utcnow()
return (local_path, url, timestamp, hasher.hexdigest())
def unzip(file_list):
for path in file_list:
unzip_name = os.path.splitext(path)[0]
# Skip unzipping if the directory exists
if not os.path.isdir(unzip_name):
print "Unzipping " + path
zip_ref = zipfile.ZipFile(path, 'r')
zip_ref.extractall(unzip_name)
zip_ref.close()
else:
print "Skipping "+unzip_name+" because it was already unzipped."
def connect_to_db_and_push_df(df, col_formats, table, database='postgres', host='switch-db2.erg.berkeley.edu', port=5432, user=None, password=None, quiet=False):
global saved_user, saved_password
if user is None:
user = saved_user
if password is None:
password = saved_password
if user is _NOT_ENTERED:
saved_user = user = getpass.getpass('Enter username for database {}:'.format(database))
if password is _NOT_ENTERED:
saved_password = password = getpass.getpass('Enter database password for user {}:'.format(user))
try:
con = psycopg2.connect(database=database, user=user, host=host,
port=port, password=password)
if not quiet:
print "Connection to database established..."
except Exception, e:
print "Error connecting to database {} at host {}:{}.".format(database,host,port)
raise e
cur = con.cursor()
try:
args_str = ','.join(cur.mogrify(col_formats, x[1]) for x in df.iterrows())
query = "INSERT INTO "+table+" VALUES " + args_str+";"
cur.execute(query)
if not quiet:
print "Successfully pushed values"
except Exception, e:
print 'Query execution failed with error: {}'.format(e)
raise e
finally:
con.commit()
cur.close()
con.close()
if not quiet:
print 'Database connection closed.'
return
def connect_to_db_and_run_query(query, database='postgres', host='switch-db2.erg.berkeley.edu', port=5432, user=None, password=None, quiet=False):
global saved_user, saved_password
if user is None:
user = saved_user
if password is None:
password = saved_password
if user is _NOT_ENTERED:
saved_user = user = getpass.getpass('Enter username for database {}:'.format(database))
if password is _NOT_ENTERED:
saved_password = password = getpass.getpass('Enter database password for user {}:'.format(user))
try:
con = psycopg2.connect(database=database, user=user, host=host,
port=port, password=password)
if not quiet:
print "Connection to database established..."
except Exception, e:
print "Error connecting to database {} at host {}:{}.".format(database,host,port)
raise e
cur = con.cursor()
try:
cur.execute(query)
# fetchall() returns a list of tuples with the rows resulting from the query
# column names must be gotten from the cursor's description
if cur.description != None:
df = pd.DataFrame(cur.fetchall(), columns=[col[0] for col in cur.description])
if not quiet:
print 'Successfully executed query: returning results.'
return df
else:
if not quiet:
print 'Successfully executed query with no results.'
except Exception, e:
print 'Query execution failed with error: {}'.format(e)
raise e
finally:
con.commit()
cur.close()
con.close()
if not quiet:
print 'Database connection closed.'
return
def append_historic_output_to_csv(fpath, df):
write_header = not os.path.isfile(fpath)
with open(fpath, 'ab') as outfile:
df.to_csv(outfile, sep='\t', header=write_header, encoding='utf-8', index=False)