diff --git a/CHANGELOG-3.0.md b/CHANGELOG-3.0.md
index 40a1950ed1c78..c0d89253d2ad0 100644
--- a/CHANGELOG-3.0.md
+++ b/CHANGELOG-3.0.md
@@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Allow to pass the list settings through environment variables (like [], ["a", "b", "c"], ...) ([#10625](https://github.com/opensearch-project/OpenSearch/pull/10625))
- Views, simplify data access and manipulation by providing a virtual layer over one or more indices ([#11957](https://github.com/opensearch-project/OpenSearch/pull/11957))
- Added pull-based Ingestion (APIs, for ingestion source, a Kafka plugin, and IngestionEngine that pulls data from the ingestion source) ([#16958](https://github.com/opensearch-project/OpenSearch/pull/16958))
+- Added ConfigurationUtils to core for the ease of configuration parsing [#17223](https://github.com/opensearch-project/OpenSearch/pull/17223)
### Dependencies
- Update Apache Lucene to 10.1.0 ([#16366](https://github.com/opensearch-project/OpenSearch/pull/16366))
diff --git a/libs/core/src/main/java/org/opensearch/core/util/ConfigurationUtils.java b/libs/core/src/main/java/org/opensearch/core/util/ConfigurationUtils.java
new file mode 100644
index 0000000000000..b077d11644ece
--- /dev/null
+++ b/libs/core/src/main/java/org/opensearch/core/util/ConfigurationUtils.java
@@ -0,0 +1,186 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.core.util;
+
+import org.opensearch.OpenSearchException;
+import org.opensearch.OpenSearchParseException;
+import org.opensearch.common.annotation.PublicApi;
+
+import java.util.Map;
+
+/**
+ * Utility class for parsing configurations.
+ *
+ * @opensearch.api
+ */
+@PublicApi(since = "3.0.0")
+public final class ConfigurationUtils {
+
+ private ConfigurationUtils() {}
+
+ /**
+ * Returns and removes the specified optional property from the specified configuration map.
+ *
+ * If the property value isn't of type string a {@link OpenSearchParseException} is thrown.
+ */
+ public static String readOptionalStringProperty(Map configuration, String propertyName) {
+ Object value = configuration.get(propertyName);
+ return readString(propertyName, value);
+ }
+
+ /**
+ * Returns and removes the specified property from the specified configuration map.
+ *
+ * If the property value isn't of type string an {@link OpenSearchParseException} is thrown.
+ * If the property is missing an {@link OpenSearchParseException} is thrown
+ */
+ public static String readStringProperty(Map configuration, String propertyName) {
+ return readStringProperty(configuration, propertyName, null);
+ }
+
+ /**
+ * Returns the specified property from the specified configuration map.
+ *
+ * If the property value isn't of type string a {@link OpenSearchParseException} is thrown.
+ * If the property is missing and no default value has been specified a {@link OpenSearchParseException} is thrown
+ */
+ public static String readStringProperty(Map configuration, String propertyName, String defaultValue) {
+ Object value = configuration.get(propertyName);
+ if (value == null && defaultValue != null) {
+ return defaultValue;
+ } else if (value == null) {
+ throw newConfigurationException(propertyName, "required property is missing");
+ }
+ return readString(propertyName, value);
+ }
+
+ public static OpenSearchException newConfigurationException(String propertyName, String reason) {
+ String msg;
+ if (propertyName == null) {
+ msg = reason;
+ } else {
+ msg = "[" + propertyName + "] " + reason;
+ }
+ OpenSearchParseException exception = new OpenSearchParseException(msg);
+ addMetadataToException(exception, propertyName);
+ return exception;
+ }
+
+ private static String readString(String propertyName, Object value) {
+ if (value == null) {
+ return null;
+ }
+ if (value instanceof String) {
+ return (String) value;
+ }
+ throw newConfigurationException(propertyName, "property isn't a string, but of type [" + value.getClass().getName() + "]");
+ }
+
+ private static void addMetadataToException(OpenSearchException exception, String propertyName) {
+ if (propertyName != null) {
+ exception.addMetadata("opensearch.property_name", propertyName);
+ }
+ }
+
+ /**
+ * Returns the specified property from the specified configuration map.
+ *
+ * If the property value isn't of type string or int a {@link OpenSearchParseException} is thrown.
+ * If the property is missing and no default value has been specified a {@link OpenSearchParseException} is thrown
+ */
+ public static String readStringOrIntProperty(Map configuration, String propertyName, String defaultValue) {
+ Object value = configuration.get(propertyName);
+ if (value == null && defaultValue != null) {
+ return defaultValue;
+ } else if (value == null) {
+ throw newConfigurationException(propertyName, "required property is missing");
+ }
+ return readStringOrInt(propertyName, value);
+ }
+
+ private static String readStringOrInt(String propertyName, Object value) {
+ if (value == null) {
+ return null;
+ }
+ if (value instanceof String) {
+ return (String) value;
+ } else if (value instanceof Integer) {
+ return String.valueOf(value);
+ }
+ throw newConfigurationException(propertyName, "property isn't a string or int, but of type [" + value.getClass().getName() + "]");
+ }
+
+ /**
+ * Returns the specified property from the specified configuration map.
+ *
+ * If the property value isn't of type string or int a {@link OpenSearchParseException} is thrown.
+ */
+ public static String readOptionalStringOrIntProperty(Map configuration, String propertyName) {
+ Object value = configuration.get(propertyName);
+ if (value == null) {
+ return null;
+ }
+ return readStringOrInt(propertyName, value);
+ }
+
+ public static boolean readBooleanProperty(Map configuration, String propertyName, boolean defaultValue) {
+ Object value = configuration.get(propertyName);
+ if (value == null) {
+ return defaultValue;
+ } else {
+ return readBoolean(propertyName, value).booleanValue();
+ }
+ }
+
+ private static Boolean readBoolean(String propertyName, Object value) {
+ if (value == null) {
+ return null;
+ }
+ if (value instanceof Boolean) {
+ return (boolean) value;
+ }
+ throw newConfigurationException(propertyName, "property isn't a boolean, but of type [" + value.getClass().getName() + "]");
+ }
+
+ /**
+ * Returns the specified property from the specified configuration map.
+ *
+ * If the property value isn't of type int a {@link OpenSearchParseException} is thrown.
+ * If the property is missing an {@link OpenSearchParseException} is thrown
+ */
+ public static Integer readIntProperty(Map configuration, String propertyName, Integer defaultValue) {
+ Object value = configuration.get(propertyName);
+ if (value == null) {
+ return defaultValue;
+ }
+ try {
+ return Integer.parseInt(value.toString());
+ } catch (Exception e) {
+ throw newConfigurationException(propertyName, "property cannot be converted to an int [" + value + "]");
+ }
+ }
+
+ /**
+ * Returns the specified property from the specified configuration map.
+ *
+ * If the property value isn't of type int a {@link OpenSearchParseException} is thrown.
+ * If the property is missing an {@link OpenSearchParseException} is thrown
+ */
+ public static Double readDoubleProperty(Map configuration, String propertyName) {
+ Object value = configuration.get(propertyName);
+ if (value == null) {
+ throw newConfigurationException(propertyName, "required property is missing");
+ }
+ try {
+ return Double.parseDouble(value.toString());
+ } catch (Exception e) {
+ throw newConfigurationException(propertyName, "property cannot be converted to a double [" + value + "]");
+ }
+ }
+}
diff --git a/libs/core/src/test/java/org/opensearch/core/util/ConfigurationUtilsTests.java b/libs/core/src/test/java/org/opensearch/core/util/ConfigurationUtilsTests.java
new file mode 100644
index 0000000000000..bdd978a77890e
--- /dev/null
+++ b/libs/core/src/test/java/org/opensearch/core/util/ConfigurationUtilsTests.java
@@ -0,0 +1,136 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.core.util;
+
+import org.opensearch.OpenSearchParseException;
+import org.opensearch.test.OpenSearchTestCase;
+import org.junit.Before;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.Matchers.equalTo;
+
+public class ConfigurationUtilsTests extends OpenSearchTestCase {
+ private Map config;
+
+ @Before
+ public void setConfig() {
+ config = new HashMap<>();
+ config.put("foo", "bar");
+ config.put("boolVal", true);
+ config.put("null", null);
+ config.put("arr", Arrays.asList("1", "2", "3"));
+ config.put("ip", "127.0.0.1");
+ config.put("num", 1);
+ config.put("double", 1.0);
+ }
+
+ public void testReadStringProperty() {
+ String val = ConfigurationUtils.readStringProperty(config, "foo");
+ assertThat(val, equalTo("bar"));
+ String val1 = ConfigurationUtils.readStringProperty(config, "foo1", "none");
+ assertThat(val1, equalTo("none"));
+ try {
+ ConfigurationUtils.readStringProperty(config, "foo1", null);
+ } catch (OpenSearchParseException e) {
+ assertThat(e.getMessage(), equalTo("[foo1] required property is missing"));
+ }
+ }
+
+ public void testOptionalReadStringProperty() {
+ String val = ConfigurationUtils.readOptionalStringProperty(config, "foo");
+ assertThat(val, equalTo("bar"));
+ String val1 = ConfigurationUtils.readOptionalStringProperty(config, "foo1");
+ assertThat(val, equalTo("bar"));
+ assertThat(val1, equalTo(null));
+ }
+
+ public void testReadStringPropertyInvalidType() {
+ try {
+ ConfigurationUtils.readStringProperty(config, "arr");
+ } catch (OpenSearchParseException e) {
+ assertThat(e.getMessage(), equalTo("[arr] property isn't a string, but of type [java.util.Arrays$ArrayList]"));
+ }
+ }
+
+ public void testReadBooleanProperty() {
+ Boolean val = ConfigurationUtils.readBooleanProperty(config, "boolVal", false);
+ assertThat(val, equalTo(true));
+ }
+
+ public void testReadNullBooleanProperty() {
+ Boolean val = ConfigurationUtils.readBooleanProperty(config, "null", false);
+ assertThat(val, equalTo(false));
+ }
+
+ public void testReadBooleanPropertyInvalidType() {
+ try {
+ ConfigurationUtils.readBooleanProperty(config, "arr", true);
+ } catch (OpenSearchParseException e) {
+ assertThat(e.getMessage(), equalTo("[arr] property isn't a boolean, but of type [java.util.Arrays$ArrayList]"));
+ }
+ }
+
+ public void testReadStringOrIntProperty() {
+ String val1 = ConfigurationUtils.readStringOrIntProperty(config, "foo", null);
+ String val2 = ConfigurationUtils.readStringOrIntProperty(config, "num", null);
+ assertThat(val1, equalTo("bar"));
+ assertThat(val2, equalTo("1"));
+ }
+
+ public void testOptionalReadStringOrIntProperty() {
+ String val1 = ConfigurationUtils.readOptionalStringOrIntProperty(config, "foo");
+ String val2 = ConfigurationUtils.readOptionalStringOrIntProperty(config, "num");
+ String val3 = ConfigurationUtils.readOptionalStringOrIntProperty(config, "num1");
+ assertThat(val1, equalTo("bar"));
+ assertThat(val2, equalTo("1"));
+ assertThat(val3, equalTo(null));
+ }
+
+ public void testReadIntProperty() {
+ int val = ConfigurationUtils.readIntProperty(config, "num", null);
+ assertThat(val, equalTo(1));
+ try {
+ ConfigurationUtils.readIntProperty(config, "foo", 2);
+ } catch (OpenSearchParseException e) {
+ assertThat(e.getMessage(), equalTo("[foo] property cannot be converted to an int [bar]"));
+ }
+ try {
+ ConfigurationUtils.readIntProperty(config, "foo1", 2);
+ } catch (OpenSearchParseException e) {
+ assertThat(e.getMessage(), equalTo("required property is missing"));
+ }
+ }
+
+ public void testReadDoubleProperty() {
+ double val = ConfigurationUtils.readDoubleProperty(config, "double");
+ assertThat(val, equalTo(1.0));
+ try {
+ ConfigurationUtils.readDoubleProperty(config, "foo");
+ } catch (OpenSearchParseException e) {
+ assertThat(e.getMessage(), equalTo("[foo] property cannot be converted to a double [bar]"));
+ }
+ try {
+ ConfigurationUtils.readDoubleProperty(config, "foo1");
+ } catch (OpenSearchParseException e) {
+ assertThat(e.getMessage(), equalTo("[foo1] required property is missing"));
+ }
+ }
+
+ public void testReadStringOrIntPropertyInvalidType() {
+ try {
+ ConfigurationUtils.readStringOrIntProperty(config, "arr", null);
+ } catch (OpenSearchParseException e) {
+ assertThat(e.getMessage(), equalTo("[arr] property isn't a string or int, but of type [java.util.Arrays$ArrayList]"));
+ }
+ }
+
+}
diff --git a/plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaSourceConfig.java b/plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaSourceConfig.java
index 099300c6e5767..722883d353ebf 100644
--- a/plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaSourceConfig.java
+++ b/plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaSourceConfig.java
@@ -8,13 +8,17 @@
package org.opensearch.plugin.kafka;
+import org.opensearch.core.util.ConfigurationUtils;
+
import java.util.Map;
-import java.util.Objects;
/**
* Class encapsulating the configuration of a Kafka source.
*/
public class KafkaSourceConfig {
+ private final String PROP_TOPIC = "topic";
+ private final String PROP_BOOTSTRAP_SERVERS = "bootstrap_servers";
+
private final String topic;
private final String bootstrapServers;
@@ -23,10 +27,8 @@ public class KafkaSourceConfig {
* @param params the configuration parameters
*/
public KafkaSourceConfig(Map params) {
- // TODO: better parsing and validation
- this.topic = (String) Objects.requireNonNull(params.get("topic"));
- this.bootstrapServers = (String) Objects.requireNonNull(params.get("bootstrap_servers"));
- assert this.bootstrapServers != null;
+ this.topic = ConfigurationUtils.readStringProperty(params, PROP_TOPIC);
+ this.bootstrapServers = ConfigurationUtils.readStringProperty(params, PROP_BOOTSTRAP_SERVERS);
}
/**