From 652026ce6360ecc8552946094ec75a4742c9b232 Mon Sep 17 00:00:00 2001 From: Christian Parker Date: Thu, 20 Jun 2024 04:54:41 -0700 Subject: [PATCH] Feature/update clarity (#35) * Incorporate clarity's updates * Added date to the key/file path * Some bug fixes and adding date to key path * Removing static path * Linting fixes --- fetcher/lib/measure.js | 1 + fetcher/lib/providers.js | 31 +++ fetcher/lib/utils.js | 18 ++ fetcher/providers/clarity.js | 400 +++++++++++++++-------------------- 4 files changed, 215 insertions(+), 235 deletions(-) diff --git a/fetcher/lib/measure.js b/fetcher/lib/measure.js index 95ac389..2bfe8f3 100644 --- a/fetcher/lib/measure.js +++ b/fetcher/lib/measure.js @@ -24,6 +24,7 @@ class Measures { if (!this.from || measure.timestamp < this.from) { this.from = measure.timestamp; } + this.measures.push(measure); } diff --git a/fetcher/lib/providers.js b/fetcher/lib/providers.js index c7c2c52..809fb46 100644 --- a/fetcher/lib/providers.js +++ b/fetcher/lib/providers.js @@ -1,5 +1,6 @@ const fs = require('fs'); const path = require('path'); +const dayjs = require('dayjs'); const { SNSClient, PublishCommand } = require('@aws-sdk/client-sns'); const { @@ -9,6 +10,7 @@ const { fetchSecret, getObject, putObject, + putFile, prettyPrintStation } = require('./utils'); @@ -148,12 +150,41 @@ class Providers { if (DRYRUN) { console.log(`Would have saved ${measures.length} measurements to '${Bucket}/${Key}'`); + putFile(compressedString, Key); return new Promise((y) => y(true)); } if (VERBOSE) console.debug(`Saving measurements to ${Bucket}/${Key}`); return await putObject(compressedString, Bucket, Key, false, 'text/csv', 'gzip'); } + + /** + * Given a measures object, save it to s3 + * + * @param {string} provider The name of the provider (ie purpleair) + * @param {Measures} data A object with measures + * @param {string} id An optional identifier to use when creating filename + */ + static async put_measures_json(provider, data, id) { + if (!data.measures.length && !data.locations.length) { + return console.warn('Nothing found, not uploading to S3.'); + } + const Bucket = process.env.BUCKET; + const today = dayjs().format('YYYY-MM-DD'); + const filename = id || `${Math.floor(Date.now() / 1000)}-${Math.random().toString(36).substring(8)}`; + const Key = `${process.env.STACK}/measures/${provider}/${today}/${filename}.json.gz`; + const compressedString = await gzip(JSON.stringify(data)); + + + if (DRYRUN) { + console.log(`Would have saved ${data.measures.length} measurements and ${data.locations.length} stations to '${Bucket}/${Key}'`); + putFile(compressedString, Key); + return new Promise((y) => y(true)); + } + if (VERBOSE) console.debug(`Saving measurements to ${Bucket}/${Key}`); + + return await putObject(compressedString, Bucket, Key, false, 'application/json', 'gzip'); + } } module.exports = Providers; diff --git a/fetcher/lib/utils.js b/fetcher/lib/utils.js index 72b05c7..95e9adf 100644 --- a/fetcher/lib/utils.js +++ b/fetcher/lib/utils.js @@ -1,6 +1,9 @@ const zlib = require('zlib'); const { promisify } = require('util'); const request = promisify(require('request')); +const fs = require('node:fs'); +const path = require('path'); +const homedir = require('os').homedir(); const { SecretsManagerClient, GetSecretValueCommand } = require('@aws-sdk/client-secrets-manager'); const { S3Client, GetObjectCommand, PutObjectCommand } = require('@aws-sdk/client-s3'); @@ -63,6 +66,20 @@ async function putObject(text, Bucket, Key, gzip = true, ContentType = 'applicat return await s3.send(cmd); } + +/** + * + * @param {string} text the string to be saved to a file + * @param {string} key the the file path. Usually the same key that would be used in the cloud storage + */ +async function putFile(text, key) { + const fpath = path.join(homedir, `Downloads/${key}`); + await fs.mkdirSync(path.dirname(fpath), { recursive: true }); + await fs.writeFileSync(fpath, text); +} + + + /** * Retrieve secret from AWS Secrets Manager * @param {string} source The source object for which we are fetching a secret. @@ -199,6 +216,7 @@ module.exports = { DRYRUN, getObject, putObject, + putFile, prettyPrintStation, checkResponseData }; diff --git a/fetcher/providers/clarity.js b/fetcher/providers/clarity.js index da6d806..10b2d7e 100644 --- a/fetcher/providers/clarity.js +++ b/fetcher/providers/clarity.js @@ -4,26 +4,12 @@ * with the Google OAuth Service Account. https://stackoverflow.com/a/49965912/728583 */ -const dayjs = require('dayjs'); -const pLimit = require('p-limit'); const Providers = require('../lib/providers'); -const { VERBOSE, request } = require('../lib/utils'); -const { Sensor, SensorNode, SensorSystem } = require('../lib/station'); +const { request } = require('../lib/utils'); const { Measures, FixedMeasure } = require('../lib/measure'); const { Measurand } = require('../lib/measurand'); -const lookup = { - relHumid: ['relativehumidity', '%'], // RelativeHumidity - temperature: ['temperature', 'c'], // Temperature - pm2_5ConcMass: ['pm25', 'μg/m3'], // PM2.5 mass concentration - pm1ConcMass: ['pm1', 'μg/m3'], // PM1 mass concentration - pm10ConcMass: ['pm10', 'μg/m3'], // PM10 mass concentration - no2Conc: ['no2', 'ppb'], // NO2 volume concentration - windSpeed: ['windspeed', 'm/s'], // Wind speed - windDirection: ['winddirection', 'degrees'] // Wind direction, compass degrees (0°=North, then clockwise) -}; - class ClarityApi { /** @@ -31,269 +17,213 @@ class ClarityApi { * @param {Source} source * @param {Organization} org */ - constructor(source, org) { + constructor(source) { + this.fetched = false; this.source = source; - this.org = org; + this._measurands = null; + this._measures = null; + this.datasources = {}; + this.missing_datasources = []; + this.parameters = { + pm2_5ConcMassIndividual: ['pm25', 'ug/m3'] + }; + // holder for the locations + this.measures = new Measures(FixedMeasure); + this.locations = []; } get apiKey() { - return this.org.apiKey; + return this.source.apiKey; } - get orgId() { - return this.org.orgId; + get provider() { + return this.source.provider; } get baseUrl() { - return this.source.meta.url; + return 'https://clarity-data-api.clarity.io'; } - /** - * - * @returns {Promise} - */ - listDatasources() { - return request({ - json: true, - method: 'GET', - headers: { 'X-API-Key': this.apiKey }, - url: new URL('v1/datasources', this.baseUrl) - }).then((response) => { - let ds = response.body; - - if (process.env.SOURCEID) { - ds = ds.filter((d) => d.deviceCode === process.env.SOURCEID); - } else { - ds = ds.filter((d)=>d.sourceType === 'CLARITY_NODE'); - } + async fetchMeasurands() { + this.measurands = await Measurand.getIndexedSupportedMeasurands(this.parameters); + } - if (VERBOSE) { - console.debug(`-------------------\nListing ${ds.length} sources for ${this.org.organizationName}`); - ds.map((d) => console.log(`${d.sourceType}: (${d.subscriptionStatus}) ${d.deviceCode} - ${d.name}`)); - } - return ds; - }); + addToMissingDatasources(ds) { + if (!this.missing_datasources.includes(ds.datasourceId)) { + console.warn('Adding to missing datasources', ds); + this.missing_datasources.push(ds.datasourceId); + } } + /** - * - * @returns {Promise} + * Fetch the list of datasources and convert to object for reference later + * @returns {array} a list of datasources */ - listDevices() { - return request({ + async fetchDatasources() { + const url = 'https://clarity-data-api.clarity.io/v1/open/datasources'; + const response = await request({ + url, json: true, method: 'GET', - headers: { 'X-API-Key': this.apiKey }, - url: new URL('v2/devices/nodes', this.baseUrl), - qs: { 'org': this.orgId } - }).then((response) => response.body).then((response) => { - if (process.env.SOURCEID) { - const sources = process.env.SOURCEID.split(','); - const total = response.length; - response = response.filter((d) => sources.includes(d.nodeId)); - console.debug(`Limiting sensors to ${process.env.SOURCEID}, found ${response.length} of ${total}`); - } - const working = response.filter((o) => o.lifeStage.stage === 'working'); - if (VERBOSE) { - console.debug(`-----------------\nListing devices for ${this.org.organizationName}\nFound ${response.length} total devices, ${working.length} working`); - response - .filter((d) => d.lifeStage.stage !== 'working') - .map((d) => console.log(`DEVICE: ${d.nodeId} - ${d.model} - ${d.lifeStage.stage}`)); - } - return working; + headers: { + 'X-API-Key': this.apiKey, + 'Accept-Encoding': 'gzip' + }, + gzip: true }); + // const arr = response.body.filter(d => d.dataOrigin!='Reference Site'); + // reshape to make it easier to use + console.debug(`Found ${Object.keys(response.body.datasources).length} datasources`); + this.datasources = response.body.datasources; // Object.assign({}, ...arr.map(item => ({[`${item.datasourceId}`]: item}))); + return this.datasources; } /** - * - * @returns {AugmentedDevice[]} + * Provide a sensor based ingest id + * @param {object} meas + * @param {object} measurand + * @returns {string} */ - async listAugmentedDevices() { - const [devices, datasources] = await Promise.all([ - this.listDevices(), - this.listDatasources() - ]); - - const indexedDatasources = Object.assign( - {}, - ...datasources.map((datasource) => ({ - [datasource.deviceCode]: datasource - })) - ); - - return devices.map((device) => ({ - ...indexedDatasources[device.nodeId], - ...device - })); + getSensorId(meas) { + const measurand = this.measurands[meas.metric]; + if (!measurand) { + throw new Error(`Could not find measurand for ${meas.metric}`); + } + return `clarity-${meas.datasourceId}-${measurand.parameter}`; } - /** - * - * @param {String} code - * @param {dayjs.Dayjs} since - * @param {dayjs.Dayjs} to - * @yields {Measurement} - */ - async *fetchMeasurements(code, since, to) { - if (VERBOSE) - console.log( - `--------------------\nFetching measurements for ${this.org.organizationName}/${code} since ${since} to ${to}` - ); - - const limit = 20000; - let offset = 0; - - const url = new URL('v1/measurements', this.baseUrl); - url.searchParams.set('code', code); - url.searchParams.set('limit', limit); - url.searchParams.set('startTime', since.toISOString()); - url.searchParams.set('endTime', to.toISOString()); - - while (true) { - url.searchParams.set('skip', offset); - if (VERBOSE) console.log(`Fetching ${url}&key=${this.apiKey}`); - const response = await request({ - url, - json: true, - method: 'GET', - headers: { 'X-API-Key': this.apiKey, 'Accept-Encoding': 'gzip' }, - gzip: true - }); - - if (response.statusCode !== 200) { - console.warn(`Fetch failed (${response.statusCode}) ${response.body.Message}: ${url}`); - break; - } + getLocationId(loc) { + return `clarity-${loc.datasourceId}`; + } - if (offset === 0 && response.body.length === 0) { - console.warn(`Fetch failed to return any data: ${code}`); - break; - } + getLabel(loc) { + const datasource = this.datasources[loc.datasourceId]; + if (!datasource) { + this.addToMissingDatasources(loc.datasourceId); + throw new Error(`Could not find datasource for ${loc.datasourceId}`); + } + // still return a label even if we are missing one + return datasource.name ? datasource.name : 'Missing device name'; + } - for (const measurement of response.body) { - yield measurement; - } + normalize(meas) { + const measurand = this.measurands[meas.metric]; + return measurand.normalize_value(meas.value); + } + + async fetchData() { + const dsurl = '/v1/open/all-recent-measurement/pm25/individual'; + const url = `${this.baseUrl}${dsurl}`; + + await this.fetchMeasurands(); + await this.fetchDatasources(); - // More data to fetch - if (response.body.length === limit) { - offset += limit; - continue; + const response = await request({ + url, + json: true, + method: 'GET', + headers: { + 'X-API-Key': this.apiKey, + 'Accept-Encoding': 'gzip' + }, + gzip: true + }); + + const measurements = response.body.data; + const datasources = response.body.locations; + + console.debug(`Found ${measurements.length} measurements for ${datasources.length} datasources`); + + // translate the dataources to locations + datasources.map((d) => { + try { + this.locations.push({ + location: this.getLocationId(d), + label: this.getLabel(d), + ismobile: false, + lon: d.lon, + lat: d.lat + }); + } catch (e) { + console.warn(`Error adding location: ${e.message}`); } + }); + - if (VERBOSE) - console.log( - `Got ${response.body.length} of ${limit} possible measurements for device ${code} at offset ${offset}, stopping pagination.` - ); + measurements.map( (m) => { + // really seems like the measures.push method + // should handle the sensor/ingest id + // and the normalizing?? + try { + this.measures.push({ + sensor_id: this.getSensorId(m), + measure: this.normalize(m), + timestamp: m.time, + flags: { 'clarity/qc': m.qc } + }); + } catch (e) { + // console.warn(`Error adding measurement: ${e.message}`); + } + }); - break; + if (this.missing_datasources.length) { + console.warn(`Could not find details for ${this.missing_datasources.length} datasources`, this.missing_datasources); } + + this.fetched = true; } - /** - * - * @param {*} supportedMeasurands - * @param {Dayjs} since - */ - async sync(supportedMeasurands, since) { - // get all the devices, even if expired - let devices = await this.listAugmentedDevices(); - - if (VERBOSE) { - console.debug(`-----------------------\n Syncing ${this.source.provider}/${this.org.organizationName}`, devices.length); - devices.map( (d) => { - if (d.pairedAccessoryModules.length > 0) { - console.debug(`--------------------\n Found device with ${d.length} modules`); - } - }); - } - // Create one station per device - const stations = devices.map((device) => - Providers.put_station( - this.source.provider, - new SensorNode({ - sensor_node_id: `${this.org.organizationName}-${device.nodeId}`, - sensor_node_site_name: device.name || device.nodeId, // fall back to code when missing name - sensor_node_geometry: device.location.coordinates, - sensor_node_status: device.subscriptionStatus, - sensor_node_source_name: this.source.provider, // - sensor_node_site_description: this.org.organizationName, - sensor_node_ismobile: false, // should remove this and just use instrument - // sensor_node_instrument: device.model, - sensor_node_deployed_date: device.lifeStage.when, - sensor_system: new SensorSystem({ - sensor_system_manufacturer_name: this.source.provider, - // Create one sensor per characteristic - sensors: [] // Object.values(supportedMeasurands) - .map( - (measurand) => - new Sensor({ - sensor_id: getSensorId(device, measurand), - measurand_parameter: measurand.parameter, - measurand_unit: measurand.normalized_unit - }) - ) - }) - }) - ) - ); - - if (VERBOSE) console.debug(`Fetching measurements for ${devices.length} devices`); - // now remove the expired ones - devices = devices.filter((d)=>d.subscriptionStatus === 'active'); - // Sequentially process readings for each device - const measures = new Measures(FixedMeasure); - let successes = 0; - for (const device of devices) { - let hasMeasures = 0; - const measurements = this.fetchMeasurements( - device.nodeId, - since.subtract(1.25, 'hour'), - since - ); - - for await (const measurement of measurements) { - const readings = Object.entries(measurement.characteristics); - for (const [type, { value }] of readings) { - const measurand = supportedMeasurands[type]; - if (!measurand) continue; - hasMeasures = 1; - measures.push({ - sensor_id: getSensorId(device, measurand), - measure: measurand.normalize_value(value), - timestamp: measurement.time - }); - } - } - successes += hasMeasures; + data() { + if (!this.fetched) { + console.warn('Data has not been fetched'); } + return { + meta: { + schema: 'v0.1', + source: 'clarity', + matching_method: 'ingest-id' + }, + measures: this.measures.measures, + locations: this.locations + }; + } - if (successes < devices.length) { - console.warn(`There were ${successes} successful requests out of ${devices.length}\n------------------------------`); + summary() { + if (!this.fetched) { + console.warn('Data has not been fetched'); + return { + source_name: this.source.provider, + message: 'Data has not been fetched' + }; + } else { + return { + source_name: this.source.provider, + locations: this.locations.length, + measures: this.measures.length, + from: this.measures.from, + to: this.measures.to + }; } - await Promise.all([ - ...stations, - Providers.put_measures(this.source.provider, measures) - ]); - const source_name = `${this.source.provider}-${this.org.orgId}`; - return { source_name, locations: stations.length, measures: measures.length, from: measures.from, to: measures.to }; } -} -function getSensorId(device, measurand) { - return `clarity-${device.nodeId}-${measurand.parameter}`; + } + + + module.exports = { async processor(source) { - const measurandsIndex = await Measurand.getIndexedSupportedMeasurands(lookup); - const now = dayjs(); - const limit = pLimit(10); // Limit to amount of orgs being processed at any given time - - return Promise.all( - source.organizations.map((org) => - limit(() => new ClarityApi(source, org).sync(measurandsIndex, now)) - ) - ); + + // create new clarity object + const client = new ClarityApi(source); + // fetch and process the data + await client.fetchData(); + // and then push it to the + Providers.put_measures_json(client.provider, client.data()); + + return client.summary(); } };