-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.py
145 lines (121 loc) · 6.07 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
import argparse
import logging
from google.oauth2 import service_account
from google.cloud import bigquery
from datetime import datetime
def authenticate_with_bigquery(project_id,service_account_file):
"""
Authenticate with BigQuery client using service account credentials
"""
credentials = service_account.Credentials.from_service_account_file(
service_account_file,
scopes=["https://www.googleapis.com/auth/cloud-platform"],
)
client = bigquery.Client(
credentials=credentials,
project=project_id,
)
return client
def create_util_dataset(client):
"""
Create a utils dataset if it doesn't exist
"""
dataset_id = "utils"
# Prepares a reference to the new dataset
dataset_ref = client.dataset(dataset_id)
dataset = bigquery.Dataset(dataset_ref)
if client.get_dataset(dataset_ref).created is not None:
logging.info('Dataset {} already exists.'.format(dataset.dataset_id))
return
# Creates the new dataset
dataset = client.create_dataset(dataset)
logging.info('Dataset {} created.'.format(dataset.dataset_id))
def create_daily_storage_stats_table(client):
"""
Create daily_storage_stats table if it doesn't exist
"""
table_id = client.project+".utils.daily_storage_stats"
# schema for the table
schema = [
bigquery.SchemaField("processing_time", "datetime", mode="REQUIRED"),
bigquery.SchemaField("project_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("dataset_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("table_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("creation_time", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("last_modified_time", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("row_count", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("size_bytes", "INTEGER", mode="REQUIRED"),
]
table = bigquery.Table(table_id, schema=schema)
# create a table if it doesn't exist
try:
if client.get_table(table_id).created is not None:
logging.info('Table {} already exists.'.format(table.table_id))
return "{}.{}.{}".format(table.project, table.dataset_id, table.table_id)
except:
table = client.create_table(table) # Make an API request.
logging.info("Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id))
return "{}.{}.{}".format(table.project, table.dataset_id, table.table_id)
def create_util_dataset_and_table(client):
"""
Create dataset and a table if it doesn't exist
"""
create_util_dataset(client)
table = create_daily_storage_stats_table(client)
return table
def main(project_id,service_account_file):
"""
1. authenticate with BigQuery client
2. Create dataset and table in BigQuery project if they don't exist
3. Scan through all datasets in a project and store stats in a destination table
"""
# 1. authenticate with BigQuery client
client = authenticate_with_bigquery(project_id,service_account_file)
# 2. Create dataset and table in BigQuery project if they don't exist
destination_table = create_util_dataset_and_table(client)
# 3. Scan through all datasets in a project and store stats in a destination table
datasets = client.list_datasets()
processing_timestamp = datetime.now()
for dataset in datasets:
dataset_id = dataset.full_dataset_id
logging.info("Updating stats for : {}".format(dataset_id))
query = str.format("INSERT `{}` (processing_time, project_id, dataset_id, table_id, "
"creation_time, last_modified_time, row_count, size_bytes) ".format(destination_table)+
"SELECT DATETIME '{}' as processing_time,project_id, dataset_id, table_id, "
"creation_time, last_modified_time, row_count, size_bytes "
"FROM `{}.__TABLES__` where type = 1"
, processing_timestamp,dataset_id)
query_job = client.query(query)
# shouldn't return rows because all rows are getting inserted in a table but this can be used for debugging
for row in query_job:
logging.info("{} | {} | {} | {} | {} | {} | {} | {} | {}".format(row.processing_time
, row.project_id
, row.dataset_id
, row.table_id
, row.creation_time
, row.last_modified_time
, row.row_count
, row.size_bytes
, row.type
)
)
if __name__ == '__main__':
"""
Entry point for the application.
--project_id: Pass project_id for which you are calculating stats
--service_account_file: provide a service account key file location. service account needs following permissions:
BigQuery Data Editor
BigQuery Data Reader
BigQuery Job user
Stats are stored also in the same project in {PROJECT_ID}.utils.daily_storage_stats.
utils = dataset name
daily_storage_stats = table name
"""
parser = argparse.ArgumentParser()
parser.add_argument('--project_id', '--Please provide project_id')
parser.add_argument('--service_account_file','--Please provide a service account key file location')
args = parser.parse_args()
if args.project_id is None or args.service_account_file is None:
print 'Please provide project_id and service_account_file'
exit(1)
main(args.project_id,args.service_account_file)