Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade packages, airgradient changes and publish method #34

Merged
merged 7 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .env
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
LCS_API=https://api.openaq.org
STACK=lcs-etl-pipeline
SECRET_STACK=lcs-etl-pipeline
BUCKET=openaq-fetches
VERBOSE=1
TOPIC_ARN=arn:aws:sns:us-east-1:470049585876:NewFetchResults
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ jobs:
steps:
- uses: actions/checkout@v2

- name: Use Node.js 12.x
- name: Use Node.js 20.x
uses: actions/setup-node@v1
with:
node-version: 12.x
node-version: 20.x

- run: npm install
- run: npm run lint
Expand Down
3 changes: 2 additions & 1 deletion cdk/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ const stack = new EtlPipeline(app, "lcs-etl-pipeline", {
schedulerModuleDir: "scheduler",
sources: require('../fetcher/sources'),
bucketName: process.env.BUCKET || 'openaq-fetches',
lcsApi: process.env.LCS_API || 'https://api.openaq.org'
lcsApi: process.env.LCS_API || 'https://api.openaq.org',
topicArn: process.env.TOPIC_ARN
});


Expand Down
19 changes: 16 additions & 3 deletions cdk/stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export class EtlPipeline extends cdk.Stack {
sources,
lcsApi,
bucketName,
topicArn,
...props
}: StackProps
) {
Expand All @@ -36,6 +37,7 @@ export class EtlPipeline extends cdk.Stack {
queue,
bucket,
lcsApi,
topicArn,
});
this.buildSchedulerLambdas({
moduleDir: schedulerModuleDir,
Expand All @@ -49,20 +51,21 @@ export class EtlPipeline extends cdk.Stack {
queue: sqs.Queue;
bucket: s3.IBucket;
lcsApi: string;
topicArn: string;
}): lambda.Function {
this.prepareNodeModules(props.moduleDir);
const handler = new lambda.Function(this, 'Fetcher', {
description: 'Fetch a single source for a given time period',
runtime: lambda.Runtime.NODEJS_16_X,
runtime: lambda.Runtime.NODEJS_20_X,
handler: 'index.handler',
code: lambda.Code.fromAsset(props.moduleDir),
timeout: cdk.Duration.seconds(900),
memorySize: 512,
environment: {
BUCKET: props.bucket.bucketName,
STACK: cdk.Stack.of(this).stackName,
VERBOSE: '1',
LCS_API: props.lcsApi,
TOPIC_ARN: props.topicArn,
},
});
handler.addEventSource(
Expand All @@ -72,6 +75,14 @@ export class EtlPipeline extends cdk.Stack {
);
props.queue.grantConsumeMessages(handler);
props.bucket.grantReadWrite(handler);
handler.addToRolePolicy(
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: ['sns:Publish'],
resources: [props.topicArn],
})
);

handler.addToRolePolicy(
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
Expand All @@ -86,6 +97,7 @@ export class EtlPipeline extends cdk.Stack {
],
})
);

return handler;
}

Expand All @@ -105,7 +117,7 @@ export class EtlPipeline extends cdk.Stack {
`${interval}Scheduler`,
{
description: `${interval}Scheduler`,
runtime: lambda.Runtime.NODEJS_16_X,
runtime: lambda.Runtime.NODEJS_20_X,
handler: 'index.handler',
code: lambda.Code.fromAsset(props.moduleDir),
timeout: cdk.Duration.seconds(25),
Expand Down Expand Up @@ -147,6 +159,7 @@ interface StackProps extends cdk.StackProps {
fetcherModuleDir: string;
schedulerModuleDir: string;
lcsApi: string;
topicArn: string;
bucketName: string;
sources: Source[];
}
11 changes: 6 additions & 5 deletions fetcher/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ require('dotenv').config({ path });
const providers = new (require('./lib/providers'))();
const sources = require('./sources');


if (require.main === module) {
handler();
}

async function handler(event) {
try {

if (!process.env.SOURCE && !event)
throw new Error('SOURCE env var or event required');

Expand All @@ -23,12 +25,11 @@ async function handler(event) {
const source = sources.find((source) => source.provider === source_name);
if (!source) throw new Error(`Unable to find ${source_name} in sources.`);

console.log(`Processing '${source_name}'`);
await providers.processor(source_name, source);

return {};
const log = await providers.processor(source);
await providers.publish(log, 'fetcher/success');
return log;
} catch (err) {
console.error(err);
providers.publish(err, 'fetcher/error');
process.exit(1);
}
}
Expand Down
8 changes: 8 additions & 0 deletions fetcher/lib/measure.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ class Measures {
constructor(type) {
this.headers = [];
this.measures = [];
this.from = null;
this.to = null;

if (type === FixedMeasure) {
this.headers = ['sensor_id', 'measure', 'timestamp'];
Expand All @@ -16,6 +18,12 @@ class Measures {
}

push(measure) {
if (!this.to || measure.timestamp > this.to) {
this.to = measure.timestamp;
}
if (!this.from || measure.timestamp < this.from) {
this.from = measure.timestamp;
}
this.measures.push(measure);
}

Expand Down
23 changes: 12 additions & 11 deletions fetcher/lib/meta.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
const { VERBOSE } = require('./utils');
const AWS = require('aws-sdk');
const s3 = new AWS.S3({
maxRetries: 10
});

const {
getObject,
putObject
} = require('./utils');

/**
* Helper to store metadata about a source in S3.
Expand All @@ -20,8 +21,8 @@ class MetaDetails {

async load() {
try {
const resp = await s3.getObject(this.props).promise();
return JSON.parse(resp.Body.toString('utf-8'));
const body = await getObject(this.props.Bucket, this.props.Key);
return JSON.parse(body);
} catch (err) {
if (err.statusCode !== 404)
throw err;
Expand All @@ -32,11 +33,11 @@ class MetaDetails {
}

save(body) {
return s3.putObject({
...this.props,
Body: JSON.stringify(body),
ContentType: 'application/json'
}).promise();
return putObject(
JSON.stringify(body),
this.props.Bucket,
this.props.Key,
);
}
}
exports.MetaDetails = MetaDetails;
75 changes: 49 additions & 26 deletions fetcher/lib/providers.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
const fs = require('fs');
const path = require('path');
const AWS = require('aws-sdk');
const { SNSClient, PublishCommand } = require('@aws-sdk/client-sns');

const {
VERBOSE,
DRYRUN,
gzip,
unzip,
fetchSecret,
getObject,
putObject,
prettyPrintStation
} = require('./utils');

const s3 = new AWS.S3({
maxRetries: 10
});

const sns = new SNSClient();

/**
* Runtime handler for each of the custom provider scripts, as well
Expand All @@ -28,13 +30,42 @@ class Providers {
/**
* Given a source config file, choose the corresponding provider script to run
*
* @param {String} source_name
* @param {Object} source
*/
async processor(source_name, source) {
async processor(source) {
if (VERBOSE) console.debug('Processing', source.provider);
if (!this[source.provider]) throw new Error(`${source.provider} is not a supported provider`);
// fetch any secrets we may be storing for the provider
if (VERBOSE) console.debug('Fetching secret: ', source.secretKey);
const config = await fetchSecret(source);
// and combine them with the source config for more generic access
if (VERBOSE) console.log('Starting processor', { ...source, ...config });
const log = await this[source.provider].processor({ ...source, ...config });
// source_name is more consistent with our db schema
if (typeof(log) == 'object' && !Array.isArray(log) && !log.source_name) {
log.source_name = source.provider;
}
return (log);
}

await this[source.provider].processor(source_name, source);
/**
* Publish the results of the fetch to our SNS topic
*
* @param {Object} message
* @param {String} subject
*/
async publish(message, subject) {
console.log('Publishing:', subject, message);
if (process.env.TOPIC_ARN && message) {
const cmd = new PublishCommand({
TopicArn: process.env.TOPIC_ARN,
Subject: subject,
Message: JSON.stringify(message)
});
return await sns.send(cmd);
} else {
return {};
}
}

/**
Expand Down Expand Up @@ -67,8 +98,7 @@ class Providers {

// Diff data to minimize costly S3 PUT operations
try {
const resp = await s3.getObject({ Bucket, Key }).promise();
const currentData = (await unzip(resp.Body)).toString('utf-8');
const currentData = await getObject(Bucket, Key);
if (currentData === newData && !process.env.FORCE) {
if (VERBOSE) console.log(`station has not changed - station: ${providerStation}`);
return;
Expand All @@ -78,25 +108,23 @@ class Providers {
prettyPrintStation(newData);
console.log('-----------------> from');
prettyPrintStation(currentData);
} else {
console.log(`Updating the station file: ${providerStation}`);
}
} catch (err) {
if (err.statusCode !== 404) throw err;
if (err.Code !== 'NoSuchKey') throw err;
}

const compressedString = await gzip(newData);

if (!DRYRUN) {
if (VERBOSE) console.debug(`Saving station to ${Bucket}/${Key}`);

await s3.putObject({
await putObject(
compressedString,
Bucket,
Key,
Body: compressedString,
ContentType: 'application/json',
ContentEncoding: 'gzip'
}).promise();
false,
'application/json',
'gzip'
);
}
if (VERBOSE) console.log(`finished station: ${providerStation}\n------------------------`);
}
Expand All @@ -117,19 +145,14 @@ class Providers {
const Key = `${process.env.STACK}/measures/${provider}/${filename}.csv.gz`;
const compressedString = await gzip(measures.csv());


if (DRYRUN) {
console.log(`Would have saved ${measures.length} measurements to '${Bucket}/${Key}'`);
return new Promise((y) => y(true));
}
if (VERBOSE) console.debug(`Saving measurements to ${Bucket}/${Key}`);

return s3.putObject({
Bucket,
Key,
Body: compressedString,
ContentType: 'text/csv',
ContentEncoding: 'gzip'
}).promise();
return await putObject(compressedString, Bucket, Key, false, 'text/csv', 'gzip');
}
}

Expand Down
Loading
Loading