From 791aae1a832543db809d15928af3ad6e8143d7c3 Mon Sep 17 00:00:00 2001 From: Christian Parker Date: Wed, 27 Mar 2024 12:33:25 -0700 Subject: [PATCH] Updated airgradient adapter --- .env | 3 +-- cdk/app.ts | 3 ++- cdk/stack.ts | 15 ++++++++++++++- fetcher/lib/providers.js | 23 +++++++++-------------- fetcher/lib/utils.js | 13 ++++++++++--- fetcher/providers/airgradient.js | 25 ++++++++++++++++++------- fetcher/providers/habitatmap.js | 15 +++++++++------ fetcher/providers/purpleair.js | 2 +- fetcher/sources/airgradient.json | 3 ++- fetcher/sources/clarity.json | 15 ++++++++------- fetcher/sources/cmu.json | 13 +++++++------ fetcher/sources/habitatmap.json | 13 +++++++------ fetcher/sources/index.js | 3 ++- fetcher/sources/purpleair.json | 15 ++++++++------- fetcher/sources/senstate.json | 2 +- package.json | 1 + test/source.test.js | 6 +++--- 17 files changed, 103 insertions(+), 67 deletions(-) diff --git a/.env b/.env index b329103..1210a84 100644 --- a/.env +++ b/.env @@ -1,5 +1,4 @@ LCS_API=https://api.openaq.org STACK=lcs-etl-pipeline -SECRET_STACK=lcs-etl-pipeline BUCKET=openaq-fetches -VERBOSE=1 +TOPIC_ARN=arn:aws:sns:us-east-1:470049585876:NewFetchResults diff --git a/cdk/app.ts b/cdk/app.ts index f3945a4..469cb98 100644 --- a/cdk/app.ts +++ b/cdk/app.ts @@ -11,7 +11,8 @@ const stack = new EtlPipeline(app, "lcs-etl-pipeline", { schedulerModuleDir: "scheduler", sources: require('../fetcher/sources'), bucketName: process.env.BUCKET || 'openaq-fetches', - lcsApi: process.env.LCS_API || 'https://api.openaq.org' + lcsApi: process.env.LCS_API || 'https://api.openaq.org', + topicArn: process.env.TOPIC_ARN }); diff --git a/cdk/stack.ts b/cdk/stack.ts index cfe6926..f5880d8 100644 --- a/cdk/stack.ts +++ b/cdk/stack.ts @@ -20,6 +20,7 @@ export class EtlPipeline extends cdk.Stack { sources, lcsApi, bucketName, + topicArn, ...props }: StackProps ) { @@ -36,6 +37,7 @@ export class EtlPipeline extends cdk.Stack { queue, bucket, lcsApi, + topicArn, }); this.buildSchedulerLambdas({ moduleDir: schedulerModuleDir, @@ -49,6 +51,7 @@ export class EtlPipeline extends cdk.Stack { queue: sqs.Queue; bucket: s3.IBucket; lcsApi: string; + topicArn: string; }): lambda.Function { this.prepareNodeModules(props.moduleDir); const handler = new lambda.Function(this, 'Fetcher', { @@ -61,8 +64,8 @@ export class EtlPipeline extends cdk.Stack { environment: { BUCKET: props.bucket.bucketName, STACK: cdk.Stack.of(this).stackName, - VERBOSE: '1', LCS_API: props.lcsApi, + TOPIC_ARN: props.topicArn, }, }); handler.addEventSource( @@ -72,6 +75,14 @@ export class EtlPipeline extends cdk.Stack { ); props.queue.grantConsumeMessages(handler); props.bucket.grantReadWrite(handler); + handler.addToRolePolicy( + new iam.PolicyStatement({ + effect: iam.Effect.ALLOW, + actions: ['sns:Publish'], + resources: [props.topicArn], + }) + ); + handler.addToRolePolicy( new iam.PolicyStatement({ effect: iam.Effect.ALLOW, @@ -86,6 +97,7 @@ export class EtlPipeline extends cdk.Stack { ], }) ); + return handler; } @@ -147,6 +159,7 @@ interface StackProps extends cdk.StackProps { fetcherModuleDir: string; schedulerModuleDir: string; lcsApi: string; + topicArn: string; bucketName: string; sources: Source[]; } diff --git a/fetcher/lib/providers.js b/fetcher/lib/providers.js index 8984352..ca330d4 100644 --- a/fetcher/lib/providers.js +++ b/fetcher/lib/providers.js @@ -56,7 +56,7 @@ class Providers { */ async publish(message, subject) { console.log('Publishing:', subject, message); - if (process.env.TOPIC_ARN) { + if (process.env.TOPIC_ARN && message) { const cmd = new PublishCommand({ TopicArn: process.env.TOPIC_ARN, Subject: subject, @@ -111,20 +111,21 @@ class Providers { prettyPrintStation(currentData); } } catch (err) { - if (err.statusCode !== 404) throw err; + if (err.Code !== 'NoSuchKey') throw err; } const compressedString = await gzip(newData); if (!DRYRUN) { if (VERBOSE) console.debug(`Saving station to ${Bucket}/${Key}`); - await putObject({ + await putObject( + compressedString, Bucket, Key, - Body: compressedString, - ContentType: 'application/json', - ContentEncoding: 'gzip' - }).promise(); + false, + 'application/json', + 'gzip' + ); } if (VERBOSE) console.log(`finished station: ${providerStation}\n------------------------`); } @@ -152,13 +153,7 @@ class Providers { } if (VERBOSE) console.debug(`Saving measurements to ${Bucket}/${Key}`); - return putObject({ - Bucket, - Key, - Body: compressedString, - ContentType: 'text/csv', - ContentEncoding: 'gzip' - }).promise(); + return await putObject(compressedString, Bucket, Key, false, 'text/csv', 'gzip'); } } diff --git a/fetcher/lib/utils.js b/fetcher/lib/utils.js index 169aa9c..c0c8c26 100644 --- a/fetcher/lib/utils.js +++ b/fetcher/lib/utils.js @@ -4,12 +4,20 @@ const request = promisify(require('request')); const { SecretsManagerClient, GetSecretValueCommand } = require('@aws-sdk/client-secrets-manager'); const { S3Client, GetObjectCommand, PutObjectCommand } = require('@aws-sdk/client-s3'); +const { NodeHttpHandler } = require('@smithy/node-http-handler'); +const https = require('https'); const VERBOSE = !!process.env.VERBOSE; const DRYRUN = !!process.env.DRYRUN; const s3 = new S3Client({ - maxRetries: 10 + maxRetries: 10, + // requestHandler: new NodeHttpHandler({ + // httpsAgent: new https.Agent({ + // maxSockets: 1000 + // }), + // socketAcquisitionWarningTimeout: 6000, + // }) }); const gzip = promisify(zlib.gzip); @@ -31,8 +39,7 @@ async function getObject(Bucket, Key) { return currentData; } -async function putObject(text, Bucket, Key, gzip = true, ContentType = 'application/json') { - let ContentEncoding = null; +async function putObject(text, Bucket, Key, gzip = true, ContentType = 'application/json', ContentEncoding = null) { if (gzip) { text = await gzip(text); ContentEncoding = 'gzip'; diff --git a/fetcher/providers/airgradient.js b/fetcher/providers/airgradient.js index 4b1c8e5..c2b249d 100644 --- a/fetcher/providers/airgradient.js +++ b/fetcher/providers/airgradient.js @@ -15,7 +15,7 @@ const lookup = { 'pm01': ['pm1', 'µg/m³'], 'pm02': ['pm25', 'µg/m³'], 'pm10': ['pm10', 'µg/m³'], - 'pm003count': ['um003', 'particles/cm³'], + 'pm003Count': ['um003', 'particles/cm³'], 'rhum': ['relativehumidity', '%'], 'atmp': ['temperature', 'c'] }; @@ -47,15 +47,26 @@ function getLatestReading(sensorData) { // and the data we are looking for is not always ready when we first check // so we are going back 3 hrs in order to cover missing data // if we still see gaps we can increase the lag time - const d = new Date(); - d.setHours(d.getHours() - 3); - d.setMinutes(0); - d.setSeconds(0); - d.setMilliseconds(0); + const offset = 3; + const from = new Date(); + const to = new Date(); + from.setHours(from.getHours() - offset); + from.setMinutes(0); + from.setSeconds(0); + from.setMilliseconds(0); + + // the current hour is always wrong because its a rolling average + to.setHours(to.getHours() - 1); + to.setMinutes(0); + to.setSeconds(0); + to.setMilliseconds(0); const params = Object.keys(lookup); const measurements = sensorData - .filter((o) => new Date(o.date).getTime() >= d.getTime()) + .filter((o) => { + const now = new Date(o.date).getTime(); + return now >= from.getTime() && now <= to.getTime(); + }) .map((o) => { const timestamp = new Date(o.date); // convert to hour ending to match our system diff --git a/fetcher/providers/habitatmap.js b/fetcher/providers/habitatmap.js index 9b7df63..3ca689f 100644 --- a/fetcher/providers/habitatmap.js +++ b/fetcher/providers/habitatmap.js @@ -14,8 +14,9 @@ const lookup = { async function processor(source) { const measurands = await Measurand.getSupportedMeasurands(lookup); - await process_fixed_locations(source, measurands); - await process_mobile_locations(source, measurands); + const fixed = await process_fixed_locations(source, measurands); + const mobile = await process_mobile_locations(source, measurands); + return fixed; } async function process_fixed_locations(source, measurands) { @@ -64,10 +65,11 @@ async function process_fixed_locations(source, measurands) { } await Promise.all(stations); - console.log(`ok - all ${stations.length} fixed stations pushed`); + //console.log(`ok - all ${stations.length} fixed stations pushed`); await Providers.put_measures(source.provider, measures); - console.log(`ok - all ${measures.length} fixed measures pushed`); + //console.log(`ok - all ${measures.length} fixed measures pushed`); + return { locations: stations.length, measures: measures.length, from: measures.from, to: measures.to }; } async function process_mobile_locations(source, measurands) { @@ -119,10 +121,11 @@ async function process_mobile_locations(source, measurands) { } await Promise.all(stations); - console.log(`ok - all ${stations.length} mobile stations pushed`); + //console.log(`ok - all ${stations.length} mobile stations pushed`); await Providers.put_measures(source.provider, measures); - console.log(`ok - all ${measures.length} mobile measures pushed`); + //console.log(`ok - all ${measures.length} mobile measures pushed`); + return { locations: stations.length, measures: measures.length, from: measures.from, to: measures.to }; } async function fixed_locations(source) { diff --git a/fetcher/providers/purpleair.js b/fetcher/providers/purpleair.js index 3d1b7dd..11e321d 100644 --- a/fetcher/providers/purpleair.js +++ b/fetcher/providers/purpleair.js @@ -119,7 +119,7 @@ async function fetchSensorData(source) { // if we are looking for a specific sourceid lets not limit if (!process.env.SOURCEID) { // Filter results to only include sensors modified or updated within the last number of seconds. - url.searchParams.append('max_age', 75); + url.searchParams.append('max_age', 100); // Filter results to only include outdoor sensors. url.searchParams.append('location_type', 0); } diff --git a/fetcher/sources/airgradient.json b/fetcher/sources/airgradient.json index 4d275ba..ab69a18 100644 --- a/fetcher/sources/airgradient.json +++ b/fetcher/sources/airgradient.json @@ -2,7 +2,8 @@ "schema": "v1", "provider": "airgradient", "frequency": "hour", - "secretKey": "airgradient", + "secretKey": "airgradient", + "active": true, "meta": { "url": "https://api.airgradient.com" } diff --git a/fetcher/sources/clarity.json b/fetcher/sources/clarity.json index 3fedbef..584ad1a 100644 --- a/fetcher/sources/clarity.json +++ b/fetcher/sources/clarity.json @@ -1,9 +1,10 @@ { - "schema": "v1", - "provider": "clarity", - "frequency": "hour", - "secretKey": "clarity-keys", - "meta": { - "url": "https://clarity-data-api.clarity.io" - } + "schema": "v1", + "provider": "clarity", + "frequency": "hour", + "secretKey": "clarity-keys", + "active": true, + "meta": { + "url": "https://clarity-data-api.clarity.io" + } } diff --git a/fetcher/sources/cmu.json b/fetcher/sources/cmu.json index b2b973b..b8d18a6 100644 --- a/fetcher/sources/cmu.json +++ b/fetcher/sources/cmu.json @@ -1,8 +1,9 @@ { - "schema": "v1", - "provider": "cmu", - "frequency": "hour", - "meta": { - "folderId": "1Mp_a-OyGGlk5tGkezYK41iZ2qybnrPzp" - } + "schema": "v1", + "provider": "cmu", + "frequency": "hour", + "active": false, + "meta": { + "folderId": "1Mp_a-OyGGlk5tGkezYK41iZ2qybnrPzp" + } } diff --git a/fetcher/sources/habitatmap.json b/fetcher/sources/habitatmap.json index b379b76..85da341 100644 --- a/fetcher/sources/habitatmap.json +++ b/fetcher/sources/habitatmap.json @@ -1,8 +1,9 @@ { - "schema": "v1", - "provider": "habitatmap", - "frequency": "minute", - "meta": { - "url": "http://aircasting.habitatmap.org" - } + "schema": "v1", + "provider": "habitatmap", + "frequency": "minute", + "active": true, + "meta": { + "url": "http://aircasting.habitatmap.org" + } } diff --git a/fetcher/sources/index.js b/fetcher/sources/index.js index 0d914c2..9d57127 100644 --- a/fetcher/sources/index.js +++ b/fetcher/sources/index.js @@ -16,4 +16,5 @@ const fs = require('fs'); /** @type {Source[]} */ module.exports = fs.readdirSync(__dirname) .filter((f) => f.endsWith('.json')) - .map((f) => require(`./${f}`)); + .map((f) => require(`./${f}`)) + .filter((f) => f.active); diff --git a/fetcher/sources/purpleair.json b/fetcher/sources/purpleair.json index 8106922..dda6c2a 100644 --- a/fetcher/sources/purpleair.json +++ b/fetcher/sources/purpleair.json @@ -1,9 +1,10 @@ { - "schema": "v1", - "provider": "purpleair", - "frequency": "minute", - "secretKey": "purpleair", - "meta": { - "url": "https://api.purpleair.com/" - } + "schema": "v1", + "provider": "purpleair", + "frequency": "minute", + "secretKey": "purpleair", + "active": false, + "meta": { + "url": "https://api.purpleair.com/" + } } diff --git a/fetcher/sources/senstate.json b/fetcher/sources/senstate.json index 68a710a..7670e91 100644 --- a/fetcher/sources/senstate.json +++ b/fetcher/sources/senstate.json @@ -2,8 +2,8 @@ "schema": "v1", "provider": "senstate", "frequency": "minute", + "active": true, "meta": { "url": "https://open-data.senstate.cloud" } } - \ No newline at end of file diff --git a/package.json b/package.json index 6994be1..f5b552b 100644 --- a/package.json +++ b/package.json @@ -36,6 +36,7 @@ "@aws-sdk/client-secrets-manager": "^3.523.0", "@aws-sdk/client-sns": "^3.521.0", "@aws-sdk/client-sqs": "^3.525.0", + "@smithy/node-http-handler": "^2.4.1", "csv-parser": "^3.0.0", "csv-writer": "^1.6.0", "dayjs": "^1.11.10", diff --git a/test/source.test.js b/test/source.test.js index edfc9b7..2c0cd01 100644 --- a/test/source.test.js +++ b/test/source.test.js @@ -8,8 +8,8 @@ const ajv = new Ajv({ }); ajv.addMetaSchema( - require('ajv/lib/refs/json-schema-draft-04.json'), - 'http://json-schema.org/draft-04/schema#' + require('ajv/lib/refs/json-schema-draft-07.json'), + 'http://json-schema.org/draft-07/schema#' ); const validate = ajv.compile(schema); @@ -21,7 +21,7 @@ tape('validate', (t) => { // find all the sources, has to be synchronous for tape sources.forEach((source) => { - tape(`tests for ${source}`, (t) => { + tape(`tests for ${source}`, (t) => { try { const valid = validate(source);