Skip to content

Commit

Permalink
Fixed syntax errors
Browse files Browse the repository at this point in the history
  • Loading branch information
caparker committed Mar 6, 2024
1 parent c01a7e9 commit 0382372
Show file tree
Hide file tree
Showing 12 changed files with 476 additions and 150 deletions.
4 changes: 2 additions & 2 deletions fetcher/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ async function handler(event) {
if (!source) throw new Error(`Unable to find ${source_name} in sources.`);

const log = await providers.processor(source);
await providers.publish(log, 'fetcher/success');
await providers.publish(log, 'fetcher/success');
return log;
} catch (err) {
providers.publish(err, 'fetcher/error');
providers.publish(err, 'fetcher/error');
process.exit(1);
}
}
Expand Down
16 changes: 8 additions & 8 deletions fetcher/lib/measure.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ class Measures {
constructor(type) {
this.headers = [];
this.measures = [];
this.from = null;
this.to = null;
this.from = null;
this.to = null;

if (type === FixedMeasure) {
this.headers = ['sensor_id', 'measure', 'timestamp'];
Expand All @@ -18,12 +18,12 @@ class Measures {
}

push(measure) {
if(!this.to || measure.timestamp > this.to) {
this.to = measure.timestamp;
}
if(!this.from || measure.timestamp < this.from) {
this.from = measure.timestamp;
}
if (!this.to || measure.timestamp > this.to) {
this.to = measure.timestamp;
}
if (!this.from || measure.timestamp < this.from) {
this.from = measure.timestamp;
}
this.measures.push(measure);
}

Expand Down
11 changes: 5 additions & 6 deletions fetcher/lib/meta.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
const { VERBOSE } = require('./utils');
const { S3Client, GetObjectCommand, PutObjectCommand } = require("@aws-sdk/client-s3");

const {
getObject,
putObject,
getObject,
putObject
} = require('./utils');

/**
Expand Down Expand Up @@ -35,9 +34,9 @@ class MetaDetails {

save(body) {
return putObject(
JSON.stringify(body),
this.props.Bucket,
this.props.Key,
JSON.stringify(body),
this.props.Bucket,
this.props.Key,
);
}
}
Expand Down
67 changes: 31 additions & 36 deletions fetcher/lib/providers.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
const fs = require('fs');
const path = require('path');
//const AWS = require('aws-sdk');
const { SNSClient, PublishCommand } = require("@aws-sdk/client-sns");
const { SNSClient, PublishCommand } = require('@aws-sdk/client-sns');

const {
VERBOSE,
DRYRUN,
gzip,
unzip,
fetchSecret,
getObject,
putObject,
fetchSecret,
getObject,
putObject,
prettyPrintStation
} = require('./utils');

Expand All @@ -32,23 +30,22 @@ class Providers {
/**
* Given a source config file, choose the corresponding provider script to run
*
* @param {String} source_name
* @param {Object} source
*/
async processor(source) {
if(VERBOSE) console.debug('Processing', source.provider);
if (VERBOSE) console.debug('Processing', source.provider);
if (!this[source.provider]) throw new Error(`${source.provider} is not a supported provider`);
// fetch any secrets we may be storing for the provider
if(VERBOSE) console.debug('Fetching secret: ', source.secretKey);
const config = await fetchSecret(source);
// and combine them with the source config for more generic access
if(VERBOSE) console.log('Starting processor', { ...source, ...config });
// fetch any secrets we may be storing for the provider
if (VERBOSE) console.debug('Fetching secret: ', source.secretKey);
const config = await fetchSecret(source);
// and combine them with the source config for more generic access
if (VERBOSE) console.log('Starting processor', { ...source, ...config });
const log = await this[source.provider].processor({ ...source, ...config });
// source_name is more consistent with our db schema
if(typeof(log) == 'object' && !Array.isArray(log) && !log.source_name) {
log.source_name = source.provider;
}
return(log);
// source_name is more consistent with our db schema
if (typeof(log) == 'object' && !Array.isArray(log) && !log.source_name) {
log.source_name = source.provider;
}
return (log);
}

/**
Expand All @@ -57,20 +54,20 @@ class Providers {
* @param {Object} message
* @param {String} subject
*/
async publish(message, subject) {
console.log('Publishing:', subject, message);
if(process.env.TOPIC_ARN) {
const cmd = new PublishCommand({
TopicArn: process.env.TOPIC_ARN,
Subject: subject,
Message: JSON.stringify(message),
});
return await sns.send(cmd);
} else {
console.log('No publish topic', subject, message);
return {};
}
}
async publish(message, subject) {
console.log('Publishing:', subject, message);
if (process.env.TOPIC_ARN) {
const cmd = new PublishCommand({
TopicArn: process.env.TOPIC_ARN,
Subject: subject,
Message: JSON.stringify(message)
});
return await sns.send(cmd);
} else {
console.log('No publish topic', subject, message);
return {};
}
}

/**
* Push an array of stations to S3
Expand Down Expand Up @@ -112,9 +109,7 @@ class Providers {
prettyPrintStation(newData);
console.log('-----------------> from');
prettyPrintStation(currentData);
} //else {
// console.log(`Updating the station file: ${providerStation}`);
// }
}
} catch (err) {
if (err.statusCode !== 404) throw err;
}
Expand Down Expand Up @@ -157,7 +152,7 @@ class Providers {
}
if (VERBOSE) console.debug(`Saving measurements to ${Bucket}/${Key}`);

return s3.putObject({
return putObject({
Bucket,
Key,
Body: compressedString,
Expand Down
104 changes: 50 additions & 54 deletions fetcher/lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ const zlib = require('zlib');
const { promisify } = require('util');
const request = promisify(require('request'));

const { SecretsManagerClient, GetSecretValueCommand } = require("@aws-sdk/client-secrets-manager");
const { S3Client, GetObjectCommand, PutObjectCommand } = require("@aws-sdk/client-s3");
const { SecretsManagerClient, GetSecretValueCommand } = require('@aws-sdk/client-secrets-manager');
const { S3Client, GetObjectCommand, PutObjectCommand } = require('@aws-sdk/client-s3');

const VERBOSE = !!process.env.VERBOSE;
const DRYRUN = !!process.env.DRYRUN;
Expand All @@ -12,56 +12,55 @@ const s3 = new S3Client({
maxRetries: 10
});


const gzip = promisify(zlib.gzip);
const unzip = promisify(zlib.unzip);

async function getObject(Bucket, Key) {
const cmd = new GetObjectCommand({
Bucket,
Key,
});
var stream = null;
const resp = await s3.send(cmd);
let currentData = null;
if(resp && resp.ContentEncoding == 'gzip') {
const ba = await resp.Body.transformToByteArray();
currentData = (await unzip(Buffer.from(ba))).toString('utf-8');
} else if(resp && resp.Body) {
currentData = await resp.Body.transformToString();
}
return currentData;
const cmd = new GetObjectCommand({
Bucket,
Key
});
const resp = await s3.send(cmd);
let currentData = null;
if (resp && resp.ContentEncoding === 'gzip') {
const ba = await resp.Body.transformToByteArray();
currentData = (await unzip(Buffer.from(ba))).toString('utf-8');
} else if (resp && resp.Body) {
currentData = await resp.Body.transformToString();
}
return currentData;
}

async function putObject(text, Bucket, Key, gzip=true, ContentType='application/json') {
let ContentEncoding = null;
if(gzip) {
text = compressedString = await gzip(text);
ContentEncoding = 'gzip';
}
const cmd = new PutObjectCommand({
Bucket,
Key,
async function putObject(text, Bucket, Key, gzip = true, ContentType = 'application/json') {
let ContentEncoding = null;
if (gzip) {
text = await gzip(text);
ContentEncoding = 'gzip';
}
const cmd = new PutObjectCommand({
Bucket,
Key,
Body: text,
ContentType,
ContentEncoding,
});
return await s3.send(cmd);
ContentEncoding
});
return await s3.send(cmd);
}

/**
* Retrieve secret from AWS Secrets Manager
* @param {string} source_name The source for which we are fetching a secret.
* @param {string} source The source object for which we are fetching a secret.
*
* @returns {object}
*/
async function fetchSecret(source) {
const key = source.secretKey;
if(!key) {
return {};
}
const key = source.secretKey;
if (!key) {
return {};
}
const secretsManager = new SecretsManagerClient({
region: process.env.AWS_DEFAULT_REGION || 'us-east-1',
maxAttemps: 1,

maxAttemps: 1
});

if (!process.env.STACK) throw new Error('STACK Env Var Required');
Expand All @@ -72,17 +71,18 @@ async function fetchSecret(source) {

if (VERBOSE) console.debug(`Fetching ${SecretId} secret...`);

const cmd = new GetSecretValueCommand({
SecretId
});
const cmd = new GetSecretValueCommand({
SecretId
});

const resp = await secretsManager
.send(cmd)
.catch(err => console.error(`Missing ${key} secret`));
if(resp && resp.SecretString) {
return JSON.parse(resp.SecretString);
} else {
return {};
}
.send(cmd)
.catch((err) => console.error(`Missing ${key} secret: ${err}`));
if (resp && resp.SecretString) {
return JSON.parse(resp.SecretString);
} else {
return {};
}
}

/**
Expand Down Expand Up @@ -138,6 +138,8 @@ function prettyPrintStation(station) {
* @param {array} data
* @param {timestamp} start_timestamp
* @param {timestamp} end_timestamp
*
* @returns {array}
*/
function checkResponseData(data, start_timestamp, end_timestamp) {
const n = data && data.length;
Expand Down Expand Up @@ -170,12 +172,6 @@ function checkResponseData(data, start_timestamp, end_timestamp) {
return fdata;
}

function publisher(subject, message, topic) {

}

const gzip = promisify(zlib.gzip);
const unzip = promisify(zlib.unzip);

module.exports = {
fetchSecret,
Expand All @@ -185,8 +181,8 @@ module.exports = {
unzip,
VERBOSE,
DRYRUN,
getObject,
putObject,
getObject,
putObject,
prettyPrintStation,
checkResponseData
};
Loading

0 comments on commit 0382372

Please sign in to comment.