From e8dceece20d9a6805293e71694e241728c9cc8f5 Mon Sep 17 00:00:00 2001 From: Yupeng Fu Date: Sat, 1 Feb 2025 11:55:18 -0800 Subject: [PATCH 1/3] use ConfigurationUtil for KafkaSourceConfig Signed-off-by: Yupeng Fu --- .../core/util/ConfigurationUtils.java | 184 ++++++++++++++++++ .../core/util/ConfigurationUtilsTests.java | 136 +++++++++++++ .../plugin/kafka/KafkaSourceConfig.java | 12 +- 3 files changed, 327 insertions(+), 5 deletions(-) create mode 100644 libs/core/src/main/java/org/opensearch/core/util/ConfigurationUtils.java create mode 100644 libs/core/src/test/java/org/opensearch/core/util/ConfigurationUtilsTests.java diff --git a/libs/core/src/main/java/org/opensearch/core/util/ConfigurationUtils.java b/libs/core/src/main/java/org/opensearch/core/util/ConfigurationUtils.java new file mode 100644 index 0000000000000..cba0557d92b9a --- /dev/null +++ b/libs/core/src/main/java/org/opensearch/core/util/ConfigurationUtils.java @@ -0,0 +1,184 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.core.util; + +import org.opensearch.OpenSearchException; +import org.opensearch.OpenSearchParseException; + +import java.util.Map; + +/** + * Utility class for parsing configurations. + * + * @opensearch.internal + */ +public class ConfigurationUtils { + + private ConfigurationUtils() {} + + /** + * Returns and removes the specified optional property from the specified configuration map. + *

+ * If the property value isn't of type string a {@link OpenSearchParseException} is thrown. + */ + public static String readOptionalStringProperty(Map configuration, String propertyName) { + Object value = configuration.get(propertyName); + return readString(propertyName, value); + } + + /** + * Returns and removes the specified property from the specified configuration map. + *

+ * If the property value isn't of type string an {@link OpenSearchParseException} is thrown. + * If the property is missing an {@link OpenSearchParseException} is thrown + */ + public static String readStringProperty(Map configuration, String propertyName) { + return readStringProperty(configuration, propertyName, null); + } + + /** + * Returns the specified property from the specified configuration map. + *

+ * If the property value isn't of type string a {@link OpenSearchParseException} is thrown. + * If the property is missing and no default value has been specified a {@link OpenSearchParseException} is thrown + */ + public static String readStringProperty(Map configuration, String propertyName, String defaultValue) { + Object value = configuration.get(propertyName); + if (value == null && defaultValue != null) { + return defaultValue; + } else if (value == null) { + throw newConfigurationException(propertyName, "required property is missing"); + } + return readString(propertyName, value); + } + + public static OpenSearchException newConfigurationException(String propertyName, String reason) { + String msg; + if (propertyName == null) { + msg = reason; + } else { + msg = "[" + propertyName + "] " + reason; + } + OpenSearchParseException exception = new OpenSearchParseException(msg); + addMetadataToException(exception, propertyName); + return exception; + } + + private static String readString(String propertyName, Object value) { + if (value == null) { + return null; + } + if (value instanceof String) { + return (String) value; + } + throw newConfigurationException(propertyName, "property isn't a string, but of type [" + value.getClass().getName() + "]"); + } + + private static void addMetadataToException(OpenSearchException exception, String propertyName) { + if (propertyName != null) { + exception.addMetadata("opensearch.property_name", propertyName); + } + } + + /** + * Returns the specified property from the specified configuration map. + *

+ * If the property value isn't of type string or int a {@link OpenSearchParseException} is thrown. + * If the property is missing and no default value has been specified a {@link OpenSearchParseException} is thrown + */ + public static String readStringOrIntProperty(Map configuration, String propertyName, String defaultValue) { + Object value = configuration.get(propertyName); + if (value == null && defaultValue != null) { + return defaultValue; + } else if (value == null) { + throw newConfigurationException(propertyName, "required property is missing"); + } + return readStringOrInt(propertyName, value); + } + + private static String readStringOrInt(String propertyName, Object value) { + if (value == null) { + return null; + } + if (value instanceof String) { + return (String) value; + } else if (value instanceof Integer) { + return String.valueOf(value); + } + throw newConfigurationException(propertyName, "property isn't a string or int, but of type [" + value.getClass().getName() + "]"); + } + + /** + * Returns the specified property from the specified configuration map. + *

+ * If the property value isn't of type string or int a {@link OpenSearchParseException} is thrown. + */ + public static String readOptionalStringOrIntProperty(Map configuration, String propertyName) { + Object value = configuration.get(propertyName); + if (value == null) { + return null; + } + return readStringOrInt(propertyName, value); + } + + public static boolean readBooleanProperty(Map configuration, String propertyName, boolean defaultValue) { + Object value = configuration.get(propertyName); + if (value == null) { + return defaultValue; + } else { + return readBoolean(propertyName, value).booleanValue(); + } + } + + private static Boolean readBoolean(String propertyName, Object value) { + if (value == null) { + return null; + } + if (value instanceof Boolean) { + return (boolean) value; + } + throw newConfigurationException(propertyName, "property isn't a boolean, but of type [" + value.getClass().getName() + "]"); + } + + /** + * Returns the specified property from the specified configuration map. + *

+ * If the property value isn't of type int a {@link OpenSearchParseException} is thrown. + * If the property is missing an {@link OpenSearchParseException} is thrown + */ + public static Integer readIntProperty(Map configuration, String propertyName, Integer defaultValue) { + Object value = configuration.get(propertyName); + if (value == null) { + return defaultValue; + } + try { + return Integer.parseInt(value.toString()); + } catch (Exception e) { + throw newConfigurationException(propertyName, "property cannot be converted to an int [" + value + "]"); + } + } + + /** + * Returns the specified property from the specified configuration map. + *

+ * If the property value isn't of type int a {@link OpenSearchParseException} is thrown. + * If the property is missing an {@link OpenSearchParseException} is thrown + */ + public static Double readDoubleProperty(Map configuration, String propertyName) { + Object value = configuration.get(propertyName); + if (value == null) { + throw newConfigurationException(propertyName, "required property is missing"); + } + try { + return Double.parseDouble(value.toString()); + } catch (Exception e) { + throw newConfigurationException(propertyName, "property cannot be converted to a double [" + value + "]"); + } + } +} diff --git a/libs/core/src/test/java/org/opensearch/core/util/ConfigurationUtilsTests.java b/libs/core/src/test/java/org/opensearch/core/util/ConfigurationUtilsTests.java new file mode 100644 index 0000000000000..bdd978a77890e --- /dev/null +++ b/libs/core/src/test/java/org/opensearch/core/util/ConfigurationUtilsTests.java @@ -0,0 +1,136 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.core.util; + +import org.opensearch.OpenSearchParseException; +import org.opensearch.test.OpenSearchTestCase; +import org.junit.Before; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; + +public class ConfigurationUtilsTests extends OpenSearchTestCase { + private Map config; + + @Before + public void setConfig() { + config = new HashMap<>(); + config.put("foo", "bar"); + config.put("boolVal", true); + config.put("null", null); + config.put("arr", Arrays.asList("1", "2", "3")); + config.put("ip", "127.0.0.1"); + config.put("num", 1); + config.put("double", 1.0); + } + + public void testReadStringProperty() { + String val = ConfigurationUtils.readStringProperty(config, "foo"); + assertThat(val, equalTo("bar")); + String val1 = ConfigurationUtils.readStringProperty(config, "foo1", "none"); + assertThat(val1, equalTo("none")); + try { + ConfigurationUtils.readStringProperty(config, "foo1", null); + } catch (OpenSearchParseException e) { + assertThat(e.getMessage(), equalTo("[foo1] required property is missing")); + } + } + + public void testOptionalReadStringProperty() { + String val = ConfigurationUtils.readOptionalStringProperty(config, "foo"); + assertThat(val, equalTo("bar")); + String val1 = ConfigurationUtils.readOptionalStringProperty(config, "foo1"); + assertThat(val, equalTo("bar")); + assertThat(val1, equalTo(null)); + } + + public void testReadStringPropertyInvalidType() { + try { + ConfigurationUtils.readStringProperty(config, "arr"); + } catch (OpenSearchParseException e) { + assertThat(e.getMessage(), equalTo("[arr] property isn't a string, but of type [java.util.Arrays$ArrayList]")); + } + } + + public void testReadBooleanProperty() { + Boolean val = ConfigurationUtils.readBooleanProperty(config, "boolVal", false); + assertThat(val, equalTo(true)); + } + + public void testReadNullBooleanProperty() { + Boolean val = ConfigurationUtils.readBooleanProperty(config, "null", false); + assertThat(val, equalTo(false)); + } + + public void testReadBooleanPropertyInvalidType() { + try { + ConfigurationUtils.readBooleanProperty(config, "arr", true); + } catch (OpenSearchParseException e) { + assertThat(e.getMessage(), equalTo("[arr] property isn't a boolean, but of type [java.util.Arrays$ArrayList]")); + } + } + + public void testReadStringOrIntProperty() { + String val1 = ConfigurationUtils.readStringOrIntProperty(config, "foo", null); + String val2 = ConfigurationUtils.readStringOrIntProperty(config, "num", null); + assertThat(val1, equalTo("bar")); + assertThat(val2, equalTo("1")); + } + + public void testOptionalReadStringOrIntProperty() { + String val1 = ConfigurationUtils.readOptionalStringOrIntProperty(config, "foo"); + String val2 = ConfigurationUtils.readOptionalStringOrIntProperty(config, "num"); + String val3 = ConfigurationUtils.readOptionalStringOrIntProperty(config, "num1"); + assertThat(val1, equalTo("bar")); + assertThat(val2, equalTo("1")); + assertThat(val3, equalTo(null)); + } + + public void testReadIntProperty() { + int val = ConfigurationUtils.readIntProperty(config, "num", null); + assertThat(val, equalTo(1)); + try { + ConfigurationUtils.readIntProperty(config, "foo", 2); + } catch (OpenSearchParseException e) { + assertThat(e.getMessage(), equalTo("[foo] property cannot be converted to an int [bar]")); + } + try { + ConfigurationUtils.readIntProperty(config, "foo1", 2); + } catch (OpenSearchParseException e) { + assertThat(e.getMessage(), equalTo("required property is missing")); + } + } + + public void testReadDoubleProperty() { + double val = ConfigurationUtils.readDoubleProperty(config, "double"); + assertThat(val, equalTo(1.0)); + try { + ConfigurationUtils.readDoubleProperty(config, "foo"); + } catch (OpenSearchParseException e) { + assertThat(e.getMessage(), equalTo("[foo] property cannot be converted to a double [bar]")); + } + try { + ConfigurationUtils.readDoubleProperty(config, "foo1"); + } catch (OpenSearchParseException e) { + assertThat(e.getMessage(), equalTo("[foo1] required property is missing")); + } + } + + public void testReadStringOrIntPropertyInvalidType() { + try { + ConfigurationUtils.readStringOrIntProperty(config, "arr", null); + } catch (OpenSearchParseException e) { + assertThat(e.getMessage(), equalTo("[arr] property isn't a string or int, but of type [java.util.Arrays$ArrayList]")); + } + } + +} diff --git a/plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaSourceConfig.java b/plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaSourceConfig.java index 099300c6e5767..722883d353ebf 100644 --- a/plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaSourceConfig.java +++ b/plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaSourceConfig.java @@ -8,13 +8,17 @@ package org.opensearch.plugin.kafka; +import org.opensearch.core.util.ConfigurationUtils; + import java.util.Map; -import java.util.Objects; /** * Class encapsulating the configuration of a Kafka source. */ public class KafkaSourceConfig { + private final String PROP_TOPIC = "topic"; + private final String PROP_BOOTSTRAP_SERVERS = "bootstrap_servers"; + private final String topic; private final String bootstrapServers; @@ -23,10 +27,8 @@ public class KafkaSourceConfig { * @param params the configuration parameters */ public KafkaSourceConfig(Map params) { - // TODO: better parsing and validation - this.topic = (String) Objects.requireNonNull(params.get("topic")); - this.bootstrapServers = (String) Objects.requireNonNull(params.get("bootstrap_servers")); - assert this.bootstrapServers != null; + this.topic = ConfigurationUtils.readStringProperty(params, PROP_TOPIC); + this.bootstrapServers = ConfigurationUtils.readStringProperty(params, PROP_BOOTSTRAP_SERVERS); } /** From 9751cafb79dc9aefb55b9cedfc9673693197479f Mon Sep 17 00:00:00 2001 From: Yupeng Fu Date: Sat, 1 Feb 2025 18:56:00 -0800 Subject: [PATCH 2/3] changelog Signed-off-by: Yupeng Fu --- CHANGELOG-3.0.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG-3.0.md b/CHANGELOG-3.0.md index 40a1950ed1c78..c0d89253d2ad0 100644 --- a/CHANGELOG-3.0.md +++ b/CHANGELOG-3.0.md @@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Allow to pass the list settings through environment variables (like [], ["a", "b", "c"], ...) ([#10625](https://github.com/opensearch-project/OpenSearch/pull/10625)) - Views, simplify data access and manipulation by providing a virtual layer over one or more indices ([#11957](https://github.com/opensearch-project/OpenSearch/pull/11957)) - Added pull-based Ingestion (APIs, for ingestion source, a Kafka plugin, and IngestionEngine that pulls data from the ingestion source) ([#16958](https://github.com/opensearch-project/OpenSearch/pull/16958)) +- Added ConfigurationUtils to core for the ease of configuration parsing [#17223](https://github.com/opensearch-project/OpenSearch/pull/17223) ### Dependencies - Update Apache Lucene to 10.1.0 ([#16366](https://github.com/opensearch-project/OpenSearch/pull/16366)) From 5fd8140d83617c827c6985e2c7fa10d26f778ffc Mon Sep 17 00:00:00 2001 From: Yupeng Fu Date: Tue, 4 Feb 2025 21:33:11 -0800 Subject: [PATCH 3/3] comment Signed-off-by: Yupeng Fu --- .../java/org/opensearch/core/util/ConfigurationUtils.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/libs/core/src/main/java/org/opensearch/core/util/ConfigurationUtils.java b/libs/core/src/main/java/org/opensearch/core/util/ConfigurationUtils.java index cba0557d92b9a..b077d11644ece 100644 --- a/libs/core/src/main/java/org/opensearch/core/util/ConfigurationUtils.java +++ b/libs/core/src/main/java/org/opensearch/core/util/ConfigurationUtils.java @@ -10,15 +10,17 @@ import org.opensearch.OpenSearchException; import org.opensearch.OpenSearchParseException; +import org.opensearch.common.annotation.PublicApi; import java.util.Map; /** * Utility class for parsing configurations. * - * @opensearch.internal + * @opensearch.api */ -public class ConfigurationUtils { +@PublicApi(since = "3.0.0") +public final class ConfigurationUtils { private ConfigurationUtils() {}