diff --git a/podaac/forge_py/forge.py b/podaac/forge_py/forge.py index 77a4bf6..b3e0260 100644 --- a/podaac/forge_py/forge.py +++ b/podaac/forge_py/forge.py @@ -12,6 +12,8 @@ def fit_footprint(lon, lat, thinning_fac=100, alpha=0.05, return_xythin=False, i Factor to thin out data by (makes alphashape fit faster). alpha: float The alpha parameter passed to alphashape. + is360: bool + Tell us if the logitude data is between 0-360 """ lon_array = lon diff --git a/podaac/lambda_handler/lambda_handler.py b/podaac/lambda_handler/lambda_handler.py index 31576e0..076157f 100644 --- a/podaac/lambda_handler/lambda_handler.py +++ b/podaac/lambda_handler/lambda_handler.py @@ -1,4 +1,4 @@ -"""lambda function used for image generation in aws lambda with cumulus""" +"""lambda function used for footprint generation in aws lambda with cumulus""" import json import logging @@ -43,13 +43,13 @@ def clean_tmp(remove_matlibplot=True): class FootprintGenerator(Process): """ - Image generation class to generate image for a granule file and upload to s3 + Footprint generation class to generate footprints for a granule file and upload to s3 Attributes ---------- processing_regex : str - regex for nc file to generate image + regex for nc file to generate footprint logger: logger cumulus logger config: dictionary @@ -61,9 +61,9 @@ class FootprintGenerator(Process): upload_file_to_s3('/user/test/test.png', 's3://bucket/path/test.fp') uploads a local file to s3 process() - main function ran for image generation + main function ran for footprint generation get_config() - downloads configuration file for tig + downloads configuration file for footprint download_file_from_s3('s3://my-internal-bucket/dataset-config/MODIS_A.2019.cfg', '/tmp/workspace') downloads a file from s3 to a directory """ @@ -117,7 +117,7 @@ def upload_file_to_s3(self, filename, uri): raise ex def get_config(self): - """Get configuration file for image generations + """Get configuration file for footprint generations Returns ---------- str @@ -153,12 +153,12 @@ def footprint_generate(self, file_, config_file, granule_id): config_file: file path of configuration file that was downloaded from s3 granule_id: - ganule_id of the granule the images are generated for + ganule_id of the granule the footprint are generated for Returns ---------- list - list of dictionary of the image information that was uploaded to s3 + list of dictionary of the footprint information that was uploaded to s3 """ collection = self.config.get('collection').get('name') @@ -175,7 +175,7 @@ def footprint_generate(self, file_, config_file, granule_id): try: local_file = s3.download(input_file, path=self.path) except botocore.exceptions.ClientError as ex: - self.logger.error("Error downloading image from s3: {}".format(ex), exc_info=True) + self.logger.error("Error downloading granule from s3: {}".format(ex), exc_info=True) raise ex with open(config_file) as config_f: @@ -220,7 +220,7 @@ def footprint_generate(self, file_, config_file, granule_id): return upload_file_dict def process(self): - """Main process to generate images for granules + """Main process to generate footprints for granules Returns ---------- diff --git a/podaac/lambda_handler/lambda_handler_branch.py b/podaac/lambda_handler/lambda_handler_branch.py new file mode 100644 index 0000000..39d4bc2 --- /dev/null +++ b/podaac/lambda_handler/lambda_handler_branch.py @@ -0,0 +1,194 @@ +"""lambda function used for image generation in aws lambda with cumulus""" +# pylint: disable=R0801 + +import json +import logging +import os +from shutil import rmtree +import requests +import botocore +from cumulus_process import Process, s3 +from cumulus_logger import CumulusLogger + + +cumulus_logger = CumulusLogger('forge_branching') + + +def clean_tmp(remove_matlibplot=True): + """ Deletes everything in /tmp """ + temp_folder = '/tmp' + temp_files = os.listdir(temp_folder) + + cumulus_logger.info("Removing everything in tmp folder {}".format(temp_files)) + for filename in os.listdir(temp_folder): + file_path = os.path.join(temp_folder, filename) + try: + if os.path.isfile(file_path) or os.path.islink(file_path): + os.unlink(file_path) + elif os.path.isdir(file_path): + if filename.startswith('matplotlib'): + if remove_matlibplot: + rmtree(file_path) + else: + rmtree(file_path) + except OSError as ex: + cumulus_logger.error('Failed to delete %s. Reason: %s' % (file_path, ex)) + + temp_files = os.listdir(temp_folder) + cumulus_logger.info("After Removing everything in tmp folder {}".format(temp_files)) + + +class FootprintBranch(Process): + """ + Image generation class to generate image for a granule file and upload to s3 + + + Attributes + ---------- + processing_regex : str + regex for nc file to generate image + logger: logger + cumulus logger + config: dictionary + configuration from cumulus + + + Methods + ------- + process() + main function ran for image generation + get_config() + downloads configuration file for tig + """ + + def __init__(self, *args, **kwargs): + + self.processing_regex = '(.*\\.nc$)' + super().__init__(*args, **kwargs) + self.logger = cumulus_logger + + def clean_all(self): + """ Removes anything saved to self.path """ + rmtree(self.path) + clean_tmp() + + def download_file_from_s3(self, s3file, working_dir): + """ Download s3 file to local + + Parameters + ---------- + s3file: str + path location of the file Ex. s3://my-internal-bucket/dataset-config/MODIS_A.2019.cfg + working_dir: str + local directory path where the s3 file should be downloaded to + + Returns + ---------- + str + full path of the downloaded file + """ + try: + return s3.download(s3file, working_dir) + except botocore.exceptions.ClientError as ex: + self.logger.error("Error downloading file %s: %s" % (s3file, working_dir), exc_info=True) + raise ex + + def get_config(self): + """Get configuration file for image generations + Returns + ---------- + str + string of the filepath to the configuration + """ + config_url = os.environ.get("CONFIG_URL") + config_name = self.config['collection']['name'] + config_bucket = os.environ.get('CONFIG_BUCKET') + config_dir = os.environ.get("CONFIG_DIR") + + if config_url: + file_url = "{}/{}.cfg".format(config_url, config_name) + response = requests.get(file_url, timeout=60) + cfg_file_full_path = "{}/{}.cfg".format(self.path, config_name) + with open(cfg_file_full_path, 'wb') as file_: + file_.write(response.content) + + elif config_bucket and config_dir: + config_s3 = 's3://{}.cfg'.format(os.path.join(config_bucket, config_dir, config_name)) + cfg_file_full_path = self.download_file_from_s3(config_s3, self.path) + else: + raise ValueError('Environment variable to get configuration files were not set') + + return cfg_file_full_path + + def process(self): + """Main process to generate images for granules + + Returns + ---------- + dict + Payload that is returned to the cma which is a dictionary with list of granules + """ + + config_file_path = self.get_config() + with open(config_file_path) as config_f: + read_config = json.load(config_f) + + forge_type = read_config.get('forge_version', 'java') + self.input['forge_version'] = forge_type + return self.input + + @classmethod + def handler(cls, event, context=None, path=None, noclean=False): + """ General event handler """ + return cls.run(path=path, noclean=noclean, context=context, **event) + + @classmethod + def run(cls, *args, **kwargs): + """ Run this payload with the given Process class """ + noclean = kwargs.pop('noclean', False) + process = cls(*args, **kwargs) + try: + output = process.process() + finally: + if not noclean: + process.clean_all() + return output + + +def handler(event, context): + """handler that gets called by aws lambda + + Parameters + ---------- + event: dictionary + event from a lambda call + context: dictionary + context from a lambda call + + Returns + ---------- + string + A CMA json message + """ + + levels = { + 'critical': logging.CRITICAL, + 'error': logging.ERROR, + 'warn': logging.WARNING, + 'warning': logging.WARNING, + 'info': logging.INFO, + 'debug': logging.DEBUG + } + logging_level = os.environ.get('LOGGING_LEVEL', 'info') + cumulus_logger.logger.level = levels.get(logging_level, 'info') + cumulus_logger.setMetadata(event, context) + clean_tmp() + result = FootprintBranch.cumulus_handler(event, context=context) + + result['meta']['collection']['meta']['workflowChoice']['forge_version'] = result['payload']['forge_version'] + del result['payload']['forge_version'] + return result + + +if __name__ == "__main__": + FootprintBranch.cli() diff --git a/terraform/forge_py_lambda.tf b/terraform/forge_py_lambda.tf index 0216abc..aae3487 100644 --- a/terraform/forge_py_lambda.tf +++ b/terraform/forge_py_lambda.tf @@ -9,7 +9,7 @@ resource "aws_lambda_function" "forge_py_task" { null_resource.upload_ecr_image ] - function_name = "${var.prefix}-lambda" + function_name = "${var.prefix}-forge-py-lambda" image_uri = "${aws_ecr_repository.lambda-image-repo.repository_url}:${local.ecr_image_tag}" role = var.lambda_role timeout = var.timeout @@ -52,3 +52,55 @@ resource "aws_cloudwatch_log_group" "forge_py_task" { retention_in_days = var.task_logs_retention_in_days tags = merge(var.tags, { Project = var.prefix }) } + + +resource "aws_lambda_function" "forge_branch_task" { + + depends_on = [ + null_resource.upload_ecr_image + ] + + function_name = "${var.prefix}-forge-branch-lambda" + image_uri = "${aws_ecr_repository.lambda-image-repo.repository_url}:${local.ecr_image_tag}" + role = var.lambda_role + timeout = var.timeout + memory_size = 256 + package_type = "Image" + + architectures = var.architectures + + image_config { + command = ["podaac.lambda_handler.lambda_handler_branch.handler"] + entry_point = var.entry_point + working_directory = var.working_directory + } + + environment { + variables = { + STACK_NAME = var.prefix + CMR_ENVIRONMENT = var.cmr_environment + CUMULUS_MESSAGE_ADAPTER_DIR = "/opt/" + REGION = var.region + CONFIG_BUCKET = var.config_bucket + CONFIG_DIR = var.config_dir + FOOTPRINT_OUTPUT_BUCKET = var.footprint_output_bucket + FOOTPRINT_OUTPUT_DIR = var.footprint_output_dir + CONFIG_URL = var.config_url + LOGGING_LEVEL = var.log_level + } + } + + vpc_config { + subnet_ids = var.subnet_ids + security_group_ids = var.security_group_ids + } + + tags = merge(var.tags, { Project = var.prefix }) +} + +resource "aws_cloudwatch_log_group" "forge_branch_task" { + name = "/aws/lambda/${aws_lambda_function.forge_branch_task.function_name}" + retention_in_days = var.task_logs_retention_in_days + tags = merge(var.tags, { Project = var.prefix }) +} + diff --git a/tests/input.txt b/tests/input.txt index f6ae34d..c85e9ae 100644 --- a/tests/input.txt +++ b/tests/input.txt @@ -42,7 +42,16 @@ ], "granuleId": "^JA1_GPN_2PeP([0-9]{3})_([0-9]{3})_([0-9]{8})_([0-9]{6})_([0-9]{8})_([0-9]{6})$", "granuleIdExtraction": "^(JA1_GPN_2PeP([0-9]{3})_([0-9]{3})_([0-9]{8})_([0-9]{6})_([0-9]{8})_([0-9]{6}))((\\.nc)|(\\.cmr\\.json))?$", - "name": "JASON-1_L2_OST_GPN_E" + "name": "JASON-1_L2_OST_GPN_E", + "meta": { + "workflowChoice": { + "compressed": false, + "convertNetCDF": false, + "dmrpp": true, + "glacier": true, + "readDataFileForMetadata": false + } + } } }, "payload":{ diff --git a/tests/test_footprint_branch.py b/tests/test_footprint_branch.py new file mode 100644 index 0000000..45850ee --- /dev/null +++ b/tests/test_footprint_branch.py @@ -0,0 +1,127 @@ +"""Test cases for forge-py lambda_handler""" + +import json +import os +import boto3 +import pytest +from jsonschema import validate + +from podaac.lambda_handler import lambda_handler_branch +from moto import mock_aws +from mock import patch, Mock +import xarray as xr +from podaac.forge_py import forge +from shapely.wkt import dumps + +file_schema = { + "type": "array", + "items": { + "additionalProperties": False, + "type": "object", + "required": [ + "bucket", + "key" + ], + "properties": { + "bucket": { + "description": "Bucket where file is archived in S3", + "type": "string" + }, + "checksum": { + "description": "Checksum value for file", + "type": "string" + }, + "checksumType": { + "description": "Type of checksum (e.g. md5, sha256, etc)", + "type": "string" + }, + "fileName": { + "description": "Name of file (e.g. file.txt)", + "type": "string" + }, + "key": { + "description": "S3 Key for archived file", + "type": "string" + }, + "size": { + "description": "Size of file (in bytes)", + "type": "number" + }, + "source": { + "description": "Source URI of the file from origin system (e.g. S3, FTP, HTTP)", + "type": "string" + }, + "type": { + "description": "Type of file (e.g. data, metadata, browse)", + "type": "string" + }, + "description": { + "description": "variable values", + "type": "string" + } + } + } +} + +class Context: + def __init__(self, aws_request_id): + self.aws_request_id = aws_request_id + +@mock_aws +@patch('requests.get') +def test_lambda_handler_cumulus(mocked_get): + """Test lambda handler to run through cumulus handler""" + + test_dir = os.path.dirname(os.path.realpath(__file__)) + + bucket = "test-prefix-protected-test" + aws_s3 = boto3.resource('s3', region_name='us-east-1') + aws_s3.create_bucket(Bucket=bucket) + + input_dir = f'{test_dir}/input' + config_dir = f'{test_dir}/configs' + nc_file = f'{input_dir}/measures_esdr_scatsat_l2_wind_stress_23433_v1.1_s20210228-054653-e20210228-072612.nc' + cfg_file = f'{config_dir}/PODAAC-CYGNS-C2H10.cfg' + + with open(nc_file, 'rb') as data: + aws_s3.Bucket(bucket).put_object(Key='test_folder/test_granule.nc', Body=data) + + s3_client = boto3.client('s3', region_name='us-east-1') + + # Mock S3 download here: + os.environ["CONFIG_BUCKET"] = "internal-bucket" + os.environ["CONFIG_DIR"] = "dataset-config" + os.environ["CONFIG_URL"] = "" + os.environ["FOOTPRINT_OUTPUT_BUCKET"] = "internal-bucket" + os.environ["FOOTPRINT_OUTPUT_DIR"] = "test" + + aws_s3.create_bucket(Bucket='internal-bucket') + + with open(cfg_file, 'rb') as data: + s3_client.put_object(Bucket='internal-bucket', + Key='dataset-config/JASON-1_L2_OST_GPN_E.cfg', + Body=data) + + s3_client.get_object(Bucket="internal-bucket", + Key='dataset-config/JASON-1_L2_OST_GPN_E.cfg', + ) + + dir_path = os.path.dirname(os.path.realpath(__file__)) + input_file = dir_path + '/input.txt' + + with open(input_file) as json_event: + event = json.load(json_event) + granules = event.get('payload').get('granules') + for granule in granules: + files = granule.get('files') + is_valid_shema = validate(instance=files, schema=file_schema) + assert is_valid_shema is None + + context = Context("fake_request_id") + output = lambda_handler_branch.handler(event, context) + + for granule in output.get('payload').get('granules'): + is_valid_shema = validate(instance=granule.get('files'), schema=file_schema) + assert is_valid_shema is None + + assert output['meta']['collection']['meta']['workflowChoice']['forge_version'] == 'java' \ No newline at end of file