Skip to content

Commit

Permalink
CAC specific updates
Browse files Browse the repository at this point in the history
  • Loading branch information
caparker committed Feb 22, 2024
1 parent 68858ba commit 61ed5e4
Show file tree
Hide file tree
Showing 5 changed files with 255 additions and 210 deletions.
3 changes: 2 additions & 1 deletion fetcher/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const providers = new (require('./lib/providers'))();
const sources = require('./sources');

if (require.main === module) {
handler();
handler();

Check failure on line 8 in fetcher/index.js

View workflow job for this annotation

GitHub Actions / build

Expected indentation of 4 spaces but found 3
}

async function handler(event) {
Expand All @@ -31,6 +31,7 @@ async function handler(event) {
console.log(

Check failure on line 31 in fetcher/index.js

View workflow job for this annotation

GitHub Actions / build

Expected indentation of 8 spaces but found 6
`Processing ${process.env.STACK}: ${source.type}/${source.provider}/${source.name}`
);

await providers.processor(source);

return {};
Expand Down
144 changes: 83 additions & 61 deletions fetcher/lib/measurand.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
const { request, VERBOSE } = require('./utils');

class Measurand {
constructor({ input_param, parameter, unit }) {
constructor({ input_param, parameter, unit, provider_unit }) {
// How a measurand is described by external source (e.g. "CO")
this.input_param = input_param;
// How a measurand is described internally (e.g. "co")
this.parameter = parameter;
// Unit for measurand (e.g "ppb")
this.unit = unit;
this.provider_unit = provider_unit
}

/**
Expand All @@ -17,84 +18,105 @@ class Measurand {
* @returns { Object } normalizer
*/
get _normalizer() {
return (
{
ppb: ['ppm', (val) => val / 1000],
'ng/m³': ['µg/m³', (val) => val / 1000],
pp100ml: ['particles/cm³', (val) => val / 100]
}[this.unit] || [this.unit, (val) => val]
);
// provider_units: { unit: conversion function }
return ({
ppb: {
ppm: (val) => val / 1000
},
ppm: {
ppb: (val) => val * 1000
},
f: {
c: (val) => (val - 32) * 5/9
},
'ng/m3': {
'ug/m3': (val) => val / 1000
},
pp100ml: {
'particles/cm³': (val) => val / 100
},
}[this.provider_unit][this.unit]) ?? ((val) => val);
;
}

get normalized_unit() {
return this._normalizer[0];
return this.unit;
}

get normalize_value() {
return this._normalizer[1];
return this._normalizer
}

/**
* Given a map of lookups from an input parameter (i.e. how a data provider
* identifies a measurand) to a tuple of a measurand parameter (i.e. how we
* identify a measurand internally) and a measurand unit, generate an array
* Measurand objects that are supported by the OpenAQ API.
*
* form -> { input_parameter : [ measurand_parameter, input_units ] }
*
* @param {*} lookups, e.g. {'CO': ['co', 'ppb'] }
* @returns { Measurand[] }
*/
static async getSupportedMeasurands(lookups) {
// Fetch from API
const supportedMeasurandParameters = [];
const baseurl = new URL('/v2/parameters', process.env.API_URL || 'https://api.openaq.org');
if (VERBOSE) console.debug(`Getting Supported Measurands - ${baseurl}`);
let morePages;
let page = 1;
do {
const url = new URL(
'/v2/parameters',
baseurl,
);
url.searchParams.append('page', page++);
const {
body: { meta, results }
} = await request({
json: true,
method: 'GET',
url
});

if(!results) throw new Error(`Could not connect to ${baseurl}`);

for (const { name } of results) {
supportedMeasurandParameters.push(name);
}
morePages = meta.found > meta.page * meta.limit;
} while (morePages);
if (VERBOSE)
console.debug(
`Fetched ${supportedMeasurandParameters.length} supported measurement parameters from ${baseurl}.`
);
// we are supporting everything in the fetcher
const supportedMeasurandParameters = {
ambient_temp: 'c',
bc: 'ug/m3',
bc_375: 'ug/m3',
bc_470: 'ug/m3',
bc_528: 'ug/m3',
bc_625: 'ug/m3',
bc_880: 'ug/m3',
ch4: 'ppm',
cl: 'ppb',
co2: 'ppm',
co: 'ppm',
ec: 'ppb',
humidity: '%',
no2: 'ppm',
no3: 'ppb',
no: 'ppm',
nox: 'ppm',
o3: 'ppm',
oc: 'ppb',
ozone: 'ppb',
pm100: 'ug/m3',
pm10: 'ug/m3',
pm1: 'ug/m3',
pm25: 'ug/m3',
pm4: 'ug/m3',
pm: 'ug/m3',
pressure: 'hpa',
relativehumidity: '%',
so2: 'ppm',
so4: 'ppb',
temperature: 'c',
ufp: 'particles/cm3',
um003: 'particles/cm3',
um005: 'particles/cm3',
um010: 'particles/cm3',
um025: 'particles/cm3',
um050: 'particles/cm3',
um100: 'particles/cm3',
v: 'ppb',
voc: 'iaq',
wind_direction: 'deg',
wind_speed: 'm/s',
};

// Filter provided lookups
const supportedLookups = Object.entries(lookups).filter(
// eslint-disable-next-line no-unused-vars
([input_param, [measurand_parameter, measurand_unit]]) =>
supportedMeasurandParameters.includes(measurand_parameter)
);

if (!supportedLookups.length) throw new Error('No measurands supported.');
if (VERBOSE) {
Object.values(lookups)
.map(([measurand_parameter]) => measurand_parameter)
.filter((measurand_parameter) => !supportedMeasurandParameters.includes(measurand_parameter))
.map((measurand_parameter) => console.warn(`ignoring unsupported parameter: ${measurand_parameter}`));
}

return supportedLookups.map(
([input_param, [parameter, unit]]) =>
new Measurand({ input_param, parameter, unit })
);
let supported = Object.entries(lookups)
.map(([input_param, [parameter, provider_unit]]) => {
return new Measurand({
input_param,
parameter,
unit: supportedMeasurandParameters[parameter],
provider_unit
})
}).filter( m => m.unit)
// ([input_param, [parameter, unit, provider_unit]]) =>
// new Measurand({ input_param, parameter, unit, provider_unit })
if (VERBOSE>1) console.log('Supported measurands', supported)
return supported
}

/**
Expand Down
35 changes: 24 additions & 11 deletions fetcher/lib/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ const applyCredentials = credentials => {
if(!storage && credentials) {
if (!credentials.client_email) throw new Error('client_email required');
if (!credentials.client_id) throw new Error('client_id required');
console.debug(`Initializing storage object '${credentials.project_id}' as ${credentials.client_email}`);
if (VERBOSE) console.debug(`Initializing storage object '${credentials.project_id}' as ${credentials.client_email}`);
let projectId = credentials.project_id;
// https://github.com/googleapis/google-cloud-node/blob/main/docs/authentication.md
storage = new Storage({
Expand Down Expand Up @@ -105,7 +105,7 @@ const putObject = async (data, Key) => {
data = await gzip(data);
}

if(!DRYRUN) {
//if(!DRYRUN) {
if (VERBOSE) console.debug(`Saving data to ${Key}`);
await s3.putObject({
Bucket,
Expand All @@ -116,10 +116,10 @@ const putObject = async (data, Key) => {
}).promise().catch( err => {
console.log('error putting object', err);
});
} else {
if (VERBOSE) console.debug(`Would have saved data to ${Key}`);
await writeJson(data, Key);
}
//} else {
// if (VERBOSE) console.debug(`Would have saved data to ${Key}`);
// await writeJson(data, Key);
//}
};

const writeJson = async (data, filepath) => {
Expand Down Expand Up @@ -233,11 +233,16 @@ const listFilesLocal = async (config) => {

const fetchFile = async file => {
const source = file.source;
var data;
if(source == 'google-bucket') {
return await fetchFileGoogleBucket(file);
data = await fetchFileGoogleBucket(file);
} else {
return await fetchFileLocal(file);
data = await fetchFileLocal(file);
}
if(DRYRUN) {
writeJson(data, `raw/${file.path}`)
}
return data;
};

const fetchFileLocal = async file => {
Expand Down Expand Up @@ -318,7 +323,8 @@ const writeError = async (file) => {
file.path = `${dir}/errors/${parsed.name}_error.txt`;

if (DRYRUN) {
return file;
return await writeErrorLocal(file);
//return file;
} else if(source == 'google-bucket') {
return await writeErrorGoogleBucket(file);
} else {
Expand All @@ -335,8 +341,15 @@ const writeErrorGoogleBucket = async (file) => {
};

const writeErrorLocal = async (file) => {
if (VERBOSE) console.debug(`Writing local error to ${file.path}`);
fs.writeFileSync(file.path, file.error);
const dir = process.env.LOCAL_DESTINATION_BUCKET || __dirname;
const fullpath = path.join(dir, file.path);
if (VERBOSE) console.debug(`Writing local error to ${fullpath}`, file);
fs.promises.mkdir(path.dirname(fullpath), {recursive: true})
.then( res => {
fs.writeFileSync(fullpath, file.error);
}).catch( err => {
console.warn(err);
});
return file;
};

Expand Down
Loading

0 comments on commit 61ed5e4

Please sign in to comment.