Skip to content

Commit

Permalink
Don't add error log when it is ES version conflicts (cadence-workflow…
Browse files Browse the repository at this point in the history
  • Loading branch information
neil-xie authored Feb 5, 2025
1 parent 8a1058c commit f27fae8
Showing 1 changed file with 9 additions and 10 deletions.
19 changes: 9 additions & 10 deletions service/worker/indexer/esProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,6 @@ func (p *ESProcessorImpl) bulkAfterAction(id int64, requests []bulk.GenericBulka
continue
}
wid, rid, domainID := p.getMsgWithInfo(key)
p.logger.Error("ES request failed and is not retryable",
tag.ESResponseStatus(err.Status),
tag.ESRequest(request.String()),
tag.WorkflowID(wid),
tag.WorkflowRunID(rid),
tag.WorkflowDomainID(domainID))

// check if it is a delete request and status code
// 404 means the document does not exist
Expand All @@ -162,6 +156,7 @@ func (p *ESProcessorImpl) bulkAfterAction(id int64, requests []bulk.GenericBulka
tag.WorkflowRunID(rid),
tag.WorkflowDomainID(domainID))
p.ackKafkaMsg(key)
continue
} else if err.Status == 404 {
req, err := request.Source()
if err == nil && p.isDeleteRequest(req) {
Expand All @@ -171,15 +166,19 @@ func (p *ESProcessorImpl) bulkAfterAction(id int64, requests []bulk.GenericBulka
tag.WorkflowRunID(rid),
tag.WorkflowDomainID(domainID))
p.ackKafkaMsg(key)
continue
} else {
p.logger.Error("Get request source err.", tag.Error(err), tag.ESRequest(request.String()))
p.scope.IncCounter(metrics.ESProcessorCorruptedData)
p.nackKafkaMsg(key)
}
} else {
// For all other non-retryable errors, nack the message
p.nackKafkaMsg(key)
}
p.logger.Error("ES request failed and is not retryable",
tag.ESResponseStatus(err.Status),
tag.ESRequest(request.String()),
tag.WorkflowID(wid),
tag.WorkflowRunID(rid),
tag.WorkflowDomainID(domainID))
p.nackKafkaMsg(key)
}
p.scope.IncCounter(metrics.ESProcessorFailures)
}
Expand Down

0 comments on commit f27fae8

Please sign in to comment.