-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathemr_lambda.py
118 lines (96 loc) · 3.46 KB
/
emr_lambda.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
"""Lambda to launch EMR and start map reduce for an input"""
import boto3
import logging
DEBUG = False
logger = logging.getLogger()
logger.setLevel(logging.DEBUG if DEBUG else logging.INFO)
BUCKET = 'test-bucket'
LOGS_PATH = 's3://{}/logs/emr_lambda/'.format(BUCKET)
INITIALIZATION_SCRIPT_PATH = 's3://{}/initialize.sh'.format(BUCKET)
MAPPER_FILE = 'mapper.py'
MAPPER_PATH = 's3://{}/code/{}'.format(BUCKET, MAPPER_FILE)
REDUCER_FILE = 'reducer.py'
REDUCER_PATH = 's3://{}/code/{}'.format(BUCKET, REDUCER_FILE)
INPUT_FILE = 'input.csv'
INPUT_PATH = 'data/{}'.format(INPUT_FILE)
OUTPUT_PATH = 'output'
KEY_PAIR = 'key-pair' # An existing key pair name without extension
REGION_NAME = 'us-east-1'
def get_emr_client():
"""
Method to create boto3 EMR client object
:return: boto3 EMR client object
"""
try:
return boto3.client('emr', region_name=REGION_NAME)
except Exception as e:
logger.info(str(e))
exit(0)
def lambda_handler(event, context):
"""
:param event:
:param context:
:return:
"""
try:
# Method to launch EMR cluster and run jobs
get_emr_client().run_job_flow(
Name='EMR Name',
LogUri=LOGS_PATH,
ReleaseLabel='emr-5.15.0', # EMR version
# Configuration for EMR cluster
Instances={
'InstanceGroups': [
{'Name': 'master',
'InstanceRole': 'MASTER',
'InstanceType': 'm3.xlarge',
'InstanceCount': 1,
},
{'Name': 'core',
'InstanceRole': 'CORE',
'InstanceType': 'm3.xlarge',
'InstanceCount': 2,
},
],
'Ec2KeyName': KEY_PAIR #This allows us to ssh with the keypair
# Other available configurations for the instances
# 'KeepJobFlowAliveWhenNoSteps': True,
# 'EmrManagedSlaveSecurityGroup': 'sg-1234',
# 'EmrManagedMasterSecurityGroup': 'sg-1234',
# 'Ec2SubnetId': 'subnet-1q234',
},
# To install requirements in all nodes of EMR while it is setting up
BootstrapActions=[
{
'Name': 'Install packages',
'ScriptBootstrapAction': {
'Path': INITIALIZATION_SCRIPT_PATH
}
}
],
# Hadoop streaming command to start the map reduce job
Steps=[
{'Name': 'Name of the Step',
'ActionOnFailure': 'TERMINATE_CLUSTER',
'HadoopJarStep': {
'Jar': 'command-runner.jar',
'Args': [
'hadoop-streaming',
'-files',
'{},{},{}'.format(MAPPER_PATH, REDUCER_PATH,
INPUT_PATH),
'-mapper', MAPPER_FILE,
'-input', INPUT_PATH,
'-output', OUTPUT_PATH,
'-reducer', REDUCER_FILE
]}
}
],
VisibleToAllUsers=True,
JobFlowRole='EMR_EC2_DefaultRole',
ServiceRole='EMR_DefaultRole',
)
return 1
except Exception as e:
logger.error(str(e))
return 0, str(e)