-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathprovision_index.py
209 lines (162 loc) · 7.4 KB
/
provision_index.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
#!/usr/bin/python
# coding: utf-8
# ---------------------------------------------------------------------------------------------------------------------
#
# Florida International University
#
# This software is a "Camilo Valdes Work" under the terms of the United States Copyright Act.
# Please cite the author(s) in any work or product based on this material.
#
# OBJECTIVE:
# The purpose of this program is to copy the Bowtie2 index to all the worker nodes in the cluster. The goal
# is to copy the index in parallel, so that we don't copy the index sequentially and wait a lot of time.
#
#
# NOTES:
# Please see the dependencies section below for the required libraries (if any).
#
# DEPENDENCIES:
# • Apache-Spark
# • Python
# • R
#
# You can check the python modules currently installed in your system by running: python -c "help('modules')"
#
# USAGE:
# Run the program with the "--help" flag to see usage instructions.
#
# AUTHOR:
# Camilo Valdes
# cvalde03@fiu.edu
# https://github.com/camilo-v
# Florida International University, FIU
# School of Computing and Information Sciences
# Bioinformatics Research Group, BioRG
# http://biorg.cs.fiu.edu/
#
#
# ---------------------------------------------------------------------------------------------------------------------
# Spark Modules
from pyspark import SparkConf, SparkContext
# Python Modules
import sys
import argparse
import time
import collections
from datetime import timedelta
import subprocess as sp
import shlex
# -------------------------------------------------------- Main -------------------------------------------------------
#
#
def main(args):
"""
Main function of the app.
Args:
args: command line arguments.
Returns:
"""
#
# Command-line Arguments, and miscellaneous declarations.
#
parser = argparse.ArgumentParser()
parser.add_argument("--shards", required=True, type=int, help="Number of Bowtie2 Index Shards.")
parser.add_argument("--bowtie2_index", required=True, type=str, help="S3 Path of bucket.")
parser.add_argument("--verbose", required=False, action="store_true", help="Wordy print statements.")
args = parser.parse_args(args)
index_location_s3 = args.bowtie2_index
index_location_s3.strip()
number_of_index_shards = args.shards
#
# Spark Configuration, and App declaration.
#
APP_NAME = "Index_Provisioning"
conf = (SparkConf().setAppName(APP_NAME))
conf.set("spark.default.parallelism", int(number_of_index_shards))
conf.set("spark.executor.memoryOverhead", "1G")
conf.set("conf spark.locality.wait", "1s")
print("[ " + time.strftime('%d-%b-%Y %H:%M:%S', time.localtime()) + " ] ")
print("[ " + time.strftime('%d-%b-%Y %H:%M:%S', time.localtime()) + " ] Configuring Spark...")
print("[ " + time.strftime('%d-%b-%Y %H:%M:%S', time.localtime()) + " ] ")
# Initialize the Spark context for this run.
sc = SparkContext(conf=conf)
start_time = time.time()
print("[ " + time.strftime('%d-%b-%Y %H:%M:%S', time.localtime()) + " ] ")
print("[ " + time.strftime('%d-%b-%Y %H:%M:%S', time.localtime()) + " ] Starting Index Copy from: ")
print("[ " + time.strftime('%d-%b-%Y %H:%M:%S', time.localtime()) + " ] " + index_location_s3)
print("[ " + time.strftime('%d-%b-%Y %H:%M:%S', time.localtime()) + " ] ")
# Prepare the locations that will be copied.
list_of_index_shards = []
range_end = number_of_index_shards + 1 # Range function does not include the end.
for partition_id in range(1, range_end):
s3_location = index_location_s3 + "/" + str(partition_id)
list_of_index_shards.append(s3_location)
print("[ " + time.strftime('%d-%b-%Y %H:%M:%S', time.localtime()) + " ] Index Shards to Copy:")
for location in list_of_index_shards:
print(location)
print("[ " + time.strftime('%d-%b-%Y %H:%M:%S', time.localtime()) + " ] ")
index_shards = sc.parallelize(list_of_index_shards)
# index_shards = index_shards.repartition(number_of_index_shards)
print("[ " + time.strftime('%d-%b-%Y %H:%M:%S', time.localtime()) + " ] No. RDD Partitions: " +
str(index_shards.getNumPartitions()))
print("[ " + time.strftime('%d-%b-%Y %H:%M:%S', time.localtime()) + " ] ")
print("[ " + time.strftime('%d-%b-%Y %H:%M:%S', time.localtime()) + " ] Starting to copy...")
#
# The acknowledge_RDD will dispatch the "copy_index_to_worker()" function to all the workers via the
# "mapPartitions()" function.
#
acknowledge_RDD = index_shards.mapPartitions(copy_index_to_worker)
#
# Once the "mapPartitions()" has executed (not really because it lazely evaluated), we'll collect the
# responses from all the nodes.
#
acknowledge_list = acknowledge_RDD.collect()
acknowledge_list.sort()
number_of_acknowledgements = len(acknowledge_list)
print("[ " + time.strftime('%d-%b-%Y %H:%M:%S', time.localtime()) + " ] ")
print("[ " + time.strftime('%d-%b-%Y %H:%M:%S', time.localtime()) + " ] No. of Workers that acknowledged: " +
str(number_of_acknowledgements))
print("[ " + time.strftime('%d-%b-%Y %H:%M:%S', time.localtime()) + " ] ")
if args.verbose:
for ack in acknowledge_list:
print(ack)
print("[ " + time.strftime('%d-%b-%Y %H:%M:%S', time.localtime()) + " ] ")
print("[ " + time.strftime('%d-%b-%Y %H:%M:%S', time.localtime()) + " ] Duplicate Worker IPs:")
print [item for item, count in collections.Counter(acknowledge_list).items() if count > 1]
end_time = time.time()
run_time = end_time - start_time
print("[ " + time.strftime('%d-%b-%Y %H:%M:%S', time.localtime()) + " ]")
print("[ " + time.strftime('%d-%b-%Y %H:%M:%S', time.localtime()) + " ] Index Copy Time: " +
"" + str(timedelta(seconds=run_time)) + "")
print("[ " + time.strftime('%d-%b-%Y %H:%M:%S', time.localtime()) + " ]")
print("[ " + time.strftime('%d-%b-%Y %H:%M:%S', time.localtime()) + " ] Done.")
# ----------------------------------------------- Spark Stop --------------------------------------------------
#
# Shut down the cluster once everything completes.
sc.stop()
def copy_index_to_worker(iterator):
"""
Function that runs on all the Worker nodes. It will copy the Index from the specified location into the worker's
local filesystem.
Args:
iterator:
Returns:
"""
return_data = []
worker_node_ip = str(sp.check_output(["hostname"])).strip()
local_index_path = "/mnt/bio_data/index"
for s3_location in iterator:
aws_copy_command = "aws s3 cp --recursive " + s3_location + " " + local_index_path
aws_process = sp.Popen(shlex.split(aws_copy_command), stdout=sp.PIPE, stderr=sp.PIPE)
stdout, stderr = aws_process.communicate()
return_data.append(worker_node_ip + "\n" +
"STDERR: " + stderr + "**********\n" +
"AWS COMMAND:\n" + aws_copy_command + "\n")
return iter(return_data)
# ----------------------------------------------------------- Init ----------------------------------------------------
#
# App Initializer.
#
if __name__ == "__main__":
main(sys.argv[1:])
# -------------------------------------------------------- End of Line ------------------------------------------------