Skip to content

Commit

Permalink
Merge pull request #111 from tago-io/sse_others
Browse files Browse the repository at this point in the history
[SSE] Migrate SocketIO to SSE
  • Loading branch information
felipefdl authored Mar 20, 2024
2 parents 3927f75 + 136061c commit c3d4be0
Show file tree
Hide file tree
Showing 19 changed files with 1,613 additions and 1,982 deletions.
2 changes: 1 addition & 1 deletion LICENSE.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@

END OF TERMS AND CONDITIONS

Copyright 2020 Tago LLC
Copyright 2020 TagoIO Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down
3,158 changes: 1,428 additions & 1,730 deletions package-lock.json

Large diffs are not rendered by default.

48 changes: 24 additions & 24 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "@tago-io/sdk",
"version": "11.1.0",
"description": "TagoIO SDK for JavaScript in the browser and Node.js",
"author": "Tago LLC",
"author": "TagoIO Inc.",
"homepage": "https://tago.io",
"license": "Apache-2.0",
"repository": "tago-io/sdk-js.git",
Expand Down Expand Up @@ -33,34 +33,34 @@
"docs": "typedoc"
},
"devDependencies": {
"@swc/jest": "0.2.26",
"@types/express": "4.17.17",
"@types/jest": "29.5.2",
"@types/node": "20.2.5",
"@types/papaparse": "5.3.7",
"@types/qs": "6.9.7",
"@swc/jest": "0.2.36",
"@types/express": "4.17.21",
"@types/jest": "29.5.12",
"@types/node": "20.11.28",
"@types/papaparse": "5.3.14",
"@types/qs": "6.9.12",
"@types/socket.io-client": "1.4.36",
"@typescript-eslint/eslint-plugin": "5.59.9",
"@typescript-eslint/parser": "5.59.9",
"dts-bundle-generator": "8.0.1",
"eslint": "8.42.0",
"eslint-config-prettier": "8.8.0",
"eslint-plugin-prettier": "4.2.1",
"express": "4.18.2",
"jest": "29.5.0",
"prettier": "2.8.8",
"typedoc": "0.24.8",
"typedoc-plugin-extras": "2.3.3",
"typedoc-plugin-missing-exports": "2.0.0",
"typescript": "5.1.3"
"@typescript-eslint/eslint-plugin": "7.2.0",
"@typescript-eslint/parser": "7.2.0",
"dts-bundle-generator": "9.3.1",
"eslint": "8.57.0",
"eslint-config-prettier": "9.1.0",
"eslint-plugin-prettier": "5.1.3",
"express": "4.18.3",
"jest": "29.7.0",
"prettier": "3.2.5",
"typedoc": "0.25.12",
"typedoc-plugin-extras": "3.0.0",
"typedoc-plugin-missing-exports": "2.2.0",
"typescript": "5.4.2"
},
"dependencies": {
"axios": "1.4.0",
"axios": "1.6.8",
"form-data": "4.0.0",
"nanoid": "3.3.6",
"nanoid": "3.3.7",
"papaparse": "5.4.1",
"qs": "6.11.2",
"socket.io-client": "4.6.2"
"qs": "6.12.0",
"socket.io-client": "4.7.5"
},
"optionalDependencies": {
"eventsource": "2.0.2"
Expand Down
4 changes: 2 additions & 2 deletions src/common/common.types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ type RecursivePartial<T> = {
[P in keyof T]?: T[P] extends (infer U)[]
? RecursivePartial<U>[]
: T[P] extends object
? RecursivePartial<T[P]>
: T[P];
? RecursivePartial<T[P]>
: T[P];
};
interface Query<T, U> {
/**
Expand Down
38 changes: 26 additions & 12 deletions src/infrastructure/apiSSE.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,28 @@
import { GenericModuleParams } from "../common/TagoIOModule";
import regions from "../regions";

const channels = {
deviceInspector: "device_inspector",
} as const;
const channelsWithID = ["device_inspector", "analysis_console", "ui_dashboard"] as const;
const channelsWithoutID = ["notification", "analysis_trigger", "ui"] as const;
const channels = [...channelsWithID, ...channelsWithoutID] as const;

type openSSEConfig = GenericModuleParams & {
channel: keyof typeof channels;
resource_id?: string;
type ChannelWithID = (typeof channelsWithID)[number];
type ChannelWithoutID = (typeof channelsWithoutID)[number];

type OpenSSEWithID = GenericModuleParams & {
channel: ChannelWithID;
resource_id: string;
};

type OpenSSEWithoutID = GenericModuleParams & {
channel: ChannelWithoutID;
};

type OpenSSEConfig = OpenSSEWithID | OpenSSEWithoutID;

function isChannelWithID(params: OpenSSEConfig): params is OpenSSEWithID {
return channelsWithID.includes(params.channel as ChannelWithID);
}

async function loadEventSourceLib(): Promise<typeof EventSource> {
if (globalThis.EventSource) {
return globalThis.EventSource;
Expand All @@ -19,15 +32,16 @@ async function loadEventSourceLib(): Promise<typeof EventSource> {
}
}

async function openSSEListening(params: openSSEConfig): Promise<EventSource> {
const { region, token, channel, resource_id } = params;
async function openSSEListening(params: OpenSSEConfig): Promise<EventSource> {
const { region, token } = params;

const url = new URL(regions(region).sse);
url.pathname = "/events";

if (params.resource_id) {
url.searchParams.set("channel", `${channels[channel]}::${resource_id}`);
if (isChannelWithID(params)) {
url.searchParams.set("channel", `${params.channel}.${params.resource_id}`);
} else {
url.searchParams.set("channel", `${channels[channel]}`);
url.searchParams.set("channel", `${params.channel}`);
}

url.searchParams.set("token", token);
Expand All @@ -39,4 +53,4 @@ async function openSSEListening(params: openSSEConfig): Promise<EventSource> {
}

export { openSSEListening, channels };
export type { openSSEConfig };
export type { OpenSSEConfig };
2 changes: 1 addition & 1 deletion src/infrastructure/envParams.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"version": "11.0.6"}
{"version": "11.1.0"}
49 changes: 39 additions & 10 deletions src/modules/Analysis/Analysis.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import TagoIOModule from "../../common/TagoIOModule";
import ConsoleService from "../Services/Console";
import apiSocket, { channels } from "../../infrastructure/apiSocket";
import { openSSEListening } from "../../infrastructure/apiSSE";
import { AnalysisConstructorParams, analysisFunction, AnalysisEnvironment } from "./analysis.types";
import { JSONParseSafe } from "../../common/JSONParseSafe";

Expand Down Expand Up @@ -104,24 +104,53 @@ class Analysis extends TagoIOModule<AnalysisConstructorParams> {
}
}

private localRuntime() {
private async localRuntime() {
if (this.params.token === "unknown") {
throw "To run analysis locally, you needs a token";
}

const socket = apiSocket(this.params);
const analysis = await this.doRequest<{ name: string; active: boolean; run_on: "external" | "tago" }>({
path: "/info",
method: "GET",
}).catch((_) => undefined);

socket.on("connect", () => console.info("Connected to TagoIO, Getting analysis information..."));
if (!analysis) {
console.error("¬ Error :: Analysis not found or not active.");
return;
}

socket.on("disconnect", () => console.info("Disconnected from TagoIO.\n\n"));
if (analysis.run_on !== "external") {
console.info("¬ Warning :: Analysis is not set to run on external");
}

socket.on("error", (e: Error) => console.error("Connection error", e));
const sse = await openSSEListening({
token: this.params.token,
region: this.params.region,
channel: "analysis_trigger",
});

socket.on("ready", (analysis: any) => console.info(`Analysis [${analysis.name}] Started.`));
const tokenEnd = this.params.token.slice(-5);

socket.on(channels.analysisTrigger, (scope: any) => {
this.runLocal(scope.environment, scope.data, scope.analysis_id, scope.token);
});
sse.onmessage = (event) => {
const data = JSONParseSafe(event?.data, {})?.payload;

if (!data) {
// console.log("Invalid data", event.data);
return;
}

this.runLocal(data.environment, data.data, data.analysis_id, data.token);
};

sse.onerror = (_error) => {
// console.debug(_error);
console.error("¬ Connection was closed, trying to reconnect...");
};

sse.onopen = () => {
console.info(`\n¬ Connected to TagoIO :: Analysis [${analysis.name}](${tokenEnd}) is ready.`);
console.info("¬ Waiting for analysis trigger...\n");
};
}

public static use(analysis: analysisFunction, params?: AnalysisConstructorParams) {
Expand Down
3 changes: 1 addition & 2 deletions src/modules/Device/Device.ts
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,7 @@ class Device extends TagoIOModule<DeviceConstructorParams> {
}

/**
* Get parameters from device
* @param onlyUnRead set true to get only unread parameters
* Get paronlyUnReameters from device
* @example
* ```js
* const myDevice = new Device({ token: "my_device_token" });
Expand Down
1 change: 0 additions & 1 deletion src/modules/Dictionary/Dictionary.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ class Dictionary extends TagoIOModule<IDictionaryModuleParams> {
*
* @param language Language.
* @param dictionary ID or Slug.
* @param runURL URL for the Run to make anonymous request.
*/
public async getLanguagesData(dictionary: string, language = this.language): Promise<LanguageData> {
if (!language || !dictionary) {
Expand Down
Loading

0 comments on commit c3d4be0

Please sign in to comment.