Skip to content

Commit

Permalink
use db to persist latest reports; revisit the logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Maxime Chaillet committed Jan 31, 2025
1 parent e69bccd commit 6ae7b69
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 53 deletions.
4 changes: 2 additions & 2 deletions alerting/cron_aa_storm_alert_run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ docker compose run --entrypoint 'yarn aa-storm-alert-worker' alerting-node 2>&1
## To set up the cron job, run the following command on the server:
# crontab -e
## and then add the following line to the crontab file:
# 15 0,6,12,18 * * * ~/prism-app/alerting/cron_aa_storm_alert_run.sh
## This will run the alerting script every day at 0h15; 6h15; 12h15 and 18h15.
# 0 * * * * ~/prism-app/alerting/cron_aa_storm_alert_run.sh
## This will run the alerting script every hour at minute 0.
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { MigrationInterface, QueryRunner } from "typeorm";

export class CreateLatestAaStormReportsTable1738249210356
implements MigrationInterface
{
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`CREATE TABLE "latest_aa_storm_reports" (
"id" SERIAL NOT NULL,
"report_identifier" character varying NOT NULL,
CONSTRAINT "PK_ad91cad659a3536465d564a4b3a" PRIMARY KEY ("id")
)`);
}

public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`DROP TABLE "latest_aa_storm_reports"`);
}
}
146 changes: 95 additions & 51 deletions alerting/src/aa-storm-alert-worker.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,7 @@
import { isNaN } from 'lodash';
import Bluebird from 'bluebird';
import nodeFetch from 'node-fetch';
import { createConnection, Repository } from 'typeorm';
import { API_URL } from './constants';
import { Alert } from './entities/alerts.entity';
import { calculateAlert } from './utils/analysis-utils';
import { sendEmail } from './utils/email';
import { fetchCoverageLayerDays, formatUrl, WMS } from 'prism-common';
import moment from 'moment';
import { StormDataResponseBody, WindState } from './types/rawStormDataTypes';
import { LatestAAStormReports } from './entities/latestAAStormReports.entity';

interface EmailPayload {}

Expand Down Expand Up @@ -95,66 +88,117 @@ function isEmailNeededByReport(report: StormDataResponseBody) {
return false;
}

async function buildEmailPayloads(): Promise<EmailPayload[]> {
// get all the reports
// fetch and extract the more recent short report for each reported storm
async function fetchLatestAvailableReports() {
// fetch all reports
const allReports = await fetchAllReports();
console.log('allReports', allReports);

if (!allReports) {
console.error('Error fetching all reports');
return [];
}

// filter today's reports
const today = moment.utc().format('YYYY-MM-DD');
const todaysReports = allReports[today];
if (!todaysReports) {
return [];
}
// filter latest reports for all storms using day
const latestReportsDate = Object.keys(allReports).reduce(
(latestDateReports, currentDateReports) =>
new Date(currentDateReports) > new Date(latestDateReports)
? currentDateReports
: latestDateReports,
);

const latestDayReports = allReports[latestReportsDate];

// for each storm of the last day, keep only the latest report by time

return Object.keys(latestDayReports).map((stormName) => {
const stormShortReports = latestDayReports[stormName];

// get the latest report for that storm
const latestReport = stormShortReports.reduce(
(latestShortReport, currentShortReport) =>
new Date(currentShortReport.ref_time) >
new Date(latestShortReport.ref_time)
? currentShortReport
: latestShortReport,
);
return latestReport;
});
}

async function filterAlreadyProcessedReports(
availableReports: ShortReport[],
latestStormReportsRepository: Repository<LatestAAStormReports>,
) {
// get the last reports which have been processed for email alert system
const latestProcessedReports = await latestStormReportsRepository.find();

const latestProcessedReportsPaths = latestProcessedReports.map(
(item) => item.reportIdentifier,
);

// for each today's storm
const falsyOrEmailPayloads = await Promise.all(
Object.keys(todaysReports).map(async (stormName) => {
const stormShortReports = todaysReports[stormName];

// get the latest detailed report for that storm
const latestReport = stormShortReports.reduce(
(latestShortReport, currentShortReport) =>
new Date(currentShortReport.ref_time) >
new Date(latestShortReport.ref_time)
? currentShortReport
: latestShortReport,
);

const detailedStormReport: StormDataResponseBody = await fetch(
`https://data.earthobservation.vam.wfp.org/public-share/aa/ts/outputs/${latestReport.path}?v2`,
).then((data) => data.json());

console.log('detailedstormreport', detailedStormReport);
// decide whether an email is required
const isEmailNeeded = isEmailNeededByReport(detailedStormReport);

if (isEmailNeeded) {
// TODO: add the missing items required to feed the email template
return {
cycloneName: detailedStormReport.forecast_details.cyclone_name,
};
}

return false;
}),
return availableReports.filter(
(availableReport) =>
!latestProcessedReportsPaths.includes(availableReport.path),
);
}

return falsyOrEmailPayloads.filter((payload) => payload);
async function buildEmailPayloads(
shortReports: ShortReport[],
): Promise<EmailPayload[]> {
try {
const emailPayload = await Promise.all(
shortReports.map(async (shortReport) => {
const detailedStormReport: StormDataResponseBody = await fetch(
`https://data.earthobservation.vam.wfp.org/public-share/aa/ts/outputs/${shortReport.path}?v2`,
).then((data) => data.json());

const isEmailNeeded = isEmailNeededByReport(detailedStormReport);

if (isEmailNeeded) {
// TODO: add the missing items required to feed the email template
return {
cycloneName: detailedStormReport.forecast_details.cyclone_name,
};
}

return false;
}),
);
return emailPayload.filter((payload) => payload);
} catch (e) {
console.error('Error while creating email payload');
return [];
}
}

async function run() {
//check whether an email should be sent
buildEmailPayloads().then((emailPayloads) => {
// create a connection to the remote db
const connection = await createConnection();
const latestStormReportsRepository =
connection.getRepository(LatestAAStormReports);

const latestAvailableReports = await fetchLatestAvailableReports();

// filter reports which have been already processed

const filteredAvailableReports = await filterAlreadyProcessedReports(
latestAvailableReports,
latestStormReportsRepository,
);

// check whether an email should be sent
buildEmailPayloads(filteredAvailableReports).then((emailPayloads) => {
console.log('emailPayload', emailPayloads);

// create templates
// send emails
});

// drop all latest storm reports stored to prevent accumulation of useless data in this table by time
await latestStormReportsRepository.clear();
await latestStormReportsRepository.save(
latestAvailableReports.map((report) => ({ reportIdentifier: report.path })),
);
}

run();
10 changes: 10 additions & 0 deletions alerting/src/entities/latestAAStormReports.entity.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { Entity, PrimaryGeneratedColumn, Column } from 'typeorm';

@Entity()
export class LatestAAStormReports {
@PrimaryGeneratedColumn()
id: number;

@Column()
reportIdentifier: string;
}

0 comments on commit 6ae7b69

Please sign in to comment.