diff --git a/.gitignore b/.gitignore index 59d842ba..817ab924 100644 --- a/.gitignore +++ b/.gitignore @@ -1,28 +1,5 @@ -# Logs -logs +/coverage +/doc +/lib/jars +/node_modules *.log - -# Runtime data -pids -*.pid -*.seed - -# Directory for instrumented libs generated by jscoverage/JSCover -lib-cov - -# Coverage directory used by tools like istanbul -coverage - -# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files) -.grunt - -# Compiled binary addons (http://nodejs.org/api/addons.html) -build/Release - -# Dependency directory -# Commenting this out is preferred by some people, see -# https://www.npmjs.org/doc/misc/npm-faq.html#should-i-check-my-node_modules-folder-into-git- -node_modules - -# Users Environment Variables -.lock-wscript diff --git a/.npmignore b/.npmignore new file mode 100644 index 00000000..47ab24c1 --- /dev/null +++ b/.npmignore @@ -0,0 +1,10 @@ +/coverage +/doc +/lib/jars +/node_modules +*.log + +Gruntfile.js +/conf +/samples +/test diff --git a/Gruntfile.js b/Gruntfile.js new file mode 100644 index 00000000..6fe39cfc --- /dev/null +++ b/Gruntfile.js @@ -0,0 +1,114 @@ +/*** +Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Amazon Software License (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + +http://aws.amazon.com/asl/ + +or in the "license" file accompanying this file. This file is distributed +on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. See the License for the specific language governing +permissions and limitations under the License. +***/ + +'use strict'; + + +function mochaCoverageOptions(reporterName, outFile) { + return { + options: { + reporter: reporterName, + quiet: true, + captureFile: outFile + }, + src: ['test/**/*_tests.js'] + }; +} + +module.exports = function(grunt) { + + grunt.initConfig({ + + jshint: { + options: { + jshintrc: 'conf/.jshintrc' + }, + gruntfile: { + src: 'Gruntfile.js' + }, + bin: { + src: ['bin/kcl-bootstrap'] + }, + lib: { + src: ['index.js', 'lib/**/*.js'] + }, + test: { + src: ['test/**/*.js'] + }, + samples: { + src: ['samples/**/*.js'] + } + }, + + clean: { + build: { + options: { + force: true + }, + src: ['build'] + }, + coverage: { + options: { + force: true + }, + src: ['coverage'] + }, + doc: { + options: { + force: true + }, + src: ['doc'] + } + }, + + mochaTest: { + test: { + options: { + reporter: 'spec', + require: ['test/unit_tests_bootstrap'], + clearRequireCache: true + }, + src: ['test/**/*_tests.js'] + }, + html: mochaCoverageOptions('html-cov', 'coverage/index.html'), + json: mochaCoverageOptions('json-cov', 'coverage/javascript.coverage.json'), + }, + + jsdoc : { + dist : { + src: ['index.js', 'lib/**/*.js', 'README.md'], + jsdoc: './node_modules/grunt-jsdoc/node_modules/jsdoc/jsdoc', + options: { + destination: 'doc', + configure: './conf/jsdoc.conf.json', + template: './node_modules/grunt-jsdoc/node_modules/ink-docstrap/template' + } + } + } + }); + + grunt.loadNpmTasks('grunt-contrib-clean'); + grunt.loadNpmTasks('grunt-contrib-jshint'); + grunt.loadNpmTasks('grunt-jsdoc'); + grunt.loadNpmTasks('grunt-mocha-test'); + + grunt.registerTask('default', ['jshint', 'mochaTest']); + grunt.registerTask('build', 'compile'); + grunt.registerTask('compile', ['jshint']); + // clean task already defined above. + grunt.registerTask('doc', 'jsdoc'); + grunt.registerTask('test', ['jshint', 'mochaTest']); + grunt.registerTask('release', ['jshint', 'mochaTest', 'jsdoc']); +}; diff --git a/LICENSE.txt b/LICENSE.txt new file mode 100644 index 00000000..a4a77545 --- /dev/null +++ b/LICENSE.txt @@ -0,0 +1,40 @@ + +Amazon Software License + +This Amazon Software License (“License”) governs your use, reproduction, and distribution of the accompanying software as specified below. +1. Definitions + +“Licensor” means any person or entity that distributes its Work. + +“Software” means the original work of authorship made available under this License. + +“Work” means the Software and any additions to or derivative works of the Software that are made available under this License. + +The terms “reproduce,” “reproduction,” “derivative works,” and “distribution” have the meaning as provided under U.S. copyright law; provided, however, that for the purposes of this License, derivative works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work. + +Works, including the Software, are “made available” under this License by including in or with the Work either (a) a copyright notice referencing the applicability of this License to the Work, or (b) a copy of this License. +2. License Grants + +2.1 Copyright Grant. Subject to the terms and conditions of this License, each Licensor grants to you a perpetual, worldwide, non-exclusive, royalty-free, copyright license to reproduce, prepare derivative works of, publicly display, publicly perform, sublicense and distribute its Work and any resulting derivative works in any form. + +2.2 Patent Grant. Subject to the terms and conditions of this License, each Licensor grants to you a perpetual, worldwide, non-exclusive, royalty-free patent license to make, have made, use, sell, offer for sale, import, and otherwise transfer its Work, in whole or in part. The foregoing license applies only to the patent claims licensable by Licensor that would be infringed by Licensor’s Work (or portion thereof) individually and excluding any combinations with any other materials or technology. +3. Limitations + +3.1 Redistribution. You may reproduce or distribute the Work only if (a) you do so under this License, (b) you include a complete copy of this License with your distribution, and (c) you retain without modification any copyright, patent, trademark, or attribution notices that are present in the Work. + +3.2 Derivative Works. You may specify that additional or different terms apply to the use, reproduction, and distribution of your derivative works of the Work (“Your Terms”) only if (a) Your Terms provide that the use limitation in Section 3.3 applies to your derivative works, and (b) you identify the specific derivative works that are subject to Your Terms. Notwithstanding Your Terms, this License (including the redistribution requirements in Section 3.1) will continue to apply to the Work itself. + +3.3 Use Limitation. The Work and any derivative works thereof only may be used or intended for use with the web services, computing platforms or applications provided by Amazon.com, Inc. or its affiliates, including Amazon Web Services, Inc. + +3.4 Patent Claims. If you bring or threaten to bring a patent claim against any Licensor (including any claim, cross-claim or counterclaim in a lawsuit) to enforce any patents that you allege are infringed by any Work, then your rights under this License from such Licensor (including the grants in Sections 2.1 and 2.2) will terminate immediately. + +3.5 Trademarks. This License does not grant any rights to use any Licensor’s or its affiliates’ names, logos, or trademarks, except as necessary to reproduce the notices described in this License. + +3.6 Termination. If you violate any term of this License, then your rights under this License (including the grants in Sections 2.1 and 2.2) will terminate immediately. +4. Disclaimer of Warranty. + +THE WORK IS PROVIDED “AS IS” WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WARRANTIES OR CONDITIONS OF M ERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, TITLE OR NON-INFRINGEMENT. YOU BEAR THE RISK OF UNDERTAKING ANY ACTIVITIES UNDER THIS LICENSE. SOME STATES’ CONSUMER LAWS DO NOT ALLOW EXCLUSION OF AN IMPLIED WARRANTY, SO THIS DISCLAIMER MAY NOT APPLY TO YOU. +5. Limitation of Liability. + +EXCEPT AS PROHIBITED BY APPLICABLE LAW, IN NO EVENT AND UNDER NO LEGAL THEORY, WHETHER IN TORT (INCLUDING NEGLIGENCE), CONTRACT, OR OTHERWISE SHALL ANY LICENSOR BE LIABLE TO YOU FOR DAMAGES, INCLUDING ANY DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF OR RELATED TO THIS LICENSE, THE USE OR INABILITY TO USE THE WORK (INCLUDING BUT NOT LIMITED TO LOSS OF GOODWILL, BUSINESS INTERRUPTION, LOST PROFITS OR DATA, COMPUTER FAILURE OR MALFUNCTION, OR ANY OTHER COMM ERCIAL DAMAGES OR LOSSES), EVEN IF THE LICENSOR HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGES. + diff --git a/README.md b/README.md index 9db45fb0..79d827af 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,291 @@ -# amazon-kinesis-client-nodejs -Amazon Kinesis Client Library for Node.js +# Amazon Kinesis Client Library for Node.js + +This package provides an interface to the [Amazon Kinesis Client Library][amazon-kcl] (KCL) [MultiLangDaemon][multi-lang-daemon] for the Node.js framework. + +Developers can use the KCL to build distributed applications that process streaming data reliably at scale. The KCL takes care of many of the complex tasks associated with distributed computing, such as load-balancing across multiple instances, responding to instance failures, checkpointing processed records, and reacting to changes in stream volume. + +This package wraps and manages the interaction with the [MultiLangDaemon][multi-lang-daemon], which is provided as part of the [Amazon KCL for Java][amazon-kcl-github] so that developers can focus on implementing their record processing logic. + +A record processor in Node.js typically looks like the following: + +```javascript +var kcl = require('aws-kcl'); +var util = require('util'); + +/** + * The record processor must provide three functions: + * + * * `initialize` - called once + * * `processRecords` - called zero or more times + * * `shutdown` - called if this KCL instance loses the lease to this shard + * + * Notes: + * * All of the above functions take additional callback arguments. When one is + * done initializing, processing records, or shutting down, callback must be + * called (i.e., `completeCallback()`) in order to let the KCL know that the + * associated operation is complete. Without the invocation of the callback + * function, the KCL will not proceed further. + * * The application will terminate if any error is thrown from any of the + * record processor functions. Hence, if you would like to continue processing + * on exception scenarios, exceptions should be handled appropriately in + * record processor functions and should not be passed to the KCL library. The + * callback must also be invoked in this case to let the KCL know that it can + * proceed further. + */ +var recordProcessor = { + /** + * Called once by the KCL before any calls to processRecords. Any initialization + * logic for record processing can go here. + * + * @param {object} initializeInput - Initialization related information. + * Looks like - {"shardId":""} + * @param {callback} completeCallback - The callback that must be invoked + * once the initialization operation is complete. + */ + initialize: function(initializeInput, completeCallback) { + // Initialization logic ... + + completeCallback(); + }, + + /** + * Called by KCL with a list of records to be processed and checkpointed. + * A record looks like: + * {"data":"","partitionKey":"someKey","sequenceNumber":"1234567890"} + * + * The checkpointer can optionally be used to checkpoint a particular sequence + * number (from a record). If checkpointing, the checkpoint must always be + * invoked before calling `completeCallback` for processRecords. Moreover, + * `completeCallback` should only be invoked once the checkpoint operation + * callback is received. + * + * @param {object} processRecordsInput - Process records information with + * array of records that are to be processed. Looks like - + * {"records":[, ], "checkpointer":} + * where format is specified above. + * @param {Checkpointer} processRecordsInput.checkpointer - A checkpointer + * which accepts a `string` or `null` sequence number and a + * callback. + * @param {callback} completeCallback - The callback that must be invoked + * once all records are processed and checkpoint (optional) is + * complete. + */ + processRecords: function(processRecordsInput, completeCallback) { + if (!processRecordsInput || !processRecordsInput.records) { + // Must call completeCallback to proceed further. + completeCallback(); + return; + } + + var records = processRecordsInput.records; + var record, sequenceNumber, partitionKey, data; + for (var i = 0 ; i < records.length ; ++i) { + record = records[i]; + sequenceNumber = record.sequenceNumber; + partitionKey = record.partitionKey; + // Note that "data" is a base64-encoded string. Buffer can be used to + // decode the data into a string. + data = new Buffer(record.data, 'base64').toString(); + + // Custom record processing logic ... + } + if (!sequenceNumber) { + // Must call completeCallback to proceed further. + completeCallback(); + return; + } + // If checkpointing, only call completeCallback once checkpoint operation + // is complete. + processRecordsInput.checkpointer.checkpoint(sequenceNumber, + function(err, checkpointedSequenceNumber) { + // In this example, regardless of error, we mark processRecords + // complete to proceed further with more records. + completeCallback(); + } + ); + }, + + /** + * Called by KCL to indicate that this record processor should shut down. + * After shutdown operation is complete, there will not be any more calls to + * any other functions of this record processor. Note that reason + * could be either TERMINATE or ZOMBIE. If ZOMBIE, clients should not + * checkpoint because there is possibly another record processor which has + * acquired the lease for this shard. If TERMINATE, then + * `checkpointer.checkpoint()` should be called to checkpoint at the end of + * the shard so that this processor will be shut down and new processors + * will be created for the children of this shard. + * + * @param {object} shutdownInput - Shutdown information. Looks like - + * {"reason":"", "checkpointer":} + * @param {Checkpointer} shutdownInput.checkpointer - A checkpointer which + * accepts a `string` or `null` sequence number and a callback. + * @param {callback} completeCallback - The callback that must be invoked + * once shutdown-related operations are complete and checkpoint + * (optional) is complete. + */ + shutdown: function(shutdownInput, completeCallback) { + // Shutdown logic ... + + if (shutdownInput.reason !== 'TERMINATE') { + completeCallback(); + return; + } + // Since you are checkpointing, only call completeCallback once the checkpoint + // operation is complete. + shutdownInput.checkpointer.checkpoint(function(err) { + // In this example, regardless of error, we mark the shutdown operation + // complete. + completeCallback(); + }); + } +}; + +kcl(recordProcessor).run(); +``` + +## Before You Get Started + +### Prerequisite +Before you begin, Node.js and NPM must be installed on your system. For download instructions for your platform, see http://nodejs.org/download/. + +To get the sample KCL application and bootstrap script, you need git. + +Amazon KCL for Node.js uses [MultiLangDaemon][multi-lang-daemon] provided by [Amazon KCL for Java][amazon-kcl-github]. You also need Java version 1.7 or higher installed. + +### Setting Up the Environment +Before running the samples, make sure that your environment is configured to allow the samples to use your [AWS Security Credentials](http://docs.aws.amazon.com/general/latest/gr/aws-security-credentials.html), which are used by [MultiLangDaemon][multi-lang-daemon] to interact with AWS services. + +By default, the [MultiLangDaemon][multi-lang-daemon] uses the [DefaultAWSCredentialsProviderChain][DefaultAWSCredentialsProviderChain], so make your credentials available to one of the credentials providers in that provider chain. There are several ways to do this. You can provide credentials through a `~/.aws/credentials` file or through environment variables (**AWS\_ACCESS\_KEY\_ID** and **AWS\_SECRET\_ACCESS\_KEY**). If you're running on Amazon EC2, you can associate an IAM role with your instance with appropriate access. + +For more information about [Amazon Kinesis][amazon-kinesis] and the client libraries, see the +[Amazon Kinesis documentation][amazon-kinesis-docs] as well as the [Amazon Kinesis forums][kinesis-forum]. + +## Running the Sample + +The Amazon KCL for Node.js repository contains source code for the KCL, a sample data producer and data consumer (processor) application, and the bootstrap script. + +To run sample applications, you need to get all required NPM modules. **From the root of the repository**, execute the following command: + +`npm install` + +This downloads all dependencies for running the bootstrap script as well as the sample application. + +The sample application consists of two components: + +* A data producer (`samples/basic_sample/producer/sample_kinesis_producer_app.js`): this script creates an [Amazon Kinesis][amazon-kinesis] stream and starts putting 10 random records into it. +* A data processor (`samples/basic_sample/consumer/sample_kcl_app.js`): this script is invoked by the [MultiLangDaemon][multi-lang-daemon], consumes the data from the [Amazon Kinesis][amazon-kinesis] stream, and stores received data into files (1 file per shard). + +The following defaults are used in the sample application: + +* *Stream name*: `kclnodejssample` +* *Number of shards*: 2 +* *Amazon KCL application name*: `kclnodejssample` +* *Amazon DynamoDB table for Amazon KCL application*: `kclnodejssample` + +### Running the Data Producer +To run the data producer, execute the following commands from the root of the repository: + +```sh + cd samples/basic_sample/producer + node sample_kinesis_producer_app.js +``` + +#### Notes +* The script `samples/basic_sample/producer/sample_kinesis_producer_app.js` takes several parameters that you can use to customize its behavior. To change default parameters, change values in the file `samples/basic_sample/producer/config.js`. + +### Running the Data Processor +To start the data processor, run the following command from the root of the repository: + +```sh + cd samples/basic_sample/consumer + ../../../bin/kcl-bootstrap --java /usr/bin/java -e -p ./sample.properties +``` + +#### Notes +* The Amazon KCL for Node.js uses stdin/stdout to interact with [MultiLangDaemon][multi-lang-daemon]. Do not point your application logs to stdout/stderr. If your logs point to stdout/stderr, log output gets mingled with [MultiLangDaemon][multi-lang-daemon], which makes it really difficult to find consumer-specific log events. This consumer uses a logging library to redirect all application logs to a file called application.log. Make sure to follow a similar pattern while developing consumer applications with the Amazon KCL for Node.js. For more information about the protocol between the MultiLangDaemon and the Amazon KCL for Node.js, go to [MultiLangDaemon][multi-lang-daemon]. +* The bootstrap script downloads [MultiLangDaemon][multi-lang-daemon] and its dependencies. +* The bootstrap script invokes the [MultiLangDaemon][multi-lang-daemon], which starts the Node.js consumer application as its child process. By default: + * The file `samples/basic_sample/consumer/sample.properties` controls which Amazon KCL for Node.js application is run. You can specify your own properties file with the `-p` or `--properties` argument. + * The bootstrap script uses `JAVA_HOME` to locate the java binary. To specify your own java home path, use the `-j` or `--java` argument when invoking the bootstrap script. +* To only print commands on the console to run the KCL application without actually running the KCL application, leave out the `-e` or `--execute` argument to the bootstrap script. +* You can also add REPOSITORY_ROOT/bin to your PATH so you can access kcl-bootstrap from anywhere. +* To find out all the options you can override when running the bootstrap script, run the following command: + +```sh + kcl-bootstrap --help +``` + +### Cleaning Up +This sample application creates an [Amazon Kinesis][amazon-kinesis] stream, sends data to it, and creates a DynamoDB table to track the KCL application state. This will incur nominal costs to your AWS account, and continue to do so even when the sample app is finished. To stop being charged, delete these resources. Specifically, the sample application creates following AWS resources: + +* An *Amazon Kinesis stream* named `kclnodejssample` +* An *Amazon DynamoDB table* named `kclnodejssample` + +You can delete these using the AWS Management Console. + +## Running on Amazon EC2 +Log into an Amazon EC2 instance running Amazon Linux, then perform the following steps to prepare your environment for running the sample application. Note the version of Java that ships with Amazon Linux can be found at `/usr/bin/java` and should be 1.7 or greater. + +```sh + # install node.js, npm and git + sudo yum install nodejs npm --enablerepo=epel + sudo yum install git + # clone the git repository to work with the samples + git clone https://github.com/awslabs/amazon-kinesis-client-node.git kclnodejs + cd kclnodejs/samples/basic_sample/producer/ + # download dependencies + npm install + # run the sample producer + node sample_kinesis_producer_app.js & + + # ...and in another terminal, run the sample consumer + export PATH=$PATH:kclnodejs/bin + cd kclnodejs/samples/basic_sample/consumer/ + kcl-bootstrap --java /usr/bin/java -e -p ./sample.properties > consumer.out 2>&1 & +``` + +## NPM module +To get the Amazon KCL for Node.js module from NPM, use the following command: + +```sh + npm install aws-kcl +``` + +## Under the Hood: Supplemental information about the MultiLangDaemon + +Amazon KCL for Node.js uses [Amazon KCL for Java][amazon-kcl-github] internally. We have implemented a Java-based daemon, called the *[MultiLangDaemon][multi-lang-daemon]* that does all the heavy lifting. The daemon launches the user-defined record processor script/program as a sub-process, and then communicates with this sub-process over standard input/output using a simple protocol. This allows support for any language. This approach enables the [Amazon KCL][amazon-kcl] to be language-agnostic, while providing identical features and similar parallel processing model across all languages. + +At runtime, there will always be a one-to-one correspondence between a record processor, a child process, and an [Amazon Kinesis shard][amazon-kinesis-shard]. The [MultiLangDaemon][multi-lang-daemon] ensures that, without any developer intervention. + +In this release, we have abstracted these implementation details away and exposed an interface that enables you to focus on writing record processing logic in Node.js. + +## See Also + +* [Developing Processor Applications for Amazon Kinesis Using the Amazon Kinesis Client Library][amazon-kcl] +* [Amazon KCL for Java][amazon-kcl-github] +* [Amazon KCL for Python][amazon-kinesis-python-github] +* [Amazon KCL for Ruby][amazon-kinesis-ruby-github] +* [Amazon Kinesis documentation][amazon-kinesis-docs] +* [Amazon Kinesis forum][kinesis-forum] + + +## Release Notes + +### Release 0.5.0 (March 26, 2015) +* The `aws-kcl` npm module allows implementation of record processors in Node.js using the Amazon KCL [MultiLangDaemon][multi-lang-daemon]. +* The `samples` directory contains a sample producer and processing applications using the Amazon KCL for Node.js. + +[amazon-kinesis]: http://aws.amazon.com/kinesis +[amazon-kinesis-docs]: http://aws.amazon.com/documentation/kinesis/ +[amazon-kinesis-shard]: http://docs.aws.amazon.com/kinesis/latest/dev/key-concepts.html +[amazon-kcl]: http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-record-processor-app.html +[aws-sdk-node]: http://aws.amazon.com/sdk-for-node-js/ +[amazon-kcl-github]: https://github.com/awslabs/amazon-kinesis-client +[amazon-kinesis-python-github]: https://github.com/awslabs/amazon-kinesis-client-python +[amazon-kinesis-ruby-github]: https://github.com/awslabs/amazon-kinesis-client-ruby +[multi-lang-daemon]: https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/multilang/package-info.java +[DefaultAWSCredentialsProviderChain]: http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html +[kinesis-forum]: http://developer.amazonwebservices.com/connect/forum.jspa?forumID=169 +[aws-console]: http://aws.amazon.com/console/ +[jvm]: http://java.com/en/download/ diff --git a/bin/kcl-bootstrap b/bin/kcl-bootstrap new file mode 100755 index 00000000..6ad196c5 --- /dev/null +++ b/bin/kcl-bootstrap @@ -0,0 +1,277 @@ +#!/usr/bin/env node + +/*** +Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Amazon Software License (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + +http://aws.amazon.com/asl/ + +or in the "license" file accompanying this file. This file is distributed +on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. See the License for the specific language governing +permissions and limitations under the License. +***/ + +'use strict'; + + +var fs = require('fs'); +var http = require('http'); +var https = require('https'); +var path = require('path'); +var program = require('commander'); +var spawn = require('child_process').spawn; +var url = require('url'); +var util = require('util'); + + +var MAVEN_PACKAGE_LIST = [ + getMavenPackageInfo('commons-codec', 'commons-codec', '1.3'), + getMavenPackageInfo('joda-time', 'joda-time', '2.4'), + getMavenPackageInfo('com.amazonaws', 'aws-java-sdk', '1.7.13'), + getMavenPackageInfo('com.fasterxml.jackson.core', 'jackson-databind', '2.1.1'), + getMavenPackageInfo('commons-logging', 'commons-logging', '1.1.1'), + getMavenPackageInfo('com.amazonaws', 'amazon-kinesis-client', '1.2.0'), + getMavenPackageInfo('com.fasterxml.jackson.core', 'jackson-core', '2.1.1'), + getMavenPackageInfo('org.apache.httpcomponents', 'httpclient', '4.2'), + getMavenPackageInfo('org.apache.httpcomponents', 'httpcore', '4.2'), + getMavenPackageInfo('com.fasterxml.jackson.core', 'jackson-annotations', '2.1.1') +]; + +var DEFAULT_JAR_PATH = path.resolve(path.join(__dirname, '..', 'lib', 'jars')); +var MULTI_LANG_DAEMON_CLASS = 'com.amazonaws.services.kinesis.multilang.MultiLangDaemon'; +var MAX_HTTP_REDIRECT_FOLLOW = 3; + + +function bootstrap() { + var args = parseArguments(); + downloadMavenPackages(MAVEN_PACKAGE_LIST, args.jarPath, function(err) { + if (err) { + errorExit(util.format('Unable to download Multi-Language Daemon jar files from maven: %s', err)); + } + startKinesisClientLibraryApplication(args); + }); +} + +function parseArguments() { + program + .option('-p, --properties ', 'properties file with multi-language daemon options') + .option('-j, --java [java path]', 'path to java executable - defaults to using JAVA_HOME environment variable to get java path (optional)') + .option('-c, --jar-path [jar path]', 'path where all multi-language daemon jar files will be downloaded (optional)') + .option('-e, --execute', 'execute the KCL application') + .parse(process.argv); + + var args = { + 'properties': program.properties, + 'java': (program.java ? program.java : (process.env.JAVA_HOME ? path.join(process.env.JAVA_HOME, 'bin', 'java') : null)), + 'jarPath': (program.jarPath ? program.jarPath : DEFAULT_JAR_PATH), + 'execute': program.execute + }; + + if (!args.properties) { + invalidInvocationExit(program, 'Specify a valid --properties value.', true); + } + if (!isFile(args.properties)) { + invalidInvocationExit(program, args.properties + ' file does not exist. Specify a valid --properties value.', true); + } + if (!isFile(args.java)) { + invalidInvocationExit(program, 'Valid --java value is required or alternatively JAVA_HOME environment variable must be set.', true); + } + if (args.jarPath === DEFAULT_JAR_PATH) { + createDirectory(args.jarPath); + } + else if (!isDirectory(args.jarPath)) { + invalidInvocationExit(program, 'Path specified with --jar-path must already exist and must be a directory.', false); + } + return args; +} + +function startKinesisClientLibraryApplication(options) { + var classpath = getClasspath(options).join(getPathDelimiter()); + var java = options.java; + var args = ['-cp', classpath, MULTI_LANG_DAEMON_CLASS, options.properties]; + var cmd = java + ' ' + args.join(' '); + + console.log("=========================================================="); + console.log(cmd); + console.log("=========================================================="); + if (options.execute) { + console.log("Starting Multi-Lang Daemon ..."); + spawn(java, args, { stdio: 'inherit' }); + } +} + +function getClasspath(options) { + var classpath = []; + fs.readdirSync(options.jarPath).map(function (file) { + return path.join(options.jarPath, file); + }).filter(function (file) { + return isFile(file); + }).forEach(function (file) { + classpath.push(path.resolve(file)); + }); + classpath.push(path.resolve('.')); + classpath.push(path.dirname(path.resolve(options.properties))); + return classpath; +} + +function downloadMavenPackages(mavenPackages, destinationDirectory, callback) { + var remainingPackages = mavenPackages.length; + var callbackInvoked = false; + + var downloadMavenPackageCallback = function(err, filePath) { + remainingPackages = remainingPackages - 1; + if (!callbackInvoked) { + if (!err) { + console.log(filePath + ' downloaded. ' + remainingPackages + ' files remain.'); + } + if (err || remainingPackages === 0) { + callbackInvoked = true; + callback(err); + return; + } + } + }; + + for (var i = 0 ; i < mavenPackages.length ; ++i) { + downloadMavenPackage(mavenPackages[i], destinationDirectory, downloadMavenPackageCallback); + } +} + +function downloadMavenPackage(mavenPackage, destinationDirectory, callback) { + process.nextTick(function() { + var mavenPackageUrlInfo = getMavenPackageUrlInfo(mavenPackage); + var destinationFile = path.join(destinationDirectory, mavenPackageUrlInfo.fileName); + if (fs.existsSync(destinationFile)) { + callback(null, destinationFile); + return; + } + httpDownloadFile(mavenPackageUrlInfo.url, destinationFile, 0, callback); + }); +} + +function httpDownloadFile(requestUrl, destinationFile, redirectCount, callback) { + if (redirectCount >= MAX_HTTP_REDIRECT_FOLLOW) { + callback('Reached maximum redirects. ' + requestUrl + ' could not be downloaded.'); + return; + } + var protocol = (url.parse(requestUrl).protocol === 'https' ? https : http); + var options = { + hostname: url.parse(requestUrl).hostname, + path: url.parse(requestUrl).path, + agent: false + }; + var request = protocol.get(options, function(response) { + // Non-2XX response. + if (response.statusCode > 300) { + if (response.statusCode > 300 && response.statusCode < 400 && response.headers.location) { + httpDownloadFile(response.headers.location, destinationFile, redirectCount + 1, callback); + return; + } + else { + callback(requestUrl + ' could not be downloaded: ' + response.statusCode); + return; + } + } + else { + var destinationFileStream = fs.createWriteStream(destinationFile); + response.pipe(destinationFileStream); + + var callbackInvoked = false; + var destinationFileStreamFinishCallback = function() { + if (callbackInvoked) { + return; + } + callbackInvoked = true; + callback(null, destinationFile); + }; + destinationFileStream.on('finish', destinationFileStreamFinishCallback); + // Older Node.js version may not support 'finish' event. + destinationFileStream.on('close', destinationFileStreamFinishCallback); + } + }).on('error', function(err) { + fs.unlink(destinationFile); + callback(err); + }); +} + +function getMavenPackageUrlInfo(mavenPackage) { + var urlParts = []; + var fileName = util.format('%s-%s.jar', mavenPackage.artifactId, mavenPackage.version); + mavenPackage.groupId.split('.').forEach(function (part) { + urlParts.push(part); + }); + urlParts.push(mavenPackage.artifactId); + urlParts.push(mavenPackage.version); + urlParts.push(fileName); + return { + 'url': "http://search.maven.org/remotecontent?filepath=" + urlParts.join('/'), + 'fileName': fileName + }; +} + +function getMavenPackageInfo(groupId, artifactId, version) { + return { + 'groupId': groupId, + 'artifactId': artifactId, + 'version': version + }; +} + +function isDirectory(path) { + try { + return fs.statSync(path).isDirectory(); + } catch (e) { + // Path does not exist. + return false; + } +} + +function createDirectory(path) { + try { + fs.mkdirSync(path); + } catch(e) { + if (e.code !== 'EEXIST') { + throw e; + } + } +} + +function isFile(path) { + try { + return fs.statSync(path).isFile(); + } catch (e) { + // Path does not exist. + return false; + } +} + +function getPathDelimiter() { + if (path.delimiter) { + return path.delimiter; + } + // Older Node.js version may not support path.delimiter. + return (/^win/.test(process.platform) ? ';' : ':'); +} + +function invalidInvocationExit(prog, err, showHelp) { + console.error(''); + console.error(util.format('ERROR: %s', err)); + console.error(''); + if (showHelp) { + prog.outputHelp(); + } + process.exit(1); +} + +function errorExit(err) { + console.error(''); + console.error(util.format('ERROR: %s', err)); + console.error(''); + process.exit(1); +} + +bootstrap(); diff --git a/bin/kcl-bootstrap.bat b/bin/kcl-bootstrap.bat new file mode 100644 index 00000000..ac75093d --- /dev/null +++ b/bin/kcl-bootstrap.bat @@ -0,0 +1,17 @@ +@echo off + +REM Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +REM Licensed under the Amazon Software License (the "License"). +REM You may not use this file except in compliance with the License. +REM A copy of the License is located at + +REM http://aws.amazon.com/asl/ + +REM or in the "license" file accompanying this file. This file is distributed +REM on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +REM express or implied. See the License for the specific language governing +REM permissions and limitations under the License. + + +node %~dp0\kcl-bootstrap %* diff --git a/conf/.jshintrc b/conf/.jshintrc new file mode 100644 index 00000000..0dd0f688 --- /dev/null +++ b/conf/.jshintrc @@ -0,0 +1,26 @@ +{ + "bitwise" : false, + "curly" : true, + "eqeqeq" : true, + "funcscope" : false, + "immed" : true, + "indent" : 2, + "latedef" : "nofunc", + "newcap" : true, + "noarg" : true, + "sub" : true, + "undef" : true, + "boss" : true, + "eqnull" : true, + "strict" : true, + "node" : true, + "globals" : { + /* MOCHA */ + "describe" : false, + "it" : false, + "before" : false, + "beforeEach" : false, + "after" : false, + "afterEach" : false + } +} diff --git a/conf/jsdoc.conf.json b/conf/jsdoc.conf.json new file mode 100644 index 00000000..3912195c --- /dev/null +++ b/conf/jsdoc.conf.json @@ -0,0 +1,32 @@ +{ + "tags" : { + "allowUnknownTags" : true + }, + "plugins" : ["plugins/markdown"], + "templates" : { + "cleverLinks" : false, + "monospaceLinks" : false, + "dateFormat" : "ddd MMM Do YYYY", + "outputSourceFiles" : false, + "outputSourcePath" : false, + "systemName" : "Amazon Kinesis Client Library in Node.js", + "footer" : "Amazon Kinesis Client Library in Node.js", + "copyright" : "Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.", + "navType" : "vertical", + "theme" : "cosmo", + "linenums" : true, + "collapseSymbols" : false, + "inverseNav" : true, + "highlightTutorialCode" : true, + "protocol": "fred://" + }, + "markdown" : { + "parser" : "gfm", + "hardwrap" : true + }, + "opts" : { + "private" : false, + "recurse" : true, + "lenient" : false + } +} diff --git a/index.js b/index.js new file mode 100644 index 00000000..e935b87d --- /dev/null +++ b/index.js @@ -0,0 +1,25 @@ +/*** +Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Amazon Software License (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + +http://aws.amazon.com/asl/ + +or in the "license" file accompanying this file. This file is distributed +on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. See the License for the specific language governing +permissions and limitations under the License. +***/ + +'use strict'; + + +/** + * @fileoverview + * The main KCL namespace exports user-facing modules and public functions. + * Note that private modules and private functions may change at a future date without notice. + */ + +module.exports = require("./lib/kcl/kcl_process"); diff --git a/lib/kcl/action_handler.js b/lib/kcl/action_handler.js new file mode 100644 index 00000000..4b9be2e6 --- /dev/null +++ b/lib/kcl/action_handler.js @@ -0,0 +1,87 @@ +/*** +Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Amazon Software License (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + +http://aws.amazon.com/asl/ + +or in the "license" file accompanying this file. This file is distributed +on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. See the License for the specific language governing +permissions and limitations under the License. +***/ + +'use strict'; + + +/** + * @fileoverview + * Marshals and unmarshals actions and delegates them back and forth between the I/O handler + * that talks to the MultiLangDaemon and the KCL manager. + */ + +var EventEmitter = require('events').EventEmitter; +var util = require('util'); + +/** + * Creates an instance of the action handler. + * @class ActionHandler + * @param {IOHandler} ioHandler - I/O handler instance that communicates with the MultiLangDaemon. + */ +function ActionHandler(ioHandler) { + this._ioHandler = ioHandler; + this._onIOLineCallback = this._onIOLine.bind(this); + this._onIOCloseCallback = this._onIOClose.bind(this); + this._ioHandler.on('line', this._onIOLineCallback); + this._ioHandler.on('close', this._onIOCloseCallback); +} + +/** @extends EventEmitter */ +util.inherits(ActionHandler, EventEmitter); + +/** + * Frees up any resources held by this instance. + */ +ActionHandler.prototype.destroy = function() { + this._ioHandler.removeListener('line', this._onIOLineCallback); + this._ioHandler.removeListener('close', this._onIOCloseCallback); +}; + +/** + * Sends an action to the MultiLangDaemon. + * @param {object} action - Action to send to the MultiLangDaemon. + * @param {callback} callback - Callback that will be invoked when the action is sent to the MultiLangDaemon. + */ +ActionHandler.prototype.sendAction = function(action, callback) { + this._ioHandler.writeLine(JSON.stringify(action), callback); +}; + +/** + * Event handler when a new line is received from the MultiLangDaemon through the I/O handler. + * @param {string} line - New line received by IO handler. + * @private + */ +ActionHandler.prototype._onIOLine = function(line) { + if (line) { + var action = JSON.parse(line); + if (!action || !action.action) { + this._ioHandler.writeError(util.format('Invalid action received: %s', line)); + return; + } + this.emit('action', action); + } +}; + +/** + * Event handler for the I/O close event. Following this event, no new lines will be received from the I/O handler. + * @private + */ +ActionHandler.prototype._onIOClose = function() { + this.emit('end'); +}; + + +/** @exports kcl/ActionHandler */ +module.exports = ActionHandler; diff --git a/lib/kcl/checkpointer.js b/lib/kcl/checkpointer.js new file mode 100644 index 00000000..405a69e5 --- /dev/null +++ b/lib/kcl/checkpointer.js @@ -0,0 +1,76 @@ +/*** +Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Amazon Software License (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + +http://aws.amazon.com/asl/ + +or in the "license" file accompanying this file. This file is distributed +on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. See the License for the specific language governing +permissions and limitations under the License. +***/ + +'use strict'; + + +/** + * @fileoverview + * Allows you to make checkpoint requests. A checkpoint marks a point in a shard until which all records are processed + * successfully. If this MultiLangDaemon KCL application instance shuts down for whatever reason, then another instance + * of the same KCL application resumes processing for this shard after the most recent checkpoint. + */ + +var EventEmitter = require('events').EventEmitter; +var util = require('util'); + + +/** + * Creates an instance of the checkpointer. + * @class Checkpointer + * @param {KCLManager} kclManager - Main KCL manager instance that keeps track of current state and dispatches all + * processing functions. + */ +function Checkpointer(kclManager) { + this._kclManager = kclManager; + this._callback = null; +} + +/** + * Checkpoints at a given sequence number. If the sequence number is not provided, the checkpoint will be at the end of + * the most recently-delivered list of records. + * @param {string} [sequenceNumber] - Sequence number of the record to checkpoint; if this value is not provided, the + * latest retrieved record is checkpointed. + * @param {callback} callback - Function that is invoked after the checkpoint operation completes. + */ +Checkpointer.prototype.checkpoint = function(sequenceNumber, callback) { + if (typeof sequenceNumber === 'function') { + callback = sequenceNumber; + sequenceNumber = null; + } + + if (this._callback) { + callback('Cannot checkpoint while another checkpoint is already in progress.'); + return; + } + this._callback = callback; + this._kclManager.checkpoint(sequenceNumber); +}; + +/** + * Gets called by the KCL manager when an outstanding checkpoint request completes either successfully or with + * an error. This function then invokes the callback passed by the user when the checkpoint was requested. + * @param {string} err - Error message if the checkpoint request was unsuccessful. + * @param {string} sequenceNumber - Sequence number for which the checkpoint response is received. + * @ignore + */ +Checkpointer.prototype.onCheckpointerResponse = function(err, sequenceNumber) { + var callback = this._callback; + this._callback = null; + callback(err, sequenceNumber); +}; + +/** @exports kcl/Checkpointer */ +module.exports = Checkpointer; diff --git a/lib/kcl/io_handler.js b/lib/kcl/io_handler.js new file mode 100644 index 00000000..d4a69d88 --- /dev/null +++ b/lib/kcl/io_handler.js @@ -0,0 +1,97 @@ +/*** +Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Amazon Software License (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + +http://aws.amazon.com/asl/ + +or in the "license" file accompanying this file. This file is distributed +on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. See the License for the specific language governing +permissions and limitations under the License. +***/ + +'use strict'; + + +/** + * @fileoverview + * Communicates with the MultiLangDaemon through the input and output files. + */ + +var EventEmitter = require('events').EventEmitter; +var readline = require('readline'); +var util = require('util'); + +/** + * Creates an instance of the I/O handler. + * @class IOHandler + * @param {file} inputFile - A file to read input lines from. + * @param {file} outputFile - A file to write output lines to. + * @param {file} errorFile - A file to write error lines to. + */ +function IOHandler(inputFile, outputFile, errorFile) { + this._inputFile = inputFile; + this._outputFile = outputFile; + this._errorFile = errorFile; + + this._readlineInterface = readline.createInterface(this._inputFile, this._outputFile); + this._onInputLineCallback = this._onInputLine.bind(this); + this._onInputCloseCallback = this._onInputClose.bind(this); + this._readlineInterface.on('line', this._onInputLineCallback); + this._readlineInterface.on('close', this._onInputCloseCallback); +} + +/** @extends EventEmitter */ +util.inherits(IOHandler, EventEmitter); + +/** + * Frees up any resources held by this instance. + */ +IOHandler.prototype.destroy = function() { + this._readlineInterface.removeListener('line', this._onInputLineCallback); + this._readlineInterface.removeListener('close', this._onInputCloseCallback); + this._readlineInterface.close(); +}; + +/** + * Sends the string message to the MultiLangDaemon using the output stream. + * @param {string} line - Line to send to the MultiLangDaemon. + * @param {callback} callback - Callback that gets invoked on completion. + */ +IOHandler.prototype.writeLine = function(line, callback) { + var result = this._outputFile.write(util.format('\n%s\n', line), 'utf8', callback); + if (!result) { + callback(util.format('Unable to write %s to file.', line)); + } +}; + +/** + * Logs an error. + * @param {string} error - Error to log. + */ +IOHandler.prototype.writeError = function(error) { + this._errorFile.write(util.format('%s\n', error)); +}; + +/** + * Event handler for when a new line is received from the MultiLangDaemon through the input stream. + @ @param {string} line - New line received. + * @private + */ +IOHandler.prototype._onInputLine = function(line) { + this.emit('line', line); +}; + +/** + * Event handler for when the input stream is closed. + * @private + */ +IOHandler.prototype._onInputClose = function() { + this.emit('close'); +}; + +/** @exports kcl/IOHandler */ +module.exports = IOHandler; diff --git a/lib/kcl/kcl_manager.js b/lib/kcl/kcl_manager.js new file mode 100644 index 00000000..00f1a449 --- /dev/null +++ b/lib/kcl/kcl_manager.js @@ -0,0 +1,361 @@ +/*** +Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Amazon Software License (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + +http://aws.amazon.com/asl/ + +or in the "license" file accompanying this file. This file is distributed +on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. See the License for the specific language governing +permissions and limitations under the License. +***/ + +'use strict'; + + +/** + * @fileoverview + * Keeps track of the MultiLangDaemon protocol. It implements the logic to move record processing forward and to + * manage interactions with the record processor and MultiLangDaemon. + */ + +var BehavioralFsm = require('machina').BehavioralFsm; +var util = require('util'); + +var ActionHandler = require('./action_handler'); +var Checkpointer = require('./checkpointer'); +var IOHandler = require('./io_handler'); + +/** + * KCL state machine class responsible for maintaining the MultiLangDaemon protocol state. + * @class KCLStateMachine + * @private + */ +var KCLStateMachine = BehavioralFsm.extend({ + initialize: function(config) { + this._actionHandler = config.actionHandler; + }, + + namespace: 'kcl-state-machine', + initialState: 'Uninitialized', + + states: { + Uninitialized: { + '*': function(context) { + this.deferUntilTransition(context); + return true; + } + }, + Start: { + beginInitialize: function(context) { + this.transition(context, 'Initializing'); + return true; + }, + '*': function(context) { + this.transition(context, 'Error'); + return false; + } + }, + Initializing: { + finishInitialize: function(context) { + this.transition(context, 'Ready'); + return true; + }, + '*': function(context) { + this.transition(context, 'Error'); + return false; + } + }, + Ready: { + beginProcessRecords: function(context) { + this.transition(context, 'Processing'); + return true; + }, + beginShutdown: function(context) { + this.transition(context, 'ShuttingDown'); + return true; + }, + '*': function(context) { + this.transition(context, 'Error'); + return false; + } + }, + Processing: { + beginCheckpoint: function(context) { + this.transition(context, 'Checkpointing'); + return true; + }, + finishProcessRecords: function(context) { + this.transition(context, 'Ready'); + return true; + }, + '*': function(context) { + this.transition(context, 'Error'); + return false; + } + }, + Checkpointing: { + finishCheckpoint: function(context) { + this.transition(context, 'Processing'); + return true; + }, + '*': function(context) { + this.transition(context, 'Error'); + return false; + } + }, + ShuttingDown: { + beginCheckpoint: function(context) { + this.transition(context, 'FinalCheckpointing'); + return true; + }, + finishShutdown: function(context) { + this.transition(context, 'End'); + return true; + }, + '*': function(context) { + this.transition(context, 'Error'); + return false; + } + }, + FinalCheckpointing: { + finishCheckpoint: function(context) { + this.transition(context, 'ShuttingDown'); + return true; + }, + '*': function(context) { + this.transition(context, 'Error'); + return false; + } + }, + End: { + cleanup: function(context) { + return true; + }, + '*': function(context) { + this.transition(context, 'Error'); + return false; + } + }, + Error: { + '*': function(context) { + return false; + } + } + } +}); + + +/** + * Creates an instance of the KCL manager. + * @class KCLManager + * @param {object} recordProcessor - A record processor to use for processing a shard. + * @param {file} inputFile - A file to read action messages from. + * @param {file} outputFile - A file to write action messages to. + * @param {file} errorfile - A file to write error messages to. + */ +function KCLManager(recordProcessor, inputFile, outputFile, errorFile) { + this._ioHandler = new IOHandler(inputFile, outputFile, errorFile); + this._actionHandler = new ActionHandler(this._ioHandler); + + this._stateMachine = new KCLStateMachine({}); + this._context = { + recordProcessor: recordProcessor, + checkpointer: new Checkpointer(this) + }; + + this._onActionCallback = this._onAction.bind(this); + this._onActionEndCallback = this._onActionEnd.bind(this); + this._actionHandler.on('action', this._onActionCallback); + this._actionHandler.on('end', this._onActionEndCallback); +} + +/** + * Frees up any resources held by this instance. + * @private + */ +KCLManager.prototype._cleanup = function() { + this._actionHandler.removeListener('action', this._onActionCallback); + this._actionHandler.removeListener('end', this._onActionEndCallback); + this._actionHandler.destroy(); + this._ioHandler.destroy(); +}; + +/** + * Initiates the KCL processing. + */ +KCLManager.prototype.run = function() { + if (!this._running) { + this._running = true; + this._stateMachine.transition(this._context, 'Start'); + } +}; + +/** + * Checkpoints with given sequence number. The request is sent to the MultiLangDaemon. + * @param {string} sequenceNumber - Sequence number to checkpoint. + */ +KCLManager.prototype.checkpoint = function(sequenceNumber) { + // Before invoking operation, first transition to make sure state is valid. + this._handleStateInput(this._context, 'beginCheckpoint'); + + this._sendAction(this._context, {action : 'checkpoint', checkpoint : sequenceNumber}); +}; + +/** + * Event handler that gets invoked on a new action received from the MultiLangDaemon. + * @param {object} action - Action received. + * @private + */ +KCLManager.prototype._onAction = function(action) { + var actionType = action.action; + if (actionType === 'initialize' || actionType === 'processRecords' || actionType === 'shutdown') { + this._onRecordProcessorAction(action); + } + else if (actionType === 'checkpoint') { + this._onCheckpointAction(action); + } + else { + this._reportError(util.format('Invalid action received: %j', action)); + } +}; + +/** + * Event handler that gets invoked when action handler has ended and no more action will be received. + * @private + */ +KCLManager.prototype._onActionEnd = function() { + // No more actions, so cleanup. If we are not in appropriate state for KCL to end, then error will be raised. + this._handleStateInput(this._context, 'cleanup'); + this._cleanup(); +}; + +/** + * Record processing related action handler. + * @param {object} action - Record processor related action. + * @private + */ +KCLManager.prototype._onRecordProcessorAction = function(action) { + var actionType = action.action; + var context = this._context; + var checkpointer = context.checkpointer; + var recordProcessor = context.recordProcessor; + var recordProcessorFuncInput = cloneToInput(action); + var recordProcessorFunc; + var beginActionInput; + var finishActionInput; + + if (actionType === 'initialize') { + recordProcessorFunc = recordProcessor.initialize; + beginActionInput = 'beginInitialize'; + finishActionInput = 'finishInitialize'; + } + else if (actionType === 'processRecords') { + recordProcessorFuncInput.checkpointer = checkpointer; + recordProcessorFunc = recordProcessor.processRecords; + beginActionInput = 'beginProcessRecords'; + finishActionInput = 'finishProcessRecords'; + } + else if (actionType === 'shutdown') { + recordProcessorFuncInput.checkpointer = checkpointer; + recordProcessorFunc = recordProcessor.shutdown; + beginActionInput = 'beginShutdown'; + finishActionInput = 'finishShutdown'; + } + // Should not occur. + else { + throw new Error(util.format('Invalid action for record processor: %j', action)); + } + + // Before invoking the operation, first transition to make sure state is valid. + this._handleStateInput(context, beginActionInput); + + // Attach callback so user can mark that operation is complete, and KCL can proceed with new operation. + var callbackFunc = function() { + this._recordProcessorCallback(context, action, finishActionInput); + }.bind(this); + + recordProcessorFunc.apply(recordProcessor, [recordProcessorFuncInput, callbackFunc]); +}; + +/** + * Gets invoked when the callback is received from the record processor suggesting that the record processor action + * is complete. + * @param {object} context - Context for which the record processor action is complete. + * @param {object} action - Completed action. + * @param {string} finishActionInput - Event input to pass to the state machine. + * @private + */ +KCLManager.prototype._recordProcessorCallback = function(context, action, finishActionInput) { + // Before invoking the operation, first transition to make sure state is valid. + this._handleStateInput(context, finishActionInput); + + this._sendAction(context, {action : 'status', responseFor : action.action}); +}; + +/** + * Checkpoint response action handler. + * @param {object} action - Checkpoint response action. + * @private + */ +KCLManager.prototype._onCheckpointAction = function(action) { + // Before invoking the operation, first transition to make sure state is valid. + this._handleStateInput(this._context, 'finishCheckpoint'); + + var checkpointer = this._context.checkpointer; + checkpointer.onCheckpointerResponse.apply(checkpointer, [action.error, action.checkpoint]); +}; + +/** + * Sends the given action to the MultiLangDaemon. + * @param {object} context - Record processor context for which this action belongs to. + * @param {object} action - Action to send. + */ +KCLManager.prototype._sendAction = function(context, action) { + this._actionHandler.sendAction(action, function(err) { + // If there is an error communicating with the MultiLangDaemon, then cannot proceed any further. + if (err) { + this._handleStateInput(context, 'error'); + } + }.bind(this)); +}; + +/** + * Handles the input to the KCL state machine and transitions to the new state based on the input + * and current state. If the input is not valid for the current state, then an exception is thrown. + * @param {object} context - Context on which to perform the state machine operation. + * @param {string} input - Input event to state machine. + * @throws {Error} Exception is thrown if the input is invalid for current state. + * @private + */ +KCLManager.prototype._handleStateInput = function(context, input) { + var result = this._stateMachine.handle(context, input); + if (!result) { + // Clean-up since KCL cannot recover from invalid state and no longer can process more actions. + this._cleanup(); + throw new Error('Kinesis Client Library is in the invalid state. Cannot proceed further.'); + } +}; + +/** + * Clones the JSON action object into an input object that will be passed to the record processor function. + * Note that only shallow copy is performed for efficiency. + * @param {object} action - Record processor-related action. + * @return Returns the cloned action object without the "action" attribute. + * @private + */ +function cloneToInput(action) { + var input = {}; + for (var attr in action) { + if (attr !== 'action') { + input[attr] = action[attr]; + } + } + return input; +} + +/** @exports kcl/KCLManager */ +module.exports = KCLManager; diff --git a/lib/kcl/kcl_process.js b/lib/kcl/kcl_process.js new file mode 100644 index 00000000..495b67f4 --- /dev/null +++ b/lib/kcl/kcl_process.js @@ -0,0 +1,96 @@ +/*** +Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Amazon Software License (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + +http://aws.amazon.com/asl/ + +or in the "license" file accompanying this file. This file is distributed +on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. See the License for the specific language governing +permissions and limitations under the License. +***/ + +'use strict'; + + +/** + * @fileoverview + * Initializes record processing with the MultiLangDaemon. + * + * This KCL class takes the record processor that is responsible for processing a shard from an Amazon Kinesis stream. + * The record processor must provide the following three methods: + * + * * `initialize` - Called once. + * * `processRecords` - Called zero or more times. + * * `shutdown` - Called if this MultiLangDaemon instance loses the lease to this shard. + * + * @example + * var recordProcessor = { + * + * initialize: function(initializeInput, completeCallback) { + * // Initialization logic here... + * + * // Must call completeCallback when finished initializing in order to proceed further. + * completeCallback(); + * }, + * + * processRecords: function(processRecordsInput, completeCallback) { + * // Record processing logic here... + * + * // Note that if a checkpoint is invoked, only call completeCallback after the checkpoint operation is complete. + * completeCallback(); + * }, + * + * shutdown: function(shutdownInput, completeCallback) { + * // Shutdown logic here... + * + * // Checkpoint only if the shutdown reason is TERMINATE. Also, note that if a checkpoint is invoked, only call + * // completeCallback after the checkpoint operation is complete. + * completeCallback(); + * } + * }; + * + * kcl(recordProcessor).run(); + * + */ + +var KCLManager = require('./kcl_manager'); + +/** + * Creates an instance of the KCL process. + * @param {object} recordProcessor - A record processor to use for processing a shard. + * @param {file} inputFile - A file to read action messages from. Defaults to STDIN. + * @param {file} outputFile - A file to write action messages to. Defaults to STDOUT. + * @param {file} errorfile - A file to write error messages to. Defaults to STDERR. + */ +function KCLProcess(recordProcessor, inputFile, outputFile, errorFile) { + if (typeof recordProcessor.initialize !== 'function' || + typeof recordProcessor.processRecords !== 'function' || + typeof recordProcessor.shutdown !== 'function') { + throw new Error('Record processor must implement initialize, processRecords, and shutdown functions.'); + } + inputFile = typeof inputFile !== 'undefined' ? inputFile : process.stdin; + outputFile = typeof outputFile !== 'undefined' ? outputFile : process.stdout; + errorFile = typeof errorFile !== 'undefined' ? errorFile : process.stderr; + + var kclManager = new KCLManager(recordProcessor, inputFile, outputFile, errorFile); + + return { + // For testing only. + _kclManager: kclManager, + + /** + * Starts this KCL process's main loop. + */ + run: function() { + kclManager.run(); + } + }; +} + + +/** @exports kcl/KCLProcess */ +module.exports = KCLProcess; diff --git a/package.json b/package.json new file mode 100644 index 00000000..8ee68491 --- /dev/null +++ b/package.json @@ -0,0 +1,68 @@ +{ + "name": "aws-kcl", + "description": "Kinesis Client Libray (KCL) in Node.js.", + "version": "0.5.0", + "author": { + "name": "Amazon Web Services", + "url": "http://aws.amazon.com/" + }, + "main": "index.js", + "engines": { + "node": ">= 0.8.0" + }, + "bin": { + "kcl-bootstrap": "bin/kcl-bootstrap", + "kcl-bootstrap.bat": "bin/kcl-bootstrap.bat" + }, + "scripts": { + "build": "grunt build", + "compile": "grunt compile", + "clean": "grunt clean", + "test": "grunt test", + "release": "grunt release", + "doc": "grunt jsdoc" + }, + "dependencies": { + "commander": "~2.6.0", + "machina": "~1.0.0-1" + }, + "devDependencies": { + "async": "~0.9.0", + "aws-sdk": "2.x", + "blanket": "~1.1.5", + "chai": "^1.10.0", + "grunt": "~0.4.5", + "grunt-cli": "~0.1.13", + "grunt-contrib-clean": "~0.6.0", + "grunt-contrib-jshint": "~0.6.4", + "grunt-jsdoc": "~0.5.8", + "grunt-mocha-test": "~0.12.7", + "log4js": "~0.6.22", + "mocha": "^2.1.0", + "sinon": "^1.12.2" + }, + "homepage": "https://github.com/awslabs/amazon-kinesis-client-nodejs", + "repository": { + "type": "git", + "url": "git://github.com/awslabs/amazon-kinesis-client-nodejs.git" + }, + "bugs": { + "url": "https://github.com/awslabs/amazon-kinesis-client-nodejs/issues" + }, + "licenses": [ + { + "type": "Amazon Software License", + "url": "https://github.com/awslabs/amazon-kinesis-client-nodejs/blob/master/LICENSE.txt" + } + ], + "keywords": [ + "api", + "amazon", + "aws", + "big data", + "kinesis", + "kinesis client library", + "kcl", + "node.js" + ] +} diff --git a/samples/basic_sample/consumer/sample.properties b/samples/basic_sample/consumer/sample.properties new file mode 100644 index 00000000..00a1c8a3 --- /dev/null +++ b/samples/basic_sample/consumer/sample.properties @@ -0,0 +1,83 @@ +# The script that abides by the multi-language protocol. This script will +# be executed by the MultiLangDaemon, which will communicate with this script +# over STDIN and STDOUT according to the multi-language protocol. +executableName = node sample_kcl_app.js + +# The name of an Amazon Kinesis stream to process. +streamName = kclnodejssample + +# Used by the KCL as the name of this application. Will be used as the name +# of an Amazon DynamoDB table which will store the lease and checkpoint +# information for workers with this application name +applicationName = kclnodejssample + +# Users can change the credentials provider the KCL will use to retrieve credentials. +# The DefaultAWSCredentialsProviderChain checks several other providers, which is +# described here: +# http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html +AWSCredentialsProvider = DefaultAWSCredentialsProviderChain + +# Appended to the user agent of the KCL. Does not impact the functionality of the +# KCL in any other way. +processingLanguage = nodejs/0.10 + +# Valid options at TRIM_HORIZON or LATEST. +# See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax +initialPositionInStream = TRIM_HORIZON + +# The following properties are also available for configuring the KCL Worker that is created +# by the MultiLangDaemon. + +# The KCL defaults to us-east-1 +regionName = us-east-1 + +# Fail over time in milliseconds. A worker which does not renew it's lease within this time interval +# will be regarded as having problems and it's shards will be assigned to other workers. +# For applications that have a large number of shards, this msy be set to a higher number to reduce +# the number of DynamoDB IOPS required for tracking leases +#failoverTimeMillis = 10000 + +# A worker id that uniquely identifies this worker among all workers using the same applicationName +# If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself. +#workerId = + +# Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks. +#shardSyncIntervalMillis = 60000 + +# Max records to fetch from Kinesis in a single GetRecords call. +#maxRecords = 10000 + +# Idle time between record reads in milliseconds. +#idleTimeBetweenReadsInMillis = 1000 + +# Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while) +#callProcessRecordsEvenForEmptyRecordList = false + +# Interval in milliseconds between polling to check for parent shard completion. +# Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on +# completion of parent shards). +#parentShardPollIntervalMillis = 10000 + +# Cleanup leases upon shards completion (don't wait until they expire in Kinesis). +# Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try +# to delete the ones we don't need any longer. +#cleanupLeasesUponShardCompletion = true + +# Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures). +#taskBackoffTimeMillis = 500 + +# Buffer metrics for at most this long before publishing to CloudWatch. +#metricsBufferTimeMillis = 10000 + +# Buffer at most this many metrics before publishing to CloudWatch. +#metricsMaxQueueSize = 10000 + +# KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls +# to RecordProcessorCheckpointer#checkpoint(String) by default. +#validateSequenceNumberBeforeCheckpointing = true + +# The maximum number of active threads for the MultiLangDaemon to permit. +# If a value is provided then a FixedThreadPool is used with the maximum +# active threads set to the provided value. If a non-positive integer or no +# value is provided a CachedThreadPool is used. +#maxActiveThreads = 0 diff --git a/samples/basic_sample/consumer/sample_kcl_app.js b/samples/basic_sample/consumer/sample_kcl_app.js new file mode 100644 index 00000000..8afd8ff6 --- /dev/null +++ b/samples/basic_sample/consumer/sample_kcl_app.js @@ -0,0 +1,83 @@ +/*** +Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Amazon Software License (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + +http://aws.amazon.com/asl/ + +or in the "license" file accompanying this file. This file is distributed +on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. See the License for the specific language governing +permissions and limitations under the License. +***/ + +'use strict'; + + +var fs = require('fs'); +var path = require('path'); +var util = require('util'); +var kcl = require('../../..'); +var logger = require('../../util/logger'); + +/** + * A simple implementation for the record processor (consumer) that simply writes the data to a log file. + * + * Be careful not to use the 'stderr'/'stdout'/'console' as log destination since it is used to communicate with the + * {https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/multilang/package-info.java MultiLangDaemon}. + */ + +function recordProcessor() { + var log = logger().getLogger('recordProcessor'); + var shardId; + + return { + + initialize: function(initializeInput, completeCallback) { + shardId = initializeInput.shardId; + + completeCallback(); + }, + + processRecords: function(processRecordsInput, completeCallback) { + if (!processRecordsInput || !processRecordsInput.records) { + completeCallback(); + return; + } + var records = processRecordsInput.records; + var record, data, sequenceNumber, partitionKey; + for (var i = 0 ; i < records.length ; ++i) { + record = records[i]; + data = new Buffer(record.data, 'base64').toString(); + sequenceNumber = record.sequenceNumber; + partitionKey = record.partitionKey; + log.info(util.format('ShardID: %s, Record: %s, SeqenceNumber: %s, PartitionKey:%s', shardId, data, sequenceNumber, partitionKey)); + } + if (!sequenceNumber) { + completeCallback(); + return; + } + // If checkpointing, completeCallback should only be called once checkpoint is complete. + processRecordsInput.checkpointer.checkpoint(sequenceNumber, function(err, sequenceNumber) { + log.info(util.format('Checkpoint successful. ShardID: %s, SeqenceNumber: %s', shardId, sequenceNumber)); + completeCallback(); + }); + }, + + shutdown: function(shutdownInput, completeCallback) { + // Checkpoint should only be performed when shutdown reason is TERMINATE. + if (shutdownInput.reason !== 'TERMINATE') { + completeCallback(); + return; + } + // Whenever checkpointing, completeCallback should only be invoked once checkpoint is complete. + shutdownInput.checkpointer.checkpoint(function(err) { + completeCallback(); + }); + } + }; +} + +kcl(recordProcessor()).run(); diff --git a/samples/basic_sample/producer/config.js b/samples/basic_sample/producer/config.js new file mode 100644 index 00000000..7d2a394d --- /dev/null +++ b/samples/basic_sample/producer/config.js @@ -0,0 +1,28 @@ +/*** +Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Amazon Software License (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + +http://aws.amazon.com/asl/ + +or in the "license" file accompanying this file. This file is distributed +on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. See the License for the specific language governing +permissions and limitations under the License. +***/ + +'use strict'; + +var config = module.exports = { + kinesis : { + region : 'us-east-1' + }, + + sampleProducer : { + stream : 'kclnodejssample', + shards : 2, + waitBetweenDescribeCallsInSeconds : 5 + } +}; diff --git a/samples/basic_sample/producer/sample_kinesis_producer_app.js b/samples/basic_sample/producer/sample_kinesis_producer_app.js new file mode 100644 index 00000000..85a2007d --- /dev/null +++ b/samples/basic_sample/producer/sample_kinesis_producer_app.js @@ -0,0 +1,23 @@ +/*** +Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Amazon Software License (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + +http://aws.amazon.com/asl/ + +or in the "license" file accompanying this file. This file is distributed +on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. See the License for the specific language governing +permissions and limitations under the License. +***/ + +'use strict'; + +var AWS = require('aws-sdk'); +var config = require('./config'); +var producer = require('./sample_producer'); + +var kinesis = new AWS.Kinesis({region : config.kinesis.region}); +producer(kinesis, config.sampleProducer).run(); diff --git a/samples/basic_sample/producer/sample_producer.js b/samples/basic_sample/producer/sample_producer.js new file mode 100644 index 00000000..877615bd --- /dev/null +++ b/samples/basic_sample/producer/sample_producer.js @@ -0,0 +1,109 @@ +/*** +Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Amazon Software License (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + +http://aws.amazon.com/asl/ + +or in the "license" file accompanying this file. This file is distributed +on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. See the License for the specific language governing +permissions and limitations under the License. +***/ + +'use strict'; + +var util = require('util'); +var logger = require('../../util/logger'); + +function sampleProducer(kinesis, config) { + var log = logger().getLogger('sampleProducer'); + + function _createStreamIfNotCreated(callback) { + var params = { + ShardCount : config.shards, + StreamName : config.stream + }; + + kinesis.createStream(params, function(err, data) { + if (err) { + if (err.code !== 'ResourceInUseException') { + callback(err); + return; + } + else { + log.info(util.format('%s stream is already created. Re-using it.', config.stream)); + } + } + else { + log.info(util.format("%s stream doesn't exist. Created a new stream with that name ..", config.stream)); + } + + // Poll to make sure stream is in ACTIVE state before start pushing data. + _waitForStreamToBecomeActive(callback); + }); + } + + function _waitForStreamToBecomeActive(callback) { + kinesis.describeStream({StreamName : config.stream}, function(err, data) { + if (!err) { + log.info(util.format('Current status of the stream is %s.', data.StreamDescription.StreamStatus)); + if (data.StreamDescription.StreamStatus === 'ACTIVE') { + callback(null); + } + else { + setTimeout(function() { + _waitForStreamToBecomeActive(callback); + }, 1000 * config.waitBetweenDescribeCallsInSeconds); + } + } + }); + } + + function _writeToKinesis() { + var currTime = new Date().getMilliseconds(); + var sensor = 'sensor-' + Math.floor(Math.random() * 100000); + var reading = Math.floor(Math.random() * 1000000); + + var record = JSON.stringify({ + time : currTime, + sensor : sensor, + reading : reading + }); + + var recordParams = { + Data : record, + PartitionKey : sensor, + StreamName : config.stream + }; + + kinesis.putRecord(recordParams, function(err, data) { + if (err) { + log.error(err); + } + else { + log.info('Successfully sent data to Kinesis.'); + } + }); + } + + return { + run: function() { + _createStreamIfNotCreated(function(err) { + if (err) { + log.error(util.format('Error creating stream: %s', err)); + return; + } + var count = 0; + while (count < 10) { + setTimeout(_writeToKinesis(), 1000); + count++; + } + }); + } + }; +} + +module.exports = sampleProducer; diff --git a/samples/click_stream_sample/README.md b/samples/click_stream_sample/README.md new file mode 100644 index 00000000..7fefcdbe --- /dev/null +++ b/samples/click_stream_sample/README.md @@ -0,0 +1,334 @@ +# How to Process Clickstream Data Using Amazon Kinesis for Node.js + +This README shows how to send a stream of records to [Amazon Kinesis][amazon-kinesis] through the implementation of an application that consumes and processes the records in near real time using the [Amazon Kinesis Client Library][amazon-kcl](KCL) for Node.js. The scenario for this README is to show how to ingest a stream of clickstream data and write a simple consumer using the KCL to process, batch, and upload data to Amazon S3 for further processing. This is a common use case for using Amazon Kinesis. + +You can work through this README on your desktop or laptop and run both the producer and consumer code on the same machine. You can also run this sample on Amazon EC2 using the Amazon CloudFormation template provided. + +Clickstream data is simulated in the sample code, and the clickstream data is evenly spread across all the shards of the Amazon Kinesis stream. + +**Note:** + +After you create a stream, your account incurs nominal charges for Amazon Kinesis usage because Amazon Kinesis is not eligible for the AWS free tier. After the consumer application starts, it also incurs nominal charges for Amazon DynamoDB usage. DynamoDB is used by the consumer application to track the processing state. When you are finished with this tutorial, delete your AWS resources to stop incurring charges. If you use the provided CloudFormation template to run this sample on Amazon EC2, the template takes care of cleaning up resources when you delete the associated CloudFormation stack. + +## Before you start + +* Before you begin, you need an AWS account. For more information about creating an AWS account and retrieving your AWS credentials, go to [AWS Security Credentials](http://docs.aws.amazon.com/general/latest/gr/aws-security-credentials.html). +* Familiarize yourself with Amazon Kinesis concepts such as streams, shards, producers, and consumers. For more information, see [Amazon Kinesis concepts](http://docs.aws.amazon.com/kinesis/latest/dev/key-concepts.html) and the tutorials. +* To run the sample code, you need Node.js, NPM installed on your computer. The Amazon KCL for Node.js uses the [MultiLangDaemon][multi-lang-daemon] provided by [Amazon KCL for Java][amazon-kcl-github]. To run the Amazon KCL for Node.js samples, you also need to install the [Java JDK](http://www.oracle.com/technetwork/java/javase/downloads/index.html). + +## Producer + +This section explains how to implement an application to ingest a continuous stream of clickstream data to Amazon Kinesis. This role is known as the Amazon Kinesis producer. You need to create a Amazon Kinesis stream to allow the producer to ingest data into Amazon Kinesis. The producer application creates a stream based on the configuration values in the file producer/config.js, or you can create your own from the [Amazon Kinesis console](https://console.aws.amazon.com/kinesis). If you create your own stream with a different name than the default in the sample code, edit the stream name in producer/config.js and the producer application will pick up that change. + +### ClickStream producer + +* Reads configuration and creates an Amazon Kinesis stream if the specified stream doesn't exist in the specified region. +* Waits for the stream to become ACTIVE by polling Amazon Kinesis using the describeStream operation. +* Continuously retrieves random clickstream data records, batches them up to a value specified by config.ClickStreamProducer.recordsToWritePerBatch, and makes a [PutRecords][nodejs-kinesis-putrecords] call to write all records to the Amazon Kinesis stream. + +```javascript +// Use putRecords API to batch more than one record. +for (var i = 0; i < totalRecords; i++) { + data = clickStreamGen.getRandomClickStreamData(); + + record = { + Data: JSON.stringify(data), + PartitionKey: data.resource + }; + + records.push(record); +} + +var recordsParams = { + Records: records, + StreamName: config.stream +}; + +kinesis.putRecords(recordsParams, function(err, data) { + if (err) { + console.log(err); + } + else { + console.log(util.format("Sent %d records with %d failures ..", records.length, data.FailedRecordCount)); + } +}); +``` + +### Clickstream records + +A clickstream record consists of a resource and a referrer. + +```javascript +var data = { + "resource": "resource-1", + "referrer": "http://www.amazon.com/" +}; +``` + +### Clickstream producer configuration + +THe file producer/config.js file contains configurations supported by the producer application. It exposes the following configurations. You can change any configuration values in producer/config.js as needed. + +```javascript +var config = module.exports = { + kinesis : { + // Region for the Amazon Kinesis stream. + region : 'us-east-1' + }, + + clickStreamProducer : { + // The Amazon Kinesis stream to ingest clickstream data into. If the specified + // stream doesn't exist, the producer application creates a new stream. + stream : 'kclnodejsclickstreamsample', + + // Total shards in the specified Amazon Kinesis stream. + shards : 2, + + // The producer application batches clickstream records in to the size specified + // here, and makes a single PutRecords API call to ingest all records to the + // stream. + recordsToWritePerBatch : 5, + + // If the producer application creates a stream, it has to wait for the stream to + // transition to ACTIVE state before it can start putting data in it. This + // specifies the wait time between consecutive describeStream calls. + waitBetweenDescribeCallsInSeconds : 5, + + // Transactions per second for the PutRecords call to make sure the producer + // doesn't hit throughput limits enforced by Amazon Kinesis. + putRecordsTps : 20 + } +}; +``` +For more information about throughput limits, see [Amazon Kinesis Limits](http://docs.aws.amazon.com/kinesis/latest/dev/service-sizes-and-limits.html). + +### Run producer on a local computer + +To run the data producer, execute the following commands from the root of the repository: + +```sh + cd samples/click_stream_sample/producer + node click_stream_producer_app.js +``` + +**Note:** + +To run a sample application on Amazon EC2, see the section 'Running on Amazon EC2' later in this README. + +## Implement a basic processing application using the Amazon KCL for Node.js + +This basic application processes records from an Amazon Kinesis stream using [nodejs-kcl][nodejs-kcl], batching records up to 1 MB (configurable) and sends them to a specified Amazon S3 bucket for further offline processing. You can extend this application to perform some processing on the data (e.g., a rolling window count) before sending data to S3. For more information, see [developing-consumer-applications-with-kcl][amazon-kcl]. + +### Clickstream consumer configuration +The consumer/config.js file contains configurations supported by the consumer application. It exposes The following configurations. You can change any configuration values in consumer/config.js as needed. + +```javascript +var config = module.exports = { + s3 : { + // Region for Amazon S3. Defaults to us-east-1. + // region : '', + + // Amazon S3 bucket to store batched clickstream data. The consumer application + // may create a new bucket (based on S3.createBucketIfNotPresent value), + // if the specified bucket doesn't exist. + bucket : 'kinesis-clickstream-batchdata', + + // Enables the consumer application to create a new S3 bucket if the specified + // bucket doesn't exist. + createBucketIfNotPresent : true + }, + + clickStreamProcessor : { + // Maximum batch size in bytes before sending data to S3. + maxBufferSize : 1024 * 1024 + } +}; +``` + +### The consumer/Amazon KCL interface +The Amazon KCL for Node.js expects applications to pass an object that implements the following three functions: + +* initialize +* processRecords +* shutdown + +**Note:** + +The Amazon KCL for Node.js uses stdin/stdout to interact with the [MultiLangDaemon][multi-lang-daemon]. Do not point your application logs to stdout/stderr. If your logs point to stdout/stderr, the log output will get mingled with [MultiLangDaemon][multi-lang-daemon], which makes it really difficult to find consumer-specific log events. This consumer uses a logging library to redirect all application logs to a file called application.log. Make sure to follow a similar pattern while developing consumer applications with the Amazon KCL for Node.js. For more information about the protocol between the MultiLangDaemon and Amazon KCL for Node.js, see [MultiLangDaemon][multi-lang-daemon]. + +```javascript +/** + * A simple implementation of RecordProcessor that accepts records from an Amazon + * Kinesis stream and batches them into 1 MB (configurable) datasets, then puts + * them in a configured S3 bucket for further offline processing. The object + * returned should implement the functions initialize, processRecords, and shutdown + * in order to enable the KCL to interact with MultiLangDaemon. + * MultiLangDaemon would create one child process (hence one RecordProcessor instance) + * per shard. A single shard will never be accessed by more than one + * RecordProcessor instance; e.g., if you run this sample on a single machine, + * against a stream with 2 shards, MultiLangDaemon would create 2 child + * Node.js processes (RecordProcessor), one for each shard. + */ +function clickStreamProcessor(emitter, cfg) { + // return an object that implements the initialize, processRecords, and shutdown functions. +} +``` + +#### initialize(initializeInput, completeCallback) + +```javascript +/** + * This function is called by the KCL to allow application initialization before it + * starts processing Amazon Kinesis records. The KCL won't start processing records until the + * application is successfully initialized and completeCallback is called. + */ +initialize: function(initializeInput, completeCallback) { + shardId = initializeInput.shardId; + // The KCL for Node.js does not allow more than one outstanding checkpoint. So checkpoint must + // be done sequentially. Async queue with 1 concurrency will allow executing checkpoints + // one after another. + commitQueue = async.queue(_commit, 1); + + emitter.initialize(function(err) { + if (err) { + log.error(util.format('Error initializing emitter: %s', err)); + process.exit(1); + } + else { + log.info('Click stream processor successfully initialized.'); + completeCallback(); + } + }); +} +``` + +#### processRecords(processRecordsInput, completeCallback) + +```javascript +/** + * Called by the KCL with a list of records to be processed and a checkpointer. + * A record looks like - + * '{"data":"","partitionKey":"someKey","sequenceNumber":"1234567890"}' + * Note that "data" is a base64-encoded string. You can use the Buffer class to decode the data + * into a string. The checkpointer can be used to checkpoint a particular sequence number. + * Any checkpoint call should be made before calling completeCallback. The KCL ingests the next + * batch of records only after completeCallback is called. + */ +processRecords: function(processRecordsInput, completeCallback) { + // Record processing... + // Checkpoint if you need to. + // call completeCallback() to allow the KCL to ingest the next batch of records. +} +``` + +In this sample, processRecords performs the following tasks: + +* Receives one or more records from the KCL. +* Stores them in a local buffer +* Checks if the buffer has reached maxBufferSize; if yes, sends batched data to S3. +* Checkpoints after the data is successfully uploaded to S3. +* Calls completeCallback() after all records are stored in the buffer. +* Each call to processRecords may or may not call the checkpoint depending on whether the data was uploaded to S3. It checkpoints only after successfully uploading data to S3. This would be the most basic example of when an application should checkpoint after a unit of data is processed or persisted. + +#### shutdown(shutdownInput, completeCallback) + +```javascript +/** + * Called by the KCL to indicate that this record processor should shut down. + * After the shutdown operation is complete, there will not be any more calls to + * any other functions of this record processor. Note that the shutdown reason + * could be either TERMINATE or ZOMBIE. If ZOMBIE, clients should not + * checkpoint because there is possibly another record processor which has + * acquired the lease for this shard. If TERMINATE, then + * checkpointer.checkpoint() should be called to checkpoint at the end of + * the shard so that this processor will be shut down and new processors + * will be created for the children of this shard. + */ +shutdown: function(shutdownInput, completeCallback) { + if (shutdownInput.reason !== 'TERMINATE') { + completeCallback(); + return; + } + // Make sure to emit all remaining buffered data to S3 before shutting down. + commitQueue.push({ + key: shardId + '/' + buffer.getFirstSequenceNumber() + '-' + buffer.getLastSequenceNumber(), + sequenceNumber: buffer.getLastSequenceNumber(), + data: buffer.readAndClearRecords(), + checkpointer: shutdownInput.checkpointer + }, function(error) { + if (error) { + log.error(util.format('Received error while shutting down: %s', error)); + } + completeCallback(); + }); +} +``` + +### Run the consumer on a local computer +Amazon KCL for Node.js uses the [MultiLangDaemon][multi-lang-daemon] provided by [Amazon KCL for Java][amazon-kcl-github]. For more information about how MultiLangDaemon interacts with the Amazon KCL for Node.js, see [MultiLangDaemon][multi-lang-daemon]. + +* By default, the MultiLangDaemon uses the [DefaultAWSCredentialsProviderChain][DefaultAWSCredentialsProviderChain], so you'll want to make your credentials available to one of the credentials providers in that provider chain. There are several ways to do this. You can provide credentials through a '~/.aws/credentials' file or through environment variables (**AWS\_ACCESS\_KEY\_ID** and **AWS\_SECRET\_ACCESS\_KEY**). If you're running on Amazon EC2, you can associate an IAM role with your instance with appropriate access to Amazon Kinesis. If you use the CloudFormation template provided with sample application, it takes care of creating and associating the IAM role to your EC2 instances with the appropriate IAM policy. +* The kcl-bootstrap script at /bin/kcl-bootstrap downloads [MultiLangDaemon][multi-lang-daemon] and its dependencies. This bootstrap script invokes the [MultiLangDaemon][multi-lang-daemon], which starts the Node.js consumer application as its child process. By default, [MultiLangDaemon][multi-lang-daemon] uses a properties file to specify configurations for accessing the Amazon Kinesis stream. Take a look at the consumer/samples.properties file provided for list of options. Use the '-p' or '--properties' option to specify the properties file to use. +* The kcl-bootstrap script uses "JAVA_HOME" to locate the java binary. To specify your own java path, use the '-j' or '--java' argument when invoking the bootstrap script. +* Skip the '-e' or '--execute' argument to the bootstrap script, and it will only print the commands on the console to run the KCL application without actually running the KCL application. +* Add REPOSITORY_ROOT/bin to your path to access kcl-bootstrap from anywhere. +* Run the following command to find out all the options you can override when running the bootstrap script: + +```sh + kcl-bootstrap --help +``` + +* Run the following command to start the consumer application: + +```sh + cd samples/click_stream_sample/consumer + kcl-bootstrap --java -p ./sample.properties -e +``` + +**Note:** + +To run a sample application on Amazon EC2, see the section 'Running on Amazon EC2' later in this README. + +### Cleaning up +This sample application creates an Amazon Kinesis stream, ingests data into it, and creates an Amazon DynamoDB table to track the KCL application state. It may also create an S3 bucket to store batched clickstream data. Your AWS account will incur nominal costs for these resources. After you are done, you can log in to the AWS Management Console and delete these resources. Specifically, the sample application creates the following AWS resources: + +* An *Amazon Kinesis stream* provided in the config.js file. +* An *Amazon DynamoDB table* with same name as applicationName provided in sample.properties. +* An *Amazon S3 bucket* provided in the config.js file. + +## Running on Amazon EC2 +To make running this sample on Amazon EC2 easier, we have provided an Amazon CloudFormation template that creates an Amazon Kinesis stream, an S3 bucket, an appropriate IAM role and policy, and Auto Scaling groups for consumers and producers. You can use this template to create a CloudFormation stack. Make sure to use same AWS region that you have specified in the config.js file (the region defaults to us-east-1, but you can use any region that supports Amazon Kinesis). This CloudFormation template also takes care of downloading and starting producer and consumer applications on EC2 instances. + +After the template is created, you can: + +* Log on to producer instances, go to samples/click_stream_sample/producer, and look at logs/application.log for logs. +* Log on to consumer instances, go to samples/click_stream_sample/consumer, and look at consumer.out for multi-lang-daemon logs and logs/application.log for consumer logs. +* View batched clickstream data in S3 under the specified S3 bucket. + +After you are done with testing the sample application, you can delete the CloudFormation script and it should take care of cleaning up the AWS resources created. Keep in mind the following: + +* Just as with the manually-run scenario, this stack ingests data into Amazon Kinesis, stores metadata in DynamoDB, and stores clickstream data in S3, all of which will result in a nominal AWS resource cost. This is especially important if you are planning to run the CloudFormation script for a longer duration. +* You can use ProducerPutRecordsBatchSize and ProducerPutRecordsTps to decide how fast to ingest data into Amazon Kinesis. A lower number for both of these parameters will result in a slower data ingestion rate. +* You must delete all files in the S3 bucket before deleting the CloudFormation script because CloudFormation only deletes empty S3 buckets. + +## Summary + +Processing a large amount of data in near real time does not require writing any complex code or developing a huge infrastructure. It is as simple as writing logic to process a small amount of data (like writing processRecord(Record)) and letting Amazon Kinesis scale for you so that it works for a large amount of streaming data. You don’t have to worry about how your processing would scale because Amazon Kinesis handles it for you. Spend your time designing and developing the logic for your ingestion (producer) and processing (consumer), and let Amazon Kinesis do the rest. + +## Next steps +* For more information about the KCL, see [Developing Consumer Applications for Amazon Kinesis using the Amazon Kinesis Client Library][amazon-kcl]. +* For more information about how to optimize your application, see [Advanced Topics for Amazon Kinesis Applications][advanced-kcl-topics]. + +[amazon-kinesis]: http://aws.amazon.com/kinesis +[amazon-kcl-github]: https://github.com/awslabs/amazon-kinesis-client +[amazon-kinesis-docs]: http://aws.amazon.com/documentation/kinesis/ +[amazon-kinesis-shard]: http://docs.aws.amazon.com/kinesis/latest/dev/key-concepts.html +[amazon-kcl]: http://docs.aws.amazon.com/kinesis/latest/dev/developing-consumer-apps-with-kcl.html +[nodejs-kinesis-putrecords]: http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Kinesis.html#putRecords-property +[nodejs-kcl]: https://github.com/awslabs/amazon-kinesis-client-nodejs +[advanced-kcl-topics]: http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-record-processor-advanced.html +[aws-sdk-node]: http://aws.amazon.com/sdk-for-node-js/ +[multi-lang-daemon]: https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/multilang/package-info.java +[DefaultAWSCredentialsProviderChain]: http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html +[kinesis-forum]: http://developer.amazonwebservices.com/connect/forum.jspa?forumID=169 +[aws-console]: http://aws.amazon.com/console/ diff --git a/samples/click_stream_sample/cloudformation/nodejs-kcl-clickstream.template b/samples/click_stream_sample/cloudformation/nodejs-kcl-clickstream.template new file mode 100644 index 00000000..3d168bdf --- /dev/null +++ b/samples/click_stream_sample/cloudformation/nodejs-kcl-clickstream.template @@ -0,0 +1,350 @@ +{ + "AWSTemplateFormatVersion" : "2010-09-09", + + "Description" : "The Amazon Kinesis click stream sample for Node.js KCL.", + + "Parameters" : { + "ProducerInstanceType" : { + "Description" : "EC2 instance type for producer node", + "Type" : "String", + "Default" : "t2.micro", + "AllowedValues" : [ "t2.micro", "t2.small", "t2.medium", "m3.medium", "m3.large", "m3.xlarge", "m3.2xlarge", "c3.large", "c3.xlarge", "c3.2xlarge", "c3.4xlarge", "c3.8xlarge" ], + "ConstraintDescription" : "must be a supported EC2 instance type for this template." + }, + + "ConsumerInstanceType" : { + "Description" : "EC2 instance type for consumer node", + "Type" : "String", + "Default" : "t2.micro", + "AllowedValues" : [ "t2.micro", "t2.small", "t2.medium", "m3.medium", "m3.large", "m3.xlarge", "m3.2xlarge", "c3.large", "c3.xlarge", "c3.2xlarge", "c3.4xlarge", "c3.8xlarge" ], + "ConstraintDescription" : "must be a supported EC2 instance type for this template." + }, + + "ProducerClusterSize" : { + "Description" : "Total producer instances", + "Type" : "Number", + "MinValue" : "1", + "Default" : "1" + }, + + "ConsumerClusterSize" : { + "Description" : "Total consumer instances", + "Type" : "Number", + "MinValue" : "1", + "Default" : "1" + }, + + "NumberOfShards" : { + "Description" : "Total shards for the kinesis stream this stack creates.", + "Type" : "Number", + "MinValue" : "1", + "Default" : "2" + }, + + "ProducerPutRecordsBatchSize" : { + "Description" : "Total number of records to batch per putRecords API call to Kinesis.", + "Type" : "Number", + "MinValue" : "1", + "Default" : "5" + }, + + "ProducerPutRecordsTps" : { + "Description" : "Transactions per second for PutRecords API.", + "Type" : "Number", + "MinValue" : "1", + "Default" : "20" + }, + + "S3BufferSizeInBytes" : { + "Description" : "Maximum buffer size on consumer before putting data in S3. Defaults to 1MB.", + "Type" : "Number", + "MinValue" : "1", + "Default" : "1048576" + }, + + "S3BucketName" : { + "Description" : "S3 bucket to send batch of processed click-stream data from consumer.", + "Type" : "String", + "Default" : "", + "MinLength" : "0", + "MaxLength" : "255", + "AllowedPattern" : "[\\x20-\\x7E]*", + "ConstraintDescription" : "can contain only ASCII characters." + }, + + "KeyName" : { + "Description" : "(Optional) Name of an existing EC2 KeyPair to enable SSH access to the instance. If this is not provided you will not be able to SSH on to the EC2 instance.", + "Type" : "String", + "Default" : "", + "MinLength" : "0", + "MaxLength" : "255", + "AllowedPattern" : "[\\x20-\\x7E]*", + "ConstraintDescription" : "can contain only ASCII characters." + }, + + "SSHLocation" : { + "Description" : "The IP address range that can be used to SSH to the EC2 instances", + "Type" : "String", + "MinLength" : "9", + "MaxLength" : "18", + "Default" : "0.0.0.0/0", + "AllowedPattern" : "(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})/(\\d{1,2})", + "ConstraintDescription" : "must be a valid IP CIDR range of the form x.x.x.x/x." + } + }, + + "Conditions": { + "UseEC2KeyName": {"Fn::Not": [{"Fn::Equals" : [{"Ref" : "KeyName"}, ""]}]} + }, + + "Mappings" : { + "AWSInstanceType2Arch" : { + "t2.micro" : { "Arch" : "HVM64" }, + "t2.small" : { "Arch" : "HVM64" }, + "t2.medium" : { "Arch" : "HVM64" }, + "m3.medium" : { "Arch" : "HVM64" }, + "m3.large" : { "Arch" : "HVM64" }, + "m3.xlarge" : { "Arch" : "HVM64" }, + "m3.2xlarge" : { "Arch" : "HVM64" }, + "c3.large" : { "Arch" : "HVM64" }, + "c3.xlarge" : { "Arch" : "HVM64" }, + "c3.2xlarge" : { "Arch" : "HVM64" }, + "c3.4xlarge" : { "Arch" : "HVM64" }, + "c3.8xlarge" : { "Arch" : "HVM64" } + }, + + "AWSRegionArch2AMI" : { + "us-east-1" : { "HVM64" : "ami-146e2a7c" }, + "us-west-1" : { "HVM64" : "ami-42908907" }, + "us-west-2" : { "HVM64" : "ami-dfc39aef" }, + "eu-west-1" : { "HVM64" : "ami-9d23aeea" }, + "eu-central-1" : { "HVM64" : "ami-04003319" }, + "ap-southeast-1" : { "HVM64" : "ami-96bb90c4" }, + "ap-southeast-2" : { "HVM64" : "ami-d50773ef" }, + "ap-northeast-1" : { "HVM64" : "ami-18869819" } + } + }, + + "Resources" : { + "Ec2SecurityGroup" : { + "Type" : "AWS::EC2::SecurityGroup", + "Properties" : { + "GroupDescription" : "Enable SSH access and HTTP access on the inbound port", + "SecurityGroupIngress" : + [{ "IpProtocol" : "tcp", "FromPort" : "22", "ToPort" : "22", "CidrIp" : { "Ref" : "SSHLocation"} }, + { "IpProtocol" : "tcp", "FromPort" : "80", "ToPort" : "80", "CidrIp" : "0.0.0.0/0"}] + } + }, + + "KinesisStream" : { + "Type" : "AWS::Kinesis::Stream", + "Properties" : { + "ShardCount" : { "Ref" : "NumberOfShards" } + } + }, + + "KCLDynamoDBTable" : { + "Type" : "AWS::DynamoDB::Table", + "Properties" : { + "AttributeDefinitions" : [ + { + "AttributeName" : "leaseKey", + "AttributeType" : "S" + } + ], + "KeySchema" : [ + { + "AttributeName" : "leaseKey", + "KeyType" : "HASH" + } + ], + "ProvisionedThroughput" : { + "ReadCapacityUnits" : "10", + "WriteCapacityUnits" : "5" + } + } + }, + + "S3Bucket": { + "Type" : "AWS::S3::Bucket", + "Properties" : { + "BucketName" : {"Ref":"S3BucketName"} + }, + "DeletionPolicy" : "Delete" + }, + + "RootRole": { + "Type" : "AWS::IAM::Role", + "Properties" : { + "AssumeRolePolicyDocument": { + "Version" : "2012-10-17", + "Statement" : [ { + "Effect" : "Allow", + "Principal" : { + "Service" : [ "ec2.amazonaws.com" ] + }, + "Action" : [ "sts:AssumeRole" ] + } ] + }, + "Path" : "/" + } + }, + + "RolePolicies" : { + "Type" : "AWS::IAM::Policy", + "Properties" : { + "PolicyName" : "root", + "PolicyDocument" : { + "Version" : "2012-10-17", + "Statement" : [ { + "Effect" : "Allow", + "Action" : "kinesis:*", + "Resource" : { "Fn::Join" : [ "", [ "arn:aws:kinesis:", { "Ref" : "AWS::Region" }, ":", { "Ref" : "AWS::AccountId" }, ":stream/", { "Ref" : "KinesisStream" } ]]} + }, { + "Effect" : "Allow", + "Action" : "dynamodb:*", + "Resource" : { "Fn::Join" : [ "", [ "arn:aws:dynamodb:", { "Ref" : "AWS::Region" }, ":", { "Ref" : "AWS::AccountId" }, ":table/", { "Ref" : "KCLDynamoDBTable" } ]]} + }, { + "Effect" : "Allow", + "Action" : "cloudwatch:*", + "Resource" : "*" + }, { + "Effect" : "Allow", + "Action" : "s3:*", + "Resource" : "*" + } + ] + }, + "Roles" : [ { "Ref": "RootRole" } ] + } + }, + + "RootInstanceProfile" : { + "Type" : "AWS::IAM::InstanceProfile", + "Properties" : { + "Path" : "/", + "Roles" : [ { "Ref": "RootRole" } ] + } + }, + + "ConsumerCluster" : { + "Type" : "AWS::AutoScaling::AutoScalingGroup", + "Properties" : { + "AvailabilityZones" : { "Fn::GetAZs" : { "Ref" : "AWS::Region" } }, + "LaunchConfigurationName" : { "Ref" : "ConsumerLaunchConfig" }, + "MinSize" : { "Ref" : "ConsumerClusterSize" }, + "MaxSize" : { "Ref" : "ConsumerClusterSize" }, + "DesiredCapacity" : { "Ref" : "ConsumerClusterSize" }, + "Tags" : [ + { "Key" : "ApplicationRole", "Value" : "NodeJSClickStreamConsumer", "PropagateAtLaunch" : "true" } + ] + } + }, + + "ConsumerLaunchConfig" : { + "Type" : "AWS::AutoScaling::LaunchConfiguration", + "Metadata" : { + "Comment:" : "Run consumer for kinesis NodeJS-KCL ClickStream example.", + "AWS::CloudFormation::Init" : { + "config" : { + } + } + }, + "Properties" : { + "KeyName" : { "Fn::If" : [ "UseEC2KeyName", { "Ref" : "KeyName" }, { "Ref" : "AWS::NoValue" } ]}, + "ImageId" : { "Fn::FindInMap" : [ "AWSRegionArch2AMI", { "Ref" : "AWS::Region" }, + { "Fn::FindInMap" : [ "AWSInstanceType2Arch", { "Ref" : "ConsumerInstanceType" }, + "Arch" ] } ] }, + "InstanceType" : { "Ref" : "ConsumerInstanceType" }, + "SecurityGroups" : [{ "Ref" : "Ec2SecurityGroup" }], + "IamInstanceProfile": { "Ref": "RootInstanceProfile" }, + "UserData" : { "Fn::Base64" : { "Fn::Join" : ["", [ + "#!/bin/bash\n", + "cd /home/ec2-user\n", + "yum install -y nodejs npm --enablerepo=epel\n", + "yum install -y git\n", + "git clone https://github.com/awslabs/amazon-kinesis-client-nodejs\n", + "chown -R ec2-user:ec2-user /home/ec2-user/amazon-kinesis-client-nodejs\n", + "cd amazon-kinesis-client-nodejs\n", + "npm install\n", + "cd samples/click_stream_sample/consumer\n", + "export NODE_LOG_DIR='logs'\n", + "mkdir logs\n", + "sed 's/streamName = kclnodejsclickstreamsample/streamName = ", { "Ref" : "KinesisStream"}, "/g' -i sample.properties\n", + "sed 's/applicationName = kclnodejsclickstreamsample/applicationName = ", { "Ref" : "KCLDynamoDBTable"}, "/g' -i sample.properties\n", + "sed 's/regionName = us-east-1/regionName = ", { "Ref" : "AWS::Region"}, "/g' -i sample.properties\n", + "sed 's/kinesis-clickstream-batchdata/", { "Ref" : "S3Bucket" }, "/g' -i config.js\n", + "sed 's/maxBufferSize : 1024 \\* 1024/maxBufferSize : ", { "Ref" : "S3BufferSizeInBytes" }, "/g' -i config.js\n", + "../../../bin/kcl-bootstrap -e --java /usr/bin/java --properties ./sample.properties > consumer.out 2>&1 &\n" + ]]}} + } + }, + + "ProducerCluster" : { + "Type" : "AWS::AutoScaling::AutoScalingGroup", + "Properties" : { + "AvailabilityZones" : { "Fn::GetAZs" : { "Ref" : "AWS::Region" } }, + "LaunchConfigurationName" : { "Ref" : "ProducerLaunchConfig" }, + "MinSize" : { "Ref" : "ProducerClusterSize" }, + "MaxSize" : { "Ref" : "ProducerClusterSize" }, + "DesiredCapacity" : { "Ref" : "ProducerClusterSize" }, + "Tags" : [ + { "Key" : "ApplicationRole", "Value" : "NodeJSClickStreamProducer", "PropagateAtLaunch" : "true" } + ] + } + }, + + "ProducerLaunchConfig" : { + "Type" : "AWS::AutoScaling::LaunchConfiguration", + "Metadata" : { + "Comment:" : "Run producer for kinesis NodeJS-KCL ClickStream example.", + "AWS::CloudFormation::Init" : { + "config" : { + } + } + }, + "Properties" : { + "KeyName" : { "Fn::If" : [ "UseEC2KeyName", { "Ref" : "KeyName" }, { "Ref" : "AWS::NoValue" } ]}, + "ImageId" : { "Fn::FindInMap" : [ "AWSRegionArch2AMI", { "Ref" : "AWS::Region" }, + { "Fn::FindInMap" : [ "AWSInstanceType2Arch", { "Ref" : "ProducerInstanceType" }, + "Arch" ] } ] }, + "InstanceType" : { "Ref" : "ProducerInstanceType" }, + "SecurityGroups" : [{ "Ref" : "Ec2SecurityGroup" }], + "IamInstanceProfile": { "Ref": "RootInstanceProfile" }, + "UserData" : { "Fn::Base64" : { "Fn::Join" : ["", [ + "#!/bin/bash\n", + "cd /home/ec2-user\n", + "yum install -y nodejs npm --enablerepo=epel\n", + "yum install -y git\n", + "git clone https://github.com/awslabs/amazon-kinesis-client-nodejs\n", + "chown -R ec2-user:ec2-user /home/ec2-user/amazon-kinesis-client-nodejs\n", + "cd amazon-kinesis-client-nodejs\n", + "npm install\n", + "cd samples/click_stream_sample/producer\n", + "export NODE_LOG_DIR='logs'\n", + "mkdir logs\n", + "sed 's/kclnodejsclickstreamsample/", { "Ref" : "KinesisStream" }, "/g' -i config.js\n", + "sed 's/us-east-1/", { "Ref" : "AWS::Region" }, "/g' -i config.js\n", + "sed 's/recordsToWritePerBatch : 5/recordsToWritePerBatch : ", { "Ref" : "ProducerPutRecordsBatchSize" }, "/g' -i config.js\n", + "sed 's/shards : 2/shards : ", { "Ref" : "NumberOfShards" }, "/g' -i config.js\n", + "sed 's/putRecordsTps : 20/putRecordsTps : ", { "Ref" : "ProducerPutRecordsTps" }, "/g' -i config.js\n", + "node click_stream_producer_app.js &\n" + ]]}} + } + } + }, + "Outputs" : { + "StreamName" : { + "Description" : "The name of the Kinesis Stream. This was autogenerated by the Kinesis Resource named 'KinesisStream'", + "Value" : { "Ref" : "KinesisStream" } + }, + "DynamoDBTableForKCL" : { + "Description" : "The DynamoDB table name to store KCL metadata. This was autogenerated by the DynamoDB Resource named 'KCLDynamoDBTable'", + "Value" : { "Ref" : "KCLDynamoDBTable" } + }, + "S3Bucket" : { + "Description" : "The name of the bucket where click stream data is stored from consumer. This was autogenerated by the S3 Resource named 'S3Bucket'", + "Value" : { "Ref" : "S3Bucket" } + } + } +} diff --git a/samples/click_stream_sample/consumer/click_stream_consumer.js b/samples/click_stream_sample/consumer/click_stream_consumer.js new file mode 100644 index 00000000..182f6cad --- /dev/null +++ b/samples/click_stream_sample/consumer/click_stream_consumer.js @@ -0,0 +1,194 @@ +/*** +Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Amazon Software License (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + +http://aws.amazon.com/asl/ + +or in the "license" file accompanying this file. This file is distributed +on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. See the License for the specific language governing +permissions and limitations under the License. +***/ + +'use strict'; + + +var async = require('async'); +var util = require('util'); +var config = require('./config'); +var kcl = require('../../..'); +var logger = require('../../util/logger'); +var recordBuffer = require('./record_buffer'); +var s3Emitter = require('./s3_emitter'); + + +/** + * A simple implementation of RecordProcessor that accepts records from an Amazon + * Kinesis stream and batches them into 1 MB (configurable) datasets, then puts + * them in a configured S3 bucket for further offline processing. The object + * returned should implement the functions initialize, processRecords, and shutdown + * in order to enable the KCL to interact with MultiLangDaemon. + * MultiLangDaemon would create one child process (hence one RecordProcessor instance) + * per shard. A single shard will never be accessed by more than one + * RecordProcessor instance; e.g., if you run this sample on a single machine, + * against a stream with 2 shards, MultiLangDaemon would create 2 child + * Node.js processes (RecordProcessor), one for each shard. + */ +function clickStreamProcessor(emitter, cfg) { + var buffer = recordBuffer(cfg.maxBufferSize); + var log = logger().getLogger('clickStreamProcessor'); + var shardId = null; + var commitQueue = null; + + function _commit(commitInfo, callback) { + var key = commitInfo.key; + var sequenceNumber = commitInfo.sequenceNumber; + var data = commitInfo.data; + var checkpointer = commitInfo.checkpointer; + emitter.emit(key, data, function(error) { + if (error) { + callback(error); + return; + } + log.info(util.format('Successfully uploaeded data to s3 file: %s', key)); + checkpointer.checkpoint(sequenceNumber, function(e, seq) { + if (!e) { + log.info('Successful checkpoint at sequence number: %s', sequenceNumber); + } + callback(e); + }); + }); + } + + function _processRecord(record, checkpointer, callback) { + var data = new Buffer(record.data, 'base64').toString(); + var sequenceNumber = record.sequenceNumber; + + // Add data to buffer until maxBufferSize. + buffer.putRecord(data, sequenceNumber); + + if (!buffer.shouldFlush()) { + callback(null); + return; + } + // Buffer is full. Add commit to the queue. + commitQueue.push({ + key: shardId + '/' + buffer.getFirstSequenceNumber() + '-' + buffer.getLastSequenceNumber(), + sequenceNumber: buffer.getLastSequenceNumber(), + data: buffer.readAndClearRecords(), + checkpointer: checkpointer + }, callback); + } + + return { + /** + * This function is called by the KCL to allow application initialization before it + * starts processing Amazon Kinesis records. The KCL won't start processing records until the + * application is successfully initialized and completeCallback is called. + */ + initialize: function(initializeInput, completeCallback) { + shardId = initializeInput.shardId; + // The KCL for Node.js does not allow more than one outstanding checkpoint. So checkpoint must + // be done sequentially. Async queue with 1 concurrency will allow executing checkpoints + // one after another. + commitQueue = async.queue(_commit, 1); + + emitter.initialize(function(err) { + if (err) { + log.error(util.format('Error initializing emitter: %s', err)); + process.exit(1); + } + else { + log.info('Click stream processor successfully initialized.'); + completeCallback(); + } + }); + }, + + /** + * Called by the KCL with a list of records to be processed and a checkpointer. + * A record looks like - + * '{"data":"","partitionKey":"someKey","sequenceNumber":"1234567890"}' + * Note that "data" is a base64-encoded string. You can use the Buffer class to decode the data + * into a string. The checkpointer can be used to checkpoint a particular sequence number. + * Any checkpoint call should be made before calling completeCallback. The KCL ingests the next + * batch of records only after completeCallback is called. + */ + processRecords: function(processRecordsInput, completeCallback) { + if (!processRecordsInput || !processRecordsInput.records) { + completeCallback(); + return; + } + + var records = processRecordsInput.records; + // Call completeCallback only after we have processed all records. + async.series([ + function(done) { + var record; + var processedCount = 0; + var errorCount = 0; + var errors; + + var callback = function (err) { + if (err) { + log.error(util.format('Received error while processing record: %s', err)); + errorCount++; + errors = errors + '\n' + err; + } + + processedCount++; + if (processedCount === records.length) { + done(errors, errorCount); + } + }; + + for (var i = 0 ; i < records.length ; ++i) { + record = records[i]; + _processRecord(record, processRecordsInput.checkpointer, callback); + } + } + ], + function(err, errCount) { + if (err) { + log.info(util.format('%d records processed with %d errors.', records.length, errCount)); + } + completeCallback(); + }); + }, + + /** + * Called by the KCL to indicate that this record processor should shut down. + * After the shutdown operation is complete, there will not be any more calls to + * any other functions of this record processor. Note that the shutdown reason + * could be either TERMINATE or ZOMBIE. If ZOMBIE, clients should not + * checkpoint because there is possibly another record processor which has + * acquired the lease for this shard. If TERMINATE, then + * checkpointer.checkpoint() should be called to checkpoint at the end of + * the shard so that this processor will be shut down and new processors + * will be created for the children of this shard. + */ + shutdown: function(shutdownInput, completeCallback) { + if (shutdownInput.reason !== 'TERMINATE') { + completeCallback(); + return; + } + // Make sure to emit all remaining buffered data to S3 before shutting down. + commitQueue.push({ + key: shardId + '/' + buffer.getFirstSequenceNumber() + '-' + buffer.getLastSequenceNumber(), + sequenceNumber: buffer.getLastSequenceNumber(), + data: buffer.readAndClearRecords(), + checkpointer: shutdownInput.checkpointer + }, function(error) { + if (error) { + log.error(util.format('Received error while shutting down: %s', error)); + } + completeCallback(); + }); + } + }; +} + +kcl(clickStreamProcessor(s3Emitter(config.s3), config.clickStreamProcessor)).run(); diff --git a/samples/click_stream_sample/consumer/config.js b/samples/click_stream_sample/consumer/config.js new file mode 100644 index 00000000..a58e8057 --- /dev/null +++ b/samples/click_stream_sample/consumer/config.js @@ -0,0 +1,37 @@ +/*** +Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Amazon Software License (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + +http://aws.amazon.com/asl/ + +or in the "license" file accompanying this file. This file is distributed +on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. See the License for the specific language governing +permissions and limitations under the License. +***/ + +'use strict'; + +var config = module.exports = { + s3 : { + // Region for Amazon S3. Defaults to us-east-1. + // region : '', + + // Amazon S3 bucket to store batched clickstream data. The consumer application + // may create a new bucket (based on S3.createBucketIfNotPresent value), + // if the specified bucket doesn't exist. + bucket : 'kinesis-clickstream-batchdata', + + // Enables the consumer application to create a new S3 bucket if the specified + // bucket doesn't exist. + createBucketIfNotPresent : true + }, + + clickStreamProcessor : { + // Maximum batch size in bytes before sending data to S3. + maxBufferSize : 1024 * 1024 + } +}; diff --git a/samples/click_stream_sample/consumer/record_buffer.js b/samples/click_stream_sample/consumer/record_buffer.js new file mode 100644 index 00000000..2400ff37 --- /dev/null +++ b/samples/click_stream_sample/consumer/record_buffer.js @@ -0,0 +1,81 @@ +/*** +Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Amazon Software License (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + +http://aws.amazon.com/asl/ + +or in the "license" file accompanying this file. This file is distributed +on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. See the License for the specific language governing +permissions and limitations under the License. +***/ + +'use strict'; + +// In memory buffer for storing kinesis records. +function recordBuffer(size) { + var buffer = []; + var firstSequenceNumber = 0; + var lastSequenceNumber = 0; + var totalRecords = 0; + var currentSize = 0; + var delimiter = '\n'; + + function _clear() { + buffer.length = 0; + firstSequenceNumber = 0; + lastSequenceNumber = 0; + totalRecords = 0; + currentSize = 0; + } + + return { + + // Stores a single record in memory. + putRecord: function(data, seq, callback) { + if (!data) { + return; + } + + var record = new Buffer(data + delimiter); + if (firstSequenceNumber === 0) { + firstSequenceNumber = seq; + } + + lastSequenceNumber = seq; + + currentSize += record.length; + buffer.push(record); + }, + + // Bundles all records in a single buffer and clears local buffer. + readAndClearRecords: function() { + var buf = new Buffer.concat(buffer, currentSize); + _clear(); + return buf; + }, + + setDelimiter: function(delimiter) { + delimiter = delimiter; + }, + + getFirstSequenceNumber: function() { + return firstSequenceNumber; + }, + + getLastSequenceNumber: function() { + return lastSequenceNumber; + }, + + shouldFlush: function() { + if (currentSize >= size) { + return true; + } + } + }; +} + +module.exports = recordBuffer; diff --git a/samples/click_stream_sample/consumer/s3_emitter.js b/samples/click_stream_sample/consumer/s3_emitter.js new file mode 100644 index 00000000..99d88315 --- /dev/null +++ b/samples/click_stream_sample/consumer/s3_emitter.js @@ -0,0 +1,75 @@ +/*** +Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Amazon Software License (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + +http://aws.amazon.com/asl/ + +or in the "license" file accompanying this file. This file is distributed +on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. See the License for the specific language governing +permissions and limitations under the License. +***/ + +'use strict'; + + +var AWS = require('aws-sdk'); +var util = require('util'); +var logger = require('../../util/logger'); + +function s3Emitter(config) { + var s3Client; + var log = logger().getLogger('s3Emitter'); + var initializeRetryCount = 0; + + var self = { + initialize: function(callback) { + ++initializeRetryCount; + + s3Client = new AWS.S3({region: config.region}); + // Check if specified S3 bucket exists. If it does not, create one based one config. + s3Client.headBucket({Bucket: config.bucket}, function(err, data) { + if (!err) { + log.info(util.format('Destination bucket: %s', config.bucket)); + callback(null); + return; + } + if (!config.createBucketIfNotPresent) { + callback('Specified bucket does not exist in S3. Enable bucket creation by setting config.s3.createBucketIfNotPresent to true.'); + return; + } + + var params = { + Bucket: config.bucket, + CreateBucketConfiguration: { + LocationConstraint: config.region + } + }; + s3Client.createBucket(params, function(err, data) { + if (err && initializeRetryCount < 3) { + setTimeout(function() { + self.initialize(callback); + }, 1000); + return; + } + callback(err); + }); + }); + }, + + emit: function(key, value, callback) { + var params = { + Bucket: config.bucket, + Key: key, + Body: value + }; + s3Client.upload(params, callback); + } + }; + return self; +} + +module.exports = s3Emitter; diff --git a/samples/click_stream_sample/consumer/sample.properties b/samples/click_stream_sample/consumer/sample.properties new file mode 100644 index 00000000..e1cec178 --- /dev/null +++ b/samples/click_stream_sample/consumer/sample.properties @@ -0,0 +1,83 @@ +# The script that abides by the multi-language protocol. This script will +# be executed by the MultiLangDaemon, which will communicate with this script +# over STDIN and STDOUT according to the multi-language protocol. +executableName = node click_stream_consumer.js + +# The name of an Amazon Kinesis stream to process. +streamName = kclnodejsclickstreamsample + +# Used by the KCL as the name of this application. Will be used as the name +# of an Amazon DynamoDB table which will store the lease and checkpoint +# information for workers with this application name +applicationName = kclnodejsclickstreamsample + +# Users can change the credentials provider the KCL will use to retrieve credentials. +# The DefaultAWSCredentialsProviderChain checks several other providers, which is +# described here: +# http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html +AWSCredentialsProvider = DefaultAWSCredentialsProviderChain + +# Appended to the user agent of the KCL. Does not impact the functionality of the +# KCL in any other way. +processingLanguage = nodejs/0.10 + +# Valid options at TRIM_HORIZON or LATEST. +# See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax +initialPositionInStream = TRIM_HORIZON + +# The following properties are also available for configuring the KCL Worker that is created +# by the MultiLangDaemon. + +# The KCL defaults to us-east-1 +regionName = us-east-1 + +# Fail over time in milliseconds. A worker which does not renew it's lease within this time interval +# will be regarded as having problems and it's shards will be assigned to other workers. +# For applications that have a large number of shards, this msy be set to a higher number to reduce +# the number of DynamoDB IOPS required for tracking leases +#failoverTimeMillis = 10000 + +# A worker id that uniquely identifies this worker among all workers using the same applicationName +# If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself. +#workerId = + +# Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks. +#shardSyncIntervalMillis = 60000 + +# Max records to fetch from Kinesis in a single GetRecords call. +#maxRecords = 10000 + +# Idle time between record reads in milliseconds. +#idleTimeBetweenReadsInMillis = 1000 + +# Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while) +#callProcessRecordsEvenForEmptyRecordList = false + +# Interval in milliseconds between polling to check for parent shard completion. +# Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on +# completion of parent shards). +#parentShardPollIntervalMillis = 10000 + +# Cleanup leases upon shards completion (don't wait until they expire in Kinesis). +# Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try +# to delete the ones we don't need any longer. +#cleanupLeasesUponShardCompletion = true + +# Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures). +#taskBackoffTimeMillis = 500 + +# Buffer metrics for at most this long before publishing to CloudWatch. +#metricsBufferTimeMillis = 10000 + +# Buffer at most this many metrics before publishing to CloudWatch. +#metricsMaxQueueSize = 10000 + +# KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls +# to RecordProcessorCheckpointer#checkpoint(String) by default. +#validateSequenceNumberBeforeCheckpointing = true + +# The maximum number of active threads for the MultiLangDaemon to permit. +# If a value is provided then a FixedThreadPool is used with the maximum +# active threads set to the provided value. If a non-positive integer or no +# value is provided a CachedThreadPool is used. +#maxActiveThreads = 0 diff --git a/samples/click_stream_sample/producer/click_stream_generator.js b/samples/click_stream_sample/producer/click_stream_generator.js new file mode 100644 index 00000000..a2e8b742 --- /dev/null +++ b/samples/click_stream_sample/producer/click_stream_generator.js @@ -0,0 +1,50 @@ +/*** +Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Amazon Software License (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + +http://aws.amazon.com/asl/ + +or in the "license" file accompanying this file. This file is distributed +on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. See the License for the specific language governing +permissions and limitations under the License. +***/ + +'use strict'; + + +function clickStreamGenerator(totalResources) { + var referrers = [ + 'http://www.amazon.com', + 'http://www.google.com', + 'http://www.yahoo.com', + 'http://bing/com', + 'http://stackoverflow.com', + 'http://reddit.com' + ]; + var resources = []; + + // List of resources and referrers to generate fake data. Resource names will also be used as partition-keys. + for (var i = 0 ; i < totalResources ; i++) { + resources.push('resource-' + i); + } + + return { + getRandomClickStreamData: function() { + var referrer = referrers[Math.floor(Math.random() * referrers.length)]; + var resource = resources[Math.floor(Math.random() * resources.length)]; + + var data = { + resource: resource, + referrer: referrer + }; + + return data; + } + }; +} + +module.exports = clickStreamGenerator; diff --git a/samples/click_stream_sample/producer/click_stream_producer.js b/samples/click_stream_sample/producer/click_stream_producer.js new file mode 100644 index 00000000..552697c5 --- /dev/null +++ b/samples/click_stream_sample/producer/click_stream_producer.js @@ -0,0 +1,134 @@ +/*** +Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Amazon Software License (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + +http://aws.amazon.com/asl/ + +or in the "license" file accompanying this file. This file is distributed +on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. See the License for the specific language governing +permissions and limitations under the License. +***/ + +'use strict'; + + +var util = require('util'); +var clickStreamGenerator = require('./click_stream_generator'); +var logger = require('../../util/logger'); + +function clickStreamProducer(kinesis, config) { + var clickStreamGen = clickStreamGenerator(config.shards); + var log = logger().getLogger('producer'); + var waitBetweenPutRecordsCallsInMilliseconds = config.putRecordsTps ? 1000 / config.putRecordsTps : 50; + + // Creates a new kinesis stream if one doesn't exist. + function _createStreamIfNotCreated(callback) { + var params = { + ShardCount: config.shards, + StreamName: config.stream + }; + + kinesis.createStream(params, function(err, data) { + if (err) { + // ResourceInUseException is returned when the stream is already created. + if (err.code !== 'ResourceInUseException') { + callback(err); + return; + } + else { + log.info(util.format('%s stream is already created! Re-using it.', config.stream)); + } + } + else { + log.info(util.format('%s stream does not exist. Created a new stream with that name.', config.stream)); + } + + // Poll to make sure stream is in ACTIVE state before start pushing data. + _waitForStreamToBecomeActive(callback); + }); + } + + // Checks current status of the stream. + function _waitForStreamToBecomeActive(callback) { + kinesis.describeStream({StreamName: config.stream}, function(err, data) { + if (!err) { + if (data.StreamDescription.StreamStatus === 'ACTIVE') { + log.info('Current status of the stream is ACTIVE.'); + callback(null); + } + else { + log.info(util.format('Current status of the stream is %s.', data.StreamDescription.StreamStatus)); + setTimeout(function() { + _waitForStreamToBecomeActive(callback); + }, 1000 * config.waitBetweenDescribeCallsInSeconds); + } + } + }); + } + + // Sends batch of records to kinesis using putRecords API. + function _sendToKinesis(totalRecords, done) { + if (totalRecords <= 0) { + return; + } + + var data, record; + var records = []; + + // Use putRecords API to batch more than one record. + for (var i = 0 ; i < totalRecords ; i++) { + data = clickStreamGen.getRandomClickStreamData(); + + record = { + Data: JSON.stringify(data), + PartitionKey: data.resource + }; + + records.push(record); + } + + var recordsParams = { + Records: records, + StreamName: config.stream + }; + + kinesis.putRecords(recordsParams, function(err, data) { + if (err) { + log.error(err); + } + else { + log.info(util.format('Sent %d records with %d failures.', records.length, data.FailedRecordCount)); + } + }); + + done(); + } + + function _sendToKinesisRecursively(totalRecords) { + setTimeout(function() { + _sendToKinesis(totalRecords, function() { + _sendToKinesisRecursively(totalRecords); + }); + }, waitBetweenPutRecordsCallsInMilliseconds); + } + + return { + run: function() { + log.info(util.format('Configured wait between consecutive PutRecords call in milliseconds: %d', + waitBetweenPutRecordsCallsInMilliseconds)); + _createStreamIfNotCreated(function(err) { + if (err) { + log.error(util.format('Error creating stream: %s', err)); + return; + } + _sendToKinesisRecursively(config.recordsToWritePerBatch); + }); + } + }; +} + +module.exports = clickStreamProducer; diff --git a/samples/click_stream_sample/producer/click_stream_producer_app.js b/samples/click_stream_sample/producer/click_stream_producer_app.js new file mode 100644 index 00000000..c17edf56 --- /dev/null +++ b/samples/click_stream_sample/producer/click_stream_producer_app.js @@ -0,0 +1,24 @@ +/*** +Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Amazon Software License (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + +http://aws.amazon.com/asl/ + +or in the "license" file accompanying this file. This file is distributed +on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. See the License for the specific language governing +permissions and limitations under the License. +***/ + +'use strict'; + + +var AWS = require('aws-sdk'); +var config = require('./config'); +var producer = require('./click_stream_producer'); + +var kinesis = new AWS.Kinesis({region: config.kinesis.region}); +producer(kinesis, config.clickStreamProducer).run(); diff --git a/samples/click_stream_sample/producer/config.js b/samples/click_stream_sample/producer/config.js new file mode 100644 index 00000000..e6c5525c --- /dev/null +++ b/samples/click_stream_sample/producer/config.js @@ -0,0 +1,48 @@ +/*** +Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Amazon Software License (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + +http://aws.amazon.com/asl/ + +or in the "license" file accompanying this file. This file is distributed +on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. See the License for the specific language governing +permissions and limitations under the License. +***/ + +'use strict'; + +var config = module.exports = { + kinesis : { + // Region for the Amazon Kinesis stream. + region : 'us-east-1' + }, + + clickStreamProducer : { + // The Amazon Kinesis stream to ingest clickstream data into. If the specified + // stream doesn't exist, the producer application creates a new stream. + stream : 'kclnodejsclickstreamsample', + + // Total shards in the specified Amazon Kinesis stream. + shards : 2, + + // The producer application batches clickstream records in to the size specified + // here, and makes a single PutRecords API call to ingest all records to the + // stream. + recordsToWritePerBatch : 5, + + // If the producer application creates a stream, it has to wait for the stream to + // transition to ACTIVE state before it can start putting data in it. This + // specifies the wait time between consecutive describeStream calls. + waitBetweenDescribeCallsInSeconds : 5, + + // Transactions per second for the PutRecords call to make sure the producer + // doesn't hit throughput limits enforced by Amazon Kinesis. + // For more information about throughput limits, see: + // http://docs.aws.amazon.com/kinesis/latest/dev/service-sizes-and-limits.html + putRecordsTps : 20 + } +}; diff --git a/samples/util/logger.js b/samples/util/logger.js new file mode 100644 index 00000000..62e38a95 --- /dev/null +++ b/samples/util/logger.js @@ -0,0 +1,49 @@ +/*** +Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Amazon Software License (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + +http://aws.amazon.com/asl/ + +or in the "license" file accompanying this file. This file is distributed +on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. See the License for the specific language governing +permissions and limitations under the License. +***/ + +'use strict'; + +var log4js = require('log4js'); + +function logger() { + var logDir = process.env.NODE_LOG_DIR !== undefined ? process.env.NODE_LOG_DIR : '.'; + + var config = { + "appenders": [ + { + "type": "file", + "filename": logDir + "/" + "application.log", + "pattern": "-yyyy-MM-dd", + "layout": { + "type": "pattern", + "pattern": "%d (PID: %x{pid}) %p %c - %m", + "tokens": { + "pid" : function() { return process.pid; } + } + } + } + ] + }; + + log4js.configure(config, {}); + + return { + getLogger: function(category) { + return log4js.getLogger(category); + } + }; +} + +module.exports = logger; diff --git a/test/kcl/action_handler_tests.js b/test/kcl/action_handler_tests.js new file mode 100644 index 00000000..a844cfd7 --- /dev/null +++ b/test/kcl/action_handler_tests.js @@ -0,0 +1,103 @@ +/*** +Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Amazon Software License (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + +http://aws.amazon.com/asl/ + +or in the "license" file accompanying this file. This file is distributed +on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. See the License for the specific language governing +permissions and limitations under the License. +***/ + +'use strict'; + + +var chai = require('chai'); +var expect = chai.expect; +var sinon = require('sinon'); +var util = require('util'); + +var IOHandler = require('../../lib/kcl/io_handler'); +var ActionHandler = require('../../lib/kcl/action_handler'); + +// Local stub to capture stdout/stderr. +function captureStream(stream) { + var origWrite = stream.write; + var buffer = ''; + + stream.write = function(chunk, encoding, callback) { + buffer += chunk.toString(); + origWrite.apply(stream, arguments); + return true; + }; + + return { + unhook: function unhook() { + stream.write = origWrite; + }, + captured: function() { + return buffer; + }, + readLast: function() { + var lines = buffer.split('\n'); + return lines[lines.length - 2]; + } + }; +} + +describe('action_handler_tests', function() { + var stdoutHook = null; + var stderrHook = null; + var ioHandler = new IOHandler(process.stdin, process.stdout, process.stderr); + var actionHandler = new ActionHandler(ioHandler); + + beforeEach(function() { + stdoutHook = captureStream(process.stdout); + stderrHook = captureStream(process.stderr); + }); + + afterEach(function() { + stdoutHook.unhook(); + stderrHook.unhook(); + }); + + after(function() { + actionHandler.destroy(); + ioHandler.destroy(); + }); + + + it('should not emit action for an invalid action', function(done) { + ioHandler.emit('line', '{"shardId":"shardId-000001"}'); + expect(stderrHook.captured()).to.equal('Invalid action received: {"shardId":"shardId-000001"}\n'); + done(); + }); + + it('should emit action event for a valid action', function(done) { + actionHandler.on('action', function(action) { + expect(action.action).to.equal('initialize'); + expect(action.shardId).to.equal('shardId-000001'); + done(); + }); + ioHandler.emit('line', '{"action":"initialize","shardId":"shardId-000001"}'); + }); + + it('should write action to stdout', function(done) { + actionHandler.sendAction({action : 'initialize', shardId : 'shardId-000001'}, function(err) { + expect(err).to.be.undefined(); + expect(stdoutHook.readLast()).to.equal('{"action":"initialize","shardId":"shardId-000001"}'); + done(); + }); + }); + + it('should emit end event when IO handler is closed', function(done) { + actionHandler.on('end', function() { + done(); + }); + ioHandler.emit('close'); + }); +}); diff --git a/test/kcl/checkpointer_tests.js b/test/kcl/checkpointer_tests.js new file mode 100644 index 00000000..42c1b988 --- /dev/null +++ b/test/kcl/checkpointer_tests.js @@ -0,0 +1,96 @@ +/*** +Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Amazon Software License (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + +http://aws.amazon.com/asl/ + +or in the "license" file accompanying this file. This file is distributed +on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. See the License for the specific language governing +permissions and limitations under the License. +***/ + +'use strict'; + + +var chai = require('chai'); +var expect = require('chai').expect; +var sinon = require('sinon'); +var util = require('util'); + +var Checkpointer = require('../../lib/kcl/checkpointer'); +var KCLManager = require('../../lib/kcl/kcl_manager'); + +describe('checkpointer_tests', function() { + var sandbox = null; + var kclManager = new KCLManager(null, process.stdin, process.stdout, process.stderr); + var checkpointer = new Checkpointer(kclManager); + + beforeEach(function() { + sandbox = sinon.sandbox.create(); + }); + + afterEach(function() { + sandbox.restore(); + }); + + after(function() { + kclManager._cleanup(); + }); + + it('should emit a checkpoint action and consume response action', function(done) { + var seq = Math.floor((Math.random() * 1000000)).toString(); + // Mock KCLManager checkpoint and short-circuit dummy response. + sandbox.stub(kclManager, 'checkpoint', function(seq) { + checkpointer.onCheckpointerResponse(null, seq); + }); + + checkpointer.checkpoint(seq, function(err, seq) { + expect(err).to.be.null(); + done(); + }); + }); + + it('should emit a checkpoint action and consume response when no sequence number', function(done) { + sandbox.stub(kclManager, 'checkpoint', function(seq) { + expect(seq).to.be.null(); + checkpointer.onCheckpointerResponse(null, seq); + }); + + checkpointer.checkpoint(function(err) { + expect(err).to.be.null(); + done(); + }); + }); + + it('should raise an error when error is received from MultiLangDaemon', function(done) { + var seq = Math.floor((Math.random() * 1000000)).toString(); + // Mock KCLManager checkpoint and short-circuit dummy response. + sandbox.stub(kclManager, 'checkpoint', function(seq) { + checkpointer.onCheckpointerResponse('ThrottlingException', seq); + }); + + checkpointer.checkpoint(seq, function(err) { + expect(err).not.to.be.null(); + expect(err).to.equal('ThrottlingException'); + done(); + }); + }); + + it('should raise an error on checkpoint when previous checkpoint is not complete', function(done) { + var seq = Math.floor((Math.random() * 1000000)).toString(); + // Mock KCLManager checkpoint to have outstanding checkpoint. + sandbox.stub(kclManager, 'checkpoint', function(seq) { + }); + checkpointer.checkpoint(seq, function(err) { + }); + + checkpointer.checkpoint(seq, function(err) { + expect(err).to.equal('Cannot checkpoint while another checkpoint is already in progress.'); + done(); + }); + }); +}); diff --git a/test/kcl/io_handler_tests.js b/test/kcl/io_handler_tests.js new file mode 100644 index 00000000..897286f1 --- /dev/null +++ b/test/kcl/io_handler_tests.js @@ -0,0 +1,103 @@ +/*** +Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Amazon Software License (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + +http://aws.amazon.com/asl/ + +or in the "license" file accompanying this file. This file is distributed +on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. See the License for the specific language governing +permissions and limitations under the License. +***/ + +'use strict'; + + +var chai = require('chai'); +var expect = chai.expect; +var sinon = require('sinon'); +var util = require('util'); + +var IOHandler = require('../../lib/kcl/io_handler'); + +// Local stub to capture stdout/stderr. +function captureStream(stream) { + var origWrite = stream.write; + var buffer = ''; + + stream.write = function(chunk, encoding, callback) { + buffer += chunk.toString(); + origWrite.apply(stream, arguments); + return true; + }; + + return { + unhook: function unhook() { + stream.write = origWrite; + }, + captured: function() { + return buffer; + }, + readLast: function() { + var lines = buffer.split('\n'); + return lines[lines.length - 2]; + } + }; +} + +describe('io_handler_tests', function() { + var stdoutHook = null; + var stderrHook = null; + var ioHandler = new IOHandler(process.stdin, process.stdout, process.stderr); + + beforeEach(function() { + stdoutHook = captureStream(process.stdout); + stderrHook = captureStream(process.stderr); + }); + + afterEach(function() { + stdoutHook.unhook(); + stderrHook.unhook(); + }); + + after(function() { + ioHandler.destroy(); + }); + + it('should read line', function(done) { + ioHandler.on('line', function(line) { + expect(line).to.equal('line1'); + ioHandler.removeAllListeners('line'); + done(); + }); + process.stdin.emit('data', 'line1\n'); + }); + + it('should write to stdout', function(done) { + ioHandler.writeLine('{"action":"status","responseFor":"initialize"}', function(err) { + expect(stdoutHook.readLast()).to.equal('{"action":"status","responseFor":"initialize"}'); + done(); + }); + }); + + it('should write error messages to stderr', function(done) { + ioHandler.writeError('an error message'); + expect(stderrHook.captured()).to.equal('an error message\n'); + done(); + }); + + it('should not read line after IO handler is destroyed', function(done) { + var callback = sinon.spy(); + ioHandler.on('line', callback); + process.stdin.emit('data', 'line1\n'); + expect(callback.calledOnce).to.be.true(); + ioHandler.destroy(); + process.stdin.emit('data', 'line2\n'); + expect(callback.calledTwice).to.be.false(); + ioHandler.removeListener('line', callback); + done(); + }); +}); diff --git a/test/kcl/kcl_process_tests.js b/test/kcl/kcl_process_tests.js new file mode 100644 index 00000000..4b02c5f4 --- /dev/null +++ b/test/kcl/kcl_process_tests.js @@ -0,0 +1,193 @@ +/*** +Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Amazon Software License (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + +http://aws.amazon.com/asl/ + +or in the "license" file accompanying this file. This file is distributed +on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. See the License for the specific language governing +permissions and limitations under the License. +***/ + +'use strict'; + + +var chai = require('chai'); +var expect = require('chai').expect; +var sinon = require('sinon'); +var util = require('util'); + +var kcl = require('../..'); + +function RecordProcessor() {} + +RecordProcessor.prototype.initialize = function(initializeInput, completeCallback) { + completeCallback(); +}; + +RecordProcessor.prototype.processRecords = function(processRecordsInput, completeCallback) { + if (!processRecordsInput || !processRecordsInput.records) { + completeCallback(); + return; + } + + var records = processRecordsInput.records; + var seq = records[0].sequenceNumber; + var checkpointer = processRecordsInput.checkpointer; + checkpointer.checkpoint(seq, function(err) { + if (err) { + checkpointer.checkpoint(seq, function(err) { + completeCallback(); + }); + } + else { + completeCallback(); + } + }); +}; + +RecordProcessor.prototype.shutdown = function(shutdownInput, completeCallback) { + completeCallback(); +}; + +describe('kcl_process_tests', function() { + var sandbox = null; + + beforeEach(function() { + sandbox = sinon.sandbox.create(); + }); + + afterEach(function() { + sandbox.restore(); + }); + + it('should initialize kcl and send back response', function(done) { + var kclProcess = kcl(new RecordProcessor(), process.stdin, process.stdout, process.stderr); + var inputSpec = {method : 'initialize', action : 'initialize', input : '{"action":"initialize","shardId":"shard-000001"}'}; + // Since we can't know when run() would finish processing all inputs, creating a stub for last call in the chain to force verification. + sandbox.stub(kclProcess._kclManager._actionHandler, 'sendAction', function(data, cb) { + // Verify that Initialize action was processed. + expect(JSON.stringify(data)).to.equal(JSON.stringify({action : 'status', responseFor : inputSpec.action})); + cb(); + kclProcess._kclManager._cleanup(); + done(); + }); + + kclProcess.run(); + process.stdin.emit('data', inputSpec.input + '\n'); + }); + + it('should not process records if not initialized', function(done) { + var kclProcess = kcl(new RecordProcessor(), process.stdin, process.stdout, process.stderr); + var inputSpec = {method : 'processRecords', action : 'processRecords', input : '{"action":"processRecords","records":[{"data":"bWVvdw==","partitionKey":"cat","sequenceNumber":"456"}]}'}; + + try { + kclProcess.run(); + process.stdin.emit('data', inputSpec.input + '\n'); + } catch(err) { + kclProcess._kclManager._cleanup(); + expect(err.message).to.equal('Kinesis Client Library is in the invalid state. Cannot proceed further.'); + done(); + } + }); + + it('should not shutdown if not initialized', function(done) { + var kclProcess = kcl(new RecordProcessor(), process.stdin, process.stdout, process.stderr); + var inputSpec = {method : 'shutdown', action : 'shutdown', input : '{"action":"shutdown","reason":"TERMINATE"}'}; + + try { + kclProcess.run(); + process.stdin.emit('data', inputSpec.input + '\n'); + } catch(err) { + kclProcess._kclManager._cleanup(); + expect(err.message).to.equal('Kinesis Client Library is in the invalid state. Cannot proceed further.'); + done(); + } + }); + + it('should process Initialize, one or more processRecords and shutdown in order', function(done) { + var inputSpecs = { + init: {method : 'initialize', action : 'initialize', input : '{"action":"initialize","shardId":"shard-000001"}'}, + process: {method : 'processRecords', action : 'processRecords', input : '{"action":"processRecords","records":[{"data":"bWVvdw==","partitionKey":"cat","sequenceNumber":"456"}]}'}, + shutdown: {method : 'shutdown', action : 'shutdown', input : '{"action":"shutdown","reason":"TERMINATE"}'}, + }; + + var kclProcess = kcl(new RecordProcessor(), process.stdin, process.stdout, process.stderr); + // Since we can't know when run() would finish processing all inputs, creating a stub for last call in the chain to force verification ! + sandbox.stub(kclProcess._kclManager._actionHandler, 'sendAction', function(data, cb) { + cb(); + // MultiLangDaemon never sends a message until it receives reply for previous operation + // send next action based on previous response ! + if (data.responseFor === 'initialize') { + process.stdin.emit('data', inputSpecs.process.input + '\n'); + } + else if (data.responseFor === 'processRecords') { + process.stdin.emit('data', inputSpecs.shutdown.input + '\n'); + } + else if (data.action === 'checkpoint') { + process.stdin.emit('data', '{"action":"checkpoint","checkpoint":"456"}' + '\n'); + } + else if (data.responseFor === 'shutdown') { + kclProcess._kclManager._cleanup(); + done(); + } + }); + + kclProcess.run(); + process.stdin.emit('data', inputSpecs.init.input + '\n'); + }); + + it('should process checkpoint error from MultiLangDaemon', function(done) { + var inputSpecs = { + init: {method : 'initialize', action : 'initialize', input : '{"action":"initialize","shardId":"shard-000001"}', resp : '{"action":"status","responseFor":"initialize"}'}, + process: {method : 'processRecords', action : 'processRecords', input : '{"action":"processRecords","records":[{"data":"bWVvdw==","partitionKey":"cat","sequenceNumber":"456"}]}', resp : '{"action":"status","responseFor":"processRecords"}'}, + checkpoint: {resp : '{"action":"checkpoint","checkpoint":"456"}'}, + shutdown: {method : 'shutdown', action : 'shutdown', input : '{"action":"shutdown","reason":"TERMINATE"}', resp : '{"action":"status","responseFor":"shutdown"}'}, + }; + + var kclProcess = kcl(new RecordProcessor(), process.stdin, process.stdout, process.stderr); + // Since we can't know when run() would finish processing all inputs, creating a stub for last call in the chain to force verification ! + sandbox.stub(kclProcess._kclManager._actionHandler, 'sendAction', function(data, cb) { + cb(); + console.log(JSON.stringify(data)); + // MultiLangDaemon never sends a message until it receives reply for previous operation. + // Send next action based on previous response. + if (data.responseFor === 'initialize') { + expect(JSON.stringify(data)).to.equal(inputSpecs.init.resp); + process.stdin.emit('data', inputSpecs.process.input + '\n'); + } + else if (data.responseFor === 'processRecords') { + expect(JSON.stringify(data)).to.equal(inputSpecs.process.resp); + process.stdin.emit('data', inputSpecs.shutdown.input + '\n'); + } + else if (data.action === 'checkpoint') { + expect(JSON.stringify(data)).to.equal(inputSpecs.checkpoint.resp); + if (this.seen_checkpoint === undefined) { + this.seen_checkpoint = 1; + process.stdin.emit('data', '{"action":"checkpoint","checkpoint":"456","error":"ThrottlingException"}' + '\n'); + } + else { + this.seen_checkpoint++; + process.stdin.emit('data', '{"action":"checkpoint","checkpoint":"456"}' + '\n'); + } + } + else if (data.responseFor === 'shutdown') { + // Checkpoint should have been retried. + expect(this.seen_checkpoint).to.equal(2); + expect(JSON.stringify(data)).to.equal(inputSpecs.shutdown.resp); + kclProcess._kclManager._ioHandler.destroy(); + done(); + } + else { + done('Error - invalid action passed to action handler: ' + data); + } + }); + + kclProcess.run(); + process.stdin.emit('data', inputSpecs.init.input + '\n'); + }); +}); diff --git a/test/unit_tests_bootstrap.js b/test/unit_tests_bootstrap.js new file mode 100644 index 00000000..fa5a4273 --- /dev/null +++ b/test/unit_tests_bootstrap.js @@ -0,0 +1,23 @@ +/*** +Copyright 2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +Licensed under the Amazon Software License (the "License"). +You may not use this file except in compliance with the License. +A copy of the License is located at + +http://aws.amazon.com/asl/ + +or in the "license" file accompanying this file. This file is distributed +on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +express or implied. See the License for the specific language governing +permissions and limitations under the License. +***/ + +'use strict'; + +var path = require('path'); +var srcDir = path.join(__dirname, '..', 'lib'); + +require('blanket')({ + pattern: srcDir +});