Skip to content

Commit

Permalink
Properly handle 413 Payload Too Large errors (#1199)
Browse files Browse the repository at this point in the history
* Properly handle `413` Payload Too Large errors

Previously when Elasticsearch responds with a 413 (Payload Too Large) status,
the manticore adapter raises an error before the response can be processed
by the bulk_send error handling. This commit refactors the way
`BadErrorResponse` codes are handled. Previously we had logic in the manticore
adaptor which special cased raising errors on some codes. This commit refactors
such that the adaptor raises on any error status and the caller is now
responsible for special case handling the code.

* 12.0.2 release prep

* Use `error_code` instead of `code` when handling BadResponseCodeError

Previously a few bugs spotted in code review were being obfuscated by the
combinations of tests not running in CI and the incorrect method for retrieving
a code from a BadResponseCodeError. This commit updates the method names and
addresses the feedback from code review.

---------

Co-authored-by: João Duarte <jsvduarte@gmail.com>
  • Loading branch information
donoghuc and jsvd authored Jan 23, 2025
1 parent ec27add commit 3ef3c0c
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 45 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 12.0.2
- Properly handle http code 413 (Payload Too Large) [#1199](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1199)

## 12.0.1
- Remove irrelevant log warning about elastic stack version [#1200](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1200)

Expand Down
17 changes: 16 additions & 1 deletion docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,22 @@ This plugin uses the Elasticsearch bulk API to optimize its imports into Elastic
either partial or total failures. The bulk API sends batches of requests to an HTTP endpoint. Error codes for the HTTP
request are handled differently than error codes for individual documents.

HTTP requests to the bulk API are expected to return a 200 response code. All other response codes are retried indefinitely.

HTTP requests to the bulk API are expected to return a 200 response code. All other response codes are retried indefinitely,
including 413 (Payload Too Large) responses.

If you want to handle large payloads differently, you can configure 413 responses to go to the Dead Letter Queue instead:

[source,ruby]
-----
output {
elasticsearch {
hosts => ["localhost:9200"]
dlq_custom_codes => [413] # Send 413 errors to DLQ instead of retrying
}
-----

This will capture oversized payloads in the DLQ for analysis rather than retrying them.

The following document errors are handled as follows:

Expand Down
49 changes: 25 additions & 24 deletions lib/logstash/outputs/elasticsearch/http_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -182,22 +182,20 @@ def join_bulk_responses(bulk_responses)
def bulk_send(body_stream, batch_actions)
params = compression_level? ? {:headers => {"Content-Encoding" => "gzip"}} : {}

response = @pool.post(@bulk_path, params, body_stream.string)

@bulk_response_metrics.increment(response.code.to_s)

case response.code
when 200 # OK
LogStash::Json.load(response.body)
when 413 # Payload Too Large
begin
response = @pool.post(@bulk_path, params, body_stream.string)
@bulk_response_metrics.increment(response.code.to_s)
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
@bulk_response_metrics.increment(e.response_code.to_s)
raise e unless e.response_code == 413
# special handling for 413, treat it as a document level issue
logger.warn("Bulk request rejected: `413 Payload Too Large`", :action_count => batch_actions.size, :content_length => body_stream.size)
emulate_batch_error_response(batch_actions, response.code, 'payload_too_large')
else
url = ::LogStash::Util::SafeURI.new(response.final_url)
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(
response.code, url, body_stream.to_s, response.body
)
return emulate_batch_error_response(batch_actions, 413, 'payload_too_large')
rescue => e # it may be a network issue instead, re-raise
raise e
end

LogStash::Json.load(response.body)
end

def emulate_batch_error_response(actions, http_code, reason)
Expand Down Expand Up @@ -411,6 +409,9 @@ def host_to_url(h)
def exists?(path, use_get=false)
response = use_get ? @pool.get(path) : @pool.head(path)
response.code >= 200 && response.code <= 299
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
return false if e.response_code == 404
raise e
end

def template_exists?(template_endpoint, name)
Expand All @@ -421,6 +422,8 @@ def template_put(template_endpoint, name, template)
path = "#{template_endpoint}/#{name}"
logger.info("Installing Elasticsearch template", name: name)
@pool.put(path, nil, LogStash::Json.dump(template))
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
raise e unless e.response_code == 404
end

# ILM methods
Expand All @@ -432,17 +435,15 @@ def rollover_alias_exists?(name)

# Create a new rollover alias
def rollover_alias_put(alias_name, alias_definition)
begin
@pool.put(CGI::escape(alias_name), nil, LogStash::Json.dump(alias_definition))
logger.info("Created rollover alias", name: alias_name)
# If the rollover alias already exists, ignore the error that comes back from Elasticsearch
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
if e.response_code == 400
logger.info("Rollover alias already exists, skipping", name: alias_name)
return
end
raise e
@pool.put(CGI::escape(alias_name), nil, LogStash::Json.dump(alias_definition))
logger.info("Created rollover alias", name: alias_name)
# If the rollover alias already exists, ignore the error that comes back from Elasticsearch
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
if e.response_code == 400
logger.info("Rollover alias already exists, skipping", name: alias_name)
return
end
raise e
end

def get_xpack_info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,8 @@ def perform_request(url, method, path, params={}, body=nil)
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError.new(e, request_uri_as_string)
end

# 404s are excluded because they are valid codes in the case of
# template installation. We might need a better story around this later
# but for our current purposes this is correct
code = resp.code
if code < 200 || code > 299 && code != 404
if code < 200 || code > 299 # assume anything not 2xx is an error that the layer above needs to interpret
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(code, request_uri, body, resp.body)
end

Expand Down
24 changes: 10 additions & 14 deletions lib/logstash/outputs/elasticsearch/http_client/pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -252,13 +252,11 @@ def get_license(url)
def health_check_request(url)
logger.debug("Running health check to see if an Elasticsearch connection is working",
:healthcheck_url => url.sanitized.to_s, :path => @healthcheck_path)
begin
response = perform_request_to_url(url, :head, @healthcheck_path)
return response, nil
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
logger.warn("Health check failed", code: e.response_code, url: e.url, message: e.message)
return nil, e
end
response = perform_request_to_url(url, :head, @healthcheck_path)
return response, nil
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
logger.warn("Health check failed", code: e.response_code, url: e.url, message: e.message)
return nil, e
end

def healthcheck!(register_phase = true)
Expand Down Expand Up @@ -311,13 +309,11 @@ def healthcheck!(register_phase = true)
end

def get_root_path(url, params={})
begin
resp = perform_request_to_url(url, :get, ROOT_URI_PATH, params)
return resp, nil
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
logger.warn("Elasticsearch main endpoint returns #{e.response_code}", message: e.message, body: e.response_body)
return nil, e
end
resp = perform_request_to_url(url, :get, ROOT_URI_PATH, params)
return resp, nil
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
logger.warn("Elasticsearch main endpoint returns #{e.response_code}", message: e.message, body: e.response_body)
return nil, e
end

def test_serverless_connection(url, root_response)
Expand Down
2 changes: 1 addition & 1 deletion logstash-output-elasticsearch.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-elasticsearch'
s.version = '12.0.1'
s.version = '12.0.2'
s.licenses = ['apache-2.0']
s.summary = "Stores logs in Elasticsearch"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
7 changes: 6 additions & 1 deletion spec/unit/outputs/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -908,7 +908,12 @@
allow(elasticsearch_output_instance.client.pool).to receive(:post) do |path, params, body|
if body.length > max_bytes
max_bytes *= 2 # ensure a successful retry
double("Response", :code => 413, :body => "")
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(
413,
"test-url",
body,
""
)
else
double("Response", :code => 200, :body => '{"errors":false,"items":[{"index":{"status":200,"result":"created"}}]}')
end
Expand Down

0 comments on commit 3ef3c0c

Please sign in to comment.