From 61ed5e4abb92a056b01951a04bf8b59c1f590c99 Mon Sep 17 00:00:00 2001 From: Christian Parker Date: Thu, 22 Feb 2024 05:20:48 -0800 Subject: [PATCH] CAC specific updates --- fetcher/index.js | 3 +- fetcher/lib/measurand.js | 144 +++++++++++-------- fetcher/lib/utils.js | 35 +++-- fetcher/providers/versioning.js | 246 ++++++++++++++++---------------- fetcher/sources/cac.json | 37 ++--- 5 files changed, 255 insertions(+), 210 deletions(-) diff --git a/fetcher/index.js b/fetcher/index.js index bddab87..9b2e812 100644 --- a/fetcher/index.js +++ b/fetcher/index.js @@ -5,7 +5,7 @@ const providers = new (require('./lib/providers'))(); const sources = require('./sources'); if (require.main === module) { - handler(); + handler(); } async function handler(event) { @@ -31,6 +31,7 @@ async function handler(event) { console.log( `Processing ${process.env.STACK}: ${source.type}/${source.provider}/${source.name}` ); + await providers.processor(source); return {}; diff --git a/fetcher/lib/measurand.js b/fetcher/lib/measurand.js index 81ce610..48cb0f4 100644 --- a/fetcher/lib/measurand.js +++ b/fetcher/lib/measurand.js @@ -1,13 +1,14 @@ const { request, VERBOSE } = require('./utils'); class Measurand { - constructor({ input_param, parameter, unit }) { + constructor({ input_param, parameter, unit, provider_unit }) { // How a measurand is described by external source (e.g. "CO") this.input_param = input_param; // How a measurand is described internally (e.g. "co") this.parameter = parameter; // Unit for measurand (e.g "ppb") this.unit = unit; + this.provider_unit = provider_unit } /** @@ -17,21 +18,33 @@ class Measurand { * @returns { Object } normalizer */ get _normalizer() { - return ( - { - ppb: ['ppm', (val) => val / 1000], - 'ng/m³': ['µg/m³', (val) => val / 1000], - pp100ml: ['particles/cm³', (val) => val / 100] - }[this.unit] || [this.unit, (val) => val] - ); + // provider_units: { unit: conversion function } + return ({ + ppb: { + ppm: (val) => val / 1000 + }, + ppm: { + ppb: (val) => val * 1000 + }, + f: { + c: (val) => (val - 32) * 5/9 + }, + 'ng/m3': { + 'ug/m3': (val) => val / 1000 + }, + pp100ml: { + 'particles/cm³': (val) => val / 100 + }, + }[this.provider_unit][this.unit]) ?? ((val) => val); + ; } get normalized_unit() { - return this._normalizer[0]; + return this.unit; } get normalize_value() { - return this._normalizer[1]; + return this._normalizer } /** @@ -39,62 +52,71 @@ class Measurand { * identifies a measurand) to a tuple of a measurand parameter (i.e. how we * identify a measurand internally) and a measurand unit, generate an array * Measurand objects that are supported by the OpenAQ API. - * + * form -> { input_parameter : [ measurand_parameter, input_units ] } + * * @param {*} lookups, e.g. {'CO': ['co', 'ppb'] } * @returns { Measurand[] } */ static async getSupportedMeasurands(lookups) { - // Fetch from API - const supportedMeasurandParameters = []; - const baseurl = new URL('/v2/parameters', process.env.API_URL || 'https://api.openaq.org'); - if (VERBOSE) console.debug(`Getting Supported Measurands - ${baseurl}`); - let morePages; - let page = 1; - do { - const url = new URL( - '/v2/parameters', - baseurl, - ); - url.searchParams.append('page', page++); - const { - body: { meta, results } - } = await request({ - json: true, - method: 'GET', - url - }); - - if(!results) throw new Error(`Could not connect to ${baseurl}`); - - for (const { name } of results) { - supportedMeasurandParameters.push(name); - } - morePages = meta.found > meta.page * meta.limit; - } while (morePages); - if (VERBOSE) - console.debug( - `Fetched ${supportedMeasurandParameters.length} supported measurement parameters from ${baseurl}.` - ); + // we are supporting everything in the fetcher + const supportedMeasurandParameters = { + ambient_temp: 'c', + bc: 'ug/m3', + bc_375: 'ug/m3', + bc_470: 'ug/m3', + bc_528: 'ug/m3', + bc_625: 'ug/m3', + bc_880: 'ug/m3', + ch4: 'ppm', + cl: 'ppb', + co2: 'ppm', + co: 'ppm', + ec: 'ppb', + humidity: '%', + no2: 'ppm', + no3: 'ppb', + no: 'ppm', + nox: 'ppm', + o3: 'ppm', + oc: 'ppb', + ozone: 'ppb', + pm100: 'ug/m3', + pm10: 'ug/m3', + pm1: 'ug/m3', + pm25: 'ug/m3', + pm4: 'ug/m3', + pm: 'ug/m3', + pressure: 'hpa', + relativehumidity: '%', + so2: 'ppm', + so4: 'ppb', + temperature: 'c', + ufp: 'particles/cm3', + um003: 'particles/cm3', + um005: 'particles/cm3', + um010: 'particles/cm3', + um025: 'particles/cm3', + um050: 'particles/cm3', + um100: 'particles/cm3', + v: 'ppb', + voc: 'iaq', + wind_direction: 'deg', + wind_speed: 'm/s', + }; - // Filter provided lookups - const supportedLookups = Object.entries(lookups).filter( - // eslint-disable-next-line no-unused-vars - ([input_param, [measurand_parameter, measurand_unit]]) => - supportedMeasurandParameters.includes(measurand_parameter) - ); - - if (!supportedLookups.length) throw new Error('No measurands supported.'); - if (VERBOSE) { - Object.values(lookups) - .map(([measurand_parameter]) => measurand_parameter) - .filter((measurand_parameter) => !supportedMeasurandParameters.includes(measurand_parameter)) - .map((measurand_parameter) => console.warn(`ignoring unsupported parameter: ${measurand_parameter}`)); - } - - return supportedLookups.map( - ([input_param, [parameter, unit]]) => - new Measurand({ input_param, parameter, unit }) - ); + let supported = Object.entries(lookups) + .map(([input_param, [parameter, provider_unit]]) => { + return new Measurand({ + input_param, + parameter, + unit: supportedMeasurandParameters[parameter], + provider_unit + }) + }).filter( m => m.unit) + // ([input_param, [parameter, unit, provider_unit]]) => + // new Measurand({ input_param, parameter, unit, provider_unit }) + if (VERBOSE>1) console.log('Supported measurands', supported) + return supported } /** diff --git a/fetcher/lib/utils.js b/fetcher/lib/utils.js index 4f5def7..f8a326d 100644 --- a/fetcher/lib/utils.js +++ b/fetcher/lib/utils.js @@ -32,7 +32,7 @@ const applyCredentials = credentials => { if(!storage && credentials) { if (!credentials.client_email) throw new Error('client_email required'); if (!credentials.client_id) throw new Error('client_id required'); - console.debug(`Initializing storage object '${credentials.project_id}' as ${credentials.client_email}`); + if (VERBOSE) console.debug(`Initializing storage object '${credentials.project_id}' as ${credentials.client_email}`); let projectId = credentials.project_id; // https://github.com/googleapis/google-cloud-node/blob/main/docs/authentication.md storage = new Storage({ @@ -105,7 +105,7 @@ const putObject = async (data, Key) => { data = await gzip(data); } - if(!DRYRUN) { + //if(!DRYRUN) { if (VERBOSE) console.debug(`Saving data to ${Key}`); await s3.putObject({ Bucket, @@ -116,10 +116,10 @@ const putObject = async (data, Key) => { }).promise().catch( err => { console.log('error putting object', err); }); - } else { - if (VERBOSE) console.debug(`Would have saved data to ${Key}`); - await writeJson(data, Key); - } + //} else { + // if (VERBOSE) console.debug(`Would have saved data to ${Key}`); + // await writeJson(data, Key); + //} }; const writeJson = async (data, filepath) => { @@ -233,11 +233,16 @@ const listFilesLocal = async (config) => { const fetchFile = async file => { const source = file.source; + var data; if(source == 'google-bucket') { - return await fetchFileGoogleBucket(file); + data = await fetchFileGoogleBucket(file); } else { - return await fetchFileLocal(file); + data = await fetchFileLocal(file); } + if(DRYRUN) { + writeJson(data, `raw/${file.path}`) + } + return data; }; const fetchFileLocal = async file => { @@ -318,7 +323,8 @@ const writeError = async (file) => { file.path = `${dir}/errors/${parsed.name}_error.txt`; if (DRYRUN) { - return file; + return await writeErrorLocal(file); + //return file; } else if(source == 'google-bucket') { return await writeErrorGoogleBucket(file); } else { @@ -335,8 +341,15 @@ const writeErrorGoogleBucket = async (file) => { }; const writeErrorLocal = async (file) => { - if (VERBOSE) console.debug(`Writing local error to ${file.path}`); - fs.writeFileSync(file.path, file.error); + const dir = process.env.LOCAL_DESTINATION_BUCKET || __dirname; + const fullpath = path.join(dir, file.path); + if (VERBOSE) console.debug(`Writing local error to ${fullpath}`, file); + fs.promises.mkdir(path.dirname(fullpath), {recursive: true}) + .then( res => { + fs.writeFileSync(fullpath, file.error); + }).catch( err => { + console.warn(err); + }); return file; }; diff --git a/fetcher/providers/versioning.js b/fetcher/providers/versioning.js index 7cec87e..a9d4c0b 100644 --- a/fetcher/providers/versioning.js +++ b/fetcher/providers/versioning.js @@ -51,7 +51,7 @@ var versions = {}; // for tracking versions var stations = {}; // for keeping track of new stations var sensors = {}; // for tracking new sensors var measures_list = []; - +const sep = "-"; // how to seperate the parts of the names /** * Build sensor ID from a given sensor node ID and measurand parameter. @@ -63,7 +63,7 @@ var measures_list = []; * @returns {string} */ function getRootSensorId(sourceId, sensorNodeId, measurandParameter) { - return `${sourceId}-${sensorNodeId}-${measurandParameter}`; + return `${sourceId}${sep}${sensorNodeId}${sep}${measurandParameter}`; } @@ -80,13 +80,14 @@ function getRootSensorId(sourceId, sensorNodeId, measurandParameter) { */ function getSensorId(sourceId, sensorNodeId, measurandParameter, lifeCycle, versionDate) { const lifeCycleId = !!lifeCycle - ? `-${lifeCycle}` + ? `${sep}${lifeCycle}` : ''; // if no lifecyle value is provided we should assume this is raw data + // dashes in the date will cause issues later const versionId = versionDate && !!lifeCycle - ? `-${versionDate}` + ? `${sep}${versionDate.replace(/-/g, '')}` : ''; - return `${sourceId}-${sensorNodeId}-${measurandParameter}${lifeCycleId}${versionId}`; + return `${sourceId}${sep}${sensorNodeId}${sep}${measurandParameter}${lifeCycleId}${versionId}`; } /** @@ -121,7 +122,7 @@ async function processor(source) { if(credentials) { // add the credentials to the source config - if (VERBOSE) console.debug("Secret credentials", credentials); + if (VERBOSE>1) console.debug("Secret credentials", credentials); source.config.credentials = credentials; } @@ -139,10 +140,10 @@ async function processor(source) { return res; }) .catch( err => { - console.error(`File processing error: ${err.message}`); + console.error(`File processing error: ${file.path} - ${err.message}`); writeError({ ...file, - error: `**FILE ERROR**\nFILE: ${file.path}\nDETAILS: ${err.message}`, + error: `**FILE ERROR**\nFILE: ${file.path}\nDETAILS: ${err.message}\n${JSON.stringify(data)}`, }); moveFile(file, 'errors'); }); @@ -177,118 +178,123 @@ async function processor(source) { * @returns {??} */ async function processFile({ file, data, measurands }) { - if (VERBOSE) console.debug('Processing file', file.path); - const measures = new Measures(FixedMeasure, file); - const sourceId = 'versioning'; - - const undefines = [undefined, null, 'NaN', 'NA']; - - // we are supporting a few different file structures at this point - // all of them are csv and so they should be arrays when we reach this point - // and so the best method for distinguishing what to do here is going to be - // based on the fields that are available in the array/row - for(const row of data) { - // every row we encounter has to include the station/location info - if(!row.location) throw new Error('No location field found'); - if (VERBOSE) console.log(`Processing location: ${row.location}`, row); - let sensorNodeId = row.location; - let station = stations[sensorNodeId]; - - // Compile the station information and check if it exists - // If not this will add the sensor node to the stations directory - // if it does exist then it will compare the json strings and possibly update. - // Either a new file or an update will trigger an injest of the station data - if(!station) { - station = new SensorNode({ - sensor_node_id: sensorNodeId, - sensor_node_site_name: sensorNodeId, - sensor_node_source_name: sourceId, - sensor_node_geometry: !undefines.includes(row.lat) ? [row.lng, row.lat] : null, - ...row, - }); - stations[sensorNodeId] = station; - } - - // Loop through the expected measurands and check - // to see if we have a column that matches, this would be for a measurement - // use this method for the versions and sensor files as well because - // its as good a method as any to match the parameter to the measurand - for (const measurand of measurands) { - // if we have a parameter column we assume this is either - // a versioning file or a sensor meta data file - // in either case each row will be a new sensor id - if(row.parameter && measurand.input_param!==row.parameter) { - //console.log('parameter file, skiping', parameter, measurand.input_param); - continue; - } - - const measure = row[measurand.input_param]; - - // build the sensor id - const sensorId = getSensorId( - sourceId, - sensorNodeId, - measurand.parameter, - row.lifecycle, - row.version_date, - ); - - let sensor = sensors[sensorId]; - - if(!sensor) { - sensor = new Sensor({ - sensor_id: sensorId, - measurand_parameter: measurand.parameter, - measurand_unit: measurand.normalized_unit, - ...row, - }); - station.addSensor(sensor); - sensors[sensorId] = sensor; - } - - // we should check for a version now as we could have a version without a measure - // Compile the version information and check if it exists - // if not the version will be added to the versions directory - // and trigger an import - - if(row.lifecycle && (row.version_date || row.version)) { - if(!versions[sensorId]) { - versions[sensorId] = new Version({ - parent_sensor_id: getSensorId( - sourceId, - sensorNodeId, - measurand.parameter - ), - sensor_id: sensorId, - version_id: row.version_date || row.version, - life_cycle_id: row.lifecycle, - parameter: measurand.parameter, - filename: file.name, - readme: row.readme, - provider: sourceId, - }); - } - } - // Now we can check for a measure and potentially skip - if (undefines.includes(measure)) continue; - // make sure that we have this sensor - - // add the measurement to the measures - measures.push({ - sensor_id: sensorId, - measure: measurand.normalize_value(measure), - timestamp: row.datetime, - }); - } - } - - // And then we can add any measurements created - if(measures.length) { - measures_list.push(measures); - } - - // what should we return to the processor?? - return true; + if (VERBOSE) console.debug('Processing file', file.path); + const measures = new Measures(FixedMeasure, file); + const sourceId = 'versioning'; + + const undefines = [undefined, null, 'NaN', 'NA', '', '99999900']; + + // we are supporting a few different file structures at this point + // all of them are csv and so they should be arrays when we reach this point + // and so the best method for distinguishing what to do here is going to be + // based on the fields that are available in the array/row + for(const row of data) { + // every row we encounter has to include the station/location info + if(!row.location) throw new Error('No location field found'); + if (VERBOSE>1) console.log(`Processing location: ${row.location}`, row); + + try { + let sensorNodeId = row.location; + let station = stations[sensorNodeId]; + + // Compile the station information and check if it exists + // If not this will add the sensor node to the stations directory + // if it does exist then it will compare the json strings and possibly update. + // Either a new file or an update will trigger an injest of the station data + if(!station) { + station = new SensorNode({ + sensor_node_id: sensorNodeId, + sensor_node_site_name: sensorNodeId, + sensor_node_source_name: sourceId, + sensor_node_geometry: !undefines.includes(row.lat) ? [row.lng, row.lat] : null, + ...row, + }); + stations[sensorNodeId] = station; + } + + // Loop through the expected measurands and check + // to see if we have a column that matches, this would be for a measurement + // use this method for the versions and sensor files as well because + // its as good a method as any to match the parameter to the measurand + for (const measurand of measurands) { + // if we have a parameter column we assume this is either + // a versioning file or a sensor meta data file + // in either case each row will be a new sensor id + if(row.parameter && measurand.input_param!==row.parameter) { + if (VERBOSE) console.log('parameter file, skiping', parameter, measurand.input_param); + continue; + } + + const measure = row[measurand.input_param]; + + // build the sensor id + const sensorId = getSensorId( + sourceId, + sensorNodeId, + measurand.parameter, + row.lifecycle, + row.version_date, + ); + + let sensor = sensors[sensorId]; + + if(!sensor) { + sensor = new Sensor({ + sensor_id: sensorId, + measurand_parameter: measurand.parameter, + measurand_unit: measurand.normalized_unit, + ...row, + }); + station.addSensor(sensor); + sensors[sensorId] = sensor; + } + + // we should check for a version now as we could have a version without a measure + // Compile the version information and check if it exists + // if not the version will be added to the versions directory + // and trigger an import + + if(row.lifecycle && (row.version_date || row.version)) { + if(!versions[sensorId]) { + versions[sensorId] = new Version({ + parent_sensor_id: getSensorId( + sourceId, + sensorNodeId, + measurand.parameter + ), + sensor_id: sensorId, + version_id: row.version_date || row.version, + life_cycle_id: row.lifecycle, + parameter: measurand.parameter, + filename: file.name, + readme: row.readme, + provider: sourceId, + }); + } + } + + // Now we can check for a measure and potentially skip + if (undefines.includes(measure)) continue; + // make sure that we have this sensor + // add the measurement to the measures + measures.push({ + sensor_id: sensorId, + measure: measurand.normalize_value(measure), + timestamp: row.datetime, + }); + } + } catch (err) { + //console.error(`Row processing error: `, err) + } + } + + // And then we can add any measurements created + if(measures.length) { + measures_list.push(measures); + } + + // what should we return to the processor?? + return true; } diff --git a/fetcher/sources/cac.json b/fetcher/sources/cac.json index da9331e..5244a8a 100644 --- a/fetcher/sources/cac.json +++ b/fetcher/sources/cac.json @@ -5,34 +5,37 @@ "frequency": "minute", "type": "google-bucket", "config" : { - "bucket":"openaq-staging", + "bucket":"openaq-ingest", "folder":"pending" }, "parameters": { + "as": ["as","ppb"], + "bc_375": ["bc_375","ng/m3"], + "bc_470": ["bc_470","ng/m3"], + "bc_528": ["bc_528","ng/m3"], + "bc_625": ["bc_625","ng/m3"], + "bc_880": ["bc_880","ng/m3"], + "cd": ["cd","ppb"], + "cl": ["cl","ppb"], "co": ["co", "ppb"], - "no": ["no", "ppb"], + "ec": ["ec","ppb"], + "fe": ["fe","ppb"], + "k": ["k","ppb"], + "ni": ["ni","ppb"], "no2": ["no2", "ppb"], + "no3": ["no3","ppb"], "o3": ["o3", "ppb"], + "oc": ["oc","ppb"], "p": ["pressure", "hpa"], - "pm25": ["pm25", "ppb"], + "pb": ["pb","ppb"], "pm10": ["pm10", "ppb"], + "pm025": ["pm25", "ppb"], "rh": ["relativehumidity", "%"], "so2": ["so2", "ppb"], + "so4": ["so4","ppb"], "temp": ["temperature", "c"], - "ws": ["wind_speed", "m/s"], + "v": ["v","ppb"], "wd": ["wind_direction", "deg"], - "bc": ["bc","ppb"], - "ec": ["ec","ppb"], - "oc": ["oc","ppb"], - "so4": ["so4","ppb"], - "cl": ["cl","ppb"], - "k": ["k","ppb"], - "no3": ["no3","ppb"], - "pb": ["pb","ppb"], - "as": ["as","ppb"], - "ca": ["ca","ppb"], - "fe": ["fe","ppb"], - "ni": ["ni","ppb"], - "v": ["v","ppb"] + "ws": ["wind_speed", "m/s"] } }