From b788b88d05e486babe460e4183fae53816fb440f Mon Sep 17 00:00:00 2001 From: Brandon Max Date: Sat, 25 Apr 2020 15:58:55 -0400 Subject: [PATCH] Add ability to customize bulk batch size --- docs/index.asciidoc | 124 ++++++++++-------- lib/logstash/outputs/elasticsearch.rb | 23 +++- .../outputs/elasticsearch/common_configs.rb | 7 +- .../outputs/elasticsearch/http_client.rb | 30 ++--- .../elasticsearch/http_client_builder.rb | 11 +- spec/integration/outputs/index_spec.rb | 63 ++++++--- .../outputs/elasticsearch/http_client_spec.rb | 44 +++++-- 7 files changed, 181 insertions(+), 121 deletions(-) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index abdffbd11..ffb49b0b5 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -48,7 +48,7 @@ plugin to version 6.2.5 or higher. If you plan to use the Kibana web interface, use the Elasticsearch output plugin to get your log data into -Elasticsearch. +Elasticsearch. TIP: You can run Elasticsearch on your own hardware, or use our https://www.elastic.co/cloud/elasticsearch-service[hosted {es} Service] on @@ -58,7 +58,7 @@ Elastic Cloud. The Elasticsearch Service is available on both AWS and GCP. This output only speaks the HTTP protocol. HTTP is the preferred protocol for interacting with Elasticsearch as of Logstash 2.0. We strongly encourage the use of HTTP over the node protocol for a number of reasons. HTTP is only marginally slower, yet far easier to administer and work with. When using the HTTP protocol one may upgrade Elasticsearch versions without having -to upgrade Logstash in lock-step. +to upgrade Logstash in lock-step. You can learn more about Elasticsearch at @@ -82,7 +82,7 @@ the new template is installed. [NOTE] ================================================================================ -You cannot use dynamic variable substitution when `ilm_enabled` is `true` and +You cannot use dynamic variable substitution when `ilm_enabled` is `true` and when using `ilm_rollover_alias`. ================================================================================ @@ -106,7 +106,7 @@ Example: index => "%{[some_field][sub_field]}-%{+YYYY.MM.dd}" } } - + **What to do in case there is no field in the event containing the destination index prefix?** You can use the `mutate` filter and conditionals to add a `[@metadata]` field (see https://www.elastic.co/guide/en/logstash/current/event-dependent-configuration.html#metadata) to set @@ -142,7 +142,7 @@ HTTP requests to the bulk API are expected to return a 200 response code. All ot The following document errors are handled as follows: * 400 and 404 errors are sent to the dead letter queue (DLQ), if enabled. If a DLQ is not enabled, a log message will be emitted, and the event will be dropped. See <> for more info. - * 409 errors (conflict) are logged as a warning and dropped. + * 409 errors (conflict) are logged as a warning and dropped. Note that 409 exceptions are no longer retried. Please set a higher `retry_on_conflict` value if you experience 409 exceptions. It is more performant for Elasticsearch to retry these exceptions than this plugin. @@ -224,12 +224,12 @@ not reevaluate its DNS value while the keepalive is in effect. ==== HTTP Compression -This plugin supports request and response compression. Response compression is enabled by default and -for Elasticsearch versions 5.0 and later, the user doesn't have to set any configs in Elasticsearch for -it to send back compressed response. For versions before 5.0, `http.compression` must be set to `true` https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-http.html#modules-http[in +This plugin supports request and response compression. Response compression is enabled by default and +for Elasticsearch versions 5.0 and later, the user doesn't have to set any configs in Elasticsearch for +it to send back compressed response. For versions before 5.0, `http.compression` must be set to `true` https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-http.html#modules-http[in Elasticsearch] to take advantage of response compression when using this plugin -For requests compression, regardless of the Elasticsearch version, users have to enable `http_compression` +For requests compression, regardless of the Elasticsearch version, users have to enable `http_compression` setting in their Logstash config file. ==== Authentication @@ -247,6 +247,7 @@ This plugin supports the following configuration options plus the <> |<>|No | <> |<>|No | <> |<>|No +| <> |<>|No | <> |a valid filesystem path|No | <> |<>|No | <> |<>|No @@ -308,7 +309,7 @@ output plugins.   [id="plugins-{type}s-{plugin}-action"] -===== `action` +===== `action` * Value type is <> * Default value is `"index"` @@ -339,7 +340,7 @@ Authenticate using Elasticsearch API key. Note that this option also requires en Format is `id:api_key` where `id` and `api_key` are as returned by the Elasticsearch https://www.elastic.co/guide/en/elasticsearch/reference/current/security-api-create-api-key.html[Create API key API]. [id="plugins-{type}s-{plugin}-bulk_path"] -===== `bulk_path` +===== `bulk_path` * Value type is <> * There is no default value for this setting. @@ -347,8 +348,17 @@ Format is `id:api_key` where `id` and `api_key` are as returned by the Elasticse HTTP Path to perform the _bulk requests to this defaults to a concatenation of the path parameter and "_bulk" +[id="plugins-{type}s-{plugin}-bulk_batch_size"] +===== `bulk_batch_size` + + * Value type is <> + * Default value is `20971520` (20 * 1024 * 1024 === 20 MiB) + +HTTP Path to perform the _bulk requests to +this defaults to a concatenation of the path parameter and "_bulk" + [id="plugins-{type}s-{plugin}-cacert"] -===== `cacert` +===== `cacert` * Value type is <> * There is no default value for this setting. @@ -376,7 +386,7 @@ Cloud ID, from the Elastic Cloud web console. If set `hosts` should not be used. For more details, check out the https://www.elastic.co/guide/en/logstash/current/connecting-to-cloud.html#_cloud_id[Logstash-to-Cloud documentation] [id="plugins-{type}s-{plugin}-doc_as_upsert"] -===== `doc_as_upsert` +===== `doc_as_upsert` * Value type is <> * Default value is `false` @@ -385,7 +395,7 @@ Enable `doc_as_upsert` for update mode. Create a new document with source if `document_id` doesn't exist in Elasticsearch [id="plugins-{type}s-{plugin}-document_id"] -===== `document_id` +===== `document_id` * Value type is <> * There is no default value for this setting. @@ -393,7 +403,7 @@ Create a new document with source if `document_id` doesn't exist in Elasticsearc The document ID for the index. Useful for overwriting existing entries in Elasticsearch with the same ID. [id="plugins-{type}s-{plugin}-document_type"] -===== `document_type` +===== `document_type` * Value type is <> * There is no default value for this setting. @@ -414,7 +424,7 @@ If you don't set a value for this option: - for elasticsearch clusters 5.x and below: the event's 'type' field will be used, if the field is not present the value of 'doc' will be used. [id="plugins-{type}s-{plugin}-failure_type_logging_whitelist"] -===== `failure_type_logging_whitelist` +===== `failure_type_logging_whitelist` * Value type is <> * Default value is `[]` @@ -435,7 +445,7 @@ an elasticsearch node. The headers will be used for any kind of request These custom headers will be overidden by settings like `http_compression`. [id="plugins-{type}s-{plugin}-healthcheck_path"] -===== `healthcheck_path` +===== `healthcheck_path` * Value type is <> * There is no default value for this setting. @@ -446,7 +456,7 @@ before it is once again eligible to service requests. If you have custom firewall rules you may need to change this [id="plugins-{type}s-{plugin}-hosts"] -===== `hosts` +===== `hosts` * Value type is <> * Default value is `[//127.0.0.1]` @@ -464,7 +474,7 @@ to prevent LS from sending bulk requests to the master nodes. So this parameter Any special characters present in the URLs here MUST be URL escaped! This means `#` should be put in as `%23` for instance. [id="plugins-{type}s-{plugin}-http_compression"] -===== `http_compression` +===== `http_compression` * Value type is <> * Default value is `false` @@ -527,7 +537,7 @@ NOTE: Updating the rollover alias will require the index template to be rewritte NOTE: `ilm_rollover_alias` does NOT support dynamic variable substitution as `index` does. [id="plugins-{type}s-{plugin}-index"] -===== `index` +===== `index` * Value type is <> * Default value is `"logstash-%{+yyyy.MM.dd}"` @@ -541,7 +551,7 @@ LS uses Joda to format the index pattern from event timestamp. Joda formats are defined http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html[here]. [id="plugins-{type}s-{plugin}-keystore"] -===== `keystore` +===== `keystore` * Value type is <> * There is no default value for this setting. @@ -550,7 +560,7 @@ The keystore used to present a certificate to the server. It can be either .jks or .p12 [id="plugins-{type}s-{plugin}-keystore_password"] -===== `keystore_password` +===== `keystore_password` * Value type is <> * There is no default value for this setting. @@ -558,7 +568,7 @@ It can be either .jks or .p12 Set the keystore password [id="plugins-{type}s-{plugin}-manage_template"] -===== `manage_template` +===== `manage_template` * Value type is <> * Default value is `true` @@ -577,7 +587,7 @@ field names) you should set `manage_template` to false and use the REST API to apply your templates manually. [id="plugins-{type}s-{plugin}-parameters"] -===== `parameters` +===== `parameters` * Value type is <> * There is no default value for this setting. @@ -587,7 +597,7 @@ to every host listed in the 'hosts' configuration. If the 'hosts' list contains urls that already have query strings, the one specified here will be appended. [id="plugins-{type}s-{plugin}-parent"] -===== `parent` +===== `parent` * Value type is <> * Default value is `nil` @@ -596,7 +606,7 @@ For child documents, ID of the associated parent. This can be dynamic using the `%{foo}` syntax. [id="plugins-{type}s-{plugin}-password"] -===== `password` +===== `password` * Value type is <> * There is no default value for this setting. @@ -604,7 +614,7 @@ This can be dynamic using the `%{foo}` syntax. Password to authenticate to a secure Elasticsearch cluster [id="plugins-{type}s-{plugin}-path"] -===== `path` +===== `path` * Value type is <> * There is no default value for this setting. @@ -615,7 +625,7 @@ Note that if you use paths as components of URLs in the 'hosts' field you may not also set this field. That will raise an error at startup [id="plugins-{type}s-{plugin}-pipeline"] -===== `pipeline` +===== `pipeline` * Value type is <> * Default value is `nil` @@ -624,7 +634,7 @@ Set which ingest pipeline you wish to execute for an event. You can also use eve here like `pipeline => "%{INGEST_PIPELINE}"` [id="plugins-{type}s-{plugin}-pool_max"] -===== `pool_max` +===== `pool_max` * Value type is <> * Default value is `1000` @@ -635,7 +645,7 @@ Setting this too low may mean frequently closing / opening connections which is bad. [id="plugins-{type}s-{plugin}-pool_max_per_route"] -===== `pool_max_per_route` +===== `pool_max_per_route` * Value type is <> * Default value is `100` @@ -646,7 +656,7 @@ Setting this too low may mean frequently closing / opening connections which is bad. [id="plugins-{type}s-{plugin}-proxy"] -===== `proxy` +===== `proxy` * Value type is <> * There is no default value for this setting. @@ -657,7 +667,7 @@ An empty string is treated as if proxy was not set. This is useful when using environment variables e.g. `proxy => '${LS_PROXY:}'`. [id="plugins-{type}s-{plugin}-resurrect_delay"] -===== `resurrect_delay` +===== `resurrect_delay` * Value type is <> * Default value is `5` @@ -667,7 +677,7 @@ Resurrection is the process by which backend endpoints marked 'down' are checked to see if they have come back to life [id="plugins-{type}s-{plugin}-retry_initial_interval"] -===== `retry_initial_interval` +===== `retry_initial_interval` * Value type is <> * Default value is `2` @@ -675,7 +685,7 @@ to see if they have come back to life Set initial interval in seconds between bulk retries. Doubled on each retry up to `retry_max_interval` [id="plugins-{type}s-{plugin}-retry_max_interval"] -===== `retry_max_interval` +===== `retry_max_interval` * Value type is <> * Default value is `64` @@ -683,7 +693,7 @@ Set initial interval in seconds between bulk retries. Doubled on each retry up t Set max interval in seconds between bulk retries. [id="plugins-{type}s-{plugin}-retry_on_conflict"] -===== `retry_on_conflict` +===== `retry_on_conflict` * Value type is <> * Default value is `1` @@ -693,7 +703,7 @@ See the https://www.elastic.co/guide/en/elasticsearch/guide/current/partial-upda for more info [id="plugins-{type}s-{plugin}-routing"] -===== `routing` +===== `routing` * Value type is <> * There is no default value for this setting. @@ -702,7 +712,7 @@ A routing override to be applied to all processed events. This can be dynamic using the `%{foo}` syntax. [id="plugins-{type}s-{plugin}-script"] -===== `script` +===== `script` * Value type is <> * Default value is `""` @@ -718,7 +728,7 @@ Example: } [id="plugins-{type}s-{plugin}-script_lang"] -===== `script_lang` +===== `script_lang` * Value type is <> * Default value is `"painless"` @@ -727,7 +737,7 @@ Set the language of the used script. If not set, this defaults to painless in ES When using indexed (stored) scripts on Elasticsearch 6 and higher, you must set this parameter to `""` (empty string). [id="plugins-{type}s-{plugin}-script_type"] -===== `script_type` +===== `script_type` * Value can be any of: `inline`, `indexed`, `file` * Default value is `["inline"]` @@ -738,7 +748,7 @@ Define the type of script referenced by "script" variable file : "script" contains the name of script stored in elasticsearch's config directory [id="plugins-{type}s-{plugin}-script_var_name"] -===== `script_var_name` +===== `script_var_name` * Value type is <> * Default value is `"event"` @@ -746,7 +756,7 @@ Define the type of script referenced by "script" variable Set variable name passed to script (scripted update) [id="plugins-{type}s-{plugin}-scripted_upsert"] -===== `scripted_upsert` +===== `scripted_upsert` * Value type is <> * Default value is `false` @@ -754,7 +764,7 @@ Set variable name passed to script (scripted update) if enabled, script is in charge of creating non-existent document (scripted update) [id="plugins-{type}s-{plugin}-sniffing"] -===== `sniffing` +===== `sniffing` * Value type is <> * Default value is `false` @@ -764,7 +774,7 @@ For Elasticsearch 1.x and 2.x any nodes with `http.enabled` (on by default) will For Elasticsearch 5.x and 6.x any nodes with `http.enabled` (on by default) will be added to the hosts list, excluding master-only nodes. [id="plugins-{type}s-{plugin}-sniffing_delay"] -===== `sniffing_delay` +===== `sniffing_delay` * Value type is <> * Default value is `5` @@ -772,7 +782,7 @@ For Elasticsearch 5.x and 6.x any nodes with `http.enabled` (on by default) will How long to wait, in seconds, between sniffing attempts [id="plugins-{type}s-{plugin}-sniffing_path"] -===== `sniffing_path` +===== `sniffing_path` * Value type is <> * There is no default value for this setting. @@ -783,7 +793,7 @@ if sniffing_path is set it will be used as an absolute path do not use full URL here, only paths, e.g. "/sniff/_nodes/http" [id="plugins-{type}s-{plugin}-ssl"] -===== `ssl` +===== `ssl` * Value type is <> * There is no default value for this setting. @@ -793,7 +803,7 @@ is specified in the URLs listed in 'hosts'. If no explicit protocol is specified If SSL is explicitly disabled here the plugin will refuse to start if an HTTPS URL is given in 'hosts' [id="plugins-{type}s-{plugin}-ssl_certificate_verification"] -===== `ssl_certificate_verification` +===== `ssl_certificate_verification` * Value type is <> * Default value is `true` @@ -803,7 +813,7 @@ For more information on disabling certificate verification please read https://www.cs.utexas.edu/~shmat/shmat_ccs12.pdf [id="plugins-{type}s-{plugin}-template"] -===== `template` +===== `template` * Value type is <> * There is no default value for this setting. @@ -812,7 +822,7 @@ You can set the path to your own template here, if you so desire. If not set, the included template will be used. [id="plugins-{type}s-{plugin}-template_name"] -===== `template_name` +===== `template_name` * Value type is <> * Default value is `"logstash"` @@ -826,7 +836,7 @@ change this, you will need to prune the old template manually, e.g. where `OldTemplateName` is whatever the former setting was. [id="plugins-{type}s-{plugin}-template_overwrite"] -===== `template_overwrite` +===== `template_overwrite` * Value type is <> * Default value is `false` @@ -843,7 +853,7 @@ template (logstash), setting this to true will make Logstash to overwrite the "logstash" template (i.e. removing all customized settings) [id="plugins-{type}s-{plugin}-timeout"] -===== `timeout` +===== `timeout` * Value type is <> * Default value is `60` @@ -852,7 +862,7 @@ Set the timeout, in seconds, for network operations and requests sent Elasticsea a timeout occurs, the request will be retried. [id="plugins-{type}s-{plugin}-truststore"] -===== `truststore` +===== `truststore` * Value type is <> * There is no default value for this setting. @@ -862,7 +872,7 @@ It can be either .jks or .p12. Use either `:truststore` or `:cacert`. [id="plugins-{type}s-{plugin}-truststore_password"] -===== `truststore_password` +===== `truststore_password` * Value type is <> * There is no default value for this setting. @@ -870,7 +880,7 @@ Use either `:truststore` or `:cacert`. Set the truststore password [id="plugins-{type}s-{plugin}-upsert"] -===== `upsert` +===== `upsert` * Value type is <> * Default value is `""` @@ -879,7 +889,7 @@ Set upsert content for update mode. Create a new document with this parameter as json string if `document_id` doesn't exists [id="plugins-{type}s-{plugin}-user"] -===== `user` +===== `user` * Value type is <> * There is no default value for this setting. @@ -887,7 +897,7 @@ Create a new document with this parameter as json string if `document_id` doesn' Username to authenticate to a secure Elasticsearch cluster [id="plugins-{type}s-{plugin}-validate_after_inactivity"] -===== `validate_after_inactivity` +===== `validate_after_inactivity` * Value type is <> * Default value is `10000` @@ -902,7 +912,7 @@ have become stale (half-closed) while kept inactive in the pool.' See https://hc.apache.org/httpcomponents-client-ga/httpclient/apidocs/org/apache/http/impl/conn/PoolingHttpClientConnectionManager.html#setValidateAfterInactivity(int)[these docs for more info] [id="plugins-{type}s-{plugin}-version"] -===== `version` +===== `version` * Value type is <> * There is no default value for this setting. @@ -911,7 +921,7 @@ The version to use for indexing. Use sprintf syntax like `%{my_version}` to use See https://www.elastic.co/blog/elasticsearch-versioning-support. [id="plugins-{type}s-{plugin}-version_type"] -===== `version_type` +===== `version_type` * Value can be any of: `internal`, `external`, `external_gt`, `external_gte`, `force` * There is no default value for this setting. diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch.rb index 5639bad3a..84297d69c 100644 --- a/lib/logstash/outputs/elasticsearch.rb +++ b/lib/logstash/outputs/elasticsearch.rb @@ -75,12 +75,12 @@ # # ==== HTTP Compression # -# This plugin supports request and response compression. Response compression is enabled by default and -# for Elasticsearch versions 5.0 and later, the user doesn't have to set any configs in Elasticsearch for -# it to send back compressed response. For versions before 5.0, `http.compression` must be set to `true` in +# This plugin supports request and response compression. Response compression is enabled by default and +# for Elasticsearch versions 5.0 and later, the user doesn't have to set any configs in Elasticsearch for +# it to send back compressed response. For versions before 5.0, `http.compression` must be set to `true` in # Elasticsearch[https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-http.html#modules-http] to take advantage of response compression when using this plugin # -# For requests compression, regardless of the Elasticsearch version, users have to enable `http_compression` +# For requests compression, regardless of the Elasticsearch version, users have to enable `http_compression` # setting in their Logstash config file. # class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base @@ -103,6 +103,8 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base config_name "elasticsearch" + DEFAULT_BATCH_SIZE = 20 * 1024 * 1024 # 20MiB + # The Elasticsearch action to perform. Valid actions are: # # - index: indexes a document (an event from Logstash). @@ -242,6 +244,19 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base # Custom Headers to send on each request to elasticsearch nodes config :custom_headers, :validate => :hash, :default => {} + # Bulk batch size is used to determine at what point to send the bulk requests. + # The criteria used for default value is: + # 1. We need a number that's less than 100MiB because ES + # won't accept bulks larger than that. + # 2. It must be large enough to amortize the connection constant + # across multiple requests. + # 3. It must be small enough that even if multiple threads hit this size + # we won't use a lot of heap. + # + # We wound up agreeing that a number greater than 10 MiB and less than 100MiB + # made sense. We picked one on the lowish side to not use too much heap. + config :bulk_batch_size, :validate => :number, :default => DEFAULT_BATCH_SIZE + # @override to handle proxy => '' as if none was set def config_init(params) proxy = params['proxy'] diff --git a/lib/logstash/outputs/elasticsearch/common_configs.rb b/lib/logstash/outputs/elasticsearch/common_configs.rb index b686f774c..d7f4f94ca 100644 --- a/lib/logstash/outputs/elasticsearch/common_configs.rb +++ b/lib/logstash/outputs/elasticsearch/common_configs.rb @@ -19,8 +19,8 @@ def self.included(mod) # Joda formats are defined http://www.joda.org/joda-time/apidocs/org/joda/time/format/DateTimeFormat.html[here]. mod.config :index, :validate => :string, :default => DEFAULT_INDEX_NAME - mod.config :document_type, - :validate => :string, + mod.config :document_type, + :validate => :string, :deprecated => "Document types are being deprecated in Elasticsearch 6.0, and removed entirely in 7.0. You should avoid this feature" # From Logstash 1.3 onwards, a template is applied to Elasticsearch during @@ -69,7 +69,7 @@ def self.included(mod) # The version to use for indexing. Use sprintf syntax like `%{my_version}` to use a field value here. # See https://www.elastic.co/blog/elasticsearch-versioning-support. mod.config :version, :validate => :string - + # The version_type to use for indexing. # See https://www.elastic.co/blog/elasticsearch-versioning-support. # See also https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#_version_types @@ -145,7 +145,6 @@ def self.included(mod) # here like `pipeline => "%{INGEST_PIPELINE}"` mod.config :pipeline, :validate => :string, :default => nil - # ----- # ILM configurations (beta) # ----- diff --git a/lib/logstash/outputs/elasticsearch/http_client.rb b/lib/logstash/outputs/elasticsearch/http_client.rb index 32a37e82a..1a1a34612 100644 --- a/lib/logstash/outputs/elasticsearch/http_client.rb +++ b/lib/logstash/outputs/elasticsearch/http_client.rb @@ -8,21 +8,6 @@ require 'stringio' module LogStash; module Outputs; class ElasticSearch; - # This is a constant instead of a config option because - # there really isn't a good reason to configure it. - # - # The criteria used are: - # 1. We need a number that's less than 100MiB because ES - # won't accept bulks larger than that. - # 2. It must be large enough to amortize the connection constant - # across multiple requests. - # 3. It must be small enough that even if multiple threads hit this size - # we won't use a lot of heap. - # - # We wound up agreeing that a number greater than 10 MiB and less than 100MiB - # made sense. We picked one on the lowish side to not use too much heap. - TARGET_BULK_BYTES = 20 * 1024 * 1024 # 20MiB - class HttpClient attr_reader :client, :options, :logger, :pool, :action_count, :recv_count # This is here in case we use DEFAULT_OPTIONS in the future @@ -52,6 +37,7 @@ class HttpClient def initialize(options={}) @logger = options[:logger] @metric = options[:metric] + @bulk_batch_size = options[:bulk_batch_size] @bulk_request_metrics = @metric.namespace(:bulk_requests) @bulk_response_metrics = @bulk_request_metrics.namespace(:responses) @@ -110,7 +96,7 @@ def bulk(actions) if http_compression body_stream.set_encoding "BINARY" stream_writer = Zlib::GzipWriter.new(body_stream, Zlib::DEFAULT_COMPRESSION, Zlib::DEFAULT_STRATEGY) - else + else stream_writer = body_stream end bulk_responses = [] @@ -119,7 +105,7 @@ def bulk(actions) action.map {|line| LogStash::Json.dump(line)}.join("\n") : LogStash::Json.dump(action) as_json << "\n" - if (body_stream.size + as_json.bytesize) > TARGET_BULK_BYTES + if (body_stream.size + as_json.bytesize) > @bulk_batch_size bulk_responses << bulk_send(body_stream) unless body_stream.size == 0 end stream_writer.write(as_json) @@ -215,7 +201,7 @@ def scheme else nil end - + calculated_scheme = calculate_property(uris, :scheme, explicit_scheme, sniffing) if calculated_scheme && calculated_scheme !~ /https?/ @@ -235,7 +221,7 @@ def port # Enter things like foo:123, bar and wind up with foo:123, bar:9200 calculate_property(uris, :port, nil, sniffing) || 9200 end - + def uris @options[:hosts] end @@ -254,7 +240,7 @@ def http_compression def build_adapter(options) timeout = options[:timeout] || 0 - + adapter_options = { :socket_timeout => timeout, :request_timeout => timeout, @@ -281,7 +267,7 @@ def build_adapter(options) adapter_class = ::LogStash::Outputs::ElasticSearch::HttpClient::ManticoreAdapter adapter = adapter_class.new(@logger, adapter_options) end - + def build_pool(options) adapter = build_adapter(options) @@ -331,7 +317,7 @@ def host_to_url(h) h.query end prefixed_raw_query = raw_query && !raw_query.empty? ? "?#{raw_query}" : nil - + raw_url = "#{raw_scheme}://#{postfixed_userinfo}#{raw_host}:#{raw_port}#{prefixed_raw_path}#{prefixed_raw_query}" ::LogStash::Util::SafeURI.new(raw_url) diff --git a/lib/logstash/outputs/elasticsearch/http_client_builder.rb b/lib/logstash/outputs/elasticsearch/http_client_builder.rb index fd8827f71..d2e90af84 100644 --- a/lib/logstash/outputs/elasticsearch/http_client_builder.rb +++ b/lib/logstash/outputs/elasticsearch/http_client_builder.rb @@ -11,13 +11,14 @@ def self.build(logger, hosts, params) :http_compression => params["http_compression"], :headers => params["custom_headers"] || {} } - + client_settings[:proxy] = params["proxy"] if params["proxy"] - + common_options = { :client_settings => client_settings, :metric => params["metric"], - :resurrect_delay => params["resurrect_delay"] + :resurrect_delay => params["resurrect_delay"], + :bulk_batch_size => params["bulk_batch_size"] } if params["sniffing"] @@ -65,7 +66,7 @@ def self.build(logger, hosts, params) LogStash::ConfigurationError, "External versioning requires the presence of a version number." ) if external_version_types.include?(params.fetch('version_type', '')) and params.fetch("version", nil) == nil - + # Create API setup raise( @@ -144,7 +145,7 @@ def self.setup_ssl(logger, params) def self.setup_basic_auth(logger, params) user, password = params["user"], params["password"] - + return {} unless user && password && password.value { diff --git a/spec/integration/outputs/index_spec.rb b/spec/integration/outputs/index_spec.rb index 710ba653e..2b5e07a4e 100644 --- a/spec/integration/outputs/index_spec.rb +++ b/spec/integration/outputs/index_spec.rb @@ -1,8 +1,8 @@ require_relative "../../../spec/es_spec_helper" require "logstash/outputs/elasticsearch" -describe "TARGET_BULK_BYTES", :integration => true do - let(:target_bulk_bytes) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES } +describe "BATCH_BULK_SIZE", :integration => true do + let(:batch_bulk_size) { LogStash::Outputs::ElasticSearch::DEFAULT_BATCH_SIZE } let(:event_count) { 1000 } let(:events) { event_count.times.map { event }.to_a } let(:config) { @@ -23,11 +23,11 @@ end describe "batches that are too large for one" do - let(:event) { LogStash::Event.new("message" => "a " * (((target_bulk_bytes/2) / event_count)+1)) } + let(:event) { LogStash::Event.new("message" => "a " * (((batch_bulk_size/2) / event_count)+1)) } it "should send in two batches" do expect(subject.client).to have_received(:bulk_send).twice do |payload| - expect(payload.size).to be <= target_bulk_bytes + expect(payload.size).to be <= batch_bulk_size end end @@ -38,7 +38,40 @@ it "should send in one batch" do expect(subject.client).to have_received(:bulk_send).once do |payload| - expect(payload.size).to be <= target_bulk_bytes + expect(payload.size).to be <= batch_bulk_size + end + end + end + end + + describe "custom bulk size set" do + let(:batch_bulk_size) { 5 * 1024 * 1024 } + let(:config) { + { + "hosts" => get_host_port, + "index" => index, + "bulk_batch_size" => batch_bulk_size + } + } + + describe "batches that are too large for one" do + let(:event) { LogStash::Event.new("message" => "a " * (((batch_bulk_size/2) / event_count)+1)) } + + it "should send in two batches" do + expect(subject.client).to have_received(:bulk_send).twice do |payload| + expect(payload.size).to be <= batch_bulk_size + end + end + + describe "batches that fit in one" do + # Normally you'd want to generate a request that's just 1 byte below the limit, but it's + # impossible to know how many bytes an event will serialize as with bulk proto overhead + let(:event) { LogStash::Event.new("message" => "a") } + + it "should send in one batch" do + expect(subject.client).to have_received(:bulk_send).once do |payload| + expect(payload.size).to be <= batch_bulk_size + end end end end @@ -53,7 +86,7 @@ let(:config) { "not implemented" } let(:events) { event_count.times.map { event }.to_a } subject { LogStash::Outputs::ElasticSearch.new(config) } - + let(:es_url) { "http://#{get_host_port}" } let(:index_url) {"#{es_url}/#{index}"} let(:http_client_options) { {} } @@ -65,7 +98,7 @@ subject.register subject.multi_receive([]) end - + shared_examples "an indexer" do |secure| it "ships events" do subject.multi_receive(events) @@ -85,13 +118,13 @@ expect(doc["_index"]).to eq(index) end end - + it "sets the correct content-type header" do expected_manticore_opts = {:headers => {"Content-Type" => "application/json"}, :body => anything} if secure expected_manticore_opts = { - :headers => {"Content-Type" => "application/json"}, - :body => anything, + :headers => {"Content-Type" => "application/json"}, + :body => anything, :auth => { :user => user, :password => password, @@ -146,7 +179,7 @@ :auth => { :user => user, :password => password - }, + }, :ssl => { :enabled => true, :ca_file => cacert @@ -154,14 +187,14 @@ } end it_behaves_like("an indexer", true) - + describe "with a password requiring escaping" do let(:user) { "f@ncyuser" } let(:password) { "ab%12#" } - + include_examples("an indexer", true) end - + describe "with a user/password requiring escaping in the URL" do let(:config) do { @@ -171,7 +204,7 @@ "index" => index } end - + include_examples("an indexer", true) end end diff --git a/spec/unit/outputs/elasticsearch/http_client_spec.rb b/spec/unit/outputs/elasticsearch/http_client_spec.rb index efb7ca7f7..d94305397 100644 --- a/spec/unit/outputs/elasticsearch/http_client_spec.rb +++ b/spec/unit/outputs/elasticsearch/http_client_spec.rb @@ -8,6 +8,7 @@ opts = { :hosts => [::LogStash::Util::SafeURI.new("127.0.0.1")], :logger => Cabin::Channel.get, + :bulk_batch_size => LogStash::Outputs::ElasticSearch::DEFAULT_BATCH_SIZE, :metric => ::LogStash::Instrument::NullMetric.new(:dummy).namespace(:alsodummy) } @@ -30,7 +31,7 @@ let(:http_hostname_port) { ::LogStash::Util::SafeURI.new("http://#{hostname_port}") } let(:https_hostname_port) { ::LogStash::Util::SafeURI.new("https://#{hostname_port}") } let(:http_hostname_port_path) { ::LogStash::Util::SafeURI.new("http://#{hostname_port}/path") } - + shared_examples("proper host handling") do it "should properly transform a host:port string to a URL" do expect(subject.host_to_url(hostname_port_uri).to_s).to eq(http_hostname_port.to_s + "/") @@ -59,7 +60,7 @@ context "when SSL is false" do let(:ssl) { false } let(:base_options) { super.merge(:hosts => [https_hostname_port]) } - + it "should refuse to handle an https url" do expect { subject.host_to_url(https_hostname_port) @@ -73,13 +74,13 @@ subject expect(subject.host_to_url(https_hostname_port).to_s).to eq(https_hostname_port.to_s + "/") end - end + end end describe "path" do let(:url) { http_hostname_port_path } let(:base_options) { super.merge(:hosts => [url]) } - + it "should allow paths in a url" do expect(subject.host_to_url(url)).to eq(url) end @@ -93,12 +94,12 @@ }.to raise_error(LogStash::ConfigurationError) end end - + context "with a path missing a leading /" do let(:url) { http_hostname_port } let(:base_options) { super.merge(:client_settings => {:path => "otherpath"}) } - - + + it "should automatically insert a / in front of path overlays" do expected = url.clone expected.path = url.path + "/otherpath" @@ -190,14 +191,29 @@ ["index", {:_id=>nil, :_index=>"logstash"}, {"message"=> message}], ]} - context "if a message is over TARGET_BULK_BYTES" do - let(:target_bulk_bytes) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES } - let(:message) { "a" * (target_bulk_bytes + 1) } + context "if a message is over DEFAULT_BATCH_SIZE" do + let(:default_batch_size) { LogStash::Outputs::ElasticSearch::DEFAULT_BATCH_SIZE } + let(:message) { "a" * (default_batch_size + 1) } + + it "should be handled properly" do + allow(subject).to receive(:join_bulk_responses) + expect(subject).to receive(:bulk_send).once do |data| + expect(data.size).to be > default_batch_size + end + s = subject.send(:bulk, actions) + end + end + + context "if a message is over customized BATCH_SIZE" do + let(:batch_size) { 5 * 1024 * 1024 } + let(:base_options) { super.merge(:bulk_batch_size => batch_size) } + + let(:message) { "a" * (batch_size + 1) } it "should be handled properly" do allow(subject).to receive(:join_bulk_responses) expect(subject).to receive(:bulk_send).once do |data| - expect(data.size).to be > target_bulk_bytes + expect(data.size).to be > batch_size end s = subject.send(:bulk, actions) end @@ -216,9 +232,9 @@ s = subject.send(:bulk, actions) end - context "if one exceeds TARGET_BULK_BYTES" do - let(:target_bulk_bytes) { LogStash::Outputs::ElasticSearch::TARGET_BULK_BYTES } - let(:message1) { "a" * (target_bulk_bytes + 1) } + context "if one exceeds BULK_BATCH_SIZE" do + let(:default_batch_size) { LogStash::Outputs::ElasticSearch::DEFAULT_BATCH_SIZE } + let(:message1) { "a" * (default_batch_size + 1) } it "executes two bulk_send operations" do allow(subject).to receive(:join_bulk_responses) expect(subject).to receive(:bulk_send).twice