diff --git a/starcluster/awsutils.py b/starcluster/awsutils.py index d3330cdff..8cea03b97 100644 --- a/starcluster/awsutils.py +++ b/starcluster/awsutils.py @@ -467,7 +467,7 @@ def request_instances(self, image_id, price=None, instance_type='m1.small', availability_zone_group=None, placement=None, user_data=None, placement_group=None, block_device_map=None, subnet_id=None, - network_interfaces=None): + network_interfaces=None, iam_profile=None): """ Convenience method for running spot or flat-rate instances """ @@ -511,7 +511,8 @@ def request_instances(self, image_id, price=None, instance_type='m1.small', placement_group=placement_group, user_data=user_data, block_device_map=block_device_map, - network_interfaces=network_interfaces) + network_interfaces=network_interfaces, + iam_profile=iam_profile) if price: return self.request_spot_instances( price, image_id, @@ -532,7 +533,7 @@ def request_spot_instances(self, price, image_id, instance_type='m1.small', security_group_ids=None, subnet_id=None, placement=None, placement_group=None, user_data=None, block_device_map=None, - network_interfaces=None): + network_interfaces=None, iam_profile=None): kwargs = locals() kwargs.pop('self') return self.conn.request_spot_instances(**kwargs) @@ -605,7 +606,7 @@ def run_instances(self, image_id, instance_type='m1.small', min_count=1, max_count=1, key_name=None, security_groups=None, placement=None, user_data=None, placement_group=None, block_device_map=None, subnet_id=None, - network_interfaces=None): + network_interfaces=None, iam_profile=None): kwargs = dict( instance_type=instance_type, min_count=min_count, @@ -616,7 +617,8 @@ def run_instances(self, image_id, instance_type='m1.small', min_count=1, user_data=user_data, placement_group=placement_group, block_device_map=block_device_map, - network_interfaces=network_interfaces + network_interfaces=network_interfaces, + instance_profile_name=iam_profile ) if subnet_id: kwargs.update( diff --git a/starcluster/cluster.py b/starcluster/cluster.py index 63d2fa7ab..925114b35 100644 --- a/starcluster/cluster.py +++ b/starcluster/cluster.py @@ -51,7 +51,7 @@ def __repr__(self): return "" % self.ec2.region.name def get_cluster(self, cluster_name, group=None, load_receipt=True, - load_plugins=True, load_volumes=True, require_keys=True): + load_plugins=True, load_volumes=True, require_keys=True, load_iam_profile=True): """ Returns a Cluster object representing an active cluster """ @@ -64,7 +64,8 @@ def get_cluster(self, cluster_name, group=None, load_receipt=True, cluster_group=group) if load_receipt: cl.load_receipt(load_plugins=load_plugins, - load_volumes=load_volumes) + load_volumes=load_volumes, + load_iam_profile=load_iam_profile) try: cl.keyname = cl.keyname or cl.master_node.key_name key_location = self.cfg.get_key(cl.keyname).get('key_location') @@ -79,14 +80,15 @@ def get_cluster(self, cluster_name, group=None, load_receipt=True, except exception.SecurityGroupDoesNotExist: raise exception.ClusterDoesNotExist(cluster_name) - def get_clusters(self, load_receipt=True, load_plugins=True): + def get_clusters(self, load_receipt=True, load_plugins=True, load_iam_profile=True): """ Returns a list of all active clusters """ cluster_groups = self.get_cluster_security_groups() clusters = [self.get_cluster(g.name, group=g, load_receipt=load_receipt, - load_plugins=load_plugins) + load_plugins=load_plugins, + load_iam_profile=load_iam_profile) for g in cluster_groups] return clusters @@ -169,24 +171,27 @@ def _get_cluster_name(self, cluster_name): def add_node(self, cluster_name, alias=None, no_create=False, image_id=None, instance_type=None, zone=None, - placement_group=None, spot_bid=None): + placement_group=None, spot_bid=None, + iam_profile=None): cl = self.get_cluster(cluster_name) return cl.add_node(alias=alias, image_id=image_id, instance_type=instance_type, zone=zone, placement_group=placement_group, spot_bid=spot_bid, - no_create=no_create) + no_create=no_create, iam_profile=iam_profile) def add_nodes(self, cluster_name, num_nodes, aliases=None, no_create=False, image_id=None, instance_type=None, zone=None, - placement_group=None, spot_bid=None): + placement_group=None, spot_bid=None, iam_profile=None): """ Add one or more nodes to cluster """ + print __file__,191 + print iam_profile cl = self.get_cluster(cluster_name) return cl.add_nodes(num_nodes, aliases=aliases, image_id=image_id, instance_type=instance_type, zone=zone, placement_group=placement_group, spot_bid=spot_bid, - no_create=no_create) + no_create=no_create, iam_profile=iam_profile) def remove_node(self, cluster_name, alias=None, terminate=True, force=False): @@ -290,7 +295,8 @@ def list_clusters(self, cluster_groups=None, show_ssh_status=False): tag = self.get_tag_from_sg(scg.name) try: cl = self.get_cluster(tag, group=scg, load_plugins=False, - load_volumes=False, require_keys=False) + load_volumes=False, require_keys=False, + load_iam_profile=True) except exception.IncompatibleCluster as e: sep = '*' * 60 log.error('\n'.join([sep, e.msg, sep]), @@ -319,6 +325,8 @@ def list_clusters(self, cluster_groups=None, show_ssh_status=False): print 'Subnet: %s' % getattr(n, 'subnet_id', 'N/A') print 'Zone: %s' % getattr(n, 'placement', 'N/A') print 'Keypair: %s' % getattr(n, 'key_name', 'N/A') + ipn = cl.iam_profile if cl.iam_profile else 'N/A' + print 'IAM instance profile: %s' % ipn ebs_vols = [] for node in nodes: devices = node.attached_vols @@ -412,6 +420,7 @@ def __init__(self, disable_cloudinit=False, subnet_id=None, public_ips=None, + iam_profile=None, **kwargs): # update class vars with given vars _vars = locals().copy() @@ -523,7 +532,8 @@ def load_volumes(self, vols): This method assigns the first volume to /dev/sdz, second to /dev/sdy, etc. for all volumes that do not include a device/partition setting """ - devices = ['/dev/sd%s' % s for s in string.lowercase] + #devices = ['/dev/sd%s' % s for s in string.lowercase] + devices = ['/dev/xvdb%s' % s for s in string.lowercase] devmap = {} for volname in vols: vol = vols.get(volname) @@ -571,7 +581,7 @@ def __str__(self): cfg = self.__getstate__() return pprint.pformat(cfg) - def load_receipt(self, load_plugins=True, load_volumes=True): + def load_receipt(self, load_plugins=True, load_volumes=True, load_iam_profile=True): """ Load the original settings used to launch this cluster into this Cluster object. Settings are loaded from cluster group tags and the @@ -587,7 +597,7 @@ def load_receipt(self, load_plugins=True, load_volumes=True): sep = '*' * 60 log.warn('\n'.join([sep, msg, sep]), extra={'__textwrap__': 1}) self.update(self._get_settings_from_tags()) - if not (load_plugins or load_volumes): + if not (load_plugins or load_volumes or load_iam_profile): return True try: master = self.master_node @@ -603,6 +613,8 @@ def load_receipt(self, load_plugins=True, load_volumes=True): self.plugins = self.load_plugins(master.get_plugins()) if load_volumes: self.volumes = master.get_volumes() + if load_iam_profile: + self.iam_profile = master.get_iam_profile() except exception.PluginError: log.error("An error occurred while loading plugins: ", exc_info=True) @@ -702,7 +714,8 @@ def _add_tags_to_sg(self, sg): disable_cloudinit=self.disable_cloudinit) user_settings = dict(cluster_user=self.cluster_user, cluster_shell=self.cluster_shell, - keyname=self.keyname, spot_bid=self.spot_bid) + keyname=self.keyname, spot_bid=self.spot_bid, + userdata_scripts=self.userdata_scripts) core = utils.dump_compress_encode(core_settings, use_json=True, chunk_size=static.MAX_TAG_LEN) self._add_chunked_tags(sg, core, static.CORE_TAG) @@ -887,11 +900,11 @@ def get_spot_requests_or_raise(self): return spots def create_node(self, alias, image_id=None, instance_type=None, zone=None, - placement_group=None, spot_bid=None, force_flat=False): + placement_group=None, spot_bid=None, force_flat=False, iam_profile=None): return self.create_nodes([alias], image_id=image_id, instance_type=instance_type, zone=zone, placement_group=placement_group, - spot_bid=spot_bid, force_flat=force_flat)[0] + spot_bid=spot_bid, force_flat=force_flat, iam_profile=iam_profile)[0] def _get_cluster_userdata(self, aliases): alias_file = utils.string_to_file('\n'.join(['#ignored'] + aliases), @@ -913,7 +926,7 @@ def _get_cluster_userdata(self, aliases): def create_nodes(self, aliases, image_id=None, instance_type=None, zone=None, placement_group=None, spot_bid=None, - force_flat=False): + force_flat=False, iam_profile=None): """ Convenience method for requesting instances with this cluster's settings. All settings (kwargs) except force_flat default to cluster @@ -938,6 +951,7 @@ def create_nodes(self, aliases, image_id=None, instance_type=None, image_id = image_id or self.node_image_id count = len(aliases) if not spot_bid else 1 user_data = self._get_cluster_userdata(aliases) + iam_profile = iam_profile or self.iam_profile kwargs = dict(price=spot_bid, instance_type=instance_type, min_count=count, max_count=count, count=count, key_name=self.keyname, @@ -945,7 +959,8 @@ def create_nodes(self, aliases, image_id=None, instance_type=None, launch_group=cluster_sg, placement=zone or getattr(self.zone, 'name', None), user_data=user_data, - placement_group=placement_group) + placement_group=placement_group, + iam_profile=iam_profile) if self.subnet_id: netif = self.ec2.get_network_spec( device_index=0, associate_public_ip_address=self.public_ips, @@ -985,7 +1000,7 @@ def _get_next_node_num(self): def add_node(self, alias=None, no_create=False, image_id=None, instance_type=None, zone=None, placement_group=None, - spot_bid=None): + spot_bid=None, iam_profile=None): """ Add a single node to this cluster """ @@ -993,11 +1008,12 @@ def add_node(self, alias=None, no_create=False, image_id=None, return self.add_nodes(1, aliases=aliases, image_id=image_id, instance_type=instance_type, zone=zone, placement_group=placement_group, - spot_bid=spot_bid, no_create=no_create) + spot_bid=spot_bid, no_create=no_create, + iam_profile=iam_profile) def add_nodes(self, num_nodes, aliases=None, image_id=None, instance_type=None, zone=None, placement_group=None, - spot_bid=None, no_create=False): + spot_bid=None, no_create=False, iam_profile=None): """ Add new nodes to this cluster @@ -1030,7 +1046,7 @@ def add_nodes(self, num_nodes, aliases=None, image_id=None, resp = self.create_nodes(aliases, image_id=image_id, instance_type=instance_type, zone=zone, placement_group=placement_group, - spot_bid=spot_bid) + spot_bid=spot_bid, iam_profile=iam_profile) if spot_bid or self.spot_bid: self.ec2.wait_for_propagation(spot_requests=resp) else: @@ -1665,7 +1681,7 @@ def _setup_cluster(self): Runs the default StarCluster setup routines followed by any additional plugin setup routines. Does not wait for nodes to come up. """ - log.info("The master node is %s" % self.master_node.dns_name) + log.info("The master node is %s" % self.master_node.private_ip_address) log.info("Configuring cluster...") if self.volumes: self.attach_volumes_to_master() diff --git a/starcluster/node.py b/starcluster/node.py index 39d70a207..a5731108e 100644 --- a/starcluster/node.py +++ b/starcluster/node.py @@ -187,6 +187,14 @@ def get_volumes(self): payload = volstxt.split('\n', 2)[2] return utils.decode_uncompress_load(payload) + def get_iam_profile(self): + if self.instance.instance_profile: + arn = self.instance.instance_profile['arn'] + match = re.match(r'arn:aws:iam::\d{12}:instance-profile/(\S+)', arn) + return match.group(1) + else: + return None + def _remove_all_tags(self): tags = self.tags.keys()[:] for t in tags: @@ -237,6 +245,10 @@ def memory(self): "free -m | grep -i mem | awk '{print $2}'")[0]) return self._memory + @property + def instance_profile(self): + return self.instance.instance_profile + @property def ip_address(self): return self.instance.ip_address diff --git a/starcluster/plugins/mount_ephemeral.py b/starcluster/plugins/mount_ephemeral.py new file mode 100644 index 000000000..c8de826e8 --- /dev/null +++ b/starcluster/plugins/mount_ephemeral.py @@ -0,0 +1,19 @@ +import os +from starcluster.clustersetup import ClusterSetup +from starcluster.logger import log + +class MountEphemeralPlugin(ClusterSetup): + def __init__(self): + self.plugin_dir = os.path.dirname(os.path.realpath(__file__)) + + def mountEphemeralStorage(self, node): + log.info("Mounting ephemeral storage on %s" % node.alias) + node.ssh.put(self.plugin_dir + "/mount_ephemeral.sh", ".") + node.ssh.execute("sh ./mount_ephemeral.sh") + + def run(self, nodes, master, user, user_shell, volumes): + for node in nodes: + self.mountEphemeralStorage(node) + + def on_add_node(self, node, nodes, master, user, user_shell, volumes): + self.mountEphemeralStorage(node) diff --git a/starcluster/plugins/mount_ephemeral.sh b/starcluster/plugins/mount_ephemeral.sh new file mode 100644 index 000000000..c8c8002f8 --- /dev/null +++ b/starcluster/plugins/mount_ephemeral.sh @@ -0,0 +1,30 @@ +#!/bin/bash + +VOLUMES="" +for device in `curl -s 169.254.169.254/latest/meta-data/block-device-mapping/` +do + if [[ $device == "ephemeral"* ]] + then + block=`curl -s 169.254.169.254/latest/meta-data/block-device-mapping/$device | awk -F/ '{print $NF}'`; + if [[ -e /dev/$block ]] + then + #if [ ! -e /mnt/$device ] + #then + # mkfs.ext3 /dev/$block + # mkdir /mnt/$device + # mount /dev/$block /mnt/$device + # chmod 1777 /mnt/$device + #fi + pvcreate /dev/$block + VOLUMES="${VOLUMES} /dev/$block" + fi + fi +done + +vgcreate vg_ephemeral $VOLUMES +SIZE=`vgdisplay vg_ephemeral | grep "Total PE" | awk '{print $3}'` +lvcreate -l $SIZE vg_ephemeral -n ephemerallv +mkfs.ext3 /dev/mapper/vg_ephemeral-ephemerallv +mkdir /scratch +mount /dev/mapper/vg_ephemeral-ephemerallv /scratch +chmod 1777 /scratch diff --git a/starcluster/plugins/tagger.py b/starcluster/plugins/tagger.py new file mode 100644 index 000000000..4a8711eda --- /dev/null +++ b/starcluster/plugins/tagger.py @@ -0,0 +1,15 @@ +from starcluster.clustersetup import ClusterSetup +from starcluster.logger import log + +class TaggerPlugin(ClusterSetup): + def __init__(self, tags): + self.tags = [t.strip() for t in tags.split(',')] + self.tags = dict([t.split('=') for t in self.tags]) + + def run(self, nodes, master, user, user_shell, volumes): + log.info("Tagging all nodes...") + for tag in self.tags: + val = self.tags.get(tag) + log.info("Applying tag - %s: %s" % (tag, val)) + for node in nodes: + node.add_tag(tag, val) diff --git a/starcluster/static.py b/starcluster/static.py index 4b62547f8..9138e6574 100644 --- a/starcluster/static.py +++ b/starcluster/static.py @@ -302,4 +302,5 @@ def create_sc_config_dirs(): 'force_spot_master': (bool, False, False, None, None), 'disable_cloudinit': (bool, False, False, None, None), 'dns_prefix': (bool, False, False, None, None), + 'iam_profile': (str, False, None, None, None), } diff --git a/starcluster/utils.py b/starcluster/utils.py index ec6f0bfff..900d8ad0a 100644 --- a/starcluster/utils.py +++ b/starcluster/utils.py @@ -129,7 +129,8 @@ def is_valid_device(dev): Checks that dev matches the following regular expression: /dev/sd[a-z]$ """ - regex = re.compile('/dev/sd[a-z]$') + #regex = re.compile('/dev/sd[a-z]$') + regex = re.compile('/dev/xvdb[a-z]$') try: return regex.match(dev) is not None except TypeError: