From b9d61c72e79cd10650f35c1032f577d7d535fa47 Mon Sep 17 00:00:00 2001 From: Ralph Ursprung Date: Mon, 6 May 2024 19:31:39 +0200 Subject: [PATCH 1/8] update `pom.xml` with more details (URL, license, etc.) note that the pom so far claimed that the license was the Liquibase EULA, but this repository - and the extension - is licensed as Apache 2.0. --- pom.xml | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/pom.xml b/pom.xml index ac4d5e0..bdba79e 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ org.liquibase liquibase-parent-pom - 0.4.5 + 0.4.5 org.liquibase.ext @@ -15,22 +15,22 @@ Liquibase Opensearch Extension jar - Describe your extension here. - https://docs.liquibase.com + Liquibase extension to manage changesets for OpenSearch. + https://github.com/liquibase/liquibase-opensearch - Liquibase EULA - https://www.liquibase.com/eula + https://www.apache.org/licenses/LICENSE-2.0 + Apache License, Version 2.0 - Your Name - youremail@example.com - Liquibase - https://www.liquibase.com/ + Ralph Ursprung + ralph.ursprung@avaloq.com + Avaloq Group AG + https://www.avaloq.com/ From c5ab605bf968822dc5b82b6790be60d8f7b70607 Mon Sep 17 00:00:00 2001 From: Ralph Ursprung Date: Mon, 13 May 2024 13:21:46 +0200 Subject: [PATCH 2/8] add `build/` to `.gitignore` this folder is generated by maven when running builds. --- .gitignore | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 0812805..f75e086 100644 --- a/.gitignore +++ b/.gitignore @@ -10,4 +10,5 @@ buildNumber.properties # https://github.com/takari/maven-wrapper#usage-without-binary-jar .mvn/wrapper/maven-wrapper.jar *.iml -.idea \ No newline at end of file +.idea +build/ From a6c8be35d61250556a4d6ea45feea40d4fddc90c Mon Sep 17 00:00:00 2001 From: Ralph Ursprung Date: Mon, 6 May 2024 19:37:26 +0200 Subject: [PATCH 3/8] remove leftover example code --- .../example/change/ClearPasswordsChange.java | 57 ------------- .../change/PrefixedCreateTableChange.java | 32 -------- .../HasPasswordColumnPrecondition.java | 80 ------------------- .../META-INF/services/liquibase.change.Change | 2 - .../liquibase.precondition.Precondition | 1 - .../change/PrefixedCreateTableChangeTest.java | 50 ------------ 6 files changed, 222 deletions(-) delete mode 100644 src/main/java/com/example/change/ClearPasswordsChange.java delete mode 100644 src/main/java/com/example/change/PrefixedCreateTableChange.java delete mode 100644 src/main/java/com/example/precondition/HasPasswordColumnPrecondition.java delete mode 100644 src/main/resources/META-INF/services/liquibase.change.Change delete mode 100644 src/main/resources/META-INF/services/liquibase.precondition.Precondition delete mode 100644 src/test/java/com/example/change/PrefixedCreateTableChangeTest.java diff --git a/src/main/java/com/example/change/ClearPasswordsChange.java b/src/main/java/com/example/change/ClearPasswordsChange.java deleted file mode 100644 index 2197c7e..0000000 --- a/src/main/java/com/example/change/ClearPasswordsChange.java +++ /dev/null @@ -1,57 +0,0 @@ -package com.example.change; - -import liquibase.change.AbstractChange; -import liquibase.change.ChangeMetaData; -import liquibase.change.DatabaseChange; -import liquibase.change.DatabaseChangeProperty; -import liquibase.change.core.UpdateDataChange; -import liquibase.database.Database; -import liquibase.statement.SqlStatement; -import liquibase.statement.core.UpdateStatement; - -@DatabaseChange(name = "clearPasswords", description = "Clears all data in a 'password' column", priority = ChangeMetaData.PRIORITY_DEFAULT) -public class ClearPasswordsChange extends AbstractChange { - - private String tableName; - private String schemaName; - private String catalogName; - - @DatabaseChangeProperty - public String getTableName() { - return tableName; - } - - public void setTableName(String tableName) { - this.tableName = tableName; - } - - @DatabaseChangeProperty - public String getSchemaName() { - return schemaName; - } - - public void setSchemaName(String schemaName) { - this.schemaName = schemaName; - } - - @DatabaseChangeProperty - public String getCatalogName() { - return catalogName; - } - - public void setCatalogName(String catalogName) { - this.catalogName = catalogName; - } - - @Override - public String getConfirmationMessage() { - return "Passwords cleared"; - } - - @Override - public SqlStatement[] generateStatements(Database database) { - return new SqlStatement[] { - new UpdateStatement(getCatalogName(), getSchemaName(), getTableName()).addNewColumnValue("password", null) - }; - } -} diff --git a/src/main/java/com/example/change/PrefixedCreateTableChange.java b/src/main/java/com/example/change/PrefixedCreateTableChange.java deleted file mode 100644 index d5e7a25..0000000 --- a/src/main/java/com/example/change/PrefixedCreateTableChange.java +++ /dev/null @@ -1,32 +0,0 @@ -package com.example.change; - -import liquibase.change.ChangeMetaData; -import liquibase.change.DatabaseChange; -import liquibase.change.DatabaseChangeProperty; -import liquibase.change.core.CreateTableChange; -import liquibase.statement.core.CreateTableStatement; - -@DatabaseChange(name = "createTable", description = "Create Table", priority = ChangeMetaData.PRIORITY_DATABASE + 50) -public class PrefixedCreateTableChange extends CreateTableChange { - - private String prefix; - - @DatabaseChangeProperty - public String getPrefix() { - return prefix; - } - - public void setPrefix(String prefix) { - this.prefix = prefix; - } - - @Override - protected CreateTableStatement generateCreateTableStatement() { - String prefix = getPrefix(); - if (prefix == null) { - prefix = "standard"; - } - - return new CreateTableStatement(getCatalogName(), getSchemaName(), prefix + "_" + getTableName(), getRemarks()); - } -} diff --git a/src/main/java/com/example/precondition/HasPasswordColumnPrecondition.java b/src/main/java/com/example/precondition/HasPasswordColumnPrecondition.java deleted file mode 100644 index 36f3460..0000000 --- a/src/main/java/com/example/precondition/HasPasswordColumnPrecondition.java +++ /dev/null @@ -1,80 +0,0 @@ -package com.example.precondition; - -import liquibase.changelog.ChangeSet; -import liquibase.changelog.DatabaseChangeLog; -import liquibase.changelog.visitor.ChangeExecListener; -import liquibase.database.Database; -import liquibase.exception.*; -import liquibase.precondition.AbstractPrecondition; -import liquibase.snapshot.SnapshotGeneratorFactory; -import liquibase.structure.core.Column; -import liquibase.structure.core.Schema; -import liquibase.structure.core.Table; - -public class HasPasswordColumnPrecondition extends AbstractPrecondition { - - private String catalogName; - private String schemaName; - private String tableName; - - public String getCatalogName() { - return catalogName; - } - - public void setCatalogName(String catalogName) { - this.catalogName = catalogName; - } - - public String getSchemaName() { - return schemaName; - } - - public void setSchemaName(String schemaName) { - this.schemaName = schemaName; - } - - public String getTableName() { - return tableName; - } - - public void setTableName(String tableName) { - this.tableName = tableName; - } - - @Override - public String getName() { - return "hasPasswordColumn"; - } - - @Override - public Warnings warn(Database database) { - return new Warnings(); - } - - @Override - public ValidationErrors validate(Database database) { - return new ValidationErrors() - .checkRequiredField("tableName", getTableName()); - } - - @Override - public void check(Database database, DatabaseChangeLog changeLog, ChangeSet changeSet, ChangeExecListener changeExecListener) throws PreconditionFailedException, PreconditionErrorException { - Column example = new Column(); - example.setRelation(new Table().setName(database.correctObjectName(getTableName(), Table.class)).setSchema(new Schema(getCatalogName(), getSchemaName()))); - example.setName(database.correctObjectName("password", Column.class)); - - try { - if (!SnapshotGeneratorFactory.getInstance().has(example, database)) { - throw new PreconditionFailedException("Table " + getTableName() + " does not have a password column", changeLog, this); - } - } catch (LiquibaseException e) { - throw new PreconditionErrorException(e, changeLog, this); - } - - } - - @Override - public String getSerializedObjectNamespace() { - return GENERIC_CHANGELOG_EXTENSION_NAMESPACE; - } -} diff --git a/src/main/resources/META-INF/services/liquibase.change.Change b/src/main/resources/META-INF/services/liquibase.change.Change deleted file mode 100644 index 03b5455..0000000 --- a/src/main/resources/META-INF/services/liquibase.change.Change +++ /dev/null @@ -1,2 +0,0 @@ -com.example.change.ClearPasswordsChange -com.example.change.PrefixedCreateTableChange diff --git a/src/main/resources/META-INF/services/liquibase.precondition.Precondition b/src/main/resources/META-INF/services/liquibase.precondition.Precondition deleted file mode 100644 index 5cd8490..0000000 --- a/src/main/resources/META-INF/services/liquibase.precondition.Precondition +++ /dev/null @@ -1 +0,0 @@ -com.example.precondition.HasPasswordColumnPrecondition diff --git a/src/test/java/com/example/change/PrefixedCreateTableChangeTest.java b/src/test/java/com/example/change/PrefixedCreateTableChangeTest.java deleted file mode 100644 index 2c969d4..0000000 --- a/src/test/java/com/example/change/PrefixedCreateTableChangeTest.java +++ /dev/null @@ -1,50 +0,0 @@ -package com.example.change; - -import liquibase.statement.core.CreateTableStatement; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.*; - -class PrefixedCreateTableChangeTest { - - private PrefixedCreateTableChange prefixedCreateTableChange; - - @BeforeEach - void setUp() { - prefixedCreateTableChange = new PrefixedCreateTableChange(); - } - - @Test - void getPrefix_returnsNull_whenNotSet() { - assertNull(prefixedCreateTableChange.getPrefix()); - } - - @Test - void getPrefix_returnsPrefix_whenSet() { - String expectedPrefix = "testPrefix"; - prefixedCreateTableChange.setPrefix(expectedPrefix); - - assertEquals(expectedPrefix, prefixedCreateTableChange.getPrefix()); - } - - @Test - void generateCreateTableStatement_usesStandardPrefix_whenPrefixNotSet() { - prefixedCreateTableChange.setTableName("testTable"); - - CreateTableStatement statement = prefixedCreateTableChange.generateCreateTableStatement(); - - assertTrue(statement.getTableName().startsWith("standard_")); - } - - @Test - void generateCreateTableStatement_usesSetPrefix_whenPrefixSet() { - String expectedPrefix = "testPrefix"; - prefixedCreateTableChange.setPrefix(expectedPrefix); - prefixedCreateTableChange.setTableName("testTable"); - - CreateTableStatement statement = prefixedCreateTableChange.generateCreateTableStatement(); - - assertTrue(statement.getTableName().startsWith(expectedPrefix + "_")); - } -} From 242be1a2bbd4bc2e6f93014d3c446fd705f12219 Mon Sep 17 00:00:00 2001 From: Ralph Ursprung Date: Mon, 6 May 2024 20:09:38 +0200 Subject: [PATCH 4/8] add initial `database` implementation this already allows connecting to OpenSearch using basic authentication. note: the `liquibase.nosql` package has been copied from [`liquibase-mongodb`] and adapted where needed (it is not 100% generic). no authorship is claimed for this content! [`liquibase-mongodb`]: https://github.com/liquibase/liquibase-mongodb --- pom.xml | 68 +++++ .../database/OpenSearchClientDriver.java | 55 ++++ .../database/OpenSearchConnection.java | 183 ++++++++++++ .../database/OpenSearchLiquibaseDatabase.java | 58 ++++ .../database/AbstractNoSqlConnection.java | 76 +++++ .../nosql/database/AbstractNoSqlDatabase.java | 263 ++++++++++++++++++ .../services/liquibase.database.Database | 1 + .../liquibase.database.DatabaseConnection | 1 + .../AbstractOpenSearchLiquibaseIT.java | 64 +++++ .../ext/opensearch/OpenSearchLiquibaseIT.java | 20 ++ 10 files changed, 789 insertions(+) create mode 100644 src/main/java/liquibase/ext/opensearch/database/OpenSearchClientDriver.java create mode 100644 src/main/java/liquibase/ext/opensearch/database/OpenSearchConnection.java create mode 100644 src/main/java/liquibase/ext/opensearch/database/OpenSearchLiquibaseDatabase.java create mode 100644 src/main/java/liquibase/nosql/database/AbstractNoSqlConnection.java create mode 100644 src/main/java/liquibase/nosql/database/AbstractNoSqlDatabase.java create mode 100644 src/main/resources/META-INF/services/liquibase.database.Database create mode 100644 src/main/resources/META-INF/services/liquibase.database.DatabaseConnection create mode 100644 src/test/java/liquibase/ext/opensearch/AbstractOpenSearchLiquibaseIT.java create mode 100644 src/test/java/liquibase/ext/opensearch/OpenSearchLiquibaseIT.java diff --git a/pom.xml b/pom.xml index bdba79e..b045395 100644 --- a/pom.xml +++ b/pom.xml @@ -61,12 +61,80 @@ liquibase-core ${liquibase.version} + + org.opensearch.client + opensearch-java + 2.14.0 + + + org.opensearch.client + opensearch-rest-client + + + + + org.projectlombok + lombok + 1.18.32 + provided + + + com.fasterxml.jackson.core + jackson-core + 2.17.2 + + + com.fasterxml.jackson.core + jackson-databind + 2.17.2 + + + org.apache.httpcomponents.core5 + httpcore5 + 5.2.5 + + + org.apache.httpcomponents.client5 + httpclient5 + 5.3.1 + + + + org.junit.jupiter + junit-jupiter + 5.10.3 + test + + + org.assertj + assertj-core + 3.26.3 + test + + + org.testcontainers + testcontainers + 1.20.1 + test + + + org.opensearch + opensearch-testcontainers + 2.1.0 + test + org.slf4j slf4j-api 2.0.16 test + + org.slf4j + slf4j-simple + 2.0.16 + test + diff --git a/src/main/java/liquibase/ext/opensearch/database/OpenSearchClientDriver.java b/src/main/java/liquibase/ext/opensearch/database/OpenSearchClientDriver.java new file mode 100644 index 0000000..a11c456 --- /dev/null +++ b/src/main/java/liquibase/ext/opensearch/database/OpenSearchClientDriver.java @@ -0,0 +1,55 @@ +package liquibase.ext.opensearch.database; + +import liquibase.Scope; +import liquibase.util.StringUtil; + +import java.sql.Connection; +import java.sql.Driver; +import java.sql.DriverPropertyInfo; +import java.sql.SQLException; +import java.util.Properties; +import java.util.logging.Logger; + +import static liquibase.ext.opensearch.database.OpenSearchLiquibaseDatabase.OPENSEARCH_PREFIX; + +public class OpenSearchClientDriver implements Driver { + @Override + public Connection connect(final String url, final Properties info) { + //Not applicable for non JDBC DBs + throw new UnsupportedOperationException("Cannot initiate a SQL Connection for a NoSql DB"); + } + + public static boolean isOpenSearchURL(final String url) { + return StringUtil.trimToEmpty(url).startsWith(OPENSEARCH_PREFIX); + } + + @Override + public boolean acceptsURL(final String url) { + return isOpenSearchURL(url); + } + + @Override + public DriverPropertyInfo[] getPropertyInfo(final String url, final Properties info) throws SQLException { + return new DriverPropertyInfo[0]; + } + + @Override + public int getMajorVersion() { + return 0; + } + + @Override + public int getMinorVersion() { + return 0; + } + + @Override + public boolean jdbcCompliant() { + return false; + } + + @Override + public Logger getParentLogger() { + return (Logger) Scope.getCurrentScope().getLog(getClass()); + } +} diff --git a/src/main/java/liquibase/ext/opensearch/database/OpenSearchConnection.java b/src/main/java/liquibase/ext/opensearch/database/OpenSearchConnection.java new file mode 100644 index 0000000..12d35da --- /dev/null +++ b/src/main/java/liquibase/ext/opensearch/database/OpenSearchConnection.java @@ -0,0 +1,183 @@ +package liquibase.ext.opensearch.database; + +import liquibase.exception.DatabaseException; +import liquibase.nosql.database.AbstractNoSqlConnection; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +import org.apache.hc.client5.http.auth.AuthScope; +import org.apache.hc.client5.http.auth.UsernamePasswordCredentials; +import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder; +import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder; +import org.apache.hc.client5.http.ssl.NoopHostnameVerifier; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.nio.ssl.TlsStrategy; +import org.apache.hc.core5.reactor.ssl.TlsDetails; +import org.apache.hc.core5.ssl.SSLContextBuilder; +import org.opensearch.client.json.jackson.JacksonJsonpMapper; +import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.opensearch._types.OpenSearchVersionInfo; +import org.opensearch.client.transport.httpclient5.ApacheHttpClient5TransportBuilder; + +import javax.net.ssl.SSLContext; +import java.io.IOException; +import java.net.URI; +import java.security.KeyManagementException; +import java.security.KeyStoreException; +import java.security.NoSuchAlgorithmException; +import java.sql.Driver; +import java.util.Optional; +import java.util.Properties; + +import static liquibase.ext.opensearch.database.OpenSearchLiquibaseDatabase.OPENSEARCH_PREFIX; + +@Getter +@Setter +@NoArgsConstructor +public class OpenSearchConnection extends AbstractNoSqlConnection { + + private OpenSearchClient openSearchClient; + private Optional openSearchVersion = Optional.empty(); + + private URI uri; + private Properties connectionProperties; + + @Override + public boolean supports(final String url) { + if (url == null) { + return false; + } + return url.toLowerCase().startsWith(OPENSEARCH_PREFIX); + } + + @Override + public void open(final String url, final Driver driverObject, final Properties driverProperties) throws DatabaseException { + String realUrl = url; + if (realUrl.toLowerCase().startsWith(OPENSEARCH_PREFIX)) { + realUrl = realUrl.substring(OPENSEARCH_PREFIX.length()); + } + + this.connectionProperties = driverProperties; + + try { + this.uri = new URI(realUrl); + this.connect(this.uri, driverProperties); + } catch (final Exception e) { + throw new DatabaseException("Could not open connection to database: " + realUrl); + } + } + + @Override + public void close() throws DatabaseException { + this.openSearchClient = null; + this.connectionProperties = null; + this.uri = null; + } + + @Override + public String getCatalog() throws DatabaseException { + return null; // OpenSearch doesn't have catalogs (called schemas in various RDBMS) + } + + @Override + public String getDatabaseProductName() throws DatabaseException { + return OpenSearchLiquibaseDatabase.PRODUCT_NAME; + } + + @Override + public String getURL() { + return this.uri.toString(); + } + + @Override + public String getConnectionUserName() { + return this.connectionProperties.getProperty("username"); + } + + @Override + public boolean isClosed() throws DatabaseException { + return this.openSearchClient == null; + } + + private void connect(final URI uri, final Properties info) throws DatabaseException { + final HttpHost host = HttpHost.create(uri); + + final var transport = ApacheHttpClient5TransportBuilder + .builder(host) + .setHttpClientConfigCallback(httpClientBuilder -> { + // TODO: support other credential providers + final var username = Optional.ofNullable(info.getProperty("user")); + final var password = Optional.ofNullable(info.getProperty("password")); + + if (username.isPresent()) { + final BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider(); + credentialsProvider.setCredentials(new AuthScope(host), + new UsernamePasswordCredentials(username.get(), password.orElse("").toCharArray())); + + httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); + } else if (password.isPresent()) { + throw new RuntimeException("password provided but username not set!"); + } + + final SSLContext sslcontext; + try { + sslcontext = SSLContextBuilder + .create() + .loadTrustMaterial(null, (chains, authType) -> true) + .build(); + } catch (final NoSuchAlgorithmException | KeyManagementException | KeyStoreException e) { + throw new RuntimeException(e); + } + + final TlsStrategy tlsStrategy = ClientTlsStrategyBuilder.create() + .setSslContext(sslcontext) + // disable the certificate since our testing cluster just uses the default security configuration + .setHostnameVerifier(NoopHostnameVerifier.INSTANCE) + // See https://issues.apache.org/jira/browse/HTTPCLIENT-2219 + .setTlsDetailsFactory(sslEngine -> new TlsDetails(sslEngine.getSession(), sslEngine.getApplicationProtocol())) + .build(); + + final PoolingAsyncClientConnectionManager connectionManager = PoolingAsyncClientConnectionManagerBuilder.create() + .setTlsStrategy(tlsStrategy) + .build(); + + return httpClientBuilder + .setConnectionManager(connectionManager); + }) + .setMapper(new JacksonJsonpMapper()) + .build(); + + this.openSearchClient = new OpenSearchClient(transport); + } + + @Override + public String getDatabaseProductVersion() throws DatabaseException { + return this.getOpenSearchVersion().number(); + } + + @Override + public int getDatabaseMajorVersion() throws DatabaseException { + final var version = this.getDatabaseProductVersion(); + return Integer.parseInt(version.split("\\.")[0]); + } + + @Override + public int getDatabaseMinorVersion() throws DatabaseException { + final var version = this.getDatabaseProductVersion(); + return Integer.parseInt(version.split("\\.")[1]); + } + + private OpenSearchVersionInfo getOpenSearchVersion() throws DatabaseException { + if (this.openSearchVersion.isEmpty()) { + try { + this.openSearchVersion = Optional.of(this.openSearchClient.info().version()); + } catch (IOException e) { + throw new DatabaseException(e); + } + } + return this.openSearchVersion.get(); + } + +} diff --git a/src/main/java/liquibase/ext/opensearch/database/OpenSearchLiquibaseDatabase.java b/src/main/java/liquibase/ext/opensearch/database/OpenSearchLiquibaseDatabase.java new file mode 100644 index 0000000..23a40fc --- /dev/null +++ b/src/main/java/liquibase/ext/opensearch/database/OpenSearchLiquibaseDatabase.java @@ -0,0 +1,58 @@ +package liquibase.ext.opensearch.database; + +import liquibase.CatalogAndSchema; +import liquibase.exception.LiquibaseException; +import liquibase.nosql.database.AbstractNoSqlDatabase; +import lombok.NoArgsConstructor; + +@NoArgsConstructor +public class OpenSearchLiquibaseDatabase extends AbstractNoSqlDatabase { + public static final String PRODUCT_NAME = "OpenSearch"; + public static final String PRODUCT_SHORT_NAME = "opensearch"; + public static final String OPENSEARCH_PREFIX = PRODUCT_SHORT_NAME + ":"; + + @Override + public void dropDatabaseObjects(final CatalogAndSchema schemaToDrop) throws LiquibaseException { + throw new UnsupportedOperationException(); + } + + @Override + public String getDefaultDriver(final String url) { + if (OpenSearchClientDriver.isOpenSearchURL(url)) { + return OpenSearchClientDriver.class.getName(); + } + return null; + } + + @Override + public String getDatabaseProductName() { + return PRODUCT_NAME; + } + + @Override + public String getShortName() { + return PRODUCT_SHORT_NAME; + } + + @Override + public Integer getDefaultPort() { + return 9200; + } + + @Override + protected String getDefaultDatabaseProductName() { + return PRODUCT_NAME; + } + + @Override + public String getDatabaseChangeLogTableName() { + // OpenSearch only supports lowercase index names + return super.getDatabaseChangeLogTableName().toLowerCase(); + } + + @Override + public String getDatabaseChangeLogLockTableName() { + // OpenSearch only supports lowercase index names + return super.getDatabaseChangeLogLockTableName().toLowerCase(); + } +} diff --git a/src/main/java/liquibase/nosql/database/AbstractNoSqlConnection.java b/src/main/java/liquibase/nosql/database/AbstractNoSqlConnection.java new file mode 100644 index 0000000..6ea419e --- /dev/null +++ b/src/main/java/liquibase/nosql/database/AbstractNoSqlConnection.java @@ -0,0 +1,76 @@ +package liquibase.nosql.database; + +/*- + * #%L + * Liquibase NoSql Extension + * %% + * Copyright (C) 2020 Mastercard + * %% + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import liquibase.database.Database; +import liquibase.database.DatabaseConnection; +import liquibase.exception.DatabaseException; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@Getter +@Setter +@NoArgsConstructor() +public abstract class AbstractNoSqlConnection implements DatabaseConnection { + + @Override + public abstract boolean supports(String url); + + @Override + public int getPriority() { + return PRIORITY_DEFAULT + 500; + } + + @Override + public boolean getAutoCommit() throws DatabaseException { + // TODO: this is not applicable (OpenSearch doesn't support transactions) but this gets called from + // `#setConnection`, thus we can't just throw an exception. + return false; + } + + @Override + public void setAutoCommit(boolean autoCommit) throws DatabaseException { + // TODO: this is not applicable (OpenSearch doesn't support transactions) but this gets called from + // `#setConnection`, thus we can't just throw an exception. + } + + @Override + public String nativeSQL(String sql) { + return null; + } + + @Override + public void attached(final Database database) { + // Do nothing + } + + @Override + public void commit() throws DatabaseException { + // Do nothing + } + + @Override + public void rollback() throws DatabaseException { + // Do nothing + } + +} diff --git a/src/main/java/liquibase/nosql/database/AbstractNoSqlDatabase.java b/src/main/java/liquibase/nosql/database/AbstractNoSqlDatabase.java new file mode 100644 index 0000000..932d447 --- /dev/null +++ b/src/main/java/liquibase/nosql/database/AbstractNoSqlDatabase.java @@ -0,0 +1,263 @@ +package liquibase.nosql.database; + +/*- + * #%L + * Liquibase NoSql Extension + * %% + * Copyright (C) 2020 Mastercard + * %% + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import liquibase.CatalogAndSchema; +import liquibase.database.AbstractJdbcDatabase; +import liquibase.database.Database; +import liquibase.database.DatabaseConnection; +import liquibase.exception.DatabaseException; +import liquibase.exception.LiquibaseException; +import liquibase.exception.ValidationErrors; +import liquibase.statement.DatabaseFunction; +import liquibase.structure.DatabaseObject; +import lombok.NoArgsConstructor; + +import java.math.BigInteger; +import java.util.Collections; +import java.util.List; + +import static java.util.Optional.ofNullable; + +/** + * {@link AbstractNoSqlDatabase} is extended by all supported NoSql databases as a facade to the underlying database. + * The physical connection can be retrieved from the {@link AbstractNoSqlDatabase} implementation, as well as any + * database-specific characteristics. + */ +@NoArgsConstructor +public abstract class AbstractNoSqlDatabase extends AbstractJdbcDatabase implements Database { + + @Override + public int getPriority() { + return PRIORITY_DATABASE; + } + + @Override + public boolean supportsInitiallyDeferrableColumns() { + return false; + } + + @Override + public boolean supportsSequences() { + return false; + } + + @Override + public boolean supportsDropTableCascadeConstraints() { + return false; + } + + @Override + public boolean supportsAutoIncrement() { + return false; + } + + @Override + public String getLineComment() { + throw new UnsupportedOperationException(); + } + + @Override + public String getAutoIncrementClause(final BigInteger startWith, final BigInteger incrementBy, final String generationType, final Boolean defaultOnNull) { + return null; + } + + @Override + public boolean isSystemObject(final DatabaseObject example) { + return false; + } + + @Override + public boolean isLiquibaseObject(final DatabaseObject object) { + return false; + } + + @Override + public String getViewDefinition(final CatalogAndSchema schema, final String name) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean supportsTablespaces() { + return false; + } + + @Override + public boolean supportsCatalogs() { + return false; + } + + @Override + public CatalogAndSchema.CatalogAndSchemaCase getSchemaAndCatalogCase() { + return CatalogAndSchema.CatalogAndSchemaCase.ORIGINAL_CASE; + } + + @Override + public boolean supportsSchemas() { + return false; + } + + @Override + public boolean supportsCatalogInObjectName(final Class type) { + return false; + } + + @Override + public String generatePrimaryKeyName(final String tableName) { + return null; + } + + @Override + public abstract void dropDatabaseObjects(final CatalogAndSchema schemaToDrop) throws LiquibaseException; + + @Override + public boolean supportsRestrictForeignKeys() { + return false; + } + + @Override + public List getDateFunctions() { + // irrelevant (will never be called as this is not SQL being processed) + throw new UnsupportedOperationException(); + } + + @Override + public boolean supportsForeignKeyDisable() { + return false; + } + + @Override + public boolean disableForeignKeyChecks() { + throw new UnsupportedOperationException(); + } + + @Override + public void enableForeignKeyChecks() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isCaseSensitive() { + return true; + } + + @Override + public boolean isReservedWord(final String string) { + return false; + } + + @Override + public boolean isFunction(String string) { + return false; + } + + @Override + public int getDataTypeMaxParameters(String dataTypeName) { + return 0; + } + + @Override + public boolean dataTypeIsNotModifiable(String typeName) { + return false; + } + + @Override + public String generateDatabaseFunctionValue(DatabaseFunction databaseFunction) { + return null; + } + + @Override + public boolean createsIndexesForForeignKeys() { + //Not applicable + return false; + } + + @Override + public boolean supportsPrimaryKeyNames() { + //Not applicable + return false; + } + + @Override + public boolean supportsNotNullConstraintNames() { + //Not applicable + return false; + } + + @Override + public boolean supportsBatchUpdates() { + return false; + } + + @Override + public boolean requiresExplicitNullForColumns() { + //Not applicable + return false; + } + + @Override + public String getSystemSchema() { + return null; + } + + @Override + public ValidationErrors validate() { + return null; + } + + @Override + public abstract String getDefaultDriver(final String url); + + @Override + public boolean requiresUsername() { + return false; + } + + @Override + public boolean requiresPassword() { + return false; + } + + @Override + public boolean getAutoCommitMode() { + return false; + } + + @Override + public boolean supportsDDLInTransaction() { + return false; + } + + @Override + public abstract String getDatabaseProductName(); + + @Override + public boolean isCorrectDatabaseImplementation(final DatabaseConnection conn) throws DatabaseException { + return getDatabaseProductName().equals(conn.getDatabaseProductName()); + } + + @Override + public String toString() { + return getDatabaseProductName() + " : " + + ofNullable(getConnection()).map(DatabaseConnection::getURL).orElse("NOT CONNECTED"); + } + +} diff --git a/src/main/resources/META-INF/services/liquibase.database.Database b/src/main/resources/META-INF/services/liquibase.database.Database new file mode 100644 index 0000000..f1f9688 --- /dev/null +++ b/src/main/resources/META-INF/services/liquibase.database.Database @@ -0,0 +1 @@ +liquibase.ext.opensearch.database.OpenSearchLiquibaseDatabase diff --git a/src/main/resources/META-INF/services/liquibase.database.DatabaseConnection b/src/main/resources/META-INF/services/liquibase.database.DatabaseConnection new file mode 100644 index 0000000..432dde1 --- /dev/null +++ b/src/main/resources/META-INF/services/liquibase.database.DatabaseConnection @@ -0,0 +1 @@ +liquibase.ext.opensearch.database.OpenSearchConnection diff --git a/src/test/java/liquibase/ext/opensearch/AbstractOpenSearchLiquibaseIT.java b/src/test/java/liquibase/ext/opensearch/AbstractOpenSearchLiquibaseIT.java new file mode 100644 index 0000000..4edfc89 --- /dev/null +++ b/src/test/java/liquibase/ext/opensearch/AbstractOpenSearchLiquibaseIT.java @@ -0,0 +1,64 @@ +package liquibase.ext.opensearch; + +import liquibase.command.CommandScope; +import liquibase.command.core.UpdateCommandStep; +import liquibase.command.core.helpers.DbUrlConnectionCommandStep; +import liquibase.database.DatabaseFactory; +import liquibase.ext.opensearch.database.OpenSearchConnection; +import liquibase.ext.opensearch.database.OpenSearchLiquibaseDatabase; +import lombok.SneakyThrows; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestInstance; +import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.opensearch.indices.ExistsRequest; +import org.opensearch.testcontainers.OpensearchContainer; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.utility.DockerImageName; + +import java.time.Duration; + +@TestInstance(TestInstance.Lifecycle.PER_METHOD) +public abstract class AbstractOpenSearchLiquibaseIT { + final static OpensearchContainer container; + + protected OpenSearchLiquibaseDatabase database; + private OpenSearchConnection connection; + + static { + container = new OpensearchContainer<>(DockerImageName + .parse("opensearchproject/opensearch:2.16.0") + ) + .waitingFor(Wait.forHttp("/").forPort(9200)) + .withStartupTimeout(Duration.ofSeconds(120)); + container.start(); + } + + @SneakyThrows + @BeforeEach + protected void beforeEach() { + final String url = "opensearch:" + container.getHttpHostAddress(); + final String username = container.getUsername(); + final String password = container.getPassword(); + database = (OpenSearchLiquibaseDatabase) DatabaseFactory.getInstance().openDatabase(url, username, password, null, null); + connection = (OpenSearchConnection) this.database.getConnection(); + } + + protected OpenSearchClient getOpenSearchClient() { + return this.connection.getOpenSearchClient(); + } + + protected void doLiquibaseUpdate(final String changeLogFile) throws Exception { + new CommandScope(UpdateCommandStep.COMMAND_NAME) + .addArgumentValue(DbUrlConnectionCommandStep.DATABASE_ARG, this.database) + .addArgumentValue(UpdateCommandStep.CHANGELOG_FILE_ARG, changeLogFile) + .execute(); + } + + protected boolean indexExists(final String indexName) throws Exception { + final var request = new ExistsRequest.Builder() + .index(indexName) + .build(); + return this.getOpenSearchClient().indices().exists(request).value(); + } + +} diff --git a/src/test/java/liquibase/ext/opensearch/OpenSearchLiquibaseIT.java b/src/test/java/liquibase/ext/opensearch/OpenSearchLiquibaseIT.java new file mode 100644 index 0000000..3f4b8e0 --- /dev/null +++ b/src/test/java/liquibase/ext/opensearch/OpenSearchLiquibaseIT.java @@ -0,0 +1,20 @@ +package liquibase.ext.opensearch; + +import lombok.SneakyThrows; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +public class OpenSearchLiquibaseIT extends AbstractOpenSearchLiquibaseIT { + + /** + * Self-test of the test - if this fails something is wrong with the test environment. + */ + @SneakyThrows + @Test + public void openSearchIsRunning() { + assertThat(this.getOpenSearchClient().info().clusterName()).isEqualTo("docker-cluster"); + assertThat(this.database.getDatabaseMajorVersion()).isEqualTo(2); + } + +} From bca7bb8a2a95b3499d8b6999cec5a55525dfc4b6 Mon Sep 17 00:00:00 2001 From: Ralph Ursprung Date: Mon, 6 May 2024 20:41:12 +0200 Subject: [PATCH 5/8] add initial support for `changelog` & `changeloglock` this also comes with various supporting infrastructure, e.g. the executor. this does not yet allow executing any changesets, but it allows liquibase to run with an empty changeset which will create the `changelog` and also manage the lock (which ensures that only one liquibase operation is running at a time). note: the `liquibase.nosql` package has been copied from [`liquibase-mongodb`] and adapted where needed (it is not 100% generic). no authorship is claimed for this content! it's important that all document operations being performed on the changelog & lock indices are done with `refresh=wait_for` to ensure that a subsequent read will see the correct data (otherwise a changeset might be marked as run but a subsequent run will not see this and try to run it again). [`liquibase-mongodb`]: https://github.com/liquibase/liquibase-mongodb --- pom.xml | 5 + .../changelog/OpenSearchHistoryService.java | 227 +++++++++++++ .../executor/OpenSearchExecutor.java | 157 +++++++++ .../executor/OpenSearchGenerator.java | 49 +++ .../lockservice/OpenSearchLockService.java | 156 +++++++++ .../AbstractOpenSearchStatement.java | 28 ++ .../AbstractNoSqlHistoryService.java | 287 ++++++++++++++++ .../lockservice/AbstractNoSqlLockService.java | 306 ++++++++++++++++++ .../snapshot/NoSqlSnapshotGenerator.java | 41 +++ ...iquibase.changelog.ChangeLogHistoryService | 1 + .../services/liquibase.executor.Executor | 1 + .../liquibase.lockservice.LockService | 1 + .../liquibase.snapshot.SnapshotGenerator | 1 + .../liquibase.sqlgenerator.SqlGenerator | 1 + .../i18n/liquibase-opensearch.properties | 1 + .../ext/opensearch/OpenSearchLiquibaseIT.java | 8 + .../OpenSearchHistoryServiceTest.java | 79 +++++ src/test/resources/liquibase.properties | 1 + .../liquibase/ext/changelog.empty.yaml | 1 + 19 files changed, 1351 insertions(+) create mode 100644 src/main/java/liquibase/ext/opensearch/changelog/OpenSearchHistoryService.java create mode 100644 src/main/java/liquibase/ext/opensearch/executor/OpenSearchExecutor.java create mode 100644 src/main/java/liquibase/ext/opensearch/executor/OpenSearchGenerator.java create mode 100644 src/main/java/liquibase/ext/opensearch/lockservice/OpenSearchLockService.java create mode 100644 src/main/java/liquibase/ext/opensearch/statement/AbstractOpenSearchStatement.java create mode 100644 src/main/java/liquibase/nosql/changelog/AbstractNoSqlHistoryService.java create mode 100644 src/main/java/liquibase/nosql/lockservice/AbstractNoSqlLockService.java create mode 100644 src/main/java/liquibase/nosql/snapshot/NoSqlSnapshotGenerator.java create mode 100644 src/main/resources/META-INF/services/liquibase.changelog.ChangeLogHistoryService create mode 100644 src/main/resources/META-INF/services/liquibase.executor.Executor create mode 100644 src/main/resources/META-INF/services/liquibase.lockservice.LockService create mode 100644 src/main/resources/META-INF/services/liquibase.snapshot.SnapshotGenerator create mode 100644 src/main/resources/META-INF/services/liquibase.sqlgenerator.SqlGenerator create mode 100644 src/main/resources/liquibase/i18n/liquibase-opensearch.properties create mode 100644 src/test/java/liquibase/ext/opensearch/changelog/OpenSearchHistoryServiceTest.java create mode 100644 src/test/resources/liquibase.properties create mode 100644 src/test/resources/liquibase/ext/changelog.empty.yaml diff --git a/pom.xml b/pom.xml index b045395..aba9f51 100644 --- a/pom.xml +++ b/pom.xml @@ -138,6 +138,11 @@ + + + ${project.basedir}/src/main/resources + + org.sonarsource.scanner.maven diff --git a/src/main/java/liquibase/ext/opensearch/changelog/OpenSearchHistoryService.java b/src/main/java/liquibase/ext/opensearch/changelog/OpenSearchHistoryService.java new file mode 100644 index 0000000..c8b14a1 --- /dev/null +++ b/src/main/java/liquibase/ext/opensearch/changelog/OpenSearchHistoryService.java @@ -0,0 +1,227 @@ +package liquibase.ext.opensearch.changelog; + +import liquibase.ChecksumVersion; +import liquibase.Scope; +import liquibase.change.CheckSum; +import liquibase.changelog.ChangeSet; +import liquibase.changelog.RanChangeSet; +import liquibase.database.Database; +import liquibase.exception.DatabaseException; +import liquibase.ext.opensearch.database.OpenSearchConnection; +import liquibase.ext.opensearch.database.OpenSearchLiquibaseDatabase; +import liquibase.logging.Logger; +import liquibase.nosql.changelog.AbstractNoSqlHistoryService; +import lombok.AllArgsConstructor; +import lombok.Getter; +import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.opensearch._types.FieldValue; +import org.opensearch.client.opensearch._types.Refresh; +import org.opensearch.client.opensearch._types.mapping.*; +import org.opensearch.client.opensearch.core.SearchRequest; +import org.opensearch.client.opensearch.core.search.Hit; +import org.opensearch.client.opensearch.indices.PutMappingRequest; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +public class OpenSearchHistoryService extends AbstractNoSqlHistoryService { + + private final Logger log = Scope.getCurrentScope().getLog(getClass()); + + @Override + protected Logger getLogger() { + return log; + } + + private OpenSearchClient getOpenSearchClient() { + final var connection = (OpenSearchConnection) this.getNoSqlDatabase().getConnection(); + return connection.getOpenSearchClient(); + } + + @Override + protected boolean existsRepository() throws DatabaseException { + try { + return this.getOpenSearchClient().indices().exists(r -> r.index(this.getDatabaseChangeLogTableName())).value(); + } catch (final IOException e) { + throw new DatabaseException(e); + } + } + + @Override + protected void createRepository() throws DatabaseException { + // note: the mapping will be created in adjustRepository + + try { + this.getOpenSearchClient().indices().create(r -> r.index(this.getDatabaseChangeLogTableName())); + } catch (final IOException e) { + throw new DatabaseException(e); + } + } + + @Override + protected void adjustRepository() throws DatabaseException { + // properties must match RanChangeSet & CheckSum & ContextExpression (validated by matching tests) + final var request = new PutMappingRequest.Builder() + .index(this.getDatabaseChangeLogTableName()) + .properties("id", p -> p.keyword(k -> k)) + .properties("changeLog", p -> p.keyword(k -> k)) + .properties("storedChangeLog", p -> p.keyword(k -> k)) + .properties("author", p -> p.text(t -> t)) + .properties("lastCheckSum", p -> p.object(o -> { + o.properties("version", p2 -> p2.integer(i -> i)); + o.properties("storedCheckSum", p2 -> p2.keyword(k -> k)); + return o; + })) + .properties("dateExecuted", p -> p.date(d -> d)) + .properties("tag", p -> p.text(t -> t)) + .properties("execType", p -> p.keyword(k -> k)) + .properties("description", p -> p.text(t -> t)) + .properties("comments", p -> p.text(t -> t)) + .properties("orderExecuted", p -> p.integer(i -> i)) + .properties("contextExpression", p -> p.object(o -> { + o.properties("contexts", p2 -> p2.keyword(k -> k)); + o.properties("originalString", p2 -> p2.text(t -> t)); + return o; + })) + .properties("labels", p -> p.text(t -> t)) + .properties("deploymentId", p -> p.text(t -> t)) + .properties("liquibaseVersion", p -> p.text(t -> t)) + .build(); + + try { + this.getOpenSearchClient().indices().putMapping(request); + } catch (final IOException e) { + throw new DatabaseException(e); + } + } + + @Override + protected void dropRepository() throws DatabaseException { + try { + this.getOpenSearchClient().indices().delete(r -> r.index(this.getDatabaseChangeLogTableName())); + } catch (final IOException e) { + throw new DatabaseException(e); + } + } + + @Override + protected List queryRanChangeSets() throws DatabaseException { + try { + final var response = this.getOpenSearchClient() + .search(s -> s.index(this.getDatabaseChangeLogTableName()), RanChangeSet.class); + return response.hits().hits().stream() + .map(Hit::source) + .collect(Collectors.toList()); + } catch (final IOException e) { + throw new DatabaseException(e); + } + } + + @Override + protected int generateNextSequence() throws DatabaseException { + final var aggregationName = "max"; + final var request = new SearchRequest.Builder() + .index(this.getDatabaseChangeLogTableName()) + .aggregations(aggregationName, a -> a.max(m -> m.field("orderExecuted"))) + .build(); + try { + final var response = this.getOpenSearchClient().search(request, RanChangeSet.class); + return (int) response.aggregations().get(aggregationName).max().value(); + } catch (final IOException e) { + throw new DatabaseException(e); + } + } + + @Override + protected void markChangeSetRun(final ChangeSet changeSet, final ChangeSet.ExecType execType, final Integer nextSequenceValue) throws DatabaseException { + final var ranChangeSet = new RanChangeSet(changeSet, execType, null, null); + + try { + this.getOpenSearchClient() + .index(r -> r.index(this.getDatabaseChangeLogTableName()) + .id(ranChangeSet.getId()) + .document(ranChangeSet) + .refresh(Refresh.WaitFor)); + } catch (final IOException e) { + throw new DatabaseException(e); + } + } + + @Override + protected void removeRanChangeSet(final ChangeSet changeSet) throws DatabaseException { + try { + this.getOpenSearchClient() + .delete(r -> r.index(this.getDatabaseChangeLogTableName()) + .id(String.valueOf(changeSet.getId())) + .refresh(Refresh.WaitFor)); + } catch (final IOException e) { + throw new DatabaseException(e); + } + } + + @Override + public void clearAllCheckSums() throws DatabaseException { + throw new UnsupportedOperationException(); + } + + @Override + protected long countTags(final String tag) throws DatabaseException { + final var request = new SearchRequest.Builder() + .index(this.getDatabaseChangeLogTableName()) + .query(q -> q.match(m -> m.field("tag").query(FieldValue.of(tag)))) + .build(); + try { + final var response = this.getOpenSearchClient().search(request, RanChangeSet.class); + return response.hits().total().value(); + } catch (final IOException e) { + throw new DatabaseException(e); + } + } + + @Override + protected void tagLast(final String tagString) throws DatabaseException { + // TODO + } + + @Override + protected long countRanChangeSets() throws DatabaseException { + return this.queryRanChangeSets().size(); + } + + @Override + protected void updateCheckSum(final ChangeSet changeSet) throws DatabaseException { + @AllArgsConstructor + @Getter + class CheckSumObj { + final CheckSum lastCheckSum; + } + final var currentChecksumVersion = Optional.ofNullable(changeSet.getStoredCheckSum()) + .map(cs -> ChecksumVersion.enumFromChecksumVersion(cs.getVersion())) + .orElse(ChecksumVersion.latest()); + final var checkSum = changeSet.generateCheckSum(currentChecksumVersion); + + try { + this.getOpenSearchClient() + .update(r -> r.index(this.getDatabaseChangeLogTableName()) + .id(changeSet.getId()) + .doc(new CheckSumObj(checkSum)) + .refresh(Refresh.WaitFor) + , RanChangeSet.class); + } catch (final IOException e) { + throw new DatabaseException(e); + } + } + + @Override + public boolean supports(final Database database) { + return OpenSearchLiquibaseDatabase.PRODUCT_NAME.equals(database.getDatabaseProductName()); + } + + @Override + public boolean isDatabaseChecksumsCompatible() { + return true; + } + +} diff --git a/src/main/java/liquibase/ext/opensearch/executor/OpenSearchExecutor.java b/src/main/java/liquibase/ext/opensearch/executor/OpenSearchExecutor.java new file mode 100644 index 0000000..7a76fe6 --- /dev/null +++ b/src/main/java/liquibase/ext/opensearch/executor/OpenSearchExecutor.java @@ -0,0 +1,157 @@ +package liquibase.ext.opensearch.executor; + +/*- + * #%L + * Liquibase CosmosDB Extension + * %% + * Copyright (C) 2020 Mastercard + * %% + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import liquibase.Scope; +import liquibase.database.Database; +import liquibase.exception.DatabaseException; +import liquibase.executor.AbstractExecutor; +import liquibase.ext.opensearch.database.OpenSearchLiquibaseDatabase; +import liquibase.logging.Logger; +import liquibase.servicelocator.LiquibaseService; +import liquibase.sql.visitor.SqlVisitor; +import liquibase.statement.SqlStatement; +import lombok.NoArgsConstructor; + +import java.util.List; +import java.util.Map; + +import static java.util.Collections.emptyList; + +@LiquibaseService +@NoArgsConstructor +public class OpenSearchExecutor extends AbstractExecutor { + + public static final String EXECUTOR_NAME = "jdbc"; // needed because of AbstractJdbcDatabase#execute + private final Logger log = Scope.getCurrentScope().getLog(getClass()); + + @Override + public void setDatabase(final Database database) { + super.setDatabase(database); + } + + private OpenSearchLiquibaseDatabase getDatabase() { + return (OpenSearchLiquibaseDatabase)this.database; + } + + @Override + public String getName() { + return EXECUTOR_NAME; + } + + @Override + public int getPriority() { + return PRIORITY_SPECIALIZED; + } + + @Override + public boolean supports(final Database database) { + return OpenSearchLiquibaseDatabase.PRODUCT_NAME.equals(database.getDatabaseProductName()); + } + + @Override + public T queryForObject(final SqlStatement sql, final Class requiredType) throws DatabaseException { + throw new UnsupportedOperationException(); + } + + @Override + public T queryForObject(final SqlStatement sql, final Class requiredType, final List sqlVisitors) throws DatabaseException { + throw new UnsupportedOperationException(); + } + + @Override + public long queryForLong(final SqlStatement sql) throws DatabaseException { + throw new UnsupportedOperationException(); + } + + @Override + public long queryForLong(final SqlStatement sql, final List sqlVisitors) throws DatabaseException { + throw new UnsupportedOperationException(); + } + + @Override + public int queryForInt(final SqlStatement sql) { + throw new UnsupportedOperationException(); + } + + @Override + public int queryForInt(final SqlStatement sql, final List sqlVisitors) { + throw new UnsupportedOperationException(); + } + + @Override + public List queryForList(final SqlStatement sql, final Class elementType) throws DatabaseException { + throw new UnsupportedOperationException(); + } + + @Override + @SuppressWarnings("unchecked") + public List queryForList(final SqlStatement sql, final Class elementType, final List sqlVisitors) throws DatabaseException { + throw new UnsupportedOperationException(); + } + + @Override + public List> queryForList(final SqlStatement sql) { + throw new UnsupportedOperationException(); + } + + @Override + public List> queryForList(final SqlStatement sql, final List sqlVisitors) { + throw new UnsupportedOperationException(); + } + + @Override + public void execute(final SqlStatement sql) throws DatabaseException { + this.execute(sql, emptyList()); + } + + @Override + public void execute(final SqlStatement sql, final List sqlVisitors) throws DatabaseException { + throw new DatabaseException("liquibase-opensearch extension cannot execute changeset \n" + + "Unknown type: " + sql.getClass().getName() + + "\nPlease check the following common causes:\n" + + "- Verify change set definitions for common error such as: changeType name, changeSet attributes spelling " + + "(such as runWith, context, etc.), and punctuation.\n" + + "- Verify that changesets have all the required changeset attributes and do not have invalid attributes for the designated change type.\n" + + "- Double-check to make sure your basic setup includes all needed extensions in your Java classpath"); + } + + @Override + public int update(final SqlStatement sql) throws DatabaseException { + return update(sql, emptyList()); + } + + @Override + public int update(final SqlStatement sql, final List sqlVisitors) throws DatabaseException { + throw new IllegalArgumentException(); + } + + @Override + public void comment(final String message) { + log.info(message); + } + + @Override + public boolean updatesDatabase() { + return true; + } + +} diff --git a/src/main/java/liquibase/ext/opensearch/executor/OpenSearchGenerator.java b/src/main/java/liquibase/ext/opensearch/executor/OpenSearchGenerator.java new file mode 100644 index 0000000..053e3d6 --- /dev/null +++ b/src/main/java/liquibase/ext/opensearch/executor/OpenSearchGenerator.java @@ -0,0 +1,49 @@ +package liquibase.ext.opensearch.executor; + +/*- + * #%L + * Liquibase NoSql Extension + * %% + * Copyright (C) 2020 Mastercard + * %% + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import liquibase.database.Database; +import liquibase.exception.ValidationErrors; +import liquibase.ext.opensearch.database.OpenSearchLiquibaseDatabase; +import liquibase.ext.opensearch.statement.AbstractOpenSearchStatement; +import liquibase.sql.Sql; +import liquibase.sqlgenerator.SqlGeneratorChain; +import liquibase.sqlgenerator.core.AbstractSqlGenerator; + +public class OpenSearchGenerator extends AbstractSqlGenerator { + + @Override + public boolean supports(final AbstractOpenSearchStatement statement, final Database database) { + return OpenSearchLiquibaseDatabase.PRODUCT_NAME.equals(database.getDatabaseProductName()); + } + + @Override + public ValidationErrors validate(final AbstractOpenSearchStatement statement, final Database database, + final SqlGeneratorChain sqlGeneratorChain) { + return new ValidationErrors(); + } + + @Override + public Sql[] generateSql(final AbstractOpenSearchStatement statement, final Database database, final SqlGeneratorChain sqlGeneratorChain) { + return new Sql[0]; + } + +} diff --git a/src/main/java/liquibase/ext/opensearch/lockservice/OpenSearchLockService.java b/src/main/java/liquibase/ext/opensearch/lockservice/OpenSearchLockService.java new file mode 100644 index 0000000..acc01d5 --- /dev/null +++ b/src/main/java/liquibase/ext/opensearch/lockservice/OpenSearchLockService.java @@ -0,0 +1,156 @@ +package liquibase.ext.opensearch.lockservice; + +import liquibase.Scope; +import liquibase.database.Database; +import liquibase.exception.DatabaseException; +import liquibase.ext.opensearch.database.OpenSearchConnection; +import liquibase.ext.opensearch.database.OpenSearchLiquibaseDatabase; +import liquibase.lockservice.DatabaseChangeLogLock; +import liquibase.logging.Logger; +import liquibase.nosql.lockservice.AbstractNoSqlLockService; +import liquibase.util.NetUtil; +import org.apache.hc.core5.http.HttpStatus; +import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.opensearch._types.Refresh; +import org.opensearch.client.opensearch.core.search.Hit; +import org.opensearch.client.opensearch.indices.PutMappingRequest; +import org.opensearch.client.transport.httpclient5.ResponseException; + +import java.io.IOException; +import java.util.Date; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +public class OpenSearchLockService extends AbstractNoSqlLockService { + + /** + * Magic ID: there will only ever be 0 or 1 entries in the lock index (we use the `create` API to ensure that it fails if the ID already exists) + */ + private static final int LOCK_ENTRY_ID = 1; + + private final Logger log = Scope.getCurrentScope().getLog(getClass()); + + private OpenSearchClient getOpenSearchClient() { + final var connection = (OpenSearchConnection) this.getDatabase().getConnection(); + return connection.getOpenSearchClient(); + } + + @Override + protected Logger getLogger() { + return this.log; + } + + @Override + protected boolean existsRepository() throws DatabaseException { + try { + return this.getOpenSearchClient().indices().exists(r -> r.index(this.getDatabaseChangeLogLockTableName())).value(); + } catch (final IOException e) { + throw new DatabaseException(e); + } + } + + @Override + protected void createRepository() throws DatabaseException { + // note: the mapping will be created in adjustRepository + + try { + this.getOpenSearchClient().indices().create(r -> r.index(this.getDatabaseChangeLogLockTableName())); + } catch (final IOException e) { + throw new DatabaseException(e); + } + } + + @Override + protected void adjustRepository() throws DatabaseException { + // properties must match DatabaseChangeLogLock + final var request = new PutMappingRequest.Builder() + .index(this.getDatabaseChangeLogLockTableName()) + .properties("id", p -> p.keyword(k -> k)) + .properties("lockGranted", p -> p.date(d -> d)) + .properties("lockedBy", p -> p.text(t -> t)) + .build(); + + try { + this.getOpenSearchClient().indices().putMapping(request); + } catch (final IOException e) { + throw new DatabaseException(e); + } + } + + @Override + protected void dropRepository() throws DatabaseException { + try { + this.getOpenSearchClient().indices().delete(r -> r.index(this.getDatabaseChangeLogLockTableName())); + } catch (final IOException e) { + throw new DatabaseException(e); + } + } + + @Override + protected boolean isLocked() throws DatabaseException { + return !this.queryLocks().isEmpty(); // ignore the fact that there should be exactly 0 or 1 entry here to be more conservative + } + + @Override + protected boolean createLock() throws DatabaseException { + final var lockEntry = new DatabaseChangeLogLock(LOCK_ENTRY_ID, new Date(), getLockedBy()); + try { + this.getOpenSearchClient() + .create(r -> r.index(this.getDatabaseChangeLogLockTableName()) + .id(String.valueOf(LOCK_ENTRY_ID)) + .document(lockEntry) + .refresh(Refresh.WaitFor)); + } catch (final ResponseException e) { + if (e.status() == HttpStatus.SC_CONFLICT) { + return false; + } + throw new DatabaseException(e); + } catch (final IOException e) { + throw new DatabaseException(e); + } + return true; + } + + @Override + protected void removeLock() throws DatabaseException { + try { + this.getOpenSearchClient() + .delete(r -> r.index(this.getDatabaseChangeLogLockTableName()) + .id(String.valueOf(LOCK_ENTRY_ID)) + .refresh(Refresh.WaitFor)); + } catch (final IOException e) { + throw new DatabaseException(e); + } + } + + @Override + protected List queryLocks() throws DatabaseException { + try { + final var response = this.getOpenSearchClient() + .search(s -> s.index(this.getDatabaseChangeLogLockTableName()), DatabaseChangeLogLock.class); + return response.hits().hits().stream() + .map(Hit::source) + .collect(Collectors.toList()); + } catch (final IOException e) { + throw new DatabaseException(e); + } + } + + @Override + public boolean supports(final Database database) { + return OpenSearchLiquibaseDatabase.PRODUCT_NAME.equals(database.getDatabaseProductName()); + } + + /** + * Logic taken from {@code LockDatabaseChangeLogGenerator} + * + * @return the string to be used in the {@code lockedBy} field + */ + private static String getLockedBy() { + return String.format("%s%s (%s)", + NetUtil.getLocalHostName(), + Optional.ofNullable(System.getProperty("liquibase.hostDescription")).map(s -> '#' + s).orElse(""), + NetUtil.getLocalHostAddress()); + } +} diff --git a/src/main/java/liquibase/ext/opensearch/statement/AbstractOpenSearchStatement.java b/src/main/java/liquibase/ext/opensearch/statement/AbstractOpenSearchStatement.java new file mode 100644 index 0000000..6ece79d --- /dev/null +++ b/src/main/java/liquibase/ext/opensearch/statement/AbstractOpenSearchStatement.java @@ -0,0 +1,28 @@ +package liquibase.ext.opensearch.statement; + +import liquibase.ext.opensearch.database.OpenSearchConnection; +import liquibase.ext.opensearch.database.OpenSearchLiquibaseDatabase; +import liquibase.statement.AbstractSqlStatement; +import org.opensearch.client.opensearch.OpenSearchClient; + +public abstract class AbstractOpenSearchStatement extends AbstractSqlStatement { + + @Override + public boolean continueOnError() { + return false; + } + + @Override + public boolean skipOnUnsupported() { + return false; + } + + @Override + public abstract String toString(); + + protected OpenSearchClient getOpenSearchClient(final OpenSearchLiquibaseDatabase database) { + final var connection = (OpenSearchConnection)database.getConnection(); + return connection.getOpenSearchClient(); + } + +} diff --git a/src/main/java/liquibase/nosql/changelog/AbstractNoSqlHistoryService.java b/src/main/java/liquibase/nosql/changelog/AbstractNoSqlHistoryService.java new file mode 100644 index 0000000..f6a4852 --- /dev/null +++ b/src/main/java/liquibase/nosql/changelog/AbstractNoSqlHistoryService.java @@ -0,0 +1,287 @@ +package liquibase.nosql.changelog; + +/*- + * #%L + * Liquibase NoSql Extension + * %% + * Copyright (C) 2020 Mastercard + * %% + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import liquibase.Scope; +import liquibase.changelog.AbstractChangeLogHistoryService; +import liquibase.changelog.ChangeSet; +import liquibase.changelog.RanChangeSet; +import liquibase.exception.DatabaseException; +import liquibase.exception.DatabaseHistoryException; +import liquibase.exception.UnexpectedLiquibaseException; +import liquibase.executor.Executor; +import liquibase.executor.ExecutorService; +import liquibase.executor.LoggingExecutor; +import liquibase.ext.opensearch.executor.OpenSearchExecutor; +import liquibase.logging.Logger; +import liquibase.nosql.database.AbstractNoSqlDatabase; +import lombok.Getter; +import lombok.Setter; + +import java.time.Clock; +import java.util.Date; +import java.util.List; +import java.util.ResourceBundle; + +import static java.util.Collections.unmodifiableList; +import static java.util.Objects.isNull; + +public abstract class AbstractNoSqlHistoryService extends AbstractChangeLogHistoryService { + + @Getter + private List ranChangeSetList; + + private boolean serviceInitialized; + + private static final ResourceBundle resourceBundle = ResourceBundle.getBundle("liquibase/i18n/liquibase-opensearch"); + + @Getter + private Boolean hasDatabaseChangeLogTable; + + @Getter + private Integer lastChangeSetSequenceValue; + + @Getter + private Boolean adjustedChangeLogTable = false; + + /** + * Clock field in order to make it testable + */ + @Getter + @Setter + private Clock clock = Clock.systemDefaultZone(); + + public int getPriority() { + return PRIORITY_SPECIALIZED; + } + + public String getDatabaseChangeLogTableName() { + return getDatabase().getDatabaseChangeLogTableName(); + } + + public boolean canCreateChangeLogTable() { + return true; + } + + public boolean isServiceInitialized() { + return serviceInitialized; + } + + @SuppressWarnings("unchecked") + public D getNoSqlDatabase() { + return (D) getDatabase(); + } + + public OpenSearchExecutor getExecutor() throws DatabaseException { + Executor executor = Scope.getCurrentScope().getSingleton(ExecutorService.class).getExecutor(OpenSearchExecutor.EXECUTOR_NAME, getDatabase()); + if (executor instanceof LoggingExecutor) { + throw new DatabaseException(String.format(resourceBundle.getString("command.unsupported"), "*sql")); + } + return (OpenSearchExecutor) executor; + } + + @Override + public void reset() { + super.reset(); + this.ranChangeSetList = null; + this.serviceInitialized = false; + this.hasDatabaseChangeLogTable = null; + this.adjustedChangeLogTable = false; + } + + @Override + public void init() throws DatabaseException { + + if (this.serviceInitialized) { + return; + } + + if (!hasDatabaseChangeLogTable()) { + getLogger().info("Create Database Change Log Collection"); + + // If there is no table in the database for recording change history create one. + this.getLogger().info("Creating database history collection with name: " + + this.getDatabaseChangeLogTableName()); + createRepository(); + getLogger().info("Created database history collection : " + + this.getDatabaseChangeLogTableName()); + this.hasDatabaseChangeLogTable = true; + } + + if (!adjustedChangeLogTable) { + adjustRepository(); + adjustedChangeLogTable = true; + } + + this.serviceInitialized = true; + } + + public boolean hasDatabaseChangeLogTable() { + if (isNull(this.hasDatabaseChangeLogTable)) { + try { + this.hasDatabaseChangeLogTable = existsRepository(); + } catch (final Exception e) { + throw new UnexpectedLiquibaseException(e); + } + } + return this.hasDatabaseChangeLogTable; + } + + /** + * Returns the ChangeSets that have been run against the current getDatabase(). + */ + @Override + public List getRanChangeSets() throws DatabaseException { + + if (isNull(this.ranChangeSetList)) { + this.ranChangeSetList = queryRanChangeSets(); + } + return unmodifiableList(ranChangeSetList); + } + + @Override + public void replaceChecksum(final ChangeSet changeSet) throws DatabaseException { + + updateCheckSum(changeSet); + + getLogger().info(String.format("Replace checksum executed. ChangeSet: [filename: %s, id: %s, author: %s]" + , changeSet.getFilePath(), changeSet.getId(), changeSet.getAuthor())); + + reset(); + } + + @Override + public RanChangeSet getRanChangeSet(final ChangeSet changeSet) throws DatabaseException, DatabaseHistoryException { + if (!hasDatabaseChangeLogTable()) { + return null; + } + return super.getRanChangeSet(changeSet); + } + + @Override + public void setExecType(final ChangeSet changeSet, final ChangeSet.ExecType execType) throws DatabaseException { + + final Integer nextSequenceValue = getNextSequenceValue(); + + markChangeSetRun(changeSet, execType, nextSequenceValue); + + getDatabase().commit(); + if (this.ranChangeSetList != null) { + this.ranChangeSetList.add(new RanChangeSet(changeSet, execType, null, null)); + } + } + + @Override + public void removeFromHistory(final ChangeSet changeSet) throws DatabaseException { + + removeRanChangeSet(changeSet); + + if (this.ranChangeSetList != null) { + this.ranChangeSetList.remove(new RanChangeSet(changeSet)); + } + } + + @Override + public int getNextSequenceValue() throws DatabaseException { + if (isNull(this.lastChangeSetSequenceValue)) { + if (isNull(getDatabase().getConnection())) { + this.lastChangeSetSequenceValue = 0; + } else { + this.lastChangeSetSequenceValue = generateNextSequence(); + } + } + + this.lastChangeSetSequenceValue++; + + return this.lastChangeSetSequenceValue; + } + + /** + * Tags the database changelog with the given string. + */ + @Override + public void tag(final String tagString) throws DatabaseException { + final long totalRows = countRanChangeSets(); + if (totalRows == 0L) { + final ChangeSet emptyChangeSet = new ChangeSet(String.valueOf(new Date().getTime()), "liquibase", + false, false, "liquibase-internal", null, null, + getDatabase().getObjectQuotingStrategy(), null); + this.setExecType(emptyChangeSet, ChangeSet.ExecType.EXECUTED); + } + + tagLast(tagString); + + if (this.ranChangeSetList != null) { + ranChangeSetList.get(ranChangeSetList.size() - 1).setTag(tagString); + } + } + + @Override + public boolean tagExists(final String tag) throws DatabaseException { + final long count = countTags(tag); + return count > 0L; + } + + @Override + public void destroy() { + + try { + getLogger().info("Dropping Collection Database Change Log: " + getDatabaseChangeLogTableName()); + + if (existsRepository()) { + dropRepository(); + getLogger().info("Dropped Collection Database Change Log: " + getDatabaseChangeLogTableName()); + } else { + getLogger().warning("Cannot Drop Collection Database Change Log as not found: " + getDatabaseChangeLogTableName()); + } + reset(); + } catch (final DatabaseException e) { + throw new UnexpectedLiquibaseException(e); + } + } + + protected abstract Logger getLogger(); + + protected abstract boolean existsRepository() throws DatabaseException; + + protected abstract void createRepository() throws DatabaseException; + + protected abstract void adjustRepository() throws DatabaseException; + + protected abstract void dropRepository() throws DatabaseException; + + protected abstract List queryRanChangeSets() throws DatabaseException; + + protected abstract int generateNextSequence() throws DatabaseException; + + protected abstract void markChangeSetRun(ChangeSet changeSet, ChangeSet.ExecType execType, Integer nextSequenceValue) throws DatabaseException; + + protected abstract void removeRanChangeSet(ChangeSet changeSet) throws DatabaseException; + + protected abstract long countTags(String tag) throws DatabaseException; + + protected abstract void tagLast(String tagString) throws DatabaseException; + + protected abstract long countRanChangeSets() throws DatabaseException; + + protected abstract void updateCheckSum(ChangeSet changeSet) throws DatabaseException; + +} diff --git a/src/main/java/liquibase/nosql/lockservice/AbstractNoSqlLockService.java b/src/main/java/liquibase/nosql/lockservice/AbstractNoSqlLockService.java new file mode 100644 index 0000000..712df48 --- /dev/null +++ b/src/main/java/liquibase/nosql/lockservice/AbstractNoSqlLockService.java @@ -0,0 +1,306 @@ +package liquibase.nosql.lockservice; + +/*- + * #%L + * Liquibase NoSql Extension + * %% + * Copyright (C) 2020 Mastercard + * %% + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import liquibase.configuration.GlobalConfiguration; +import liquibase.configuration.LiquibaseConfiguration; +import liquibase.database.Database; +import liquibase.exception.DatabaseException; +import liquibase.exception.LockException; +import liquibase.exception.UnexpectedLiquibaseException; +import liquibase.lockservice.DatabaseChangeLogLock; +import liquibase.lockservice.LockService; +import liquibase.logging.Logger; +import liquibase.nosql.database.AbstractNoSqlDatabase; +import lombok.Getter; +import lombok.Setter; + +import java.text.DateFormat; +import java.time.Clock; +import java.util.List; +import java.util.ResourceBundle; + +import static java.util.Objects.isNull; +import static liquibase.plugin.Plugin.PRIORITY_SPECIALIZED; + +public abstract class AbstractNoSqlLockService implements LockService { + + private D database; + + private boolean hasChangeLogLock; + + private static final ResourceBundle i18nBundle = ResourceBundle.getBundle("liquibase/i18n/liquibase-opensearch"); + + private Long changeLogLockPollRate; + + private Long changeLogLockRecheckTime; + + @Getter + private Boolean hasDatabaseChangeLogLockTable; + + @Getter + private Boolean adjustedChangeLogLockTable = false; + + /** + * Clock field in order to make it testable + */ + @Getter + @Setter + private Clock clock = Clock.systemDefaultZone(); + + @Override + public int getPriority() { + return PRIORITY_SPECIALIZED; + } + + @Override + @SuppressWarnings("unchecked") + public void setDatabase(final Database database) { + this.database = (D) database; + } + + public D getDatabase() { + return database; + } + + @Override + public void init() throws DatabaseException { + if (!hasDatabaseChangeLogLockTable()) { + getLogger().info("Create Database Lock Collection: " + getDatabaseChangeLogLockTableName()); + createRepository(); + database.commit(); + getLogger().info("Created database lock Collection: " + getDatabaseChangeLogLockTableName()); + this.hasDatabaseChangeLogLockTable = true; + } + if (!adjustedChangeLogLockTable) { + adjustRepository(); + adjustedChangeLogLockTable = true; + } + } + + @Override + public boolean hasChangeLogLock() { + return hasChangeLogLock; + } + + @Override + public void waitForLock() throws LockException { + + boolean locked = false; + + final long timeToGiveUp = getClock().instant().plusSeconds(getChangeLogLockWaitTime() * 60).toEpochMilli(); + while (!locked && (getClock().instant().toEpochMilli() < timeToGiveUp)) { + locked = acquireLock(); + if (!locked) { + getLogger().info("Waiting for changelog lock...."); + try { + //noinspection BusyWait + Thread.sleep(getChangeLogLockRecheckTime() * 1000); + } catch (InterruptedException e) { + // Restore thread interrupt status + Thread.currentThread().interrupt(); + } + } + } + + if (!locked) { + DatabaseChangeLogLock[] locks = listLocks(); + String lockedBy; + if (locks.length > 0) { + DatabaseChangeLogLock lock = locks[0]; + lockedBy = lock.getLockedBy() + " since " + + DateFormat.getDateTimeInstance(DateFormat.SHORT, DateFormat.SHORT) + .format(lock.getLockGranted()); + } else { + lockedBy = "UNKNOWN"; + } + throw new LockException("Could not acquire change log lock. Currently locked by " + lockedBy); + } + } + + @Override + public boolean acquireLock() throws LockException { + if (hasChangeLogLock) { + return true; + } + + try { + database.rollback(); + this.init(); + + if (isLocked()) { + return false; + } else { + getLogger().info("Lock Database"); + + if (!createLock()) { + // another node was faster + return false; + } + + database.commit(); + getLogger().info("Successfully Acquired Change Log Lock"); + + this.hasChangeLogLock = true; + + return true; + } + } catch (final Exception e) { + throw new LockException(e); + } finally { + try { + database.rollback(); + } catch (final DatabaseException e) { + getLogger().severe("Error on acquire change log lock Rollback.", e); + } + } + } + + @Override + public void releaseLock() throws LockException { + + try { + if (hasDatabaseChangeLogLockTable()) { + + getLogger().info("Release Database Lock"); + + database.rollback(); + removeLock(); + database.commit(); + } + } catch (Exception e) { + throw new LockException(e); + } finally { + try { + this.hasChangeLogLock = false; + database.setCanCacheLiquibaseTableInfo(false); + getLogger().info("Successfully released change log lock"); + database.rollback(); + } catch (DatabaseException e) { + getLogger().severe("Error on released change log lock Rollback.", e); + } + } + } + + @Override + public DatabaseChangeLogLock[] listLocks() throws LockException { + try { + if (!this.hasDatabaseChangeLogLockTable()) { + return new DatabaseChangeLogLock[0]; + } + final List rows = queryLocks(); + return rows.stream().map(DatabaseChangeLogLock.class::cast).toArray(DatabaseChangeLogLock[]::new); + } catch (final Exception e) { + throw new LockException(e); + } + } + + @Override + public void forceReleaseLock() throws LockException, DatabaseException { + init(); + releaseLock(); + } + + @Override + public void reset() { + hasChangeLogLock = false; + hasDatabaseChangeLogLockTable = null; + adjustedChangeLogLockTable = false; + } + + @Override + public void destroy() { + try { + getLogger().info("Dropping Collection Database Change Log Lock: " + getDatabaseChangeLogLockTableName()); + dropRepository(); + getLogger().info("Dropped Collection Database Change Log Lock: " + getDatabaseChangeLogLockTableName()); + database.commit(); + reset(); + } catch (final DatabaseException e) { + throw new UnexpectedLiquibaseException(e); + } + } + + public String getDatabaseChangeLogLockTableName() { + return database.getDatabaseChangeLogLockTableName(); + } + + public Long getChangeLogLockRecheckTime() { + if (changeLogLockRecheckTime != null) { + return changeLogLockRecheckTime; + } + return LiquibaseConfiguration + .getInstance() + .getConfiguration(GlobalConfiguration.class) + .getDatabaseChangeLogLockPollRate(); + } + + @Override + public void setChangeLogLockRecheckTime(long changeLogLockRecheckTime) { + this.changeLogLockRecheckTime = changeLogLockRecheckTime; + } + + public Long getChangeLogLockWaitTime() { + if (changeLogLockPollRate != null) { + return changeLogLockPollRate; + } + return LiquibaseConfiguration + .getInstance() + .getConfiguration(GlobalConfiguration.class) + .getDatabaseChangeLogLockWaitTime(); + } + + @Override + public void setChangeLogLockWaitTime(long changeLogLockWaitTime) { + this.changeLogLockPollRate = changeLogLockWaitTime; + } + + private boolean hasDatabaseChangeLogLockTable() throws DatabaseException { + if (isNull(this.hasDatabaseChangeLogLockTable)) { + try { + this.hasDatabaseChangeLogLockTable = + existsRepository(); + } catch (final Exception e) { + throw new DatabaseException(e); + } + } + return this.hasDatabaseChangeLogLockTable; + } + + protected abstract Logger getLogger(); + + protected abstract boolean existsRepository() throws DatabaseException; + + protected abstract void createRepository() throws DatabaseException; + + protected abstract void adjustRepository() throws DatabaseException; + + protected abstract void dropRepository() throws DatabaseException; + + protected abstract boolean isLocked() throws DatabaseException; + + protected abstract boolean createLock() throws DatabaseException; + + protected abstract void removeLock() throws DatabaseException; + + protected abstract List queryLocks() throws DatabaseException; + +} diff --git a/src/main/java/liquibase/nosql/snapshot/NoSqlSnapshotGenerator.java b/src/main/java/liquibase/nosql/snapshot/NoSqlSnapshotGenerator.java new file mode 100644 index 0000000..1de936d --- /dev/null +++ b/src/main/java/liquibase/nosql/snapshot/NoSqlSnapshotGenerator.java @@ -0,0 +1,41 @@ +package liquibase.nosql.snapshot; + +import liquibase.database.Database; +import liquibase.exception.DatabaseException; +import liquibase.ext.opensearch.database.OpenSearchLiquibaseDatabase; +import liquibase.snapshot.DatabaseSnapshot; +import liquibase.snapshot.InvalidExampleException; +import liquibase.snapshot.SnapshotGenerator; +import liquibase.snapshot.SnapshotGeneratorChain; +import liquibase.structure.DatabaseObject; + +import java.util.ResourceBundle; + +import static liquibase.plugin.Plugin.PRIORITY_SPECIALIZED; + +public class NoSqlSnapshotGenerator implements SnapshotGenerator { + private static final ResourceBundle resourceBundle = ResourceBundle.getBundle("liquibase/i18n/liquibase-opensearch"); + + @Override + public int getPriority(Class objectType, Database database) { + if (database instanceof OpenSearchLiquibaseDatabase) { + return PRIORITY_SPECIALIZED; + } + return PRIORITY_NONE; + } + + @Override + public T snapshot(T example, DatabaseSnapshot snapshot, SnapshotGeneratorChain chain) throws DatabaseException, InvalidExampleException { + throw new DatabaseException(String.format(resourceBundle.getString("command.unsupported"), "db-doc, diff*, generate-changelog, and snapshot*")); + } + + @Override + public Class[] addsTo() { + return new Class[0]; + } + + @Override + public Class[] replaces() { + return new Class[0]; + } +} diff --git a/src/main/resources/META-INF/services/liquibase.changelog.ChangeLogHistoryService b/src/main/resources/META-INF/services/liquibase.changelog.ChangeLogHistoryService new file mode 100644 index 0000000..11fe91d --- /dev/null +++ b/src/main/resources/META-INF/services/liquibase.changelog.ChangeLogHistoryService @@ -0,0 +1 @@ +liquibase.ext.opensearch.changelog.OpenSearchHistoryService diff --git a/src/main/resources/META-INF/services/liquibase.executor.Executor b/src/main/resources/META-INF/services/liquibase.executor.Executor new file mode 100644 index 0000000..e20f742 --- /dev/null +++ b/src/main/resources/META-INF/services/liquibase.executor.Executor @@ -0,0 +1 @@ +liquibase.ext.opensearch.executor.OpenSearchExecutor diff --git a/src/main/resources/META-INF/services/liquibase.lockservice.LockService b/src/main/resources/META-INF/services/liquibase.lockservice.LockService new file mode 100644 index 0000000..64dd8ac --- /dev/null +++ b/src/main/resources/META-INF/services/liquibase.lockservice.LockService @@ -0,0 +1 @@ +liquibase.ext.opensearch.lockservice.OpenSearchLockService diff --git a/src/main/resources/META-INF/services/liquibase.snapshot.SnapshotGenerator b/src/main/resources/META-INF/services/liquibase.snapshot.SnapshotGenerator new file mode 100644 index 0000000..c6ce01d --- /dev/null +++ b/src/main/resources/META-INF/services/liquibase.snapshot.SnapshotGenerator @@ -0,0 +1 @@ +liquibase.nosql.snapshot.NoSqlSnapshotGenerator diff --git a/src/main/resources/META-INF/services/liquibase.sqlgenerator.SqlGenerator b/src/main/resources/META-INF/services/liquibase.sqlgenerator.SqlGenerator new file mode 100644 index 0000000..31f19b6 --- /dev/null +++ b/src/main/resources/META-INF/services/liquibase.sqlgenerator.SqlGenerator @@ -0,0 +1 @@ +liquibase.ext.opensearch.executor.OpenSearchGenerator diff --git a/src/main/resources/liquibase/i18n/liquibase-opensearch.properties b/src/main/resources/liquibase/i18n/liquibase-opensearch.properties new file mode 100644 index 0000000..5dcea8a --- /dev/null +++ b/src/main/resources/liquibase/i18n/liquibase-opensearch.properties @@ -0,0 +1 @@ +command.unsupported=The Liquibase OpenSearch Extension does not support %s commands\nPlease refer to our documentation for the entire list of supported commands for OpenSearch diff --git a/src/test/java/liquibase/ext/opensearch/OpenSearchLiquibaseIT.java b/src/test/java/liquibase/ext/opensearch/OpenSearchLiquibaseIT.java index 3f4b8e0..98728d9 100644 --- a/src/test/java/liquibase/ext/opensearch/OpenSearchLiquibaseIT.java +++ b/src/test/java/liquibase/ext/opensearch/OpenSearchLiquibaseIT.java @@ -17,4 +17,12 @@ public void openSearchIsRunning() { assertThat(this.database.getDatabaseMajorVersion()).isEqualTo(2); } + @SneakyThrows + @Test + public void itCreatesTheChangelogAndLockIndices() { + this.doLiquibaseUpdate("liquibase/ext/changelog.empty.yaml"); + assertThat(this.indexExists(this.database.getDatabaseChangeLogLockTableName())).isTrue(); + assertThat(this.indexExists(this.database.getDatabaseChangeLogTableName())).isTrue(); + } + } diff --git a/src/test/java/liquibase/ext/opensearch/changelog/OpenSearchHistoryServiceTest.java b/src/test/java/liquibase/ext/opensearch/changelog/OpenSearchHistoryServiceTest.java new file mode 100644 index 0000000..bc2729e --- /dev/null +++ b/src/test/java/liquibase/ext/opensearch/changelog/OpenSearchHistoryServiceTest.java @@ -0,0 +1,79 @@ +package liquibase.ext.opensearch.changelog; + +import liquibase.ContextExpression; +import liquibase.change.CheckSum; +import liquibase.changelog.RanChangeSet; +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; +import java.util.Arrays; + +import static org.assertj.core.api.Assertions.*; + +class OpenSearchHistoryServiceTest { + + /** + * {@link OpenSearchHistoryService#adjustRepository()} creates an OpenSearch index which contains the same fields as + * {@link RanChangeSet}. this test ensures that we cover all fields - every time a field is added or removed from the + * class this test will fail. if this happens you must adapt both {@link OpenSearchHistoryService#adjustRepository()} + * as well as this test. + */ + @Test + public void ensureThatAllRanChangeSetFieldsAreCovered() { + final var allFields = Arrays.stream(RanChangeSet.class.getDeclaredFields()) + .filter(f -> !Modifier.isStatic(f.getModifiers())) + .map(Field::getName); + assertThat(allFields).containsExactlyInAnyOrder( + "id", + "changeLog", + "storedChangeLog", + "author", + "lastCheckSum", + "dateExecuted", + "tag", + "execType", + "description", + "comments", + "orderExecuted", + "contextExpression", + "labels", + "deploymentId", + "liquibaseVersion" + ); + } + + /** + * {@link OpenSearchHistoryService#adjustRepository()} creates an OpenSearch index which contains the same fields as + * {@link CheckSum}. this test ensures that we cover all fields - every time a field is added or removed from the + * class this test will fail. if this happens you must adapt both {@link OpenSearchHistoryService#adjustRepository()} + * as well as this test. + */ + @Test + public void ensureThatAllCheckSumFieldsAreCovered() { + final var allFields = Arrays.stream(CheckSum.class.getDeclaredFields()) + .filter(f -> !Modifier.isStatic(f.getModifiers())) + .map(Field::getName); + assertThat(allFields).containsExactlyInAnyOrder( + "version", + "storedCheckSum" + ); + } + + /** + * {@link OpenSearchHistoryService#adjustRepository()} creates an OpenSearch index which contains the same fields as + * {@link ContextExpression}. this test ensures that we cover all fields - every time a field is added or removed from the + * class this test will fail. if this happens you must adapt both {@link OpenSearchHistoryService#adjustRepository()} + * as well as this test. + */ + @Test + public void ensureThatAllContextExpressionFieldsAreCovered() { + final var allFields = Arrays.stream(ContextExpression.class.getDeclaredFields()) + .filter(f -> !Modifier.isStatic(f.getModifiers())) + .map(Field::getName); + assertThat(allFields).containsExactlyInAnyOrder( + "contexts", + "originalString" + ); + } +} diff --git a/src/test/resources/liquibase.properties b/src/test/resources/liquibase.properties new file mode 100644 index 0000000..c4aeff6 --- /dev/null +++ b/src/test/resources/liquibase.properties @@ -0,0 +1 @@ +logLevel=DEBUG diff --git a/src/test/resources/liquibase/ext/changelog.empty.yaml b/src/test/resources/liquibase/ext/changelog.empty.yaml new file mode 100644 index 0000000..7935ec2 --- /dev/null +++ b/src/test/resources/liquibase/ext/changelog.empty.yaml @@ -0,0 +1 @@ +databaseChangeLog: From 773d1040691392792c131cf2ab45a5dffd21bf32 Mon Sep 17 00:00:00 2001 From: Ralph Ursprung Date: Mon, 13 May 2024 17:25:01 +0200 Subject: [PATCH 6/8] implement basic HTTP request support this adds a new change type which allows generic HTTP requests. while this isn't great to support working over multiple OpenSearch major releases (in case the REST API had breaking changes) it already offers an easy way to trigger any possible action on OpenSearch. latter versions might add additional change types for specific operations on OpenSearch which can then also abstract away from the exact API version (if possible). --- .../opensearch/change/HttpRequestChange.java | 41 +++++++++++++ .../executor/OpenSearchExecutor.java | 32 ++++++++--- .../statement/HttpRequestStatement.java | 57 +++++++++++++++++++ .../statement/OpenSearchExecuteStatement.java | 30 ++++++++++ .../META-INF/services/liquibase.change.Change | 1 + .../AbstractOpenSearchLiquibaseIT.java | 15 ++++- .../ext/opensearch/OpenSearchLiquibaseIT.java | 27 +++++++++ .../ext/changelog.httprequest.always.yaml | 36 ++++++++++++ .../ext/changelog.httprequest.contexts.yaml | 41 +++++++++++++ .../liquibase/ext/changelog.httprequest.yaml | 21 +++++++ 10 files changed, 292 insertions(+), 9 deletions(-) create mode 100644 src/main/java/liquibase/ext/opensearch/change/HttpRequestChange.java create mode 100644 src/main/java/liquibase/ext/opensearch/statement/HttpRequestStatement.java create mode 100644 src/main/java/liquibase/ext/opensearch/statement/OpenSearchExecuteStatement.java create mode 100644 src/main/resources/META-INF/services/liquibase.change.Change create mode 100644 src/test/resources/liquibase/ext/changelog.httprequest.always.yaml create mode 100644 src/test/resources/liquibase/ext/changelog.httprequest.contexts.yaml create mode 100644 src/test/resources/liquibase/ext/changelog.httprequest.yaml diff --git a/src/main/java/liquibase/ext/opensearch/change/HttpRequestChange.java b/src/main/java/liquibase/ext/opensearch/change/HttpRequestChange.java new file mode 100644 index 0000000..f84e3f3 --- /dev/null +++ b/src/main/java/liquibase/ext/opensearch/change/HttpRequestChange.java @@ -0,0 +1,41 @@ +package liquibase.ext.opensearch.change; + +import liquibase.change.AbstractChange; +import liquibase.change.ChangeMetaData; +import liquibase.change.DatabaseChange; +import liquibase.database.Database; +import liquibase.ext.opensearch.statement.HttpRequestStatement; +import liquibase.statement.SqlStatement; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +import java.util.Optional; + +@DatabaseChange(name = "httpRequest", + description = "Execute an arbitrary HTTP request with the provided payload", + priority = ChangeMetaData.PRIORITY_DATABASE) +@NoArgsConstructor +@Getter +@Setter +public class HttpRequestChange extends AbstractChange { + + private String method; + private String path; + private String body; + + @Override + public String getConfirmationMessage() { + return String.format("executed the HTTP %s request against %s (with a body of size %d)", + this.getMethod(), + this.getPath(), + Optional.ofNullable(this.getBody()).map(String::length).orElse(0)); + } + + @Override + public SqlStatement[] generateStatements(final Database database) { + return new SqlStatement[] { + new HttpRequestStatement(this.getMethod(), this.getPath(), this.getBody()) + }; + } +} diff --git a/src/main/java/liquibase/ext/opensearch/executor/OpenSearchExecutor.java b/src/main/java/liquibase/ext/opensearch/executor/OpenSearchExecutor.java index 7a76fe6..beffe9c 100644 --- a/src/main/java/liquibase/ext/opensearch/executor/OpenSearchExecutor.java +++ b/src/main/java/liquibase/ext/opensearch/executor/OpenSearchExecutor.java @@ -25,12 +25,16 @@ import liquibase.exception.DatabaseException; import liquibase.executor.AbstractExecutor; import liquibase.ext.opensearch.database.OpenSearchLiquibaseDatabase; +import liquibase.ext.opensearch.statement.OpenSearchExecuteStatement; import liquibase.logging.Logger; import liquibase.servicelocator.LiquibaseService; import liquibase.sql.visitor.SqlVisitor; import liquibase.statement.SqlStatement; import lombok.NoArgsConstructor; +import org.opensearch.client.opensearch.generic.Body; +import org.opensearch.client.opensearch.generic.OpenSearchClientException; +import java.io.IOException; import java.util.List; import java.util.Map; @@ -125,13 +129,25 @@ public void execute(final SqlStatement sql) throws DatabaseException { @Override public void execute(final SqlStatement sql, final List sqlVisitors) throws DatabaseException { - throw new DatabaseException("liquibase-opensearch extension cannot execute changeset \n" + - "Unknown type: " + sql.getClass().getName() + - "\nPlease check the following common causes:\n" + - "- Verify change set definitions for common error such as: changeType name, changeSet attributes spelling " + - "(such as runWith, context, etc.), and punctuation.\n" + - "- Verify that changesets have all the required changeset attributes and do not have invalid attributes for the designated change type.\n" + - "- Double-check to make sure your basic setup includes all needed extensions in your Java classpath"); + if (sql instanceof OpenSearchExecuteStatement) { + try { + ((OpenSearchExecuteStatement) sql).execute(getDatabase()); + } catch (final OpenSearchClientException e) { + try (var r = e.response()) { + throw new DatabaseException("Could not execute: %s".formatted(r.getBody().map(Body::bodyAsString).orElse("")), e); + } catch (IOException ex) { + throw new DatabaseException("Could not execute", e); + } + } + } else { + throw new DatabaseException("liquibase-opensearch extension cannot execute changeset \n" + + "Unknown type: " + sql.getClass().getName() + + "\nPlease check the following common causes:\n" + + "- Verify change set definitions for common error such as: changeType name, changeSet attributes spelling " + + "(such as runWith, context, etc.), and punctuation.\n" + + "- Verify that changesets have all the required changeset attributes and do not have invalid attributes for the designated change type.\n" + + "- Double-check to make sure your basic setup includes all needed extensions in your Java classpath"); + } } @Override @@ -141,7 +157,7 @@ public int update(final SqlStatement sql) throws DatabaseException { @Override public int update(final SqlStatement sql, final List sqlVisitors) throws DatabaseException { - throw new IllegalArgumentException(); + throw new UnsupportedOperationException("no update supported, use execute instead"); } @Override diff --git a/src/main/java/liquibase/ext/opensearch/statement/HttpRequestStatement.java b/src/main/java/liquibase/ext/opensearch/statement/HttpRequestStatement.java new file mode 100644 index 0000000..c0b087f --- /dev/null +++ b/src/main/java/liquibase/ext/opensearch/statement/HttpRequestStatement.java @@ -0,0 +1,57 @@ +package liquibase.ext.opensearch.statement; + +import liquibase.Scope; +import liquibase.exception.DatabaseException; +import liquibase.ext.opensearch.database.OpenSearchLiquibaseDatabase; +import liquibase.logging.Logger; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import org.opensearch.client.opensearch.generic.Bodies; +import org.opensearch.client.opensearch.generic.OpenSearchGenericClient.ClientOptions; +import org.opensearch.client.opensearch.generic.Requests; + +import java.io.IOException; +import java.util.Optional; + +@AllArgsConstructor +@Getter +@EqualsAndHashCode(callSuper = true) +public class HttpRequestStatement extends AbstractOpenSearchStatement implements OpenSearchExecuteStatement { + + private final Logger log = Scope.getCurrentScope().getLog(getClass()); + + private String method; + private String path; + private String body; + + @Override + public String toString() { + return String.format("HTTP %s request against %s (with a body of size %d)", + this.getMethod(), + this.getPath(), + Optional.ofNullable(this.getBody()).map(String::length).orElse(0)); + } + + @Override + public void execute(final OpenSearchLiquibaseDatabase database) throws DatabaseException { + log.info(this.toString()); + + final var httpClient = this.getOpenSearchClient(database).generic().withClientOptions(ClientOptions.throwOnHttpErrors()); + + final var request = Requests.builder() + .endpoint(this.getPath()) + .method(this.getMethod()) + .body(Bodies.json(this.getBody())) + .build(); + + try (final var response = httpClient.execute(request)) { + if (response.getStatus() >= 400) { + throw new DatabaseException(String.format("HTTP request failed with code %d: %s", response.getStatus(), response)); + } + } catch (final IOException e) { + throw new DatabaseException("failed to execute the HTTP request", e); + } + } + +} diff --git a/src/main/java/liquibase/ext/opensearch/statement/OpenSearchExecuteStatement.java b/src/main/java/liquibase/ext/opensearch/statement/OpenSearchExecuteStatement.java new file mode 100644 index 0000000..5661566 --- /dev/null +++ b/src/main/java/liquibase/ext/opensearch/statement/OpenSearchExecuteStatement.java @@ -0,0 +1,30 @@ +package liquibase.ext.opensearch.statement; + +/*- + * #%L + * Liquibase NoSql Extension + * %% + * Copyright (C) 2020 Mastercard + * %% + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ + +import liquibase.exception.DatabaseException; +import liquibase.ext.opensearch.database.OpenSearchLiquibaseDatabase; + +public interface OpenSearchExecuteStatement { + + void execute(final OpenSearchLiquibaseDatabase database) throws DatabaseException; + +} diff --git a/src/main/resources/META-INF/services/liquibase.change.Change b/src/main/resources/META-INF/services/liquibase.change.Change new file mode 100644 index 0000000..512ff5c --- /dev/null +++ b/src/main/resources/META-INF/services/liquibase.change.Change @@ -0,0 +1 @@ +liquibase.ext.opensearch.change.HttpRequestChange diff --git a/src/test/java/liquibase/ext/opensearch/AbstractOpenSearchLiquibaseIT.java b/src/test/java/liquibase/ext/opensearch/AbstractOpenSearchLiquibaseIT.java index 4edfc89..073eb12 100644 --- a/src/test/java/liquibase/ext/opensearch/AbstractOpenSearchLiquibaseIT.java +++ b/src/test/java/liquibase/ext/opensearch/AbstractOpenSearchLiquibaseIT.java @@ -10,6 +10,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestInstance; import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.opensearch.core.CountRequest; import org.opensearch.client.opensearch.indices.ExistsRequest; import org.opensearch.testcontainers.OpensearchContainer; import org.testcontainers.containers.wait.strategy.Wait; @@ -47,13 +48,18 @@ protected OpenSearchClient getOpenSearchClient() { return this.connection.getOpenSearchClient(); } - protected void doLiquibaseUpdate(final String changeLogFile) throws Exception { + protected void doLiquibaseUpdate(final String changeLogFile, final String contexts) throws Exception { new CommandScope(UpdateCommandStep.COMMAND_NAME) .addArgumentValue(DbUrlConnectionCommandStep.DATABASE_ARG, this.database) .addArgumentValue(UpdateCommandStep.CHANGELOG_FILE_ARG, changeLogFile) + .addArgumentValue(UpdateCommandStep.CONTEXTS_ARG, contexts) .execute(); } + protected void doLiquibaseUpdate(final String changeLogFile) throws Exception { + this.doLiquibaseUpdate(changeLogFile, ""); + } + protected boolean indexExists(final String indexName) throws Exception { final var request = new ExistsRequest.Builder() .index(indexName) @@ -61,4 +67,11 @@ protected boolean indexExists(final String indexName) throws Exception { return this.getOpenSearchClient().indices().exists(request).value(); } + protected long getDocumentCount(final String indexName) throws Exception { + final var request = new CountRequest.Builder() + .index(indexName) + .build(); + return this.getOpenSearchClient().count(request).count(); + } + } diff --git a/src/test/java/liquibase/ext/opensearch/OpenSearchLiquibaseIT.java b/src/test/java/liquibase/ext/opensearch/OpenSearchLiquibaseIT.java index 98728d9..22c6f0f 100644 --- a/src/test/java/liquibase/ext/opensearch/OpenSearchLiquibaseIT.java +++ b/src/test/java/liquibase/ext/opensearch/OpenSearchLiquibaseIT.java @@ -25,4 +25,31 @@ public void itCreatesTheChangelogAndLockIndices() { assertThat(this.indexExists(this.database.getDatabaseChangeLogTableName())).isTrue(); } + @SneakyThrows + @Test + public void itExecutesAHttpRequestAndCreatesTheIndex() { + this.doLiquibaseUpdate("liquibase/ext/changelog.httprequest.yaml"); + assertThat(this.indexExists("testindex")).isTrue(); + } + + @SneakyThrows + @Test + public void itHandlesReRuns() { + this.doLiquibaseUpdate("liquibase/ext/changelog.httprequest.always.yaml"); + assertThat(this.indexExists("testindex-always")).isTrue(); + assertThat(this.getDocumentCount("testindex-always")).isEqualTo(1); + this.doLiquibaseUpdate("liquibase/ext/changelog.httprequest.always.yaml"); + assertThat(this.getDocumentCount("testindex-always")).isEqualTo(2); + this.doLiquibaseUpdate("liquibase/ext/changelog.httprequest.always.yaml"); + assertThat(this.getDocumentCount("testindex-always")).isEqualTo(3); + } + + @SneakyThrows + @Test + public void itRespectsTheContextFilter() { + this.doLiquibaseUpdate("liquibase/ext/changelog.httprequest.contexts.yaml", "context1"); + assertThat(this.indexExists("testindex1")).isTrue(); + assertThat(this.indexExists("testindex2")).isFalse(); + } + } diff --git a/src/test/resources/liquibase/ext/changelog.httprequest.always.yaml b/src/test/resources/liquibase/ext/changelog.httprequest.always.yaml new file mode 100644 index 0000000..db48981 --- /dev/null +++ b/src/test/resources/liquibase/ext/changelog.httprequest.always.yaml @@ -0,0 +1,36 @@ +databaseChangeLog: + - changeSet: + id: 3000 + author: test + labels: httpRequestLabel + context: httpRequestContext + comment: httpRequestComment + changes: + - httpRequest: + method: PUT + path: /testindex-always + body: > + { + "mappings": { + "properties": { + "testfield": { + "type": "text" + } + } + } + } + - changeSet: + id: 3001 + author: test + labels: httpRequestLabel + context: httpRequestContext + comment: httpRequestComment + runAlways: true + changes: + - httpRequest: + method: POST + path: /testindex-always/_doc + body: > + { + "testfield": "test" + } diff --git a/src/test/resources/liquibase/ext/changelog.httprequest.contexts.yaml b/src/test/resources/liquibase/ext/changelog.httprequest.contexts.yaml new file mode 100644 index 0000000..71dace3 --- /dev/null +++ b/src/test/resources/liquibase/ext/changelog.httprequest.contexts.yaml @@ -0,0 +1,41 @@ +databaseChangeLog: + - changeSet: + id: 2000 + author: test + labels: httpRequestLabel + context: context1 + comment: httpRequestComment + changes: + - httpRequest: + method: PUT + path: /testindex1 + body: > + { + "mappings": { + "properties": { + "testfield": { + "type": "text" + } + } + } + } + - changeSet: + id: 2001 + author: test + labels: httpRequestLabel + context: context2 + comment: httpRequestComment + changes: + - httpRequest: + method: PUT + path: /testindex2 + body: > + { + "mappings": { + "properties": { + "testfield": { + "type": "text" + } + } + } + } diff --git a/src/test/resources/liquibase/ext/changelog.httprequest.yaml b/src/test/resources/liquibase/ext/changelog.httprequest.yaml new file mode 100644 index 0000000..4d0eba9 --- /dev/null +++ b/src/test/resources/liquibase/ext/changelog.httprequest.yaml @@ -0,0 +1,21 @@ +databaseChangeLog: + - changeSet: + id: 1000 + author: test + labels: httpRequestLabel + context: httpRequestContext + comment: httpRequestComment + changes: + - httpRequest: + method: PUT + path: /testindex + body: > + { + "mappings": { + "properties": { + "testfield": { + "type": "text" + } + } + } + } From 7b6825031ff6706dd82d1fe4207cce04f6dbc66b Mon Sep 17 00:00:00 2001 From: Ralph Ursprung Date: Mon, 3 Jun 2024 15:25:43 +0200 Subject: [PATCH 7/8] implement `clearAllCheckSums` this allows executing the [`clear-checksums`] liquibase action. [`clear-checksums`]: https://docs.liquibase.com/commands/utility/clear-checksums.html --- .../changelog/OpenSearchHistoryService.java | 9 +++++++- .../AbstractOpenSearchLiquibaseIT.java | 11 +++++++-- .../ext/opensearch/OpenSearchLiquibaseIT.java | 23 +++++++++++++++++++ 3 files changed, 40 insertions(+), 3 deletions(-) diff --git a/src/main/java/liquibase/ext/opensearch/changelog/OpenSearchHistoryService.java b/src/main/java/liquibase/ext/opensearch/changelog/OpenSearchHistoryService.java index c8b14a1..8b3c1c8 100644 --- a/src/main/java/liquibase/ext/opensearch/changelog/OpenSearchHistoryService.java +++ b/src/main/java/liquibase/ext/opensearch/changelog/OpenSearchHistoryService.java @@ -163,7 +163,14 @@ protected void removeRanChangeSet(final ChangeSet changeSet) throws DatabaseExce @Override public void clearAllCheckSums() throws DatabaseException { - throw new UnsupportedOperationException(); + try { + this.getOpenSearchClient() + .updateByQuery(r -> r.index(this.getDatabaseChangeLogTableName()) + .script(s -> s.inline(i -> i.source("ctx._source.lastCheckSum = null") + .lang("painless")))); + } catch (IOException e) { + throw new DatabaseException(e); + } } @Override diff --git a/src/test/java/liquibase/ext/opensearch/AbstractOpenSearchLiquibaseIT.java b/src/test/java/liquibase/ext/opensearch/AbstractOpenSearchLiquibaseIT.java index 073eb12..569e410 100644 --- a/src/test/java/liquibase/ext/opensearch/AbstractOpenSearchLiquibaseIT.java +++ b/src/test/java/liquibase/ext/opensearch/AbstractOpenSearchLiquibaseIT.java @@ -2,6 +2,7 @@ import liquibase.command.CommandScope; import liquibase.command.core.UpdateCommandStep; +import liquibase.command.core.helpers.DbUrlConnectionArgumentsCommandStep; import liquibase.command.core.helpers.DbUrlConnectionCommandStep; import liquibase.database.DatabaseFactory; import liquibase.ext.opensearch.database.OpenSearchConnection; @@ -10,6 +11,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.TestInstance; import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.client.opensearch._types.query_dsl.Query; import org.opensearch.client.opensearch.core.CountRequest; import org.opensearch.client.opensearch.indices.ExistsRequest; import org.opensearch.testcontainers.OpensearchContainer; @@ -50,7 +52,7 @@ protected OpenSearchClient getOpenSearchClient() { protected void doLiquibaseUpdate(final String changeLogFile, final String contexts) throws Exception { new CommandScope(UpdateCommandStep.COMMAND_NAME) - .addArgumentValue(DbUrlConnectionCommandStep.DATABASE_ARG, this.database) + .addArgumentValue(DbUrlConnectionArgumentsCommandStep.DATABASE_ARG, this.database) .addArgumentValue(UpdateCommandStep.CHANGELOG_FILE_ARG, changeLogFile) .addArgumentValue(UpdateCommandStep.CONTEXTS_ARG, contexts) .execute(); @@ -67,11 +69,16 @@ protected boolean indexExists(final String indexName) throws Exception { return this.getOpenSearchClient().indices().exists(request).value(); } - protected long getDocumentCount(final String indexName) throws Exception { + protected long getDocumentCount(final String indexName, final Query query) throws Exception { final var request = new CountRequest.Builder() .index(indexName) + .query(query) .build(); return this.getOpenSearchClient().count(request).count(); } + protected long getDocumentCount(final String indexName) throws Exception { + return this.getDocumentCount(indexName, null); + } + } diff --git a/src/test/java/liquibase/ext/opensearch/OpenSearchLiquibaseIT.java b/src/test/java/liquibase/ext/opensearch/OpenSearchLiquibaseIT.java index 22c6f0f..06af9ac 100644 --- a/src/test/java/liquibase/ext/opensearch/OpenSearchLiquibaseIT.java +++ b/src/test/java/liquibase/ext/opensearch/OpenSearchLiquibaseIT.java @@ -1,7 +1,13 @@ package liquibase.ext.opensearch; +import liquibase.command.CommandScope; +import liquibase.command.core.ClearChecksumsCommandStep; +import liquibase.command.core.UpdateCommandStep; +import liquibase.command.core.helpers.DbUrlConnectionArgumentsCommandStep; +import liquibase.command.core.helpers.DbUrlConnectionCommandStep; import lombok.SneakyThrows; import org.junit.jupiter.api.Test; +import org.opensearch.client.opensearch._types.query_dsl.Query; import static org.assertj.core.api.Assertions.assertThat; @@ -52,4 +58,21 @@ public void itRespectsTheContextFilter() { assertThat(this.indexExists("testindex2")).isFalse(); } + @SneakyThrows + @Test + public void itCanClearAllChecksums() { + // run at least one change set + this.doLiquibaseUpdate("liquibase/ext/changelog.httprequest.yaml"); + + final var countBeforeClear = this.getDocumentCount("databasechangelog", new Query.Builder().exists(e -> e.field("lastCheckSum")).build()); + assertThat(countBeforeClear).isNotZero(); + + new CommandScope(ClearChecksumsCommandStep.COMMAND_NAME) + .addArgumentValue(DbUrlConnectionArgumentsCommandStep.DATABASE_ARG, this.database) + .execute(); + + final var countAfterClear = this.getDocumentCount("databasechangelog", new Query.Builder().exists(e -> e.field("lastCheckSum")).build()); + assertThat(countAfterClear).isZero(); + } + } From d7b8d162b910a280f2fe25255aac29572133ec50 Mon Sep 17 00:00:00 2001 From: Ralph Ursprung Date: Tue, 3 Sep 2024 15:07:03 +0200 Subject: [PATCH 8/8] add support for XML-based changelogs this provides the necessary XSD. note that the XSD has to be published at http://www.liquibase.org/xml/ns/opensearch/liquibase-opensearch-1.0.xsd for the reference to be valid for consumers. this seems to have been done for other plugins (e.g. mongodb) but is - presumably? - a manual process. --- .../opensearch/liquibase-opensearch-1.0.xsd | 33 +++++++++++++++++++ .../ext/opensearch/OpenSearchLiquibaseIT.java | 9 ++++- .../liquibase/ext/changelog.httprequest.xml | 27 +++++++++++++++ 3 files changed, 68 insertions(+), 1 deletion(-) create mode 100644 src/main/resources/www.liquibase.org/xml/ns/opensearch/liquibase-opensearch-1.0.xsd create mode 100644 src/test/resources/liquibase/ext/changelog.httprequest.xml diff --git a/src/main/resources/www.liquibase.org/xml/ns/opensearch/liquibase-opensearch-1.0.xsd b/src/main/resources/www.liquibase.org/xml/ns/opensearch/liquibase-opensearch-1.0.xsd new file mode 100644 index 0000000..26813fe --- /dev/null +++ b/src/main/resources/www.liquibase.org/xml/ns/opensearch/liquibase-opensearch-1.0.xsd @@ -0,0 +1,33 @@ + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/test/java/liquibase/ext/opensearch/OpenSearchLiquibaseIT.java b/src/test/java/liquibase/ext/opensearch/OpenSearchLiquibaseIT.java index 06af9ac..43b169c 100644 --- a/src/test/java/liquibase/ext/opensearch/OpenSearchLiquibaseIT.java +++ b/src/test/java/liquibase/ext/opensearch/OpenSearchLiquibaseIT.java @@ -33,11 +33,18 @@ public void itCreatesTheChangelogAndLockIndices() { @SneakyThrows @Test - public void itExecutesAHttpRequestAndCreatesTheIndex() { + public void itExecutesAHttpRequestAndCreatesTheIndexWithYAMLChangelog() { this.doLiquibaseUpdate("liquibase/ext/changelog.httprequest.yaml"); assertThat(this.indexExists("testindex")).isTrue(); } + @SneakyThrows + @Test + public void itExecutesAHttpRequestAndCreatesTheIndexWithXMLChangelog() { + this.doLiquibaseUpdate("liquibase/ext/changelog.httprequest.xml"); + assertThat(this.indexExists("xmltestindex")).isTrue(); + } + @SneakyThrows @Test public void itHandlesReRuns() { diff --git a/src/test/resources/liquibase/ext/changelog.httprequest.xml b/src/test/resources/liquibase/ext/changelog.httprequest.xml new file mode 100644 index 0000000..931e294 --- /dev/null +++ b/src/test/resources/liquibase/ext/changelog.httprequest.xml @@ -0,0 +1,27 @@ + + + + httpRequestComment + + PUT + /xmltestindex + + { + "mappings": { + "properties": { + "testfield": { + "type": "text" + } + } + } + } + + + + +