diff --git a/.gitignore b/.gitignore
index 0812805..f75e086 100644
--- a/.gitignore
+++ b/.gitignore
@@ -10,4 +10,5 @@ buildNumber.properties
# https://github.com/takari/maven-wrapper#usage-without-binary-jar
.mvn/wrapper/maven-wrapper.jar
*.iml
-.idea
\ No newline at end of file
+.idea
+build/
diff --git a/pom.xml b/pom.xml
index ac4d5e0..aba9f51 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
org.liquibase
liquibase-parent-pom
- 0.4.5
+ 0.4.5
org.liquibase.ext
@@ -15,22 +15,22 @@
Liquibase Opensearch Extension
jar
- Describe your extension here.
- https://docs.liquibase.com
+ Liquibase extension to manage changesets for OpenSearch.
+ https://github.com/liquibase/liquibase-opensearch
- Liquibase EULA
- https://www.liquibase.com/eula
+ https://www.apache.org/licenses/LICENSE-2.0
+ Apache License, Version 2.0
- Your Name
- youremail@example.com
- Liquibase
- https://www.liquibase.com/
+ Ralph Ursprung
+ ralph.ursprung@avaloq.com
+ Avaloq Group AG
+ https://www.avaloq.com/
@@ -61,15 +61,88 @@
liquibase-core
${liquibase.version}
+
+ org.opensearch.client
+ opensearch-java
+ 2.14.0
+
+
+ org.opensearch.client
+ opensearch-rest-client
+
+
+
+
+ org.projectlombok
+ lombok
+ 1.18.32
+ provided
+
+
+ com.fasterxml.jackson.core
+ jackson-core
+ 2.17.2
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ 2.17.2
+
+
+ org.apache.httpcomponents.core5
+ httpcore5
+ 5.2.5
+
+
+ org.apache.httpcomponents.client5
+ httpclient5
+ 5.3.1
+
+
+
+ org.junit.jupiter
+ junit-jupiter
+ 5.10.3
+ test
+
+
+ org.assertj
+ assertj-core
+ 3.26.3
+ test
+
+
+ org.testcontainers
+ testcontainers
+ 1.20.1
+ test
+
+
+ org.opensearch
+ opensearch-testcontainers
+ 2.1.0
+ test
+
org.slf4j
slf4j-api
2.0.16
test
+
+ org.slf4j
+ slf4j-simple
+ 2.0.16
+ test
+
+
+
+ ${project.basedir}/src/main/resources
+
+
org.sonarsource.scanner.maven
diff --git a/src/main/java/com/example/change/ClearPasswordsChange.java b/src/main/java/com/example/change/ClearPasswordsChange.java
deleted file mode 100644
index 2197c7e..0000000
--- a/src/main/java/com/example/change/ClearPasswordsChange.java
+++ /dev/null
@@ -1,57 +0,0 @@
-package com.example.change;
-
-import liquibase.change.AbstractChange;
-import liquibase.change.ChangeMetaData;
-import liquibase.change.DatabaseChange;
-import liquibase.change.DatabaseChangeProperty;
-import liquibase.change.core.UpdateDataChange;
-import liquibase.database.Database;
-import liquibase.statement.SqlStatement;
-import liquibase.statement.core.UpdateStatement;
-
-@DatabaseChange(name = "clearPasswords", description = "Clears all data in a 'password' column", priority = ChangeMetaData.PRIORITY_DEFAULT)
-public class ClearPasswordsChange extends AbstractChange {
-
- private String tableName;
- private String schemaName;
- private String catalogName;
-
- @DatabaseChangeProperty
- public String getTableName() {
- return tableName;
- }
-
- public void setTableName(String tableName) {
- this.tableName = tableName;
- }
-
- @DatabaseChangeProperty
- public String getSchemaName() {
- return schemaName;
- }
-
- public void setSchemaName(String schemaName) {
- this.schemaName = schemaName;
- }
-
- @DatabaseChangeProperty
- public String getCatalogName() {
- return catalogName;
- }
-
- public void setCatalogName(String catalogName) {
- this.catalogName = catalogName;
- }
-
- @Override
- public String getConfirmationMessage() {
- return "Passwords cleared";
- }
-
- @Override
- public SqlStatement[] generateStatements(Database database) {
- return new SqlStatement[] {
- new UpdateStatement(getCatalogName(), getSchemaName(), getTableName()).addNewColumnValue("password", null)
- };
- }
-}
diff --git a/src/main/java/com/example/change/PrefixedCreateTableChange.java b/src/main/java/com/example/change/PrefixedCreateTableChange.java
deleted file mode 100644
index d5e7a25..0000000
--- a/src/main/java/com/example/change/PrefixedCreateTableChange.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package com.example.change;
-
-import liquibase.change.ChangeMetaData;
-import liquibase.change.DatabaseChange;
-import liquibase.change.DatabaseChangeProperty;
-import liquibase.change.core.CreateTableChange;
-import liquibase.statement.core.CreateTableStatement;
-
-@DatabaseChange(name = "createTable", description = "Create Table", priority = ChangeMetaData.PRIORITY_DATABASE + 50)
-public class PrefixedCreateTableChange extends CreateTableChange {
-
- private String prefix;
-
- @DatabaseChangeProperty
- public String getPrefix() {
- return prefix;
- }
-
- public void setPrefix(String prefix) {
- this.prefix = prefix;
- }
-
- @Override
- protected CreateTableStatement generateCreateTableStatement() {
- String prefix = getPrefix();
- if (prefix == null) {
- prefix = "standard";
- }
-
- return new CreateTableStatement(getCatalogName(), getSchemaName(), prefix + "_" + getTableName(), getRemarks());
- }
-}
diff --git a/src/main/java/com/example/precondition/HasPasswordColumnPrecondition.java b/src/main/java/com/example/precondition/HasPasswordColumnPrecondition.java
deleted file mode 100644
index 36f3460..0000000
--- a/src/main/java/com/example/precondition/HasPasswordColumnPrecondition.java
+++ /dev/null
@@ -1,80 +0,0 @@
-package com.example.precondition;
-
-import liquibase.changelog.ChangeSet;
-import liquibase.changelog.DatabaseChangeLog;
-import liquibase.changelog.visitor.ChangeExecListener;
-import liquibase.database.Database;
-import liquibase.exception.*;
-import liquibase.precondition.AbstractPrecondition;
-import liquibase.snapshot.SnapshotGeneratorFactory;
-import liquibase.structure.core.Column;
-import liquibase.structure.core.Schema;
-import liquibase.structure.core.Table;
-
-public class HasPasswordColumnPrecondition extends AbstractPrecondition {
-
- private String catalogName;
- private String schemaName;
- private String tableName;
-
- public String getCatalogName() {
- return catalogName;
- }
-
- public void setCatalogName(String catalogName) {
- this.catalogName = catalogName;
- }
-
- public String getSchemaName() {
- return schemaName;
- }
-
- public void setSchemaName(String schemaName) {
- this.schemaName = schemaName;
- }
-
- public String getTableName() {
- return tableName;
- }
-
- public void setTableName(String tableName) {
- this.tableName = tableName;
- }
-
- @Override
- public String getName() {
- return "hasPasswordColumn";
- }
-
- @Override
- public Warnings warn(Database database) {
- return new Warnings();
- }
-
- @Override
- public ValidationErrors validate(Database database) {
- return new ValidationErrors()
- .checkRequiredField("tableName", getTableName());
- }
-
- @Override
- public void check(Database database, DatabaseChangeLog changeLog, ChangeSet changeSet, ChangeExecListener changeExecListener) throws PreconditionFailedException, PreconditionErrorException {
- Column example = new Column();
- example.setRelation(new Table().setName(database.correctObjectName(getTableName(), Table.class)).setSchema(new Schema(getCatalogName(), getSchemaName())));
- example.setName(database.correctObjectName("password", Column.class));
-
- try {
- if (!SnapshotGeneratorFactory.getInstance().has(example, database)) {
- throw new PreconditionFailedException("Table " + getTableName() + " does not have a password column", changeLog, this);
- }
- } catch (LiquibaseException e) {
- throw new PreconditionErrorException(e, changeLog, this);
- }
-
- }
-
- @Override
- public String getSerializedObjectNamespace() {
- return GENERIC_CHANGELOG_EXTENSION_NAMESPACE;
- }
-}
diff --git a/src/main/java/liquibase/ext/opensearch/change/HttpRequestChange.java b/src/main/java/liquibase/ext/opensearch/change/HttpRequestChange.java
new file mode 100644
index 0000000..f84e3f3
--- /dev/null
+++ b/src/main/java/liquibase/ext/opensearch/change/HttpRequestChange.java
@@ -0,0 +1,41 @@
+package liquibase.ext.opensearch.change;
+
+import liquibase.change.AbstractChange;
+import liquibase.change.ChangeMetaData;
+import liquibase.change.DatabaseChange;
+import liquibase.database.Database;
+import liquibase.ext.opensearch.statement.HttpRequestStatement;
+import liquibase.statement.SqlStatement;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+import java.util.Optional;
+
+@DatabaseChange(name = "httpRequest",
+ description = "Execute an arbitrary HTTP request with the provided payload",
+ priority = ChangeMetaData.PRIORITY_DATABASE)
+@NoArgsConstructor
+@Getter
+@Setter
+public class HttpRequestChange extends AbstractChange {
+
+ private String method;
+ private String path;
+ private String body;
+
+ @Override
+ public String getConfirmationMessage() {
+ return String.format("executed the HTTP %s request against %s (with a body of size %d)",
+ this.getMethod(),
+ this.getPath(),
+ Optional.ofNullable(this.getBody()).map(String::length).orElse(0));
+ }
+
+ @Override
+ public SqlStatement[] generateStatements(final Database database) {
+ return new SqlStatement[] {
+ new HttpRequestStatement(this.getMethod(), this.getPath(), this.getBody())
+ };
+ }
+}
diff --git a/src/main/java/liquibase/ext/opensearch/changelog/OpenSearchHistoryService.java b/src/main/java/liquibase/ext/opensearch/changelog/OpenSearchHistoryService.java
new file mode 100644
index 0000000..8b3c1c8
--- /dev/null
+++ b/src/main/java/liquibase/ext/opensearch/changelog/OpenSearchHistoryService.java
@@ -0,0 +1,234 @@
+package liquibase.ext.opensearch.changelog;
+
+import liquibase.ChecksumVersion;
+import liquibase.Scope;
+import liquibase.change.CheckSum;
+import liquibase.changelog.ChangeSet;
+import liquibase.changelog.RanChangeSet;
+import liquibase.database.Database;
+import liquibase.exception.DatabaseException;
+import liquibase.ext.opensearch.database.OpenSearchConnection;
+import liquibase.ext.opensearch.database.OpenSearchLiquibaseDatabase;
+import liquibase.logging.Logger;
+import liquibase.nosql.changelog.AbstractNoSqlHistoryService;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import org.opensearch.client.opensearch.OpenSearchClient;
+import org.opensearch.client.opensearch._types.FieldValue;
+import org.opensearch.client.opensearch._types.Refresh;
+import org.opensearch.client.opensearch._types.mapping.*;
+import org.opensearch.client.opensearch.core.SearchRequest;
+import org.opensearch.client.opensearch.core.search.Hit;
+import org.opensearch.client.opensearch.indices.PutMappingRequest;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public class OpenSearchHistoryService extends AbstractNoSqlHistoryService {
+
+ private final Logger log = Scope.getCurrentScope().getLog(getClass());
+
+ @Override
+ protected Logger getLogger() {
+ return log;
+ }
+
+ private OpenSearchClient getOpenSearchClient() {
+ final var connection = (OpenSearchConnection) this.getNoSqlDatabase().getConnection();
+ return connection.getOpenSearchClient();
+ }
+
+ @Override
+ protected boolean existsRepository() throws DatabaseException {
+ try {
+ return this.getOpenSearchClient().indices().exists(r -> r.index(this.getDatabaseChangeLogTableName())).value();
+ } catch (final IOException e) {
+ throw new DatabaseException(e);
+ }
+ }
+
+ @Override
+ protected void createRepository() throws DatabaseException {
+ // note: the mapping will be created in adjustRepository
+
+ try {
+ this.getOpenSearchClient().indices().create(r -> r.index(this.getDatabaseChangeLogTableName()));
+ } catch (final IOException e) {
+ throw new DatabaseException(e);
+ }
+ }
+
+ @Override
+ protected void adjustRepository() throws DatabaseException {
+ // properties must match RanChangeSet & CheckSum & ContextExpression (validated by matching tests)
+ final var request = new PutMappingRequest.Builder()
+ .index(this.getDatabaseChangeLogTableName())
+ .properties("id", p -> p.keyword(k -> k))
+ .properties("changeLog", p -> p.keyword(k -> k))
+ .properties("storedChangeLog", p -> p.keyword(k -> k))
+ .properties("author", p -> p.text(t -> t))
+ .properties("lastCheckSum", p -> p.object(o -> {
+ o.properties("version", p2 -> p2.integer(i -> i));
+ o.properties("storedCheckSum", p2 -> p2.keyword(k -> k));
+ return o;
+ }))
+ .properties("dateExecuted", p -> p.date(d -> d))
+ .properties("tag", p -> p.text(t -> t))
+ .properties("execType", p -> p.keyword(k -> k))
+ .properties("description", p -> p.text(t -> t))
+ .properties("comments", p -> p.text(t -> t))
+ .properties("orderExecuted", p -> p.integer(i -> i))
+ .properties("contextExpression", p -> p.object(o -> {
+ o.properties("contexts", p2 -> p2.keyword(k -> k));
+ o.properties("originalString", p2 -> p2.text(t -> t));
+ return o;
+ }))
+ .properties("labels", p -> p.text(t -> t))
+ .properties("deploymentId", p -> p.text(t -> t))
+ .properties("liquibaseVersion", p -> p.text(t -> t))
+ .build();
+
+ try {
+ this.getOpenSearchClient().indices().putMapping(request);
+ } catch (final IOException e) {
+ throw new DatabaseException(e);
+ }
+ }
+
+ @Override
+ protected void dropRepository() throws DatabaseException {
+ try {
+ this.getOpenSearchClient().indices().delete(r -> r.index(this.getDatabaseChangeLogTableName()));
+ } catch (final IOException e) {
+ throw new DatabaseException(e);
+ }
+ }
+
+ @Override
+ protected List queryRanChangeSets() throws DatabaseException {
+ try {
+ final var response = this.getOpenSearchClient()
+ .search(s -> s.index(this.getDatabaseChangeLogTableName()), RanChangeSet.class);
+ return response.hits().hits().stream()
+ .map(Hit::source)
+ .collect(Collectors.toList());
+ } catch (final IOException e) {
+ throw new DatabaseException(e);
+ }
+ }
+
+ @Override
+ protected int generateNextSequence() throws DatabaseException {
+ final var aggregationName = "max";
+ final var request = new SearchRequest.Builder()
+ .index(this.getDatabaseChangeLogTableName())
+ .aggregations(aggregationName, a -> a.max(m -> m.field("orderExecuted")))
+ .build();
+ try {
+ final var response = this.getOpenSearchClient().search(request, RanChangeSet.class);
+ return (int) response.aggregations().get(aggregationName).max().value();
+ } catch (final IOException e) {
+ throw new DatabaseException(e);
+ }
+ }
+
+ @Override
+ protected void markChangeSetRun(final ChangeSet changeSet, final ChangeSet.ExecType execType, final Integer nextSequenceValue) throws DatabaseException {
+ final var ranChangeSet = new RanChangeSet(changeSet, execType, null, null);
+
+ try {
+ this.getOpenSearchClient()
+ .index(r -> r.index(this.getDatabaseChangeLogTableName())
+ .id(ranChangeSet.getId())
+ .document(ranChangeSet)
+ .refresh(Refresh.WaitFor));
+ } catch (final IOException e) {
+ throw new DatabaseException(e);
+ }
+ }
+
+ @Override
+ protected void removeRanChangeSet(final ChangeSet changeSet) throws DatabaseException {
+ try {
+ this.getOpenSearchClient()
+ .delete(r -> r.index(this.getDatabaseChangeLogTableName())
+ .id(String.valueOf(changeSet.getId()))
+ .refresh(Refresh.WaitFor));
+ } catch (final IOException e) {
+ throw new DatabaseException(e);
+ }
+ }
+
+ @Override
+ public void clearAllCheckSums() throws DatabaseException {
+ try {
+ this.getOpenSearchClient()
+ .updateByQuery(r -> r.index(this.getDatabaseChangeLogTableName())
+ .script(s -> s.inline(i -> i.source("ctx._source.lastCheckSum = null")
+ .lang("painless"))));
+ } catch (IOException e) {
+ throw new DatabaseException(e);
+ }
+ }
+
+ @Override
+ protected long countTags(final String tag) throws DatabaseException {
+ final var request = new SearchRequest.Builder()
+ .index(this.getDatabaseChangeLogTableName())
+ .query(q -> q.match(m -> m.field("tag").query(FieldValue.of(tag))))
+ .build();
+ try {
+ final var response = this.getOpenSearchClient().search(request, RanChangeSet.class);
+ return response.hits().total().value();
+ } catch (final IOException e) {
+ throw new DatabaseException(e);
+ }
+ }
+
+ @Override
+ protected void tagLast(final String tagString) throws DatabaseException {
+ // TODO
+ }
+
+ @Override
+ protected long countRanChangeSets() throws DatabaseException {
+ return this.queryRanChangeSets().size();
+ }
+
+ @Override
+ protected void updateCheckSum(final ChangeSet changeSet) throws DatabaseException {
+ @AllArgsConstructor
+ @Getter
+ class CheckSumObj {
+ final CheckSum lastCheckSum;
+ }
+ final var currentChecksumVersion = Optional.ofNullable(changeSet.getStoredCheckSum())
+ .map(cs -> ChecksumVersion.enumFromChecksumVersion(cs.getVersion()))
+ .orElse(ChecksumVersion.latest());
+ final var checkSum = changeSet.generateCheckSum(currentChecksumVersion);
+
+ try {
+ this.getOpenSearchClient()
+ .update(r -> r.index(this.getDatabaseChangeLogTableName())
+ .id(changeSet.getId())
+ .doc(new CheckSumObj(checkSum))
+ .refresh(Refresh.WaitFor)
+ , RanChangeSet.class);
+ } catch (final IOException e) {
+ throw new DatabaseException(e);
+ }
+ }
+
+ @Override
+ public boolean supports(final Database database) {
+ return OpenSearchLiquibaseDatabase.PRODUCT_NAME.equals(database.getDatabaseProductName());
+ }
+
+ @Override
+ public boolean isDatabaseChecksumsCompatible() {
+ return true;
+ }
+
+}
diff --git a/src/main/java/liquibase/ext/opensearch/database/OpenSearchClientDriver.java b/src/main/java/liquibase/ext/opensearch/database/OpenSearchClientDriver.java
new file mode 100644
index 0000000..a11c456
--- /dev/null
+++ b/src/main/java/liquibase/ext/opensearch/database/OpenSearchClientDriver.java
@@ -0,0 +1,55 @@
+package liquibase.ext.opensearch.database;
+
+import liquibase.Scope;
+import liquibase.util.StringUtil;
+
+import java.sql.Connection;
+import java.sql.Driver;
+import java.sql.DriverPropertyInfo;
+import java.sql.SQLException;
+import java.util.Properties;
+import java.util.logging.Logger;
+
+import static liquibase.ext.opensearch.database.OpenSearchLiquibaseDatabase.OPENSEARCH_PREFIX;
+
+public class OpenSearchClientDriver implements Driver {
+ @Override
+ public Connection connect(final String url, final Properties info) {
+ //Not applicable for non JDBC DBs
+ throw new UnsupportedOperationException("Cannot initiate a SQL Connection for a NoSql DB");
+ }
+
+ public static boolean isOpenSearchURL(final String url) {
+ return StringUtil.trimToEmpty(url).startsWith(OPENSEARCH_PREFIX);
+ }
+
+ @Override
+ public boolean acceptsURL(final String url) {
+ return isOpenSearchURL(url);
+ }
+
+ @Override
+ public DriverPropertyInfo[] getPropertyInfo(final String url, final Properties info) throws SQLException {
+ return new DriverPropertyInfo[0];
+ }
+
+ @Override
+ public int getMajorVersion() {
+ return 0;
+ }
+
+ @Override
+ public int getMinorVersion() {
+ return 0;
+ }
+
+ @Override
+ public boolean jdbcCompliant() {
+ return false;
+ }
+
+ @Override
+ public Logger getParentLogger() {
+ return (Logger) Scope.getCurrentScope().getLog(getClass());
+ }
+}
diff --git a/src/main/java/liquibase/ext/opensearch/database/OpenSearchConnection.java b/src/main/java/liquibase/ext/opensearch/database/OpenSearchConnection.java
new file mode 100644
index 0000000..12d35da
--- /dev/null
+++ b/src/main/java/liquibase/ext/opensearch/database/OpenSearchConnection.java
@@ -0,0 +1,183 @@
+package liquibase.ext.opensearch.database;
+
+import liquibase.exception.DatabaseException;
+import liquibase.nosql.database.AbstractNoSqlConnection;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import org.apache.hc.client5.http.auth.AuthScope;
+import org.apache.hc.client5.http.auth.UsernamePasswordCredentials;
+import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider;
+import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
+import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
+import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder;
+import org.apache.hc.client5.http.ssl.NoopHostnameVerifier;
+import org.apache.hc.core5.http.HttpHost;
+import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
+import org.apache.hc.core5.reactor.ssl.TlsDetails;
+import org.apache.hc.core5.ssl.SSLContextBuilder;
+import org.opensearch.client.json.jackson.JacksonJsonpMapper;
+import org.opensearch.client.opensearch.OpenSearchClient;
+import org.opensearch.client.opensearch._types.OpenSearchVersionInfo;
+import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder;
+
+import javax.net.ssl.SSLContext;
+import java.io.IOException;
+import java.net.URI;
+import java.security.KeyManagementException;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.sql.Driver;
+import java.util.Optional;
+import java.util.Properties;
+
+import static liquibase.ext.opensearch.database.OpenSearchLiquibaseDatabase.OPENSEARCH_PREFIX;
+
+@Getter
+@Setter
+@NoArgsConstructor
+public class OpenSearchConnection extends AbstractNoSqlConnection {
+
+ private OpenSearchClient openSearchClient;
+ private Optional openSearchVersion = Optional.empty();
+
+ private URI uri;
+ private Properties connectionProperties;
+
+ @Override
+ public boolean supports(final String url) {
+ if (url == null) {
+ return false;
+ }
+ return url.toLowerCase().startsWith(OPENSEARCH_PREFIX);
+ }
+
+ @Override
+ public void open(final String url, final Driver driverObject, final Properties driverProperties) throws DatabaseException {
+ String realUrl = url;
+ if (realUrl.toLowerCase().startsWith(OPENSEARCH_PREFIX)) {
+ realUrl = realUrl.substring(OPENSEARCH_PREFIX.length());
+ }
+
+ this.connectionProperties = driverProperties;
+
+ try {
+ this.uri = new URI(realUrl);
+ this.connect(this.uri, driverProperties);
+ } catch (final Exception e) {
+ throw new DatabaseException("Could not open connection to database: " + realUrl);
+ }
+ }
+
+ @Override
+ public void close() throws DatabaseException {
+ this.openSearchClient = null;
+ this.connectionProperties = null;
+ this.uri = null;
+ }
+
+ @Override
+ public String getCatalog() throws DatabaseException {
+ return null; // OpenSearch doesn't have catalogs (called schemas in various RDBMS)
+ }
+
+ @Override
+ public String getDatabaseProductName() throws DatabaseException {
+ return OpenSearchLiquibaseDatabase.PRODUCT_NAME;
+ }
+
+ @Override
+ public String getURL() {
+ return this.uri.toString();
+ }
+
+ @Override
+ public String getConnectionUserName() {
+ return this.connectionProperties.getProperty("username");
+ }
+
+ @Override
+ public boolean isClosed() throws DatabaseException {
+ return this.openSearchClient == null;
+ }
+
+ private void connect(final URI uri, final Properties info) throws DatabaseException {
+ final HttpHost host = HttpHost.create(uri);
+
+ final var transport = ApacheHttpClient5TransportBuilder
+ .builder(host)
+ .setHttpClientConfigCallback(httpClientBuilder -> {
+ // TODO: support other credential providers
+ final var username = Optional.ofNullable(info.getProperty("user"));
+ final var password = Optional.ofNullable(info.getProperty("password"));
+
+ if (username.isPresent()) {
+ final BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(new AuthScope(host),
+ new UsernamePasswordCredentials(username.get(), password.orElse("").toCharArray()));
+
+ httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+ } else if (password.isPresent()) {
+ throw new RuntimeException("password provided but username not set!");
+ }
+
+ final SSLContext sslcontext;
+ try {
+ sslcontext = SSLContextBuilder
+ .create()
+ .loadTrustMaterial(null, (chains, authType) -> true)
+ .build();
+ } catch (final NoSuchAlgorithmException | KeyManagementException | KeyStoreException e) {
+ throw new RuntimeException(e);
+ }
+
+ final TlsStrategy tlsStrategy = ClientTlsStrategyBuilder.create()
+ .setSslContext(sslcontext)
+ // disable the certificate since our testing cluster just uses the default security configuration
+ .setHostnameVerifier(NoopHostnameVerifier.INSTANCE)
+ // See https://issues.apache.org/jira/browse/HTTPCLIENT-2219
+ .setTlsDetailsFactory(sslEngine -> new TlsDetails(sslEngine.getSession(), sslEngine.getApplicationProtocol()))
+ .build();
+
+ final PoolingAsyncClientConnectionManager connectionManager = PoolingAsyncClientConnectionManagerBuilder.create()
+ .setTlsStrategy(tlsStrategy)
+ .build();
+
+ return httpClientBuilder
+ .setConnectionManager(connectionManager);
+ })
+ .setMapper(new JacksonJsonpMapper())
+ .build();
+
+ this.openSearchClient = new OpenSearchClient(transport);
+ }
+
+ @Override
+ public String getDatabaseProductVersion() throws DatabaseException {
+ return this.getOpenSearchVersion().number();
+ }
+
+ @Override
+ public int getDatabaseMajorVersion() throws DatabaseException {
+ final var version = this.getDatabaseProductVersion();
+ return Integer.parseInt(version.split("\\.")[0]);
+ }
+
+ @Override
+ public int getDatabaseMinorVersion() throws DatabaseException {
+ final var version = this.getDatabaseProductVersion();
+ return Integer.parseInt(version.split("\\.")[1]);
+ }
+
+ private OpenSearchVersionInfo getOpenSearchVersion() throws DatabaseException {
+ if (this.openSearchVersion.isEmpty()) {
+ try {
+ this.openSearchVersion = Optional.of(this.openSearchClient.info().version());
+ } catch (IOException e) {
+ throw new DatabaseException(e);
+ }
+ }
+ return this.openSearchVersion.get();
+ }
+
+}
diff --git a/src/main/java/liquibase/ext/opensearch/database/OpenSearchLiquibaseDatabase.java b/src/main/java/liquibase/ext/opensearch/database/OpenSearchLiquibaseDatabase.java
new file mode 100644
index 0000000..23a40fc
--- /dev/null
+++ b/src/main/java/liquibase/ext/opensearch/database/OpenSearchLiquibaseDatabase.java
@@ -0,0 +1,58 @@
+package liquibase.ext.opensearch.database;
+
+import liquibase.CatalogAndSchema;
+import liquibase.exception.LiquibaseException;
+import liquibase.nosql.database.AbstractNoSqlDatabase;
+import lombok.NoArgsConstructor;
+
+@NoArgsConstructor
+public class OpenSearchLiquibaseDatabase extends AbstractNoSqlDatabase {
+ public static final String PRODUCT_NAME = "OpenSearch";
+ public static final String PRODUCT_SHORT_NAME = "opensearch";
+ public static final String OPENSEARCH_PREFIX = PRODUCT_SHORT_NAME + ":";
+
+ @Override
+ public void dropDatabaseObjects(final CatalogAndSchema schemaToDrop) throws LiquibaseException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getDefaultDriver(final String url) {
+ if (OpenSearchClientDriver.isOpenSearchURL(url)) {
+ return OpenSearchClientDriver.class.getName();
+ }
+ return null;
+ }
+
+ @Override
+ public String getDatabaseProductName() {
+ return PRODUCT_NAME;
+ }
+
+ @Override
+ public String getShortName() {
+ return PRODUCT_SHORT_NAME;
+ }
+
+ @Override
+ public Integer getDefaultPort() {
+ return 9200;
+ }
+
+ @Override
+ protected String getDefaultDatabaseProductName() {
+ return PRODUCT_NAME;
+ }
+
+ @Override
+ public String getDatabaseChangeLogTableName() {
+ // OpenSearch only supports lowercase index names
+ return super.getDatabaseChangeLogTableName().toLowerCase();
+ }
+
+ @Override
+ public String getDatabaseChangeLogLockTableName() {
+ // OpenSearch only supports lowercase index names
+ return super.getDatabaseChangeLogLockTableName().toLowerCase();
+ }
+}
diff --git a/src/main/java/liquibase/ext/opensearch/executor/OpenSearchExecutor.java b/src/main/java/liquibase/ext/opensearch/executor/OpenSearchExecutor.java
new file mode 100644
index 0000000..beffe9c
--- /dev/null
+++ b/src/main/java/liquibase/ext/opensearch/executor/OpenSearchExecutor.java
@@ -0,0 +1,173 @@
+package liquibase.ext.opensearch.executor;
+
+/*-
+ * #%L
+ * Liquibase CosmosDB Extension
+ * %%
+ * Copyright (C) 2020 Mastercard
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * #L%
+ */
+
+import liquibase.Scope;
+import liquibase.database.Database;
+import liquibase.exception.DatabaseException;
+import liquibase.executor.AbstractExecutor;
+import liquibase.ext.opensearch.database.OpenSearchLiquibaseDatabase;
+import liquibase.ext.opensearch.statement.OpenSearchExecuteStatement;
+import liquibase.logging.Logger;
+import liquibase.servicelocator.LiquibaseService;
+import liquibase.sql.visitor.SqlVisitor;
+import liquibase.statement.SqlStatement;
+import lombok.NoArgsConstructor;
+import org.opensearch.client.opensearch.generic.Body;
+import org.opensearch.client.opensearch.generic.OpenSearchClientException;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Collections.emptyList;
+
+@LiquibaseService
+@NoArgsConstructor
+public class OpenSearchExecutor extends AbstractExecutor {
+
+ public static final String EXECUTOR_NAME = "jdbc"; // needed because of AbstractJdbcDatabase#execute
+ private final Logger log = Scope.getCurrentScope().getLog(getClass());
+
+ @Override
+ public void setDatabase(final Database database) {
+ super.setDatabase(database);
+ }
+
+ private OpenSearchLiquibaseDatabase getDatabase() {
+ return (OpenSearchLiquibaseDatabase)this.database;
+ }
+
+ @Override
+ public String getName() {
+ return EXECUTOR_NAME;
+ }
+
+ @Override
+ public int getPriority() {
+ return PRIORITY_SPECIALIZED;
+ }
+
+ @Override
+ public boolean supports(final Database database) {
+ return OpenSearchLiquibaseDatabase.PRODUCT_NAME.equals(database.getDatabaseProductName());
+ }
+
+ @Override
+ public T queryForObject(final SqlStatement sql, final Class requiredType) throws DatabaseException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public T queryForObject(final SqlStatement sql, final Class requiredType, final List sqlVisitors) throws DatabaseException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long queryForLong(final SqlStatement sql) throws DatabaseException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long queryForLong(final SqlStatement sql, final List sqlVisitors) throws DatabaseException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int queryForInt(final SqlStatement sql) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int queryForInt(final SqlStatement sql, final List sqlVisitors) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List