Skip to content

Commit

Permalink
Merge pull request #9 from Little709/max_cache_per_user
Browse files Browse the repository at this point in the history
Max cache per user
  • Loading branch information
fredrikburmester authored Jan 23, 2025
2 parents e8bc25e + fc00a5f commit b092d1e
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 15 deletions.
5 changes: 4 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,7 @@

# File-removal
# REMOVE_FILE_AFTER_RIGHT_DOWNLOAD=true
# TIME_TO_KEEP_FILES=8 # Hours. Non-option when REMOVE_FILE_AFTER_RIGHT_DOWNLOAD=false
# TIME_TO_KEEP_FILES=8 # Hours. Non-option when REMOVE_FILE_AFTER_RIGHT_DOWNLOAD=false

# File management
# MAX_CACHED_PER_USER=10
100 changes: 86 additions & 14 deletions src/app.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import * as kill from 'tree-kill';

export interface Job {
id: string;
status: 'queued' | 'optimizing' | 'completed' | 'failed' | 'cancelled' | 'ready-for-removal';
status: 'queued' | 'optimizing' | 'pending downloads limit' | 'completed' | 'failed' | 'cancelled' | 'ready-for-removal';
progress: number;
outputPath: string;
inputUrl: string;
Expand All @@ -30,10 +30,12 @@ export interface Job {
@Injectable()
export class AppService {
private activeJobs: Job[] = [];
private optimizationHistory: Job[] = [];
private ffmpegProcesses: Map<string, ChildProcess> = new Map();
private videoDurations: Map<string, number> = new Map();
private jobQueue: string[] = [];
private maxConcurrentJobs: number;
private maxCachedPerUser: number;
private cacheDir: string;
private immediateRemoval: boolean;

Expand All @@ -48,6 +50,10 @@ export class AppService {
'MAX_CONCURRENT_JOBS',
1,
);
this.maxCachedPerUser = this.configService.get<number>(
'MAX_CACHED_PER_USER',
10,
);
this.immediateRemoval = this.configService.get<boolean>(
'REMOVE_FILE_AFTER_RIGHT_DOWNLOAD',
true,
Expand Down Expand Up @@ -128,7 +134,6 @@ export class AppService {
const finalizeJobRemoval = () => {
if (job) {
this.jobQueue = this.jobQueue.filter(id => id !== jobId);

if (this.immediateRemoval === true || job.progress < 100) {
this.fileRemoval.cleanupReadyForRemovalJobs([job]);
this.activeJobs = this.activeJobs.filter(activeJob => activeJob.id !== jobId);
Expand All @@ -138,6 +143,9 @@ export class AppService {
this.logger.log('Immediate removal is not allowed, cleanup service will take care in due time')
}
}
this.activeJobs
.filter((nextjob) => nextjob.deviceId === job.deviceId && nextjob.status === 'pending downloads limit')
.forEach((job) => job.status = 'queued')
this.checkQueue();
};

Expand Down Expand Up @@ -169,6 +177,7 @@ export class AppService {

completeJob(jobId: string):void{
const job = this.activeJobs.find((job) => job.id === jobId);

if (job) {
job.status = 'ready-for-removal';
job.timestamp = new Date()
Expand Down Expand Up @@ -260,7 +269,35 @@ export class AppService {
}

private getCompletedJobs(): number {
return this.activeJobs.filter((job) => job.status === 'completed').length;
return this.activeJobs.filter((job) => job.status === 'ready-for-removal').length;
}

private isDeviceIdInOptimizeHistory(job:Job){
const uniqueDeviceIds: string[] = [...new Set(this.optimizationHistory.map((job: Job) => job.deviceId))];
const result = uniqueDeviceIds.includes(job.deviceId); // Check if job.deviceId is in uniqueDeviceIds
this.logger.log(`Device ID ${job.deviceId} is ${result ? 'in' : 'not in'} the finished jobs. Optimizing ${result ? 'Allowed' : 'not Allowed'}`);
return result
}

private getActiveJobDeviceIds(): string[]{
const uniqueDeviceIds: string[] = [
...new Set(
this.activeJobs
.filter((job: Job) => job.status === 'queued') // Filter jobs with status 'queued'
.map((job: Job) => job.deviceId) // Extract deviceId
)
];
return uniqueDeviceIds
}

private handleOptimizationHistory(job: Job): void{
// create a finished jobs list to make sure every device gets equal optimizing time
this.optimizationHistory.push(job) // push the newest job to the finished jobs list
const amountOfActiveDeviceIds = this.getActiveJobDeviceIds().length // get the amount of active queued job device ids
while(amountOfActiveDeviceIds <= this.optimizationHistory.length && this.optimizationHistory.length > 0){ // the finished jobs should always be lower than the amount of active jobs. This is to push out the last deviceid: FIFO
this.optimizationHistory.shift() // shift away the oldest job.
}
this.logger.log(`${this.optimizationHistory.length} deviceIDs have recently finished a job`)
}

private getUniqueDevices(): number {
Expand All @@ -269,23 +306,56 @@ export class AppService {
}

private checkQueue() {
let runningJobs = this.activeJobs.filter((job) => job.status === 'optimizing')
.length;

while (runningJobs < this.maxConcurrentJobs && this.jobQueue.length > 0) {
const nextJobId = this.jobQueue.shift();
if (nextJobId) {
this.startJob(nextJobId);
runningJobs++; // Now we track the newly started job
let runningJobs = this.activeJobs.filter((job) => job.status === 'optimizing').length;

this.logger.log(
`${runningJobs} active jobs running and ${this.jobQueue.length} items in the queue`,
);

for (const index in this.jobQueue) {
if (runningJobs >= this.maxConcurrentJobs) {
break; // Stop if max concurrent jobs are reached
}
const nextJobId = this.jobQueue[index]; // Access job ID by index
let nextJob: Job = this.activeJobs.find((job) => job.id === nextJobId);

if (!this.userTooManyCachedItems(nextJobId) ) {
nextJob.status = 'pending downloads limit'
// Skip this job if user cache limits are reached
continue;
}
if(this.isDeviceIdInOptimizeHistory(nextJob)){
// Skip this job if deviceID is in the recently finished jobs
continue
}
// Start the job and remove it from the queue
this.startJob(nextJobId);
this.jobQueue.splice(Number(index), 1); // Remove the started job from the queue
runningJobs++; // Increment running jobs
}
}

private userTooManyCachedItems(jobid): boolean{
if(this.maxCachedPerUser == 0){
return false
}
const theNewJob: Job = this.activeJobs.find((job) => job.id === jobid)
let completedUserJobs = this.activeJobs.filter((job) => (job.status === "completed" || job.status === 'optimizing') && job.deviceId === theNewJob.deviceId)
if((completedUserJobs.length >= this.maxCachedPerUser)){
this.logger.log(`Waiting for items to be downloaded - device ${theNewJob.deviceId} has ${completedUserJobs.length} downloads waiting `);
return false
}
else{
this.logger.log(`Optimizing - device ${theNewJob.deviceId} has ${completedUserJobs.length} downloads waiting`);
return true
}
}

private startJob(jobId: string) {
const job = this.activeJobs.find((job) => job.id === jobId);
if (job) {
job.status = 'optimizing';
this.handleOptimizationHistory(job)
const ffmpegArgs = this.getFfmpegArgs(job.inputUrl, job.outputPath);
this.startFFmpegProcess(jobId, ffmpegArgs)
.finally(() => {
Expand All @@ -310,6 +380,7 @@ export class AppService {
];
}


private async startFFmpegProcess(
jobId: string,
ffmpegArgs: string[],
Expand All @@ -336,6 +407,7 @@ export class AppService {
}

if (code === 0) {

job.status = 'completed';
job.progress = 100;
// Update the file size
Expand All @@ -357,15 +429,15 @@ export class AppService {
this.logger.error(
`Job ${jobId} failed with exit code ${code}. Input URL: ${job.inputUrl}`,
);
reject(new Error(`FFmpeg process failed with exit code ${code}`));
// reject(new Error(`FFmpeg process failed with exit code ${code}`));
}
});

ffmpegProcess.on('error', (error) => {
this.logger.error(
`FFmpeg process error for job ${jobId}: ${error.message}`,
);
reject(error);
// reject(error);
});
});
} catch (error) {
Expand All @@ -377,7 +449,7 @@ export class AppService {
}
}


private async getVideoDuration(
inputUrl: string,
jobId: string,
Expand Down

0 comments on commit b092d1e

Please sign in to comment.