Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Properly handle 413 Payload Too Large errors #1199

Merged
merged 3 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 12.0.2
- Properly handle http code 413 (Payload Too Large) [#1199](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1199)

## 12.0.1
- Remove irrelevant log warning about elastic stack version [#1200](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1200)

Expand Down
17 changes: 16 additions & 1 deletion docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,22 @@ This plugin uses the Elasticsearch bulk API to optimize its imports into Elastic
either partial or total failures. The bulk API sends batches of requests to an HTTP endpoint. Error codes for the HTTP
request are handled differently than error codes for individual documents.

HTTP requests to the bulk API are expected to return a 200 response code. All other response codes are retried indefinitely.

HTTP requests to the bulk API are expected to return a 200 response code. All other response codes are retried indefinitely,
including 413 (Payload Too Large) responses.

If you want to handle large payloads differently, you can configure 413 responses to go to the Dead Letter Queue instead:

[source,ruby]
-----
output {
elasticsearch {
hosts => ["localhost:9200"]
dlq_custom_codes => [413] # Send 413 errors to DLQ instead of retrying
}
-----

This will capture oversized payloads in the DLQ for analysis rather than retrying them.

The following document errors are handled as follows:

Expand Down
52 changes: 27 additions & 25 deletions lib/logstash/outputs/elasticsearch/http_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -182,22 +182,20 @@ def join_bulk_responses(bulk_responses)
def bulk_send(body_stream, batch_actions)
params = compression_level? ? {:headers => {"Content-Encoding" => "gzip"}} : {}

response = @pool.post(@bulk_path, params, body_stream.string)

@bulk_response_metrics.increment(response.code.to_s)

case response.code
when 200 # OK
LogStash::Json.load(response.body)
when 413 # Payload Too Large
begin
response = @pool.post(@bulk_path, params, body_stream.string)
@bulk_response_metrics.increment(response.code.to_s)
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
@bulk_response_metrics.increment(e.response_code.to_s)
raise e unless e.response_code == 413
# special handling for 413, treat it as a document level issue
logger.warn("Bulk request rejected: `413 Payload Too Large`", :action_count => batch_actions.size, :content_length => body_stream.size)
emulate_batch_error_response(batch_actions, response.code, 'payload_too_large')
else
url = ::LogStash::Util::SafeURI.new(response.final_url)
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(
response.code, url, body_stream.to_s, response.body
)
return emulate_batch_error_response(batch_actions, 413, 'payload_too_large')
rescue => e # it may be a network issue instead, re-raise
raise e
end

LogStash::Json.load(response.body)
end

def emulate_batch_error_response(actions, http_code, reason)
Expand Down Expand Up @@ -411,6 +409,9 @@ def host_to_url(h)
def exists?(path, use_get=false)
response = use_get ? @pool.get(path) : @pool.head(path)
response.code >= 200 && response.code <= 299
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
return true if e.code == 404
raise e
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be false if code is 404, right? interesting that the tests didn't catch this.

Suggested change
response = use_get ? @pool.get(path) : @pool.head(path)
response.code >= 200 && response.code <= 299
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
return true if e.code == 404
raise e
use_get ? @pool.get(path) : @pool.head(path)
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
return false if e.code == 404
raise e
else
true

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was confused on that! I had copied that from https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1203/files#diff-4de2e59688ce60550472a447d9534d75298d14b845b5c77293f5a255359efac1R413

Previous behavior was on 404 nothing was raised and exists? returned false. Your suggestion preserves this behavior. I'll look in to where we have a test hole here.

end

def template_exists?(template_endpoint, name)
Expand All @@ -420,7 +421,10 @@ def template_exists?(template_endpoint, name)
def template_put(template_endpoint, name, template)
path = "#{template_endpoint}/#{name}"
logger.info("Installing Elasticsearch template", name: name)
@pool.put(path, nil, LogStash::Json.dump(template))
response = @pool.put(path, nil, LogStash::Json.dump(template))
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
return response if e.code == 404
raise e
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're not doing anything with the return value, and we can't guarantee response will be defined in the rescue block:

Suggested change
response = @pool.put(path, nil, LogStash::Json.dump(template))
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
return response if e.code == 404
raise e
@pool.put(path, nil, LogStash::Json.dump(template))
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
raise e if e.code != 404

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally, i had misunderstood something earlier and added that by mistake. Updated in 93eddbc

end

# ILM methods
Expand All @@ -432,17 +436,15 @@ def rollover_alias_exists?(name)

# Create a new rollover alias
def rollover_alias_put(alias_name, alias_definition)
begin
@pool.put(CGI::escape(alias_name), nil, LogStash::Json.dump(alias_definition))
logger.info("Created rollover alias", name: alias_name)
# If the rollover alias already exists, ignore the error that comes back from Elasticsearch
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
if e.response_code == 400
logger.info("Rollover alias already exists, skipping", name: alias_name)
return
end
raise e
@pool.put(CGI::escape(alias_name), nil, LogStash::Json.dump(alias_definition))
logger.info("Created rollover alias", name: alias_name)
# If the rollover alias already exists, ignore the error that comes back from Elasticsearch
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
if e.response_code == 400
logger.info("Rollover alias already exists, skipping", name: alias_name)
return
end
raise e
end

def get_xpack_info
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,8 @@ def perform_request(url, method, path, params={}, body=nil)
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError.new(e, request_uri_as_string)
end

# 404s are excluded because they are valid codes in the case of
# template installation. We might need a better story around this later
# but for our current purposes this is correct
code = resp.code
if code < 200 || code > 299 && code != 404
if code < 200 || code > 299 # assume anything not 2xx is an error that the layer above needs to interpret
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(code, request_uri, body, resp.body)
end

Expand Down
24 changes: 10 additions & 14 deletions lib/logstash/outputs/elasticsearch/http_client/pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -253,13 +253,11 @@ def get_license(url)
def health_check_request(url)
logger.debug("Running health check to see if an Elasticsearch connection is working",
:healthcheck_url => url.sanitized.to_s, :path => @healthcheck_path)
begin
response = perform_request_to_url(url, :head, @healthcheck_path)
return response, nil
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
logger.warn("Health check failed", code: e.response_code, url: e.url, message: e.message)
return nil, e
end
response = perform_request_to_url(url, :head, @healthcheck_path)
return response, nil
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
logger.warn("Health check failed", code: e.response_code, url: e.url, message: e.message)
return nil, e
end

def healthcheck!(register_phase = true)
Expand Down Expand Up @@ -312,13 +310,11 @@ def healthcheck!(register_phase = true)
end

def get_root_path(url, params={})
begin
resp = perform_request_to_url(url, :get, ROOT_URI_PATH, params)
return resp, nil
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
logger.warn("Elasticsearch main endpoint returns #{e.response_code}", message: e.message, body: e.response_body)
return nil, e
end
resp = perform_request_to_url(url, :get, ROOT_URI_PATH, params)
return resp, nil
rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
logger.warn("Elasticsearch main endpoint returns #{e.response_code}", message: e.message, body: e.response_body)
return nil, e
end

def test_serverless_connection(url, root_response)
Expand Down
2 changes: 1 addition & 1 deletion logstash-output-elasticsearch.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-elasticsearch'
s.version = '12.0.1'
s.version = '12.0.2'
s.licenses = ['apache-2.0']
s.summary = "Stores logs in Elasticsearch"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
7 changes: 6 additions & 1 deletion spec/unit/outputs/elasticsearch_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -915,7 +915,12 @@
allow(elasticsearch_output_instance.client.pool).to receive(:post) do |path, params, body|
if body.length > max_bytes
max_bytes *= 2 # ensure a successful retry
double("Response", :code => 413, :body => "")
raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(
413,
"test-url",
body,
""
)
else
double("Response", :code => 200, :body => '{"errors":false,"items":[{"index":{"status":200,"result":"created"}}]}')
end
Expand Down
Loading