-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathetl_pipe.py
234 lines (184 loc) · 10 KB
/
etl_pipe.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
``` import os
import pandas as pd
from pandas.io import sql
from sqlalchemy import create_engine
from sqlalchemy.pool import Pool
import mysql.connector
import json
## Changing the DIR
os.chdir("C:\\PYTHON")
try:
################################################################
# LOADING THE DATA
################################################################
print('###################################### \n')
print("STEP 1: EXTRACT DATA")
# Load the log file equipment_failure_sensors.log
df_failure_sensors = pd.read_csv("equipment_failure_sensors.log",
sep="\t",
engine='python',
names = ["log_date","ERROR", "sensor_id","to_drop_column","temperature","vibration"],
usecols=[0,1,2,3,4,5],
header=None,
index_col=False)
# Load the log file equipment_sensors.csv
df_equipment_sensors = pd.read_csv("equipment_sensors.csv",
sep=";",
usecols=['equipment_id', 'sensor_id'],
encoding='utf8')
# Load the log file equipment.json
arq=open('equipment.json').read()
json_object = json.loads(arq)
################################################################
# CLEANING THE DATA
################################################################
print('###################################### \n')
print("STEP 2: DATA WRANGLING \n")
# Remove special characters
df_failure_sensors = df_failure_sensors.replace(to_replace=r'[^0-9+-]',value='',regex=True)
# Remove unwanted columns
df_failure_sensors.drop(columns=['ERROR', 'to_drop_column'], inplace=True, axis=1)
# A simple adjust at LOG_DATE column
df_failure_sensors['log_date'] = df_failure_sensors['log_date'].str.replace(r'\D+','')
df_failure_sensors['log_date'] = pd.to_datetime(df_failure_sensors['log_date'], errors='coerce')
def check_missing_zero_values(df):
zero_val = (df == 0.00).astype(int).sum(axis=0)
mis_val = df.isnull().sum()
mis_val_percent = 100 * df.isnull().sum() / len(df)
mz_table = pd.concat([zero_val, mis_val, mis_val_percent], axis=1)
mz_table = mz_table.rename(
columns = {0 : 'Zero Values', 1 : 'Missing Values', 2 : '% of Total Values'})
mz_table['Total Zero Missing Values'] = mz_table['Zero Values'] + mz_table['Missing Values']
mz_table['% Total Zero Missing Values'] = 100 * mz_table['Total Zero Missing Values'] / len(df)
mz_table['Data Type'] = df.dtypes
mz_table = mz_table[mz_table.iloc[:,1] != 0].sort_values('% of Total Values', ascending=False).round(1)
print ("The dataframe has " + str(df.shape[1]) + " columns and " + str(df.shape[0]) + " Rows.\n"
"There are " + str(mz_table.shape[0]) +" columns that have missing values.")
return mz_table
print("## CHECK FOR MISSING OR ZERO VALUES\n")
check_missing_zero_values(df_failure_sensors)
print()
check_missing_zero_values(df_equipment_sensors)
print()
# Remove empy blank space
df_failure_sensors.columns = [x.strip().replace(' ', "") for x in df_failure_sensors.columns]
df_equipment_sensors.columns = [x.strip().replace(' ', "") for x in df_equipment_sensors.columns]
################################################################
# CREATING TABLES
################################################################
# Create a database connection -- DATABASE TARGET
db_connection = mysql.connector.connect(
host="localhost",
user="root",
password="password",
database="db_urano", # schema used to be load
auth_plugin='mysql_native_password'
)
# Create a MySQL cursor to process the steps
db_cursor = db_connection.cursor()
print('###################################### \n')
print("\nSTEP 3: CREATING TABLES ")
# Create tables
query=["CREATE TABLE equipment_failure_sensors (log_date VARCHAR(200), sensor_id VARCHAR(100), temperature VARCHAR(100), vibration VARCHAR(100))",
"CREATE TABLE equipment (equipment_id VARCHAR(100), code VARCHAR(100), group_name VARCHAR(100))",
"CREATE TABLE equipment_sensors (equipment_id VARCHAR(100), sensor_id VARCHAR(100))"]
for i in query:
db_cursor.execute(i)
db_connection.commit()
################################################################
# INSERTING RECORDS
################################################################
print('###################################### \n')
print("STEP 4: INSERTING RECORDS INTO THE TABLES")
try:
# Insert records from LOG file.
cols = "`,`".join([str(i) for i in df_failure_sensors.columns.tolist()])
for i,row in df_failure_sensors.iterrows():
sql = "INSERT INTO `equipment_failure_sensors` (`" +cols + "`) VALUES (" + "%s,"*(len(row)-1) + "%s)"
db_cursor.execute(sql, tuple(row))
# Insert records from JSON file.
for item in json_object:
equipment_id = item["equipment_id"]
code = item["code"]
group_name = item["group_name"]
db_cursor.execute("INSERT INTO `equipment` (`equipment_id`, `code`,`group_name`) VALUES (%s, %s, %s)",(equipment_id, code,group_name))
# Insert records from CSV file.
engine = create_engine('mysql+pymysql://root:password@localhost:3306/db_urano', echo_pool=True, pool_size=10, max_overflow=20 )
with engine.connect() as conn, conn.begin():
df_equipment_sensors.to_sql('equipment_sensors', conn, index=False, if_exists='append')
print("ALL RECORDS SUCCESSFULLY LOADED!\n")
################################################################
# RETRIEVING THE RECORDS
################################################################
# Total equipment failures ?
total = """SELECT SUM(TotalEquipmentFailures) AS TOTAL_OF_FAILURES
FROM(
SELECT COUNT(*) AS TotalEquipmentFailures
FROM equipment_failure_sensors as ef
LEFT JOIN equipment_sensors as es
ON ef.sensor_id = es.sensor_id
LEFT JOIN equipment as eq
ON eq.equipment_id=es.equipment_id
WHERE EXTRACT(YEAR_MONTH FROM log_date) = '202001'
GROUP BY eq.equipment_id
) as Failures;"""
db_cursor.execute(total)
records = db_cursor.fetchall()
print("All the answers are related to January 2020\n")
print("\nTotal equipment failures ?\n")
for row in records:
print("TOTAL OF FAILURES = ", row[0])
# Which equipment has the most failures?
code = """ SELECT max(CODE) as fault_equipment
FROM(
SELECT eq.CODE, COUNT(*) AS TotalEquipmentFailures
FROM equipment_failure_sensors as ef
LEFT JOIN equipment_sensors as es
ON ef.sensor_id = es.sensor_id
INNER JOIN equipment as eq
ON eq.equipment_id=es.equipment_id
WHERE EXTRACT(YEAR_MONTH FROM log_date) = '202001'
GROUP BY eq.CODE, eq.equipment_id
) as Failures
ORDER BY TotalEquipmentFailures DESC
LIMIT 1;"""
db_cursor.execute(code)
records_failures = db_cursor.fetchall()
print("\nWhich equipment has the most failures?\n")
for row in records_failures:
print("EQUIPMENT CODE = ", row[0])
# Average amount of failures across equipment group, ordering by the amount of failures in ascending order?
average = """
SELECT group_name as EQUIPMENT_GROUP,
round(AVG(TotalEquipmentFailures),2) as AVG_AMOUNT_FAILURES
FROM(
SELECT eq.group_name, ef.sensor_id, COUNT(*) AS TotalEquipmentFailures
FROM equipment_failure_sensors as ef
LEFT JOIN equipment_sensors as es
ON ef.sensor_id = es.sensor_id
INNER JOIN equipment as eq
ON eq.equipment_id=es.equipment_id
WHERE EXTRACT(YEAR_MONTH FROM log_date) = '202001'
GROUP BY es.equipment_id
) as Failures
group by group_name
order by count(sensor_id) ASC; """
db_cursor.execute(average)
average_failures = db_cursor.fetchall()
print("\nAverage amount of failures across equipment group, ordering by the amount of failures in ascending order?\n")
for row in average_failures:
print("EQUIPMENT GROUP = ", row[0],
"AVERAGE OF FAILURES = ", row[1])
except Exception as error:
print("Exception occurred: {}",format(error))
finally:
db_connection.commit()
except Exception as e:
print("Exception occurred: {}",format(e))
pass
finally:
db_connection.close()
if __name__ == "__main__":
print("\nPipeline ETL processed sucessfully\n")
else:
print("Error ocurred during the execution")