Skip to content

Commit

Permalink
chore: initial worker message channel impl
Browse files Browse the repository at this point in the history
  • Loading branch information
Dzianis Dashkevich committed Oct 22, 2024
1 parent 5b8e30d commit dc2c210
Show file tree
Hide file tree
Showing 10 changed files with 279 additions and 57 deletions.
49 changes: 42 additions & 7 deletions packages/playback/src/lib/player/base/base-player.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { PlayerEventType } from '../../consts/events';
// types
import type { IPlayerSource } from '../../types/source.declarations';
import type { IInterceptorsStorage } from '../../types/interceptors.declarations';
import type { IInterceptorsStorage, Interceptor } from '../../types/interceptors.declarations';
import type { ILogger } from '../../types/logger.declarations';
import type { PlayerConfiguration } from '../../types/configuration.declarations';
import type { DeepPartial } from '../../types/utility.declarations';
Expand All @@ -16,14 +17,16 @@ import { ConfigurationChangedEvent, LoggerLevelChangedEvent, VolumeChangedEvent
// errors
// pipelines
import { PipelineLoaderFactoryStorage } from './pipeline-loader-factory-storage';
import type { InterceptorType } from '../../consts/interceptor-type';
import type { InterceptorTypeToInterceptorPayloadMap } from '../../types/mappers/interceptor-type-to-interceptor-map.declarations';

declare const __COMMIT_HASH: string;
declare const __VERSION: string;
declare const __EXPERIMENTAL: boolean;

export interface PlayerDependencies {
readonly logger: ILogger;
readonly interceptorsStorage: IInterceptorsStorage;
readonly interceptorsStorage: IInterceptorsStorage<InterceptorTypeToInterceptorPayloadMap>;
readonly configurationManager: IStore<PlayerConfiguration>;
readonly eventEmitter: IEventEmitter<EventTypeToEventMap>;
}
Expand Down Expand Up @@ -91,13 +94,13 @@ export abstract class BasePlayer {
protected readonly eventEmitter_: IEventEmitter<EventTypeToEventMap>;

/**
* MARK: PRIVATE INSTANCE MEMBERS
* internal interceptor's storage service
*/
protected readonly interceptorsStorage_: IInterceptorsStorage<InterceptorTypeToInterceptorPayloadMap>;

/**
* internal interceptor's storage service
* MARK: PRIVATE INSTANCE MEMBERS
*/
private readonly interceptorsStorage_: IInterceptorsStorage;

/**
* internal configuration manager service
Expand All @@ -116,10 +119,42 @@ export abstract class BasePlayer {
*/

/**
* interceptor's storage getter
* add new interceptor for a specific type
* @param interceptorType - specific interceptor type
* @param interceptor - interceptor
*/
public addInterceptor<K extends InterceptorType>(
interceptorType: K,
interceptor: Interceptor<InterceptorTypeToInterceptorPayloadMap[K]>
): void {
this.interceptorsStorage_.addInterceptor(interceptorType, interceptor);
}

/**
* remove specific interceptor for a specific type
* @param interceptorType - specific interceptor type
* @param interceptor - interceptor
*/
public getInterceptorsStorage(): IInterceptorsStorage {
return this.interceptorsStorage_;
public removeInterceptor<K extends InterceptorType>(
interceptorType: K,
interceptor: Interceptor<InterceptorTypeToInterceptorPayloadMap[K]>
): void {
this.interceptorsStorage_.removeInterceptor(interceptorType, interceptor);
}

/**
* remove all interceptors for a specific type
* @param interceptorType - specific interceptor type
*/
public removeAllInterceptorsForType<K extends InterceptorType>(interceptorType: K): void {
this.interceptorsStorage_.removeAllInterceptorsForType(interceptorType);
}

/**
* remove all interceptors
*/
public removeAllInterceptors(): void {
this.interceptorsStorage_.removeAllInterceptors();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export enum MainToWorkerMessageType {
SetLoggerLevel = 'SetLoggerLevel',
UpdateConfiguration = 'UpdateConfiguration',
InterceptorsExecutionResult = 'InterceptorsExecutionResult',
Stop = 'Stop',
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export enum WorkerToMainMessageType {
EmitEvent = 'EmitEvent',
RunInterceptors = 'RunInterceptors',
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import type { LoggerLevel } from '../../../consts/logger-level';
import type { PlayerConfiguration } from '../../../types/configuration.declarations';
import { MainToWorkerMessageType } from '../consts/main-to-worker-message-type';
import type { InterceptorTypeToInterceptorPayloadMap } from '../../../types/mappers/interceptor-type-to-interceptor-map.declarations';
import type { InterceptorType } from '../../../consts/interceptor-type';

export abstract class MainToWorkerMessage {
public abstract readonly type: MainToWorkerMessageType;
}

export class SetLoggerLevelMessage extends MainToWorkerMessage {
public readonly type = MainToWorkerMessageType.SetLoggerLevel;
public readonly level: LoggerLevel;

public constructor(level: LoggerLevel) {
super();
this.level = level;
}
}

export class UpdateConfigurationMessage extends MainToWorkerMessage {
public readonly type = MainToWorkerMessageType.UpdateConfiguration;
public readonly configuration: PlayerConfiguration;

public constructor(configuration: PlayerConfiguration) {
super();
this.configuration = configuration;
}
}

export class InterceptorsExecutionResultMessage extends MainToWorkerMessage {
public readonly type = MainToWorkerMessageType.InterceptorsExecutionResult;
public readonly executionId: string;
public readonly result: InterceptorTypeToInterceptorPayloadMap[InterceptorType];

public constructor(executionId: string, result: InterceptorTypeToInterceptorPayloadMap[InterceptorType]) {
super();
this.executionId = executionId;
this.result = result;
}
}

export class StopMessage extends MainToWorkerMessage {
public readonly type = MainToWorkerMessageType.Stop;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { WorkerToMainMessageType } from '../consts/worker-to-main-message-type';
import type { PlayerEvent } from '../../../events/base-player-event';
import type { InterceptorType } from '../../../consts/interceptor-type';
import type { InterceptorTypeToInterceptorPayloadMap } from '../../../types/mappers/interceptor-type-to-interceptor-map.declarations';

export abstract class WorkerToMainMessage {
public abstract readonly type: WorkerToMainMessageType;
}

export class EmitEventMessage extends WorkerToMainMessage {
public readonly type = WorkerToMainMessageType.EmitEvent;
public readonly event: PlayerEvent;

public constructor(event: PlayerEvent) {
super();
this.event = event;
}
}

export class RunInterceptorsMessage extends WorkerToMainMessage {
public readonly type = WorkerToMainMessageType.RunInterceptors;
public readonly interceptorType: InterceptorType;
public readonly payload: InterceptorTypeToInterceptorPayloadMap[InterceptorType];
public readonly executionId: string = String(Date.now() + Math.random());

public constructor(
interceptorType: InterceptorType,
payload: InterceptorTypeToInterceptorPayloadMap[InterceptorType]
) {
super();
this.interceptorType = interceptorType;
this.payload = payload;
}
}
82 changes: 63 additions & 19 deletions packages/playback/src/lib/player/worker-thread/player-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,56 +6,100 @@ import type { LoggerLevel } from '../../consts/logger-level';
// types
import type { DeepPartial } from '../../types/utility.declarations';
import type { PlayerConfiguration } from '../../types/configuration.declarations';
import { ServiceLocator } from '../../service-locator';
import type { MainToWorkerMessage } from './messages/main-to-worker-messages';
import { StopMessage } from './messages/main-to-worker-messages';
import { InterceptorsExecutionResultMessage } from './messages/main-to-worker-messages';
import { UpdateConfigurationMessage } from './messages/main-to-worker-messages';
import { SetLoggerLevelMessage } from './messages/main-to-worker-messages';
import type { EmitEventMessage, RunInterceptorsMessage, WorkerToMainMessage } from './messages/worker-to-main-messages';
import { WorkerToMainMessageType } from './consts/worker-to-main-message-type';

declare const __WORKER_CODE: string;

interface PlayerWorkerDependencies extends PlayerDependencies {
readonly worker: Worker;
readonly workerScriptBlobUrl: string;
}

export class Player extends BasePlayer {
public static create(): Player {
const serviceLocator = new ServiceLocator();

const workerScriptBlob = new Blob([__WORKER_CODE], { type: 'application/javascript' });
const workerScriptBlobUrl = URL.createObjectURL(workerScriptBlob);
const worker = new Worker(workerScriptBlobUrl);

return new Player({
...serviceLocator,
worker,
workerScriptBlobUrl: workerScriptBlobUrl,
});
}

private readonly worker_: Worker;
private readonly workerScriptBlobUrl_: string;

public constructor(dependencies: PlayerDependencies) {
public constructor(dependencies: PlayerWorkerDependencies) {
super(dependencies);

const workerCodeBlob = new Blob([__WORKER_CODE], { type: 'application/javascript' });
// TODO dispose worker blob url
const workerCodeBlobUrl = URL.createObjectURL(workerCodeBlob);
// TODO: I would assume worker should be injected in the constructor for testing purposes
this.worker_ = new Worker(workerCodeBlobUrl);
// TODO, worker terminate + cleanup
this.worker_ = dependencies.worker;
this.workerScriptBlobUrl_ = dependencies.workerScriptBlobUrl;
this.worker_.addEventListener('message', this.onWorkerMessage_);
}

private readonly onWorkerMessage_ = (event: MessageEvent): void => {
this.logger_.debug('received message from worker', event.data);
// TODO: EMIT_EVENT --> eventEmitter.emit(event)
// TODO: RUN_INTERCEPTORS --> execute interceptors and post message (INTERCEPTORS_EXECUTED) with result
private readonly onWorkerMessage_ = (event: MessageEvent<WorkerToMainMessage>): void => {
switch (event.data.type) {
case WorkerToMainMessageType.EmitEvent:
return this.handleEmitEventMessage_(event.data as EmitEventMessage);
case WorkerToMainMessageType.RunInterceptors:
return this.handleRunInterceptorsMessage_(event.data as RunInterceptorsMessage);
default:
return this.handleUnknownMessage_(event.data);
}
};

private handleEmitEventMessage_(message: EmitEventMessage): void {
this.eventEmitter_.emitEvent(message.event);
}

private handleRunInterceptorsMessage_(message: RunInterceptorsMessage): void {
this.interceptorsStorage_.executeInterceptors(message.interceptorType, message.payload).then((result) => {
this.sendMessageToWorkerThread_(new InterceptorsExecutionResultMessage(message.executionId, result));
});
}

private handleUnknownMessage_(message: WorkerToMainMessage): void {
this.logger_.warn('Unknown message received from worker: ', message);
}

public setLoggerLevel(loggerLevel: LoggerLevel): void {
super.setLoggerLevel(loggerLevel);
// TODO: post message
this.worker_.postMessage({});
this.sendMessageToWorkerThread_(new SetLoggerLevelMessage(loggerLevel));
}

public updateConfiguration(configurationChunk: DeepPartial<PlayerConfiguration>): void {
super.updateConfiguration(configurationChunk);
// TODO: post message
this.worker_.postMessage({});
this.sendMessageToWorkerThread_(new UpdateConfigurationMessage(this.getConfigurationSnapshot()));
}

public resetConfiguration(): void {
super.resetConfiguration();
// TODO: post message
this.worker_.postMessage({});
this.sendMessageToWorkerThread_(new UpdateConfigurationMessage(this.getConfigurationSnapshot()));
}

public stop(reason: string): void {
super.stop(reason);
// TODO: post message
this.worker_.postMessage({});
this.sendMessageToWorkerThread_(new StopMessage());
}

private sendMessageToWorkerThread_(message: MainToWorkerMessage): void {
this.worker_.postMessage(message);
}

public dispose(): void {
super.dispose();
this.worker_.terminate();
URL.revokeObjectURL(this.workerScriptBlobUrl_);
}
}
46 changes: 46 additions & 0 deletions packages/playback/src/lib/player/worker-thread/worker-bridge.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/**
* This file should be entry point for worker bundle
*/
import type { WorkerToMainMessage } from './messages/worker-to-main-messages';
import type { MainToWorkerMessage } from './messages/main-to-worker-messages';
import { MainToWorkerMessageType } from './consts/main-to-worker-message-type';

interface WorkerBridgeDependencies {
readonly globalScope: Window & typeof globalThis;
}

class WorkerBridge {
public static create(): WorkerBridge {
return new WorkerBridge({
globalScope: self,
});
}

private readonly globalScope_: Window & typeof globalThis;

public constructor(dependencies: WorkerBridgeDependencies) {
this.globalScope_ = dependencies.globalScope;
// We don't care about clean-up, since terminate() call on main thread should fully destroy worker
this.globalScope_.addEventListener('message', this.onMessageFromMainThread_);
}

private readonly onMessageFromMainThread_ = (event: MessageEvent<MainToWorkerMessage>): void => {
switch (event.data.type) {
case MainToWorkerMessageType.SetLoggerLevel: {
break;
}
case MainToWorkerMessageType.UpdateConfiguration: {
break;
}
default: {
break;
}
}
};

private sendMessageToMainThread_(message: WorkerToMainMessage): void {
this.globalScope_.postMessage(message);
}
}

WorkerBridge.create();
16 changes: 6 additions & 10 deletions packages/playback/src/lib/types/interceptors.declarations.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
import type { InterceptorType } from '../consts/interceptor-type';
import type { InterceptorTypeToInterceptorMap } from './mappers/interceptor-type-to-interceptor-map.declarations';
export type Interceptor<T> = (payload: T) => Promise<T>;

export interface IInterceptorsStorage {
addInterceptor<K extends InterceptorType>(interceptorType: K, interceptor: InterceptorTypeToInterceptorMap[K]): void;
removeInterceptor<K extends InterceptorType>(
interceptorType: K,
interceptor: InterceptorTypeToInterceptorMap[K]
): void;
getInterceptorsSet<K extends InterceptorType>(interceptorType: K): Set<InterceptorTypeToInterceptorMap[K]>;
removeAllInterceptorsForType<K extends InterceptorType>(interceptorType: K): void;
export interface IInterceptorsStorage<M> {
addInterceptor<K extends keyof M>(interceptorType: K, interceptor: Interceptor<M[K]>): void;
removeInterceptor<K extends keyof M>(interceptorType: K, interceptor: Interceptor<M[K]>): void;
executeInterceptors<K extends keyof M>(interceptorType: K, payload: M[K]): Promise<M[K]>;
removeAllInterceptorsForType<K extends keyof M>(interceptorType: K): void;
removeAllInterceptors(): void;
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { InterceptorType } from '../../consts/interceptor-type';

export interface InterceptorTypeToInterceptorMap {
[InterceptorType.NetworkRequest]: (request: Request) => Promise<Request>;
[InterceptorType.HlsPlaylistParse]: (playlist: Uint8Array) => Promise<Uint8Array>;
export interface InterceptorTypeToInterceptorPayloadMap {
[InterceptorType.NetworkRequest]: Request;
[InterceptorType.HlsPlaylistParse]: Uint8Array;
}
Loading

0 comments on commit dc2c210

Please sign in to comment.