Skip to content

Commit

Permalink
0.1.1 bug fix
Browse files Browse the repository at this point in the history
  • Loading branch information
shawnrushefsky committed May 2, 2024
1 parent afa0ac9 commit 407b5c9
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 16 deletions.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "kelpie",
"version": "0.1.0",
"version": "0.1.1",
"description": "A worker binary to coordinate long running jobs on salad. Works with Kelpie API",
"main": "dist/index.js",
"scripts": {
Expand Down
2 changes: 1 addition & 1 deletion readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ FROM yourimage:yourtag
RUN apt-get update && install -y wget

# kelpie is a standalone x86-64 linux binary
RUN wget https://github.com/SaladTechnologies/kelpie/releases/download/0.1.0/kelpie -O /kelpie && chmod +x /kelpie
RUN wget https://github.com/SaladTechnologies/kelpie/releases/download/0.1.1/kelpie -O /kelpie && chmod +x /kelpie

CMD ["/kelpie"]
```
Expand Down
56 changes: 46 additions & 10 deletions src/files.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,41 @@
import chokidar, { FSWatcher } from "chokidar";
import { join } from "path";
import fs from "fs/promises";
import fsPromises from "fs/promises";
import { Stats } from "fs";
import fs from "fs";

// Function to check if the file has stopped changing
function waitForFileStability(filePath: string): Promise<void> {
return new Promise((resolve, reject) => {
let lastKnownSize = -1;
let retries = 0;

const checkFile = () => {
fs.stat(filePath, (err, stats) => {
if (err) {
reject(`Error accessing file: ${err}`);
return;
}

if (stats.size === lastKnownSize) {
if (retries >= 3) {
// Consider the file stable after 3 checks
resolve();
} else {
retries++;
setTimeout(checkFile, 100); // Shortened interval due to smaller file size
}
} else {
lastKnownSize = stats.size;
retries = 0;
setTimeout(checkFile, 100);
}
});
};

checkFile();
});
}

export class DirectoryWatcher {
private watcher: FSWatcher | null = null;
Expand All @@ -22,11 +56,13 @@ export class DirectoryWatcher {
persistent: true,
});

this.watcher.on("change", async (path: string, stats?: Stats) => {
console.log(`Event: change on ${path}`);
const task = forEachFile(path, "change").finally(() => {
this.activeTasks.delete(task);
});
this.watcher.on("add", async (path: string, stats?: Stats) => {
console.log(`Event: add on ${path}`);
const task = waitForFileStability(path)
.then(() => forEachFile(path, "add"))
.finally(() => {
this.activeTasks.delete(task);
});
this.activeTasks.add(task);
});

Expand Down Expand Up @@ -56,18 +92,18 @@ export async function recursivelyClearFilesInDirectory(
): Promise<void> {
try {
console.log(`Clearing files in directory: ${directory}`);
const files = await fs.readdir(directory);
const files = await fsPromises.readdir(directory);
await Promise.all(
files.map(async (file) => {
const filePath = join(directory, file);
const stats = await fs.stat(filePath);
const stats = await fsPromises.stat(filePath);
if (stats.isFile()) {
console.log(`Removing file: ${filePath}`);
await fs.unlink(filePath);
await fsPromises.unlink(filePath);
} else if (stats.isDirectory()) {
await recursivelyClearFilesInDirectory(filePath);
console.log(`Removing directory: ${filePath}`);
await fs.rmdir(filePath);
await fsPromises.rmdir(filePath);
}
})
);
Expand Down
4 changes: 2 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ async function main() {
checkpointWatcher.watchDirectory(
async (localFilePath: string, eventType: string) => {
const relativeFilename = path.relative(CHECKPOINT_DIR, localFilePath);
if (eventType === "change") {
if (eventType === "add") {
await uploadFile(
localFilePath,
work.checkpoint_bucket,
Expand All @@ -140,7 +140,7 @@ async function main() {
outputWatcher.watchDirectory(
async (localFilePath: string, eventType: string) => {
const relativeFilename = path.relative(OUTPUT_DIR, localFilePath);
if (eventType === "change") {
if (eventType === "add") {
await uploadFile(
localFilePath,
work.output_bucket,
Expand Down

0 comments on commit 407b5c9

Please sign in to comment.