Skip to content

Commit

Permalink
Merge pull request #8 from Little709/master
Browse files Browse the repository at this point in the history
  • Loading branch information
fredrikburmester authored Jan 23, 2025
2 parents c26af72 + dafc9ec commit e8bc25e
Show file tree
Hide file tree
Showing 9 changed files with 304 additions and 78 deletions.
6 changes: 5 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
# All these are optional - uncomment to use

# JELLYFIN_URL=http://your-jellyfin-url
# MAX_CONCURRENT_JOBS=1
# MAX_CONCURRENT_JOBS=1

# File-removal
# REMOVE_FILE_AFTER_RIGHT_DOWNLOAD=true
# TIME_TO_KEEP_FILES=8 # Hours. Non-option when REMOVE_FILE_AFTER_RIGHT_DOWNLOAD=false
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"dotenv": "^16.4.5",
"reflect-metadata": "^0.2.0",
"rxjs": "^7.8.1",
"tree-kill": "^1.2.2",
"uuid": "^10.0.0"
},
"devDependencies": {
Expand Down
38 changes: 20 additions & 18 deletions src/app.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
import { Response } from 'express';
import * as fs from 'fs';
import { AppService, Job } from './app.service';
import { log } from 'console';

@Controller()
export class AppController {
Expand Down Expand Up @@ -92,7 +93,7 @@ export class AppController {
@Delete('cancel-job/:id')
async cancelJob(@Param('id') id: string) {
this.logger.log(`Cancellation request for job: ${id}`);

// this.appService.completeJob(id);
const result = this.appService.cancelJob(id);
if (result) {
return { message: 'Job cancelled successfully' };
Expand Down Expand Up @@ -127,26 +128,27 @@ export class AppController {
);

const fileStream = fs.createReadStream(filePath);
fileStream.pipe(res);

// Wait for the file to finish sending
await new Promise((resolve) => {
res.on('finish', resolve);
this.logger.log(`Download started for ${filePath}`)

return new Promise((resolve, reject) => {
fileStream.pipe(res);

fileStream.on('end', () => {
// File transfer completed
this.logger.log(`File transfer ended for: ${filePath}`)

resolve(null);
});

fileStream.on('error', (err) => {
// Handle errors during file streaming
this.logger.error(`Error streaming file ${filePath}: ${err.message}`);
reject(err);
});
});

// const fileName = basename(filePath);
// this.logger.log(`Download request for file: ${fileName}`);

// const mimeType = mime.lookup(filePath) || 'application/octet-stream';

// res.set({
// 'Content-Type': mimeType,
// 'Content-Disposition': `attachment; filename="${fileName}"`,
// });

// return new StreamableFile(fs.createReadStream(filePath));
}


@Delete('delete-cache')
async deleteCache() {
this.logger.log('Cache deletion request');
Expand Down
16 changes: 14 additions & 2 deletions src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,28 @@ import { AuthMiddleware } from './auth.middleware';
import { ConfigModule } from '@nestjs/config';
import { JellyfinAuthService } from './jellyfin-auth.service';
import { ScheduleModule } from '@nestjs/schedule';
import { CleanupService } from './cleanup/cleanup.service';
import { FileRemoval } from './cleanup/removalUtils';


@Module({
imports: [ScheduleModule.forRoot(), ConfigModule.forRoot({ isGlobal: true })],
controllers: [AppController],
providers: [AppService, Logger, JellyfinAuthService],
providers: [AppService, Logger, JellyfinAuthService, CleanupService, FileRemoval],
})
export class AppModule implements NestModule {
configure(consumer: MiddlewareConsumer) {
consumer
.apply(AuthMiddleware)
.forRoutes('optimize-version', 'download/:id', 'cancel-job/:id');
.forRoutes(
'optimize-version',
'download/:id',
'cancel-job/:id',
'statistics',
'job-status/:id',
'start-job/:id',
'all-jobs',
'delete-cache',
);
}
}
118 changes: 85 additions & 33 deletions src/app.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,13 @@ import * as path from 'path';
import { ConfigService } from '@nestjs/config';
import * as fs from 'fs';
import { promises as fsPromises } from 'fs';
import { CACHE_DIR } from './constants';
import { FileRemoval } from './cleanup/removalUtils';
import * as kill from 'tree-kill';

export interface Job {
id: string;
status: 'queued' | 'optimizing' | 'completed' | 'failed' | 'cancelled';
status: 'queued' | 'optimizing' | 'completed' | 'failed' | 'cancelled' | 'ready-for-removal';
progress: number;
outputPath: string;
inputUrl: string;
Expand All @@ -32,21 +35,23 @@ export class AppService {
private jobQueue: string[] = [];
private maxConcurrentJobs: number;
private cacheDir: string;
private immediateRemoval: boolean;

constructor(
private logger: Logger,
private configService: ConfigService,
private readonly fileRemoval: FileRemoval

) {
this.cacheDir = path.join(process.cwd(), 'cache');
this.cacheDir = CACHE_DIR;
this.maxConcurrentJobs = this.configService.get<number>(
'MAX_CONCURRENT_JOBS',
1,
);

// Ensure the cache directory exists
if (!fs.existsSync(this.cacheDir)) {
fs.mkdirSync(this.cacheDir, { recursive: true });
}
this.immediateRemoval = this.configService.get<boolean>(
'REMOVE_FILE_AFTER_RIGHT_DOWNLOAD',
true,
);
}

async downloadAndCombine(
Expand Down Expand Up @@ -92,7 +97,7 @@ export class AppService {
if (!deviceId) {
return this.activeJobs;
}
return this.activeJobs.filter((job) => job.deviceId === deviceId);
return this.activeJobs.filter((job) => job.deviceId === deviceId && job.status !== 'ready-for-removal');
}

async deleteCache(): Promise<{ message: string }> {
Expand All @@ -110,23 +115,74 @@ export class AppService {
}
}

removeJob(jobId: string): void {
this.activeJobs = this.activeJobs.filter(job => job.id !== jobId);
this.logger.log(`Job ${jobId} removed.`);
}

cancelJob(jobId: string): boolean {
const job = this.activeJobs.find((job) => job.id === jobId);
this.completeJob(jobId);
const job = this.activeJobs.find(job => job.id === jobId);
const process = this.ffmpegProcesses.get(jobId);

const finalizeJobRemoval = () => {
if (job) {
this.jobQueue = this.jobQueue.filter(id => id !== jobId);

if (this.immediateRemoval === true || job.progress < 100) {
this.fileRemoval.cleanupReadyForRemovalJobs([job]);
this.activeJobs = this.activeJobs.filter(activeJob => activeJob.id !== jobId);
this.logger.log(`Job ${jobId} removed`);
}
else{
this.logger.log('Immediate removal is not allowed, cleanup service will take care in due time')
}
}
this.checkQueue();
};

if (process) {
process.kill('SIGKILL');
try {
this.logger.log(`Attempting to kill process tree for PID ${process.pid}`);
new Promise<void>((resolve, reject) => {
kill(process.pid, 'SIGINT', (err) => {
if (err) {
this.logger.error(`Failed to kill process tree for PID ${process.pid}: ${err.message}`);
reject(err);
} else {
this.logger.log(`Successfully killed process tree for PID ${process.pid}`);
resolve();
finalizeJobRemoval()
}
});
});
} catch (err) {
this.logger.error(`Error terminating process for job ${jobId}: ${err.message}`);
}
this.ffmpegProcesses.delete(jobId);
return true;
} else {
finalizeJobRemoval();
return true;
}

}

completeJob(jobId: string):void{
const job = this.activeJobs.find((job) => job.id === jobId);
if (job) {
this.jobQueue = this.jobQueue.filter((id) => id !== jobId);
this.activeJobs = this.activeJobs.filter((job) => job.id !== jobId);
job.status = 'ready-for-removal';
job.timestamp = new Date()
this.logger.log(`Job ${jobId} marked as completed and ready for removal.`);
} else {
this.logger.warn(`Job ${jobId} not found. Cannot mark as completed.`);
}
}

this.checkQueue();

this.logger.log(`Job ${jobId} canceled`);
return true;
cleanupJob(jobId: string): void {
const job = this.activeJobs.find((job) => job.id === jobId);
this.activeJobs = this.activeJobs.filter((job) => job.id !== jobId);
this.ffmpegProcesses.delete(jobId);
this.videoDurations.delete(jobId);
}

getTranscodedFilePath(jobId: string): string | null {
Expand All @@ -137,12 +193,6 @@ export class AppService {
return null;
}

cleanupJob(jobId: string): void {
this.activeJobs = this.activeJobs.filter((job) => job.id !== jobId);
this.ffmpegProcesses.delete(jobId);
this.videoDurations.delete(jobId);
}

getMaxConcurrentJobs(): number {
return this.maxConcurrentJobs;
}
Expand Down Expand Up @@ -219,24 +269,29 @@ export class AppService {
}

private checkQueue() {
const runningJobs = Array.from(this.activeJobs.values()).filter(
(job) => job.status === 'optimizing',
).length;
let runningJobs = this.activeJobs.filter((job) => job.status === 'optimizing')
.length;

while (runningJobs < this.maxConcurrentJobs && this.jobQueue.length > 0) {
const nextJobId = this.jobQueue.shift();
if (nextJobId) {
this.startJob(nextJobId);
runningJobs++; // Now we track the newly started job
}
}
}


private startJob(jobId: string) {
const job = this.activeJobs.find((job) => job.id === jobId);
if (job) {
job.status = 'optimizing';
const ffmpegArgs = this.getFfmpegArgs(job.inputUrl, job.outputPath);
this.startFFmpegProcess(jobId, ffmpegArgs);
this.startFFmpegProcess(jobId, ffmpegArgs)
.finally(() => {
// This runs after the returned Promise resolves or rejects.
this.checkQueue();
});
this.logger.log(`Started job ${jobId}`);
}
}
Expand All @@ -263,20 +318,19 @@ export class AppService {
await this.getVideoDuration(ffmpegArgs[1], jobId);

return new Promise((resolve, reject) => {
const ffmpegProcess = spawn('ffmpeg', ffmpegArgs);
const ffmpegProcess = spawn('ffmpeg', ffmpegArgs, { stdio: ['pipe', 'pipe', 'pipe']});
this.ffmpegProcesses.set(jobId, ffmpegProcess);

ffmpegProcess.stderr.on('data', (data) => {
this.updateProgress(jobId, data.toString());
});

ffmpegProcess.on('close', async (code) => {
this.ffmpegProcesses.delete(jobId);
this.videoDurations.delete(jobId);

const job = this.activeJobs.find((job) => job.id === jobId);
if (!job) {
// Job was cancelled and removed, just resolve
resolve();
return;
}
Expand Down Expand Up @@ -320,12 +374,10 @@ export class AppService {
if (job) {
job.status = 'failed';
}
} finally {
// Check queue after job completion or failure
this.checkQueue();
}
}


private async getVideoDuration(
inputUrl: string,
jobId: string,
Expand Down
Loading

0 comments on commit e8bc25e

Please sign in to comment.