Skip to content

Commit

Permalink
Setup versioning (#20)
Browse files Browse the repository at this point in the history
* Cleaned up the local methods a bit

Still need to add a local get/put object

* Added test@local method

* Some cleanup while retesting the versioning branch

Mostly just improvements to verbose and adding dryrun method

* Some dev improvements

* Deployment improvements

* Fixing README

* Added bom check and some bug fixes

* Added senstate to versioning branch

Co-authored-by: Christian Parker <cparker@talloaks-environmental.com>
  • Loading branch information
caparker and caparker authored Aug 30, 2022
1 parent 0322fa2 commit 68858ba
Show file tree
Hide file tree
Showing 14 changed files with 577 additions and 121 deletions.
115 changes: 69 additions & 46 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,41 @@ yarn test

### Env Variables

Configuration for the ingestion is provided via environment variables.

- `BUCKET`: The bucket to which the ingested data should be written. **Required**
- `SOURCE`: The [data source](#data-sources) to ingest. **Required**
- `LCS_API`: The API used when fetching supported measurands. _Default: `'https://api.openaq.org'`_
- `STACK`: The stack to which the ingested data should be associated. This is mainly used to apply a prefix to data uploaded to S3 in order to separate it from production data. _Default: `'local'`_
- `SECRET_STACK`: The stack to which the used [Secrets](#provider-secrets) are associated. At times, a developer may want to use credentials relating to a different stack (e.g. a devloper is testing the script, they want output data uploaded to the `local` stack but want to use the production stack's secrets). _Default: the value from the `STACK` env variable_
Production configuration for the ingestion is provided via environment variables.

- `BUCKET`: The bucket to which the data to be ingested should be written. **Required**
- `API_URL`: The API used when fetching supported measurands. _Default: `'https://api.openaq.org'`_
- `STACK`: The stack to which the ingested data should be associated. This is mainly used to apply a prefix to data uploaded to S3 in order to separate it from production data and to pull secrets down. _Default: `'local'`_

Development configuration adds the following variables
- `SOURCE`: The [data source](#data-sources) to ingest. This is provided from the file object in production but required to be set during development. Do not set this variable in production. **Required**
- `SOURCE_TYPE`: This will override the type value set in the source config
- `LOCAL_SOURCE_BUCKET`: This is (most likely) the path to use when pulling files from a local directory. Very helpful as you are setting up a new source.
- `LOCAL_DESTINATION_BUCKET`: The local path to save the transformed files. Also helpful when debugging pipelines.
- `DRYRUN`: If set to truthy it will prevent any upload or moving of files but stil allow you to get them.
- `VERBOSE`: Enable verbose logging. _Default: disabled_

### Running locally

To run the ingestion script locally (useful for testing without deploying), see the following example:
When running locally you may either use a `.env` file or pass the variables in directly see as in the following example:

```sh
LCS_API=https://api.openaq.org \
API_URL=https://api.openaq.org \
SOURCE=cac \
SOURCE_TYPE=local \
STACK=my-dev-stack \
SECRET_STACK=my-prod-stack \
BUCKET=openaq-fetches \
VERBOSE=1 \
SOURCE=habitatmap \
DRYRUN=1 \
node fetcher/index.js
```

You may also specify a specific `.env` file. For example, to load `.env.local` you would run
```sh
ENV=local node fetcher/index.js
```


## Data Sources

Data Sources can be configured by adding a config file & corresponding provider script. The two sections below
Expand All @@ -58,17 +70,30 @@ The first step for a new source is to add JSON config file to the the `fetcher/s

```json
{
"name": "cac",
"schema": "v1",
"provider": "example",
"provider": "versioning",
"frequency": "hour",
"meta": {}
"type": "google-bucket",
"config": {
"bucket": "name-of-bucket",
"folder": "path/to/files",
...
},
"parameters": {
"pm25": ["pm25", "ppm"],
...
}
}
```

| Attribute | Note |
| ----------- | -------------------------- |
| `provider` | Unique provider name |
| `frequency` | `day`, `hour`, or `minute` |
Attributes
- `name`: A unique name for reference
- `provider`: The provider script to use
- `frequency`: How often to run the source
- `type`: the source location type. Currently supports `google-bucket` and local
- `config`: Any config parameters needed for the source location
- `parameters`: A list of accepted measurands and their mapped value and units

The config file can contain any properties that should be configurable via the
provider script. The above table however outlines the attributes that are required.
Expand All @@ -82,36 +107,25 @@ The script here should expose a function named `processor`. This function should

The script below is a basic example of a new source:

```js
const Providers = require("../providers");
const { Sensor, SensorNode, SensorSystem } = require("../station");
const { Measures, FixedMeasure, MobileMeasure } = require("../measure");

async function processor(source_name, source) {
// Get Locations/Sensor Systems via http/s3 etc.
const locs = await get_locations();

// Map locations into SensorNodes
const station = new SensorNode();

await Providers.put_stations(source_name, [station]);

const fixed_measures = new Measures(FixedMeasure);
// or
const mobile_measures = new Measures(MobileMeasure);

fixed_measures.push(
new FixedMeasure({
sensor_id: "PurpleAir-123",
measure: 123,
timestamp: Math.floor(new Date() / 1000), //UNIX Timestamp
})
);

await Providers.put_measures(source_name, fixed_measures);
```json
{
"name": "source_name",
"schema": "v1",
"provider": "versioning",
"frequency": "minute",
"type": "google-bucket",
"config" : {
"bucket":"location-bucket",
"folder":"pending"
},
"parameters": {
"pm25": ["pm25", "ppb"],
"pm10": ["pm10", "ppb"],
"temp": ["temperature", "c"],
"ws": ["wind_speed", "m/s"],
"wd": ["wind_direction", "deg"]
}
}

module.exports = { processor };
```

### Provider Secrets
Expand Down Expand Up @@ -142,3 +156,12 @@ The should look something like the following and be stored in its entirety withi
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/service-account-email"
}
```

### Deploying

The deployment method (app.ts) is written to be dynamic and pull from the environmental variables to determine what gets deployed. There are 4 variable that can be modified when deploying:

- `STACK`: used as the id for the stack
- `BUCKET`: defines the ingest bucket
- `API_URL`: the url to the api to use
- `DEPLOYED_SOURCES`: a comma separated string that lists the sources that should be included in the deployment. Leaving this blank will deploy all sources. Names in the string must match the names in the source config files.
19 changes: 15 additions & 4 deletions cdk/app.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,31 @@
#!/usr/bin/env node
const path = process.env.ENV ? `../.env.${process.env.ENV}` : '.env';
require('dotenv').config({ path });

import * as cdk from "@aws-cdk/core";
import "source-map-support/register";
import { EtlPipeline } from "./stack";

const app = new cdk.App();
let sources = require('../fetcher/sources')

if(process.env.DEPLOYED_SOURCES) {
const DEPLOYED_SOURCES = process.env.DEPLOYED_SOURCES.split(",");
sources = sources.filter((d :any)=>DEPLOYED_SOURCES.includes(d.name))
}

const id = process.env.STACK || `cac-pipeline`

const stack = new EtlPipeline(app, `cac-pipeline`, {
description: `ETL Pipeline`,
const stack = new EtlPipeline(app, id, {
description: `ETL Pipeline for ${id}`,
sources,
fetcherModuleDir: "../fetcher",
schedulerModuleDir: "../scheduler",
sources: require('../fetcher/sources'),
bucketName: process.env.BUCKET || 'talloaks-openaq-ingest',
fetcherEnv: {
API_URL: process.env.API_URL || 'https://aagsfsmu92.execute-api.us-west-2.amazonaws.com'
},
});


cdk.Tags.of(stack).add('project', 'cac')
cdk.Tags.of(stack).add('Project', 'cac')
2 changes: 1 addition & 1 deletion cdk/stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ export class EtlPipeline extends cdk.Stack {
sources: Source[];
}): lambda.Function[] {
const durations: Record<Interval, cdk.Duration> = {
minute: cdk.Duration.minutes(1),
minute: cdk.Duration.minutes(5),
hour: cdk.Duration.hours(1),
day: cdk.Duration.days(1),
};
Expand Down
15 changes: 12 additions & 3 deletions fetcher/index.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
const path = process.env.ENV ? `.env.${process.env.ENV}` : '.env';
require('dotenv').config({ path });

const providers = new (require('./lib/providers'))();
const sources = require('./sources');

Expand All @@ -18,13 +21,19 @@ async function handler(event) {

const source_name = process.env.SOURCE || event.Records[0].body;
const source = sources.find((source) => source.name === source_name);

if (!source) throw new Error(`Unable to find ${source_name} in sources.`);

console.log(`Processing ${process.env.STACK}: '${source.provider}/${source.name}'`);
// override the source type (for dev)
if(process.env.SOURCE_TYPE) {
source.type = process.env.SOURCE_TYPE;
}

console.log(
`Processing ${process.env.STACK}: ${source.type}/${source.provider}/${source.name}`
);
await providers.processor(source);

return {};
return {};
} catch (err) {
console.error(err);
process.exit(1);
Expand Down
14 changes: 9 additions & 5 deletions fetcher/lib/measurand.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,14 @@ class Measurand {
static async getSupportedMeasurands(lookups) {
// Fetch from API
const supportedMeasurandParameters = [];
const url = new URL('/v2/parameters', process.env.API_URL || 'https://api.openaq.org');
let morePages;
let page = 1;
const baseurl = new URL('/v2/parameters', process.env.API_URL || 'https://api.openaq.org');
if (VERBOSE) console.debug(`Getting Supported Measurands - ${baseurl}`);
let morePages;
let page = 1;
do {
const url = new URL(
'/v2/parameters',
process.env.LCS_API || 'https://api.openaq.org'
baseurl,
);
url.searchParams.append('page', page++);
const {
Expand All @@ -62,14 +63,17 @@ class Measurand {
method: 'GET',
url
});

if(!results) throw new Error(`Could not connect to ${baseurl}`);

for (const { name } of results) {
supportedMeasurandParameters.push(name);
}
morePages = meta.found > meta.page * meta.limit;
} while (morePages);
if (VERBOSE)
console.debug(
`Fetched ${supportedMeasurandParameters.length} supported measurement parameters.`
`Fetched ${supportedMeasurandParameters.length} supported measurement parameters from ${baseurl}.`
);

// Filter provided lookups
Expand Down
12 changes: 6 additions & 6 deletions fetcher/lib/station.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@

const {
getObject,
putObject
putObject,
VERBOSE,
} = require('./utils');


Expand Down Expand Up @@ -48,7 +49,7 @@ class SensorNode {
const k = this.map[key] || key;
if(k) this[k] = value;
}
console.debug('Created new sensor node', this.sensor_node_id);
if (VERBOSE) console.debug('Created new sensor node', this.sensor_node_id);
}

addSensor(obj) {
Expand Down Expand Up @@ -98,15 +99,14 @@ class SensorNode {

async get() {
const key = this.key();
if(!this.stored) {
if(!this.stored && !process.env.DRYRUN) {
this.stored = await getObject(key);
}
return this.stored;
}

async put() {
const key = this.key();
//console.debug('PUTTING SENSOR NODE', key);
const current = this.get();
// make sure its different first
// if(this.different()) {
Expand Down Expand Up @@ -217,7 +217,7 @@ class Version {
this.provider = null;
this.stored = null;
Object.assign(this, p);
//console.debug('Created new version', this.sensor_id);
if (VERBOSE) console.debug('Created new version', this.sensor_id);
}

different(obj) {
Expand Down Expand Up @@ -257,7 +257,7 @@ class Version {

async get() {
const key = this.key();
if(!this.stored) {
if(!this.stored && !process.env.DRYRUN) {
this.stored = await getObject(key);
}
return this.stored;
Expand Down
Loading

0 comments on commit 68858ba

Please sign in to comment.