Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(22): lambda events #32

Merged
merged 18 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions .projenrc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const project = new awscdk.AwsCdkTypeScriptApp({
description: 'Use CDK to create Quilt packages from AWS HealthOmics',
name: solutionName,
projenrcTs: true,
eslint: true,
deps: [
'aws-lambda',
`@aws-cdk/aws-lambda-python-alpha@^${cdkVersion}-alpha.0`,
Expand All @@ -32,6 +33,8 @@ const project = new awscdk.AwsCdkTypeScriptApp({
'__pycache__', // Python
'*.pyc', // Python
'*_metadata.json', // Quilt
'/build/', // Makefile
'package-lock.json', // Node
],
});
override_file_key('.github/workflows/build.yml', 'jobs.build.env');
Expand All @@ -43,6 +46,7 @@ const appTestTask = project.addTask('pytest', {
const testTask = project.tasks.tryFind('test');
testTask?.spawn(appTestTask);
*/
project.addFields({ version: '0.1.1' });
project.synth();


Expand Down
2 changes: 1 addition & 1 deletion package.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,10 @@ export class Constants {
this.region = this.get('CDK_DEFAULT_REGION') || this.get('AWS_DEFAULT_REGION');
}

public getContext(): any {
return this.context;
}

public toDict(): KeyedConfig {
return {
app: this.app,
Expand Down
200 changes: 96 additions & 104 deletions src/omics-quilt.fastq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,121 +8,113 @@ import {
} from '@aws-sdk/client-omics';
import { v4 as uuidv4 } from 'uuid';
import { Constants, KeyedConfig } from './constants';
import { Vivos } from './vivos';

async function start_omics_run(options: StartRunCommandInput) {
const omicsClient = new OmicsClient();
const command = new StartRunCommand(options);
const response = await omicsClient.send(command);
return response;
}

export async function fastq_config_from_uri(uri: string) {
const params: Record<string, any> = {};
const sample: KeyedConfig = await Constants.LoadObjectURI(uri);
console.info(`Loaded JSON manifest:\n${JSON.stringify(sample, null, 2)}`);
params.sample_name = sample.sample_name;
params.fastq_pairs = [];
params.fastq_pairs.push({
read_group: sample.read_group as string,
fastq_1: sample.fastq_1 as string,
fastq_2: sample.fastq_2 as string,
platform: sample.platform as string,
});
return params;
export async function handler(event: any, context: any) {
const pipe = new OmicsQuiltFastq(event, context);
return pipe.exec();
}

export async function handler(event: any, context: any) {
const cc = new Constants(context);
console.debug('Received event: ' + JSON.stringify(event, null, 2));
export class OmicsQuiltFastq extends Vivos {
static async start_omics_run(options: StartRunCommandInput) {
const omicsClient = new OmicsClient();
const command = new StartRunCommand(options);
const response = await omicsClient.send(command);
return response;
}

const num_upload_records = event.Records.length;
let filename, bucket_arn, bucket_name;
if (num_upload_records === 1) {
filename = event.Records[0].s3.object.key;
bucket_arn = event.Records[0].s3.bucket.arn;
bucket_name = event.Records[0].s3.bucket.name;
console.info(`Processing ${filename} in ${bucket_arn}`);
} else if (num_upload_records === 0) {
throw new Error('No file detected for analysis!');
} else {
throw new Error('Multiple s3 files in event not yet supported');
static async fastq_config_from_uri(uri: string) {
const params: Record<string, any> = {};
const sample: KeyedConfig = await Constants.LoadObjectURI(uri);
console.info(`Loaded JSON manifest:\n${JSON.stringify(sample, null, 2)}`);
params.sample_name = sample.sample_name;
params.fastq_pairs = [];
params.fastq_pairs.push({
read_group: sample.read_group as string,
fastq_1: sample.fastq_1 as string,
fastq_2: sample.fastq_2 as string,
platform: sample.platform as string,
});
return params;
}
const uri = context.local_file || `s3://${bucket_name}/${filename}`;
const item = await fastq_config_from_uri(uri);
let error_count = 0;
error_count += await run_workflow(
item,
uri,
cc,
);

if (error_count > 0) {
throw new Error('Error launching some workflows, check logs');
constructor(event: any, context: any) {
super(event, context);
console.debug('Received event: ' + JSON.stringify(event, null, 2));
}
return { message: 'Success' };
}

export async function save_metadata(id: string, item: any, cc: Constants) {
const location = cc.get('OUTPUT_S3_LOCATION');
if (!location) {
console.info('No OUTPUT_S3_LOCATION, skipping metadata save');
return;
async exec() {
const uri = this.cc.get('local_file') || this.getEventObjectURI();
const item = await OmicsQuiltFastq.fastq_config_from_uri(uri);
let error_count = 0;
error_count += await this.run_workflow(item, uri);

if (error_count > 0) {
throw new Error('Error launching some workflows, check logs');
}
return { message: 'Success' };
}
const metadata_file = cc.get('INPUT_METADATA');
if (!metadata_file) {
console.info('No INPUT_METADATA, skipping metadata save');
return;

async save_metadata(id: string, item: any) {
const location = this.cc.get('OUTPUT_S3_LOCATION');
if (!location) {
console.info('No OUTPUT_S3_LOCATION, skipping metadata save');
return;
}
const metadata_file = this.cc.get('INPUT_METADATA');
if (!metadata_file) {
console.info('No INPUT_METADATA, skipping metadata save');
return;
}
const uri = `${location}/${id}/out/${metadata_file}`;
console.info(`Writing input to ${uri}`);
await Constants.SaveObjectURI(uri, item);
}
const uri = `${location}/${id}/out/${metadata_file}`;
console.info(`Writing input to ${uri}`);
await Constants.SaveObjectURI(uri, item);
}

export async function run_workflow(
item: Record<string, string>,
uri: string,
cc: Constants,
) {
const _samplename = item.sample_name;
console.info(`Starting workflow for sample: ${_samplename}`);
const uuid = cc.get('TEST_UUID') || uuidv4();
const run_name = `${_samplename}.${uuid}.`;
const workflow_type = 'READY2RUN' as WorkflowType;
const options = {
workflowType: workflow_type,
workflowId: cc.get('WORKFLOW_ID'),
name: run_name,
roleArn: cc.get('OMICS_ROLE'),
parameters: item,
logLevel: cc.get('LOG_LEVEL') as RunLogLevel,
outputUri: cc.get('OUTPUT_S3_LOCATION'),
tags: {
SOURCE: 'LAMBDA_FASTQ',
RUN_NAME: run_name,
SAMPLE_MANIFEST: uri,
VIVOS_ID: uuid,
},
requestId: uuid,
};
try {
console.debug(`Workflow options: ${JSON.stringify(options)}`);
if (cc.get('debug') === true) {
console.info(`Skipping with context: ${JSON.stringify(cc)}`);
} else {
const input: StartRunCommandInput = options;
const response = await start_omics_run(input);
console.info(`Workflow response: ${JSON.stringify(response)}`);
const run_metadata = {
sample: item,
run: response,
workflow: options,
};
const id = response.id!;
await save_metadata(id, run_metadata, cc);
async run_workflow(item: Record<string, string>, uri: string) {
const _samplename = item.sample_name;
console.info(`Starting workflow for sample: ${_samplename}`);
const uuid = this.cc.get('TEST_UUID') || uuidv4();
const run_name = `${_samplename}.${uuid}.`;
const workflow_type = 'READY2RUN' as WorkflowType;
const options = {
workflowType: workflow_type,
workflowId: this.cc.get('WORKFLOW_ID'),
name: run_name,
roleArn: this.cc.get('OMICS_ROLE'),
parameters: item,
logLevel: this.cc.get('LOG_LEVEL') as RunLogLevel,
outputUri: this.cc.get('OUTPUT_S3_LOCATION'),
tags: {
SOURCE: 'LAMBDA_FASTQ',
RUN_NAME: run_name,
SAMPLE_MANIFEST: uri,
VIVOS_ID: uuid,
},
requestId: uuid,
};
try {
console.debug(`Workflow options: ${JSON.stringify(options)}`);
if (this.cc.get('debug') === true) {
console.info(`Skipping with context: ${JSON.stringify(this.cc)}`);
} else {
const input: StartRunCommandInput = options;
const response = await OmicsQuiltFastq.start_omics_run(input);
console.info(`Workflow response: ${JSON.stringify(response)}`);
const run_metadata = {
sample: item,
run: response,
workflow: options,
};
const id = response.id!;
await this.save_metadata(id, run_metadata);
}
} catch (e: any) {
console.error('Error : ' + e.toString());
return 1;
}
} catch (e: any) {
console.error('Error : ' + e.toString());
return 1;
return 0;
}
return 0;
}

export default OmicsQuiltFastq;
55 changes: 32 additions & 23 deletions src/omics-quilt.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import * as python from '@aws-cdk/aws-lambda-python-alpha';
import { Duration, RemovalPolicy, Stack, type StackProps } from 'aws-cdk-lib';
import { Rule } from 'aws-cdk-lib/aws-events';
import { SnsTopic } from 'aws-cdk-lib/aws-events-targets';
import { SnsTopic, LambdaFunction } from 'aws-cdk-lib/aws-events-targets';
import {
AccountPrincipal,
ArnPrincipal,
Expand All @@ -11,12 +11,10 @@ import {
ServicePrincipal,
} from 'aws-cdk-lib/aws-iam';
import { Runtime } from 'aws-cdk-lib/aws-lambda';
import { S3EventSource } from 'aws-cdk-lib/aws-lambda-event-sources';
import { NodejsFunction } from 'aws-cdk-lib/aws-lambda-nodejs';
import {
Bucket,
BlockPublicAccess,
EventType,
BucketEncryption,
} from 'aws-cdk-lib/aws-s3';
import { Topic } from 'aws-cdk-lib/aws-sns';
Expand Down Expand Up @@ -51,8 +49,8 @@ export class OmicsQuiltStack extends Stack {
public readonly outputBucket: Bucket;

public readonly manifest_prefix: string;
public readonly manifest_suffix: string;
public readonly packager_prefix: string;
// public readonly manifest_suffix: string;
// public readonly packager_prefix: string;
public readonly packager_suffix: string;

readonly cc: Constants;
Expand All @@ -66,8 +64,8 @@ export class OmicsQuiltStack extends Stack {
this.principal = new AccountPrincipal(this.cc.account);
const manifest_root = this.cc.get('MANIFEST_ROOT');
this.manifest_prefix = `${manifest_root}/${this.cc.region}`;
this.manifest_suffix = this.cc.get('MANIFEST_SUFFIX');
this.packager_prefix = this.cc.get('FASTQ_PREFIX');
// this.manifest_suffix = this.cc.get('MANIFEST_SUFFIX');
// this.packager_prefix = this.cc.get('FASTQ_PREFIX');
this.packager_suffix = this.cc.get('FASTQ_SUFFIX');

// Create Input/Output S3 buckets
Expand All @@ -88,24 +86,34 @@ export class OmicsQuiltStack extends Stack {
// Create Lambda function to submit initial HealthOmics workflow
const fastqLambda = this.makeLambda('fastq', {});
this.makeParameter('FASTQ_LAMBDA_ARN', fastqLambda.functionArn);
// Add S3 event source to Lambda
const fastqTrigger = new S3EventSource(this.inputBucket, {
events: [EventType.OBJECT_CREATED],
filters: [
{ prefix: this.manifest_prefix, suffix: this.manifest_suffix },
],
// Create EventBridge rule to trigger Lambda function
const fastqRule = new Rule(this, 'FastqRule', {
eventPattern: {
source: ['aws.s3'],
detailType: ['Object Created'],
detail: {
bucket: {
name: [this.inputBucket.bucketName],
},
object: {
key: [{ prefix: this.manifest_prefix }], // , suffix: this.manifest_suffix
},
},
},
});
fastqLambda.addEventSource(fastqTrigger);
fastqRule.addTarget(new LambdaFunction(fastqLambda));

const packagerLambda = this.makePythonLambda('packager', {});
// TODO: trigger on Omics completion event, not report file
const packagerTrigger = new S3EventSource(this.outputBucket, {
events: [EventType.OBJECT_CREATED],
filters: [
{ prefix: this.packager_prefix, suffix: this.packager_suffix },
],
const packagerRule = new Rule(this, 'PackagerRule', {
eventPattern: {
source: ['aws.omics'],
detailType: ['Run Status Change'],
detail: {
status: ['COMPLETED'],
},
},
});
packagerLambda.addEventSource(packagerTrigger);
packagerRule.addTarget(new LambdaFunction(packagerLambda));
}

private makeParameter(name: string, value: any) {
Expand Down Expand Up @@ -188,6 +196,7 @@ export class OmicsQuiltStack extends Stack {
enforceSSL: true,
removalPolicy: RemovalPolicy.DESTROY,
versioned: true,
eventBridgeEnabled: true,
};
const bucket = new Bucket(this, name, bucketOptions);
bucket.grantDelete(this.principal);
Expand All @@ -214,7 +223,7 @@ export class OmicsQuiltStack extends Stack {
return new python.PythonFunction(this, name, {
entry: PYTHON_FOLDER,
index: PYTHON_INDEX,
runtime: Runtime.PYTHON_3_11,
runtime: Runtime.PYTHON_3_12,
role: this.lambdaRole,
timeout: Duration.seconds(this.cc.timeout()),
memorySize: this.cc.get('MEMORY_SIZE'),
Expand All @@ -236,7 +245,7 @@ export class OmicsQuiltStack extends Stack {
LOG_LEVEL: 'ALL',
OMICS_ROLE: this.omicsRole.roleArn,
OUTPUT_S3_LOCATION: output.join('/'),
SENTINEL_PREFIX: this.packager_prefix,
// SENTINEL_PREFIX: this.packager_prefix,
SENTINEL_SUFFIX: this.packager_suffix,
INPUT_METADATA: this.cc.get('INPUT_METADATA'),
QUILT_METADATA: this.cc.get('QUILT_METADATA'),
Expand Down
Loading
Loading