diff --git a/fetcher/lib/providers.js b/fetcher/lib/providers.js index 16ed721..7d3cd85 100644 --- a/fetcher/lib/providers.js +++ b/fetcher/lib/providers.js @@ -45,7 +45,9 @@ class Providers { if(VERBOSE) console.log('Starting processor', { ...source, ...config }); const log = await this[source.provider].processor({ ...source, ...config }); // source_name is more consistent with our db schema - log.source_name = source.provider; + if(typeof(log) == 'object' && !Array.isArray(log) && !log.source_name) { + log.source_name = source.provider; + } return(log); } @@ -56,6 +58,7 @@ class Providers { * @param {String} subject */ async publish(message, subject) { + console.log('Publishing:', subject, message); if(process.env.TOPIC_ARN) { const cmd = new PublishCommand({ TopicArn: process.env.TOPIC_ARN, diff --git a/fetcher/providers/clarity.js b/fetcher/providers/clarity.js index 51800a2..c139740 100644 --- a/fetcher/providers/clarity.js +++ b/fetcher/providers/clarity.js @@ -5,8 +5,7 @@ */ const dayjs = require('dayjs'); -//const pLimit = require('p-limit'); -const { default: pLimit } = import('p-limit'); +const pLimit = require('p-limit'); const Providers = require('../lib/providers'); const { VERBOSE, request } = require('../lib/utils'); @@ -275,6 +274,8 @@ class ClarityApi { ...stations, Providers.put_measures(this.source.provider, measures) ]); + const source_name = `${this.source.provider}-${this.org.orgId}`; + return { source_name, locations: stations.length, measures: measures.length, from: measures.from, to: measures.to }; } } diff --git a/fetcher/providers/cmu.js b/fetcher/providers/cmu.js index b50d8fc..6b7c500 100644 --- a/fetcher/providers/cmu.js +++ b/fetcher/providers/cmu.js @@ -4,8 +4,7 @@ const dayjs = require('dayjs') .extend(require('dayjs/plugin/utc')) .extend(require('dayjs/plugin/timezone')) .extend(require('dayjs/plugin/customParseFormat')); -//const pLimit = require('p-limit'); -const { default: pLimit } = import('p-limit'); +const pLimit = require('p-limit'); const Providers = require('../lib/providers'); const { Sensor, SensorNode, SensorSystem } = require('../lib/station'); const { Measures, FixedMeasure } = require('../lib/measure'); diff --git a/fetcher/providers/purpleair.js b/fetcher/providers/purpleair.js index 0baa8d8..c62c5b3 100644 --- a/fetcher/providers/purpleair.js +++ b/fetcher/providers/purpleair.js @@ -85,6 +85,7 @@ async function processor(source) { await Providers.put_measures(source.provider, measures); if (VERBOSE) console.log(`ok - all ${measures.length} measurements pushed`); + return { locations: stations.length, measures: measures.length, from: measures.from, to: measures.to }; } async function fetchSensorData(source) { diff --git a/fetcher/providers/senstate.js b/fetcher/providers/senstate.js index 5a19233..76e514d 100644 --- a/fetcher/providers/senstate.js +++ b/fetcher/providers/senstate.js @@ -107,6 +107,7 @@ async function processor(source) { await Providers.put_measures(source.provider, measures, `senstate-${Math.floor(Date.now() / 1000)}-${Math.random().toString(36).substring(8)}`); console.log(`ok - all ${measures.length} measurements pushed`); + return { locations: stations.length, measures: measures.length, from: measures.from, to: measures.to }; } diff --git a/package.json b/package.json index c7e6aca..babcd85 100644 --- a/package.json +++ b/package.json @@ -41,7 +41,7 @@ "dotenv": "^16.4.5", "geo-tz": "^8.0.1", "googleapis": "133", - "p-limit": "^5.0.0", + "p-limit": "3.1.0", "request": "^2.88.2" }, "devDependencies": { diff --git a/yarn.lock b/yarn.lock index b672857..a99d9c8 100644 --- a/yarn.lock +++ b/yarn.lock @@ -4725,7 +4725,7 @@ optionator@^0.9.3: prelude-ls "^1.2.1" type-check "^0.4.0" -p-limit@^3.0.2: +p-limit@^3.0.2, p-limit@3.1.0: version "3.1.0" resolved "https://registry.npmjs.org/p-limit/-/p-limit-3.1.0.tgz" integrity sha512-TYOanM3wGwNGsZN2cVTYPArw454xnXj5qmWF1bEoAc4+cU/ol7GVh7odevjp1FNHduHc3KZMcFduxU5Xc6uJRQ== @@ -4739,13 +4739,6 @@ p-limit@^4.0.0: dependencies: yocto-queue "^1.0.0" -p-limit@^5.0.0: - version "5.0.0" - resolved "https://registry.npmjs.org/p-limit/-/p-limit-5.0.0.tgz" - integrity sha512-/Eaoq+QyLSiXQ4lyYV23f14mZRQcXnxfHrN0vCai+ak9G0pp9iEQukIIZq5NccEvwRB8PUnZT0KsOoDCINS1qQ== - dependencies: - yocto-queue "^1.0.0" - p-locate@^5.0.0: version "5.0.0" resolved "https://registry.npmjs.org/p-locate/-/p-locate-5.0.0.tgz"