Skip to content

Commit

Permalink
Add error handling strategy to pull-based ingestion
Browse files Browse the repository at this point in the history
Signed-off-by: Varun Bharadwaj <varunbharadwaj1995@gmail.com>
  • Loading branch information
varunbharadwaj committed Feb 22, 2025
1 parent 8447737 commit dc4722f
Show file tree
Hide file tree
Showing 14 changed files with 376 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ public KafkaOffset nextPointer() {
return new KafkaOffset(lastFetchedOffset + 1);
}

@Override
public KafkaOffset nextPointer(KafkaOffset pointer) {
return new KafkaOffset(pointer.getOffset() + 1);
}

@Override
public IngestionShardPointer earliestPointer() {
long startOffset = AccessController.doPrivileged(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.opensearch.index.IndexModule;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.indices.pollingingest.IngestionErrorStrategy;
import org.opensearch.indices.pollingingest.StreamPoller;
import org.opensearch.indices.replication.SegmentReplicationSource;
import org.opensearch.indices.replication.common.ReplicationType;
Expand Down Expand Up @@ -731,6 +732,30 @@ public void validate(final String value, final Map<Setting<?>, Object> settings)
Property.Dynamic
);

public static final String SETTING_INGESTION_SOURCE_ERROR_STRATEGY = "index.ingestion_source.error.strategy";
public static final Setting<String> INGESTION_SOURCE_ERROR_STRATEGY_SETTING = Setting.simpleString(
SETTING_INGESTION_SOURCE_ERROR_STRATEGY,
IngestionErrorStrategy.ErrorStrategy.DROP.name(),
new Setting.Validator<>() {

@Override
public void validate(final String value) {
try {
IngestionErrorStrategy.ErrorStrategy.valueOf(value.toUpperCase(Locale.ROOT));
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException("Invalid value for " + SETTING_INGESTION_SOURCE_ERROR_STRATEGY + " [" + value + "]");
}
}

@Override
public void validate(final String value, final Map<Setting<?>, Object> settings) {
validate(value);
}
},
Property.IndexScope,
Property.Dynamic
);

public static final Setting.AffixSetting<Object> INGESTION_SOURCE_PARAMS_SETTING = Setting.prefixKeySetting(
"index.ingestion_source.param.",
key -> new Setting<>(key, "", (value) -> {
Expand Down Expand Up @@ -955,8 +980,9 @@ public IngestionSource getIngestionSource() {
final String ingestionSourceType = INGESTION_SOURCE_TYPE_SETTING.get(settings);
if (ingestionSourceType != null && !(NONE_INGESTION_SOURCE_TYPE.equals(ingestionSourceType))) {
final String pointerInitReset = INGESTION_SOURCE_POINTER_INIT_RESET_SETTING.get(settings);
final String errorStrategy = INGESTION_SOURCE_ERROR_STRATEGY_SETTING.get(settings);
final Map<String, Object> ingestionSourceParams = INGESTION_SOURCE_PARAMS_SETTING.getAsMap(settings);
return new IngestionSource(ingestionSourceType, pointerInitReset, ingestionSourceParams);
return new IngestionSource(ingestionSourceType, pointerInitReset, errorStrategy, ingestionSourceParams);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@
public class IngestionSource {
private String type;
private String pointerInitReset;
private String errorStrategy;
private Map<String, Object> params;

public IngestionSource(String type, String pointerInitReset, Map<String, Object> params) {
public IngestionSource(String type, String pointerInitReset, String errorStrategy, Map<String, Object> params) {
this.type = type;
this.pointerInitReset = pointerInitReset;
this.params = params;
this.errorStrategy = errorStrategy;
}

public String getType() {
Expand All @@ -36,6 +38,10 @@ public String getPointerInitReset() {
return pointerInitReset;
}

public String getErrorStrategy() {
return errorStrategy;
}

public Map<String, Object> params() {
return params;
}
Expand All @@ -47,16 +53,29 @@ public boolean equals(Object o) {
IngestionSource ingestionSource = (IngestionSource) o;
return Objects.equals(type, ingestionSource.type)
&& Objects.equals(pointerInitReset, ingestionSource.pointerInitReset)
&& Objects.equals(errorStrategy, ingestionSource.errorStrategy)
&& Objects.equals(params, ingestionSource.params);
}

@Override
public int hashCode() {
return Objects.hash(type, pointerInitReset, params);
return Objects.hash(type, pointerInitReset, params, errorStrategy);
}

@Override
public String toString() {
return "IngestionSource{" + "type='" + type + '\'' + ",pointer_init_reset='" + pointerInitReset + '\'' + ", params=" + params + '}';
return "IngestionSource{"
+ "type='"
+ type
+ '\''
+ ",pointer_init_reset='"
+ pointerInitReset
+ '\''
+ ",error_strategy='"
+ errorStrategy
+ '\''
+ ", params="
+ params
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexMetadata.INGESTION_SOURCE_TYPE_SETTING,
IndexMetadata.INGESTION_SOURCE_POINTER_INIT_RESET_SETTING,
IndexMetadata.INGESTION_SOURCE_PARAMS_SETTING,
IndexMetadata.INGESTION_SOURCE_ERROR_STRATEGY_SETTING,

// validate that built-in similarities don't get redefined
Setting.groupSetting("index.similarity.", (s) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ public M getMessage() {
*/
T nextPointer();

/**
* @return the immediate next pointer from the provided start pointer
*/
T nextPointer(T startPointer);

/**
* @return the earliest pointer in the shard
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.opensearch.index.translog.TranslogManager;
import org.opensearch.index.translog.TranslogStats;
import org.opensearch.indices.pollingingest.DefaultStreamPoller;
import org.opensearch.indices.pollingingest.IngestionErrorStrategy;
import org.opensearch.indices.pollingingest.StreamPoller;
import org.opensearch.search.suggest.completion.CompletionStats;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -191,7 +192,19 @@ public void start() {
resetState = StreamPoller.ResetState.NONE;
}

streamPoller = new DefaultStreamPoller(startPointer, persistedPointers, ingestionShardConsumer, this, resetState);
IngestionErrorStrategy.ErrorStrategy errorStrategy = IngestionErrorStrategy.ErrorStrategy.valueOf(
ingestionSource.getErrorStrategy().toUpperCase(Locale.ROOT)
);
IngestionErrorStrategy ingestionErrorStratey = IngestionErrorStrategy.create(errorStrategy, ingestionSource.getType());

streamPoller = new DefaultStreamPoller(
startPointer,
persistedPointers,
ingestionShardConsumer,
this,
resetState,
ingestionErrorStratey
);
streamPoller.start();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.pollingingest;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
* This error handling strategy blocks on failures preventing processing of remaining updates in the ingestion source.
*/
public class BlockIngestionErrorStrategy implements IngestionErrorStrategy {
private static final Logger logger = LogManager.getLogger(BlockIngestionErrorStrategy.class);
private final String ingestionSource;

public BlockIngestionErrorStrategy(String ingestionSource) {
this.ingestionSource = ingestionSource;
}

@Override
public void handleError(Throwable e, ErrorStage stage) {
logger.error("Error processing update from {}: {}", ingestionSource, e);

// todo: record blocking update and emit metrics
}

@Override
public boolean shouldPauseIngestion(Throwable e, ErrorStage stage) {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ public class DefaultStreamPoller implements StreamPoller {
// start of the batch, inclusive
private IngestionShardPointer batchStartPointer;

// indicates the last record successfully written to the blocking queue
private IngestionShardPointer lastSuccessfulPointer;

private ResetState resetState;

private Set<IngestionShardPointer> persistedPointers;
Expand All @@ -63,19 +66,23 @@ public class DefaultStreamPoller implements StreamPoller {
@Nullable
private IngestionShardPointer maxPersistedPointer;

private IngestionErrorStrategy errorStrategy;

public DefaultStreamPoller(
IngestionShardPointer startPointer,
Set<IngestionShardPointer> persistedPointers,
IngestionShardConsumer consumer,
IngestionEngine ingestionEngine,
ResetState resetState
ResetState resetState,
IngestionErrorStrategy errorStrategy
) {
this(
startPointer,
persistedPointers,
consumer,
new MessageProcessorRunnable(new ArrayBlockingQueue<>(100), ingestionEngine),
resetState
new MessageProcessorRunnable(new ArrayBlockingQueue<>(100), ingestionEngine, errorStrategy),
resetState,
errorStrategy
);
}

Expand All @@ -84,7 +91,8 @@ public DefaultStreamPoller(
Set<IngestionShardPointer> persistedPointers,
IngestionShardConsumer consumer,
MessageProcessorRunnable processorRunnable,
ResetState resetState
ResetState resetState,
IngestionErrorStrategy errorStrategy
) {
this.consumer = Objects.requireNonNull(consumer);
this.resetState = resetState;
Expand All @@ -109,6 +117,7 @@ public DefaultStreamPoller(
String.format(Locale.ROOT, "stream-poller-processor-%d-%d", consumer.getShardId(), System.currentTimeMillis())
)
);
this.errorStrategy = errorStrategy;
}

@Override
Expand Down Expand Up @@ -188,6 +197,7 @@ protected void startPoll() {
continue;
}
blockingQueue.put(result);
lastSuccessfulPointer = result.getPointer();
logger.debug(
"Put message {} with pointer {} to the blocking queue",
String.valueOf(result.getMessage().getPayload()),
Expand All @@ -197,8 +207,18 @@ protected void startPoll() {
// update the batch start pointer to the next batch
batchStartPointer = consumer.nextPointer();
} catch (Throwable e) {
// TODO better error handling
logger.error("Error in polling the shard {}: {}", consumer.getShardId(), e);
errorStrategy.handleError(e, IngestionErrorStrategy.ErrorStage.POLLING);

if (errorStrategy.shouldPauseIngestion(e, IngestionErrorStrategy.ErrorStage.POLLING)) {
// Blocking error encountered. Pause poller to stop processing remaining updates.
pause();
} else {
// Advance the batch start pointer to ignore the error and continue from next record
batchStartPointer = lastSuccessfulPointer == null
? consumer.nextPointer(batchStartPointer)
: consumer.nextPointer(lastSuccessfulPointer);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.pollingingest;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
* This error handling strategy drops failures and proceeds with remaining updates in the ingestion source.
*/
public class DropIngestionErrorStrategy implements IngestionErrorStrategy {
private static final Logger logger = LogManager.getLogger(DropIngestionErrorStrategy.class);
private final String ingestionSource;

public DropIngestionErrorStrategy(String ingestionSource) {
this.ingestionSource = ingestionSource;
}

@Override
public void handleError(Throwable e, ErrorStage stage) {
logger.error("Error processing update from {}: {}", ingestionSource, e);

// todo: record failed update stats and emit metrics
}

@Override
public boolean shouldPauseIngestion(Throwable e, ErrorStage stage) {
return false;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.pollingingest;

/**
* Defines the error handling strategy when an error is encountered either during polling records from ingestion source
* or during processing the polled records.
*/
public interface IngestionErrorStrategy {

/**
* Process and record the error.
*/
void handleError(Throwable e, ErrorStage stage);

/**
* Indicates if ingestion must be paused, blocking further writes.
*/
boolean shouldPauseIngestion(Throwable e, ErrorStage stage);

static IngestionErrorStrategy create(ErrorStrategy errorStrategy, String ingestionSource) {
switch (errorStrategy) {
case BLOCK:
return new BlockIngestionErrorStrategy(ingestionSource);
case DROP:
default:
return new DropIngestionErrorStrategy(ingestionSource);
}
}

// Indicates available error handling strategies
enum ErrorStrategy {
DROP,
BLOCK
}

// Indicates different stages of encountered errors
enum ErrorStage {
POLLING,
PROCESSING
}

}
Loading

0 comments on commit dc4722f

Please sign in to comment.