From 803be8cdf48900aa9a970514d2d6aee7ea68e4b1 Mon Sep 17 00:00:00 2001 From: Pierre Tomasina Date: Sun, 16 Jul 2017 15:33:29 +0200 Subject: [PATCH] run return promise and error class implemented --- README.md | 39 +++++++++++++++++++ package-lock.json | 10 +++++ package.json | 6 ++- src/main.js | 96 ++++++++++++++++++++++++++++++++++++++++------- 4 files changed, 135 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index e1d48aa..9f1af8a 100644 --- a/README.md +++ b/README.md @@ -10,3 +10,42 @@ This library let you batch a full DynamoDB table to dispatch JSON event to Kines Create a full new ElasticSearchIndex before enable DynamoDB Stream to ES +## Demo + +```javascript +'use strict' + +const DyKi = require('../src/main'); +const tableName = 'env.project.dynamodb.table'; //Related to your ARN name +const streamName = 'env.project.kinesis.stream'; //Related to your ARN name +const startKey = { + uuid: { + "S": 'b379abae-0fa5-48a5-8834-9130d502b4fc' + } +}; + +const client = new DyKi.Client(tableName, streamName, 'eu-west-1', { + delay: 500, + dyCapacityUnitLimit: 10, + progressCallbackInterval: 1500, +}); + +const progressCallback = function(info) { + console.log('In progress'); + console.log(`Last Key: ${info.lastEvaluatedKey.uuid.S}`); + console.log(`Total: ${info.total}`); + console.log(`Unit: ${info.consumedUnitCapacity}`); + console.log("\n"); +}; + +client.run(25, startKey, progressCallback).then(info => { + console.log('Finish !!'); + + console.log(`Last Key: ${info.lastEvaluatedKey.uuid.S}`); + console.log(`Total: ${info.total}`); + console.log(`Unit: ${info.consumedUnitCapacity}`); +}, err => { + console.log(err.message, err.code); +}); +``` + diff --git a/package-lock.json b/package-lock.json index 426a618..44318ed 100644 --- a/package-lock.json +++ b/package-lock.json @@ -8,6 +8,11 @@ "resolved": "https://registry.npmjs.org/aws-sdk/-/aws-sdk-2.84.0.tgz", "integrity": "sha1-yHuwW8Q76mgcEVUD9zp+eEyb/rY=" }, + "base-64": { + "version": "0.1.0", + "resolved": "https://registry.npmjs.org/base-64/-/base-64-0.1.0.tgz", + "integrity": "sha1-eAqZyE59YAJgNhURxId2E78k9rs=" + }, "base64-js": { "version": "1.2.1", "resolved": "https://registry.npmjs.org/base64-js/-/base64-js-1.2.1.tgz", @@ -73,6 +78,11 @@ "resolved": "https://registry.npmjs.org/uuid/-/uuid-3.0.1.tgz", "integrity": "sha1-ZUS7ot/ajBzxfmKaOjBeK7H+5sE=" }, + "when": { + "version": "3.7.8", + "resolved": "https://registry.npmjs.org/when/-/when-3.7.8.tgz", + "integrity": "sha1-xxMLan6gRpPoQs3J56Hyqjmjn4I=" + }, "xml2js": { "version": "0.4.17", "resolved": "https://registry.npmjs.org/xml2js/-/xml2js-0.4.17.tgz", diff --git a/package.json b/package.json index b56ce6c..8169758 100644 --- a/package.json +++ b/package.json @@ -1,5 +1,5 @@ { - "name": "dynamodb-to-kinesis", + "name": "dyki", "version": "0.1.0", "description": "Extract DynamoDB items to dispatch into Kinesis Stream", "main": "src/main.js", @@ -9,6 +9,8 @@ "author": "Pierre Tomasina (continuousphp.com)", "license": "ISC", "dependencies": { - "aws-sdk": "^2.84.0" + "aws-sdk": "^2.84.0", + "base-64": "^0.1.0", + "when": "^3.7.8" } } diff --git a/src/main.js b/src/main.js index 3a68d8c..05e4f1c 100644 --- a/src/main.js +++ b/src/main.js @@ -1,12 +1,24 @@ 'use strict' +const when = require('when'); const AWS = require('aws-sdk'); const DyKi = {}; +DyKi.Config = { + dyEventName: 'INSERT', + dyCapacityUnitLimit: 5, + delay: 1000, + progressCallbackInterval: 1000, +}; + +DyKi.Error = class extends Error {}; + DyKi.Client = class { - constructor(tableName, streamName, region) { + constructor(tableName, streamName, region, config = DyKi.Config) { this.tableName = tableName; this.streamName = streamName; + this.config = Object.assign({}, DyKi.Config); + Object.assign(this.config, config); this.dynamodb = new AWS.DynamoDB({ apiVersion: '2012-08-10', @@ -17,28 +29,80 @@ DyKi.Client = class { apiVersion: '2013-12-02', region, }); + + this.lastCapacityUnit = 0; + this.counter = 0; + this.EOF = false; } - run(startKey, pageSize = 4) { + info() { + return { + consumedUnitCapacity: this.lastCapacityUnit, + total: this.counter, + lastEvaluatedKey: this.startKey, + }; + } + + async run(pageSize = 4, startKey = null, progressCallback = null) { this.startKey = startKey; this.pageSize = pageSize; - this.scan(); + this.lastCapacityUnit = 0; + this.counter = 0; + this.EOF = false; + + let progressInterval; + + if ('function' === typeof progressCallback) { + progressInterval = setInterval(() => { + progressCallback.call(null, this.info()); + }, this.config.progressCallbackInterval); + } + + while (!this.EOF && this.lastCapacityUnit <= this.config.dyCapacityUnitLimit) { + try { + await this.scan(); + } catch(e) { + clearInterval(progressInterval); + } + } + + clearInterval(progressInterval); + + if (this.EOF) { + return when(this.info()); + } + + if (this.lastCapacityUnit > this.config.dyCapacityUnitLimit) { + const err = new DyKi.Error(`Scan stoped due to CapacityUnitLimit reach ${this.lastCapacityUnit}`); + err.code = 'ConsumedCapacityUnitLimitExceeded'; + + return when(err).then(e => { + throw e + }); + } + + return when(new DyKi.Error('Unknown Internal Error')).then(e => { + throw e; + }); } scan() { const params = { TableName: this.tableName, - ExclusiveStartKey: { - uuid: { - S: this.startKey, - }, - }, ReturnConsumedCapacity: 'TOTAL', Limit: this.pageSize, }; - this.dynamodb.scan(params, this.scanResult.bind(this)); + if (this.startKey) { + params.ExclusiveStartKey = this.startKey; + } + + return when(params).then(p => { + this.dynamodb.scan(p, this.scanResult.bind(this)); + }, err => { + throw err; + }).delay(this.config.delay); } scanResult(err, response) { @@ -47,23 +111,28 @@ DyKi.Client = class { } let _this = this; + this.lastCapacityUnit = response.ConsumedCapacity.CapacityUnits; + this.counter += response.Count; + + if ('LastEvaluatedKey' in response) { + this.startKey = response.LastEvaluatedKey; + } else { + this.EOF = true; + } response.Items.forEach(function(item) { - console.log(item); _this.putKinesisRecord(item); }); } putKinesisRecord(item) { const record = { - eventName: 'UPDATE', + eventName: this.dyEventName, dynamodb: { NewImage: item, }, }; - console.log('Put Kinesis Record', record); - const params = { Data: JSON.stringify(record), PartitionKey: item.uuid['S'], @@ -74,7 +143,6 @@ DyKi.Client = class { if (err) { throw err; } - console.log('Kinesis PutRecord', data); }); } }