Skip to content

Commit

Permalink
Upgraded to node 20 and aws sdk v3, fixed airgradient
Browse files Browse the repository at this point in the history
  Also cleaned up the method we were using to get secrets to make it
  the same for all providers
  • Loading branch information
caparker committed Feb 29, 2024
1 parent fff5f84 commit 0005e31
Show file tree
Hide file tree
Showing 16 changed files with 4,625 additions and 5,210 deletions.
11 changes: 6 additions & 5 deletions fetcher/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ require('dotenv').config({ path });
const providers = new (require('./lib/providers'))();
const sources = require('./sources');


if (require.main === module) {
handler();
}

async function handler(event) {
try {

if (!process.env.SOURCE && !event)
throw new Error('SOURCE env var or event required');

Expand All @@ -23,12 +25,11 @@ async function handler(event) {
const source = sources.find((source) => source.provider === source_name);
if (!source) throw new Error(`Unable to find ${source_name} in sources.`);

console.log(`Processing '${source_name}'`);
await providers.processor(source_name, source);

return {};
const log = await providers.processor(source);
await providers.publish(log, 'fetcher/success');

Check failure on line 29 in fetcher/index.js

View workflow job for this annotation

GitHub Actions / build

Expected indentation of 8 spaces but found 4 tabs
return log;
} catch (err) {
console.error(err);
providers.publish(err, 'fetcher/error');

Check failure on line 32 in fetcher/index.js

View workflow job for this annotation

GitHub Actions / build

Expected indentation of 8 spaces but found 4 tabs
process.exit(1);
}
}
Expand Down
8 changes: 8 additions & 0 deletions fetcher/lib/measure.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ class Measures {
constructor(type) {
this.headers = [];
this.measures = [];
this.from = null;

Check failure on line 10 in fetcher/lib/measure.js

View workflow job for this annotation

GitHub Actions / build

Expected indentation of 8 spaces but found 4 tabs
this.to = null;

Check failure on line 11 in fetcher/lib/measure.js

View workflow job for this annotation

GitHub Actions / build

Expected indentation of 8 spaces but found 4 tabs

if (type === FixedMeasure) {
this.headers = ['sensor_id', 'measure', 'timestamp'];
Expand All @@ -16,6 +18,12 @@ class Measures {
}

push(measure) {
if(!this.to || measure.timestamp > this.to) {

Check failure on line 21 in fetcher/lib/measure.js

View workflow job for this annotation

GitHub Actions / build

Expected indentation of 8 spaces but found 4 tabs

Check failure on line 21 in fetcher/lib/measure.js

View workflow job for this annotation

GitHub Actions / build

Expected space(s) after "if"
this.to = measure.timestamp;

Check failure on line 22 in fetcher/lib/measure.js

View workflow job for this annotation

GitHub Actions / build

Expected indentation of 12 spaces but found 6 tabs
}

Check failure on line 23 in fetcher/lib/measure.js

View workflow job for this annotation

GitHub Actions / build

Expected indentation of 8 spaces but found 4 tabs
if(!this.from || measure.timestamp < this.from) {

Check failure on line 24 in fetcher/lib/measure.js

View workflow job for this annotation

GitHub Actions / build

Expected indentation of 8 spaces but found 4 tabs

Check failure on line 24 in fetcher/lib/measure.js

View workflow job for this annotation

GitHub Actions / build

Expected space(s) after "if"
this.from = measure.timestamp;
}
this.measures.push(measure);
}

Expand Down
24 changes: 13 additions & 11 deletions fetcher/lib/meta.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
const { VERBOSE } = require('./utils');
const AWS = require('aws-sdk');
const s3 = new AWS.S3({
maxRetries: 10
});
const { S3Client, GetObjectCommand, PutObjectCommand } = require("@aws-sdk/client-s3");

const {
getObject,
putObject,
} = require('./utils');

/**
* Helper to store metadata about a source in S3.
Expand All @@ -20,8 +22,8 @@ class MetaDetails {

async load() {
try {
const resp = await s3.getObject(this.props).promise();
return JSON.parse(resp.Body.toString('utf-8'));
const body = await getObject(this.props.Bucket, this.props.Key);
return JSON.parse(body);
} catch (err) {
if (err.statusCode !== 404)
throw err;
Expand All @@ -32,11 +34,11 @@ class MetaDetails {
}

save(body) {
return s3.putObject({
...this.props,
Body: JSON.stringify(body),
ContentType: 'application/json'
}).promise();
return putObject(
JSON.stringify(body),
this.props.Bucket,
this.props.Key,
);
}
}
exports.MetaDetails = MetaDetails;
59 changes: 45 additions & 14 deletions fetcher/lib/providers.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
const fs = require('fs');
const path = require('path');
const AWS = require('aws-sdk');
//const AWS = require('aws-sdk');
const { SNSClient, PublishCommand } = require("@aws-sdk/client-sns");

const {
VERBOSE,
DRYRUN,
gzip,
unzip,
fetchSecret,
getObject,
putObject,
prettyPrintStation
} = require('./utils');

const s3 = new AWS.S3({
maxRetries: 10
});

const sns = new SNSClient();

/**
* Runtime handler for each of the custom provider scripts, as well
Expand All @@ -31,12 +35,40 @@ class Providers {
* @param {String} source_name

Check warning on line 35 in fetcher/lib/providers.js

View workflow job for this annotation

GitHub Actions / build

Expected JSDoc for 'source' but found 'source_name'
* @param {Object} source
*/
async processor(source_name, source) {
async processor(source) {
if(VERBOSE) console.debug('Processing', source.provider);
if (!this[source.provider]) throw new Error(`${source.provider} is not a supported provider`);

await this[source.provider].processor(source_name, source);
// fetch any secrets we may be storing for the provider
if(VERBOSE) console.debug('Fetching secret: ', source.secretKey);
const config = await fetchSecret(source);
// and combine them with the source config for more generic access
if(VERBOSE) console.log('Starting processor', { ...source, ...config });
const log = await this[source.provider].processor({ ...source, ...config });
// source_name is more consistent with our db schema
log.source_name = source.provider;
return(log);
}

/**
* Publish the results of the fetch to our SNS topic
*
* @param {Object} message
* @param {String} subject
*/
async publish(message, subject) {
if(process.env.TOPIC_ARN) {
const cmd = new PublishCommand({
TopicArn: process.env.TOPIC_ARN,
Subject: subject,
Message: JSON.stringify(message),
});
return await sns.send(cmd);
} else {
console.log('No publish topic', subject, message);
return {};
}
}

/**
* Push an array of stations to S3
*
Expand Down Expand Up @@ -67,8 +99,7 @@ class Providers {

// Diff data to minimize costly S3 PUT operations
try {
const resp = await s3.getObject({ Bucket, Key }).promise();
const currentData = (await unzip(resp.Body)).toString('utf-8');
const currentData = await getObject(Bucket, Key);
if (currentData === newData && !process.env.FORCE) {
if (VERBOSE) console.log(`station has not changed - station: ${providerStation}`);
return;
Expand All @@ -78,9 +109,9 @@ class Providers {
prettyPrintStation(newData);
console.log('-----------------> from');
prettyPrintStation(currentData);
} else {
console.log(`Updating the station file: ${providerStation}`);
}
} //else {
// console.log(`Updating the station file: ${providerStation}`);
// }
} catch (err) {
if (err.statusCode !== 404) throw err;
}
Expand All @@ -89,8 +120,7 @@ class Providers {

if (!DRYRUN) {
if (VERBOSE) console.debug(`Saving station to ${Bucket}/${Key}`);

await s3.putObject({
await putObject({
Bucket,
Key,
Body: compressedString,
Expand All @@ -117,6 +147,7 @@ class Providers {
const Key = `${process.env.STACK}/measures/${provider}/${filename}.csv.gz`;
const compressedString = await gzip(measures.csv());


if (DRYRUN) {
console.log(`Would have saved ${measures.length} measurements to '${Bucket}/${Key}'`);
return new Promise((y) => y(true));
Expand Down
81 changes: 69 additions & 12 deletions fetcher/lib/utils.js
Original file line number Diff line number Diff line change
@@ -1,37 +1,88 @@
const zlib = require('zlib');
const { promisify } = require('util');
const request = promisify(require('request'));
const AWS = require('aws-sdk');

const { SecretsManagerClient, GetSecretValueCommand } = require("@aws-sdk/client-secrets-manager");
const { S3Client, GetObjectCommand, PutObjectCommand } = require("@aws-sdk/client-s3");

const VERBOSE = !!process.env.VERBOSE;
const DRYRUN = !!process.env.DRYRUN;

const s3 = new S3Client({
maxRetries: 10
});



async function getObject(Bucket, Key) {
const cmd = new GetObjectCommand({
Bucket,
Key,
});
var stream = null;
const resp = await s3.send(cmd);
let currentData = null;
if(resp && resp.ContentEncoding == 'gzip') {
const ba = await resp.Body.transformToByteArray();
currentData = (await unzip(Buffer.from(ba))).toString('utf-8');
} else if(resp && resp.Body) {
currentData = await resp.Body.transformToString();
}
return currentData;
}

async function putObject(text, Bucket, Key, gzip=true, ContentType='application/json') {
let ContentEncoding = null;
if(gzip) {
text = compressedString = await gzip(text);
ContentEncoding = 'gzip';
}
const cmd = new PutObjectCommand({
Bucket,
Key,
Body: text,
ContentType,
ContentEncoding,
});
return await s3.send(cmd);
}

/**
* Retrieve secret from AWS Secrets Manager
* @param {string} source_name The source for which we are fetching a secret.

Check warning on line 52 in fetcher/lib/utils.js

View workflow job for this annotation

GitHub Actions / build

Expected JSDoc for 'source' but found 'source_name'
*
* @returns {object}
*/
async function fetchSecret(source_name) {
const secretsManager = new AWS.SecretsManager({
region: process.env.AWS_DEFAULT_REGION || 'us-east-1'
async function fetchSecret(source) {
const key = source.secretKey;
if(!key) {
return {};
}
const secretsManager = new SecretsManagerClient({
region: process.env.AWS_DEFAULT_REGION || 'us-east-1',
maxAttemps: 1,

});

if (!process.env.STACK) throw new Error('STACK Env Var Required');

const SecretId = `${
process.env.SECRET_STACK || process.env.STACK
}/${source_name}`;
}/${key}`;

if (VERBOSE) console.debug(`Fetching ${SecretId}...`);
if (VERBOSE) console.debug(`Fetching ${SecretId} secret...`);

const { SecretString } = await secretsManager
.getSecretValue({
const cmd = new GetSecretValueCommand({
SecretId
})
.promise();

return JSON.parse(SecretString);
});
const resp = await secretsManager
.send(cmd)
.catch(err => console.error(`Missing ${key} secret`));
if(resp && resp.SecretString) {
return JSON.parse(resp.SecretString);
} else {
return {};
}
}

/**
Expand Down Expand Up @@ -119,6 +170,10 @@ function checkResponseData(data, start_timestamp, end_timestamp) {
return fdata;
}

function publisher(subject, message, topic) {

}

const gzip = promisify(zlib.gzip);
const unzip = promisify(zlib.unzip);

Expand All @@ -130,6 +185,8 @@ module.exports = {
unzip,
VERBOSE,
DRYRUN,
getObject,
putObject,
prettyPrintStation,
checkResponseData
};
Loading

0 comments on commit 0005e31

Please sign in to comment.