Skip to content

Commit

Permalink
0.0.9
Browse files Browse the repository at this point in the history
  • Loading branch information
shawnrushefsky committed Apr 26, 2024
1 parent 3745cd7 commit 6710248
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 57 deletions.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "kelpie",
"version": "0.0.8",
"version": "0.0.9",
"description": "A worker binary to coordinate long running jobs on salad. Works with Kelpie API",
"main": "dist/index.js",
"scripts": {
Expand Down
2 changes: 1 addition & 1 deletion readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ FROM yourimage:yourtag
RUN apt-get update && install -y wget

# kelpie is a standalone x86-64 linux binary
RUN wget https://github.com/SaladTechnologies/kelpie/releases/download/0.0.8/kelpie -O /kelpie && chmod +x /kelpie
RUN wget https://github.com/SaladTechnologies/kelpie/releases/download/0.0.9/kelpie -O /kelpie && chmod +x /kelpie

CMD ["/kelpie"]
```
Expand Down
2 changes: 2 additions & 0 deletions src/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ export async function sendHeartbeat(
}

export async function reportFailed(jobId: string): Promise<void> {
console.log(`Reporting job ${jobId} as failed`);
await fetchUpToNTimes(
`${KELPIE_API_URL}/jobs/${jobId}/failed`,
{
Expand All @@ -133,6 +134,7 @@ export async function reportFailed(jobId: string): Promise<void> {
}

export async function reportCompleted(jobId: string): Promise<void> {
console.log(`Reporting job ${jobId} as completed`);
await fetchUpToNTimes(
`${KELPIE_API_URL}/jobs/${jobId}/completed`,
{
Expand Down
41 changes: 33 additions & 8 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,34 +44,58 @@ process.on("SIGINT", () => {

process.on("SIGTERM", () => {
keepAlive = false;
console.log("Received SIGTERM, stopping...");
process.exit();
});

async function main() {
await clearAllDirectories();

while (keepAlive) {
const work = await getWork();
let work;
try {
work = await getWork();
} catch (e: any) {
console.error("Error fetching work: ", e);
await sleep(10000);
continue;
}

if (!work) {
console.log("No work available, sleeping for 10 seconds...");
await sleep(10000);
continue;
}
console.log(`Received work: ${work.id}`);

// Download required files
await Promise.all([
downloadAllFilesFromPrefix(
try {
await downloadAllFilesFromPrefix(
work.input_bucket,
work.input_prefix,
INPUT_DIR
),
downloadAllFilesFromPrefix(
);
} catch (e: any) {
console.error("Error downloading input files: ", e);
// await reportFailed(work.id);
continue;
}

try {
await downloadAllFilesFromPrefix(
work.checkpoint_bucket,
work.checkpoint_prefix,
CHECKPOINT_DIR
),
]);
);
} catch (e: any) {
console.error("Error downloading checkpoint files: ", e);
// await reportFailed(work.id);
continue;
}

console.log(
"All files downloaded successfully, starting directory watchers..."
);
const checkpointWatcher = new DirectoryWatcher(CHECKPOINT_DIR);
checkpointWatcher.watchDirectory(async (localFilePath) => {
const relativeFilename = path.relative(CHECKPOINT_DIR, localFilePath);
Expand All @@ -92,6 +116,7 @@ async function main() {
);
});

console.log("Starting heartbeat manager...");
heartbeatManager.startHeartbeat(
work.id,
work.heartbeat_interval,
Expand Down Expand Up @@ -139,4 +164,4 @@ async function main() {
}
}

main();
main().then(() => console.log("Kelpie Exiting"));
144 changes: 99 additions & 45 deletions src/s3.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,20 @@
import {
S3Client,
GetObjectCommand,
ListObjectsCommand,
PutObjectCommand,
ListObjectsV2Command,
ListObjectsV2CommandInput,
} from "@aws-sdk/client-s3";
import { Progress, Upload } from "@aws-sdk/lib-storage";
import fs from "fs";
import { pipeline, Readable } from "stream";
import { promisify } from "util";
import { Readable } from "stream";

import path from "path";
import fsPromises from "fs/promises";

const { AWS_REGION, AWS_DEFAULT_REGION } = process.env;

const s3Client = new S3Client({ region: AWS_REGION || AWS_DEFAULT_REGION });

const pipelineAsync = promisify(pipeline);

export async function uploadFile(
localFilePath: string,
bucketName: string,
Expand Down Expand Up @@ -73,52 +71,103 @@ export async function downloadFile(
// Perform the download
const data = await s3Client.send(new GetObjectCommand(downloadParams));

if (data.Body) {
const readableStream = new Readable().wrap(
data.Body as NodeJS.ReadableStream
);
const fileStream = fs.createWriteStream(localFilePath);
await pipelineAsync(readableStream, fileStream);
}

console.log("Download completed successfully");
} catch (err) {
return new Promise((resolve, reject) => {
if (data.Body instanceof Readable) {
// Loop through body chunks and write to file
const writeStream = fs.createWriteStream(localFilePath);
data.Body.pipe(writeStream)
.on("error", (err: any) => reject(err))
.on("close", () => resolve());
}
});
} catch (err: any) {
console.error("Error downloading file: ", err);
throw err;
}
}

export async function downloadAllFilesFromPrefix(
async function listAllS3Objects(
bucketName: string,
prefix?: string
): Promise<string[]> {
let continuationToken: string | undefined = undefined;
const allKeys: string[] = [];

do {
const params: ListObjectsV2CommandInput = {
Bucket: bucketName,
Prefix: prefix,
ContinuationToken: continuationToken,
};

const command = new ListObjectsV2Command(params);
const response = await s3Client.send(command);

// Collect all keys from the current batch
if (response.Contents) {
response.Contents.forEach((item) => {
if (item.Key) {
allKeys.push(item.Key);
}
});
}

// Update the continuation token
continuationToken = response.NextContinuationToken;
} while (continuationToken);

return allKeys;
}

async function processBatch(
batch: string[],
bucket: string,
prefix: string,
outputDir: string
) {
const downloadPromises = batch.map((key) => {
const filename = key.replace(prefix, "");
const localFilePath = path.join(outputDir, filename);
return downloadFile(bucket, key, localFilePath);
});

const results = await Promise.allSettled(downloadPromises);
results.forEach((result, index) => {
if (result.status === "rejected") {
console.error(`Download failed for ${batch[index]}: ${result.reason}`);
}
});
console.log(
`Batch processed with ${
results.filter((r) => r.status === "fulfilled").length
} successes and ${
results.filter((r) => r.status === "rejected").length
} failures.`
);
}

export async function downloadAllFilesFromPrefix(
bucket: string,
prefix: string,
outputDir: string,
batchSize: number = 10
): Promise<void> {
try {
console.log(
`Downloading all files with prefix ${prefix} from storage bucket: ${bucket}`
);
const listObjectsParams = {
Bucket: bucket,
Prefix: prefix,
};
const allKeys = await listAllS3Objects(bucket, prefix);
console.log(`Found ${allKeys.length} files to download`);

const data = await s3Client.send(new ListObjectsCommand(listObjectsParams));

if (data.Contents) {
await Promise.all(
data.Contents.map(async (object) => {
const key = object.Key!;
// The filename should remove the prefix from the key
const filename = key.replace(prefix, "");
if (!filename) {
return;
}
const localFilePath = path.join(outputDir, filename);
await downloadFile(bucket, key, localFilePath);
})
);
// Download files in batches
for (let i = 0; i < allKeys.length; i += batchSize) {
const batch = allKeys.slice(i, i + batchSize);
await processBatch(batch, bucket, prefix, outputDir);
}

console.log("Download completed successfully");
console.log(
`All files from s3://${bucket}/${prefix} downloaded to ${outputDir} successfully`
);
} catch (err) {
console.error("Error downloading files: ", err);
}
Expand All @@ -127,20 +176,25 @@ export async function downloadAllFilesFromPrefix(
export async function uploadDirectory(
directory: string,
bucket: string,
prefix: string
prefix: string,
batchSize: number = 10
): Promise<void> {
try {
console.log(
`Uploading directory ${directory} to storage bucket: ${bucket}`
);
const fileList = await getAllFilePaths(directory);
await Promise.all(
fileList.map(async (filePath) => {
const localFilePath = path.join(directory, filePath);
const key = prefix + filePath;
await uploadFile(localFilePath, bucket, key);
})
);
console.log(`Found ${fileList.length} files to upload`);
for (let i = 0; i < fileList.length; i += batchSize) {
const batch = fileList.slice(i, i + batchSize);
await Promise.all(
batch.map(async (filePath) => {
const localFilePath = path.join(directory, filePath);
const key = prefix + filePath;
return await uploadFile(localFilePath, bucket, key);
})
);
}
console.log("Directory uploaded successfully");
} catch (err) {
console.error("Error uploading directory: ", err);
Expand Down

0 comments on commit 6710248

Please sign in to comment.