diff --git a/examples/pom.xml b/examples/pom.xml index 5746944a269b7..baf6965b0ac8f 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -63,6 +63,22 @@ ${spring.version} + + tech.ydb.jdbc + ydb-jdbc-driver-shaded + 2.1.0 + + + org.springframework spring-beans diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/FlexHolder.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/FlexHolder.java new file mode 100644 index 0000000000000..ab4a425002544 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/FlexHolder.java @@ -0,0 +1,11 @@ +package org.apache.ignite.examples.datagrid.store.jdbc; + +public class FlexHolder { + final int intHolder; + final String strHolder; + + public FlexHolder(int val1, String val2) { + this.intHolder = val1; + this.strHolder = val2; + } +} diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/FlexYDBStoreExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/FlexYDBStoreExample.java new file mode 100644 index 0000000000000..0d8b61305198e --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/FlexYDBStoreExample.java @@ -0,0 +1,153 @@ +package org.apache.ignite.examples.datagrid.store.jdbc; + +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; + +import java.util.List; +import java.util.UUID; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteException; +import org.apache.ignite.Ignition; +import org.apache.ignite.cache.query.FieldsQueryCursor; +import org.apache.ignite.cache.query.SqlFieldsQuery; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.examples.ExampleNodeStartup; +import org.apache.ignite.examples.ExamplesUtils; +import org.apache.ignite.transactions.Transaction; + +/** + * Demonstrates usage of cache with underlying persistent store configured. + *

+ * This example uses {@link CacheJdbcPersonStore} as a persistent store. + *

+ * To start the example, you should: + *

+ *

+ * Remote nodes can be started with {@link ExampleNodeStartup} in another JVM which will + * start node with {@code examples/config/example-ignite.xml} configuration. + */ +public class FlexYDBStoreExample { + /** Cache name. */ + private static final String CACHE_NAME = CacheJdbcStoreExample.class.getSimpleName(); + + /** Heap size required to run this example. */ + public static final int MIN_MEMORY = 1024 * 1024 * 1024; + + /** Number of entries to load. */ + private static final int ENTRY_COUNT = 100_000; + + /** Global person ID to use across entire example. */ + private static final Long id = Math.abs(UUID.randomUUID().getLeastSignificantBits()); + + /** + * Executes example. + * + * @param args Command line arguments, none required. + * @throws IgniteException If example execution failed. + */ + public static void main(String[] args) throws IgniteException { + ExamplesUtils.checkMinMemory(MIN_MEMORY); + + // To start ignite with desired configuration uncomment the appropriate line. + try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) { + System.out.println(); + System.out.println(">>> Cache store example started."); + + CacheConfiguration cacheCfg = new CacheConfiguration<>(CACHE_NAME); + + // Set atomicity as transaction, since we are showing transactions in example. + cacheCfg.setAtomicityMode(TRANSACTIONAL); + + YDBStoreFactory factory = new YDBStoreFactory<>(); + factory.setConnectionUrl("grpc://localhost:2136/local"); + factory.setCacheName("SQL_PUBLIC_FLEX"); // can be autogenerated from sql "create table", hardcoded for now + cacheCfg.setCacheStoreFactory(factory); + + cacheCfg.setReadThrough(true); + cacheCfg.setWriteThrough(true); + + // Auto-close cache at the end of the example. + try (IgniteCache cache = ignite.getOrCreateCache(cacheCfg)) { + ignite.cache(CACHE_NAME).query(new SqlFieldsQuery("drop table if exists flex;")); + + FieldsQueryCursor> res0 = ignite.cache(CACHE_NAME) + .query(new SqlFieldsQuery("create table if not exists flex(id long primary key, val int not null, sval varchar);")); + + res0.getAll(); + + // Make initial cache loading from persistent store. This is a + // distributed operation and will call CacheStore.loadCache(...) + // method on all nodes in topology. + loadCache(cache); + + // Start transaction and execute several cache operations with + // read/write-through to persistent store. + executeTransaction(cache); + } + finally { + // Distributed cache could be removed from cluster only by #destroyCache() call. + ignite.destroyCache(CACHE_NAME); + } + } + } + + /** + * Makes initial cache loading. + * + * @param cache Cache to load. + */ + private static void loadCache(IgniteCache cache) { + long start = System.currentTimeMillis(); + + // Start loading cache from persistent store on all caching nodes. + cache.loadCache(null, ENTRY_COUNT); + + long end = System.currentTimeMillis(); + + System.out.println(">>> Loaded " + cache.size() + " keys with backups in " + (end - start) + "ms."); + } + + /** + * Executes transaction with read/write-through to persistent store. + * + * @param cache Cache to execute transaction on. + */ + private static void executeTransaction(IgniteCache cache) { + try (Transaction tx = Ignition.ignite().transactions().txStart()) { + FlexHolder val = cache.get(id); + + System.out.println("Read value: " + val); + + val = cache.getAndPut(id, new FlexHolder(100, "test")); + + System.out.println("Overwrite old value: " + val); + + val = cache.get(id); + + System.out.println("Read value: " + val); + + tx.commit(); + } + + // Clear entry from memory, but keep it in store. + cache.clear(id); + + // Operations on this cache will not affect store. + IgniteCache cacheSkipStore = cache.withSkipStore(); + + System.out.println("Read value skipping store (expecting null): " + cacheSkipStore.get(id)); + + System.out.println("Read value with store lookup (expecting NOT null): " + cache.get(id)); + + // Expecting not null, since entry should be in memory since last call. + System.out.println("Read value skipping store (expecting NOT null): " + cacheSkipStore.get(id)); + + // Clear entry from memory, and from store. + cache.remove(id); + + System.out.println("Key removes elsewhere (expecting null): " + cache.get(id)); + } +} diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/YDBCacheStore.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/YDBCacheStore.java new file mode 100644 index 0000000000000..2cd8b144dbca4 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/YDBCacheStore.java @@ -0,0 +1,167 @@ +package org.apache.ignite.examples.datagrid.store.jdbc; + +import java.util.Collection; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.cache.Cache.Entry; +import javax.cache.integration.CacheLoaderException; +import javax.cache.integration.CacheWriterException; +import org.apache.ignite.Ignite; +import org.apache.ignite.cache.store.CacheStoreAdapter; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.query.GridQueryProperty; +import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; +import org.apache.ignite.resources.IgniteInstanceResource; +import tech.ydb.core.Result; +import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.table.SessionRetryContext; +import tech.ydb.table.TableClient; +import tech.ydb.table.description.TableDescription; +import tech.ydb.table.description.TableDescription.Builder; +import tech.ydb.table.query.DataQueryResult; +import tech.ydb.table.result.ResultSetReader; +import tech.ydb.table.transaction.TxControl; +import tech.ydb.table.values.PrimitiveType; + +public class YDBCacheStore extends CacheStoreAdapter { + private final String cacheName; + private SessionRetryContext retryCtx; + private boolean initialized; + boolean dropBeforeCreate = true; + + @IgniteInstanceResource + private Ignite ignite; + + private String database; + + public YDBCacheStore(String cacheName, String connUrl) { + this.cacheName = cacheName; + + GrpcTransport transport = GrpcTransport.forConnectionString(connUrl) + //.withAuthProvider(CloudAuthHelper.getAuthProviderFromEnviron()) + .build(); + + database = transport.getDatabase(); + + TableClient tableClient = TableClient.newClient(transport).build(); + this.retryCtx = SessionRetryContext.create(tableClient).build(); + } + + private synchronized void init() { + if (!initialized) { + Builder ydbTableBuilder = TableDescription.newBuilder(); + + Collection types = ((IgniteEx) ignite).context().query().types(cacheName); + + Set pkeys = null; + Map> fields = null; + Map fieldProperties = null; + + assert types.size() == 1; + + for (GridQueryTypeDescriptor type : types) { + pkeys = type.primaryKeyFields(); + + if (pkeys.isEmpty()) { + pkeys = Collections.singleton(type.keyFieldName()); + } + + fields = type.fields(); + fieldProperties = type.properties(); + } + + for (Map.Entry> fldInfo : fields.entrySet()) { + PrimitiveType type0; + switch (fldInfo.getValue().getSimpleName()) { + case "Integer": + type0 = PrimitiveType.Int32; + break; + case "Long": + type0 = PrimitiveType.Int64; + break; + case "String": + type0 = PrimitiveType.Text; + break; + default: + throw new UnsupportedOperationException("Undefined mapping for [" + fldInfo.getValue().getSimpleName() + "] type"); + } + + GridQueryProperty fldProp = Objects.requireNonNull(fieldProperties.get(fldInfo.getKey())); + +/* if (fldProp.notNull()) { // need to discuss with ya what the problem here + ydbTableBuilder.addNonnullColumn(fldInfo.getKey(), type0); + } else { + ydbTableBuilder.addNullableColumn(fldInfo.getKey(), type0); + }*/ + + ydbTableBuilder.addNullableColumn(fldInfo.getKey(), type0); + } + ydbTableBuilder.setPrimaryKeys(pkeys.toArray(new String[pkeys.size()])); + + TableDescription table = ydbTableBuilder.build(); + + if (dropBeforeCreate) { + retryCtx.supplyStatus(session -> session.dropTable(database + "/" + cacheName)).join(); + } + + retryCtx.supplyStatus(session -> session.createTable(database + "/" + cacheName, table)) + .join().expectSuccess(); + + initialized = true; + } + } + + @Override + public V load(K k) throws CacheLoaderException { + init(); + + String query + = "SELECT ID, VAL, SVAL " + + "FROM %s WHERE ID = " + k + ";"; + + String query0 = String.format(query, cacheName); + + TxControl txControl = TxControl.serializableRw().setCommitTx(true); + + Result result = retryCtx.supplyResult(session -> session.executeDataQuery(query0, txControl)).join(); + + DataQueryResult res = result.getValue(); + + ResultSetReader rs = res.getResultSet(0); + + while (rs.next()) { + return (V) new FlexHolder(rs.getColumn("VAL").getInt32(), rs.getColumn("SVAL").getText()); + } + + return null; + } + + @Override + public void write(Entry entry) throws CacheWriterException { + FlexHolder holder = (FlexHolder) entry.getValue(); + String query + = "INSERT INTO %s (ID, VAL, SVAL) " + + "VALUES (" + entry.getKey() + ", " + holder.intHolder + ", \"" + holder.strHolder + "\");"; + + String query0 = String.format(query, cacheName); + + TxControl txControl = TxControl.serializableRw().setCommitTx(true); + + retryCtx.supplyResult(session -> session.executeDataQuery(query0, txControl)).join(); + } + + @Override + public void delete(Object key) throws CacheWriterException { + // key can be complex !!! + String query = "delete from %s where ID=" + key; + + TxControl txControl = TxControl.serializableRw().setCommitTx(true); + + String query0 = String.format(query, cacheName); + + retryCtx.supplyResult(session -> session.executeDataQuery(query0, txControl)).join(); + } +} diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/YDBStoreFactory.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/YDBStoreFactory.java new file mode 100644 index 0000000000000..099a072c3af25 --- /dev/null +++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/YDBStoreFactory.java @@ -0,0 +1,29 @@ +package org.apache.ignite.examples.datagrid.store.jdbc; + +import java.util.Objects; +import javax.cache.configuration.Factory; + +public class YDBStoreFactory implements Factory> { + private String connUrl = "grpc://localhost:2136/local"; + + private String cacheName; + + public YDBStoreFactory setConnectionUrl(String connUrl) { + this.connUrl = connUrl; + + return this; + } + + public YDBStoreFactory setCacheName(String cacheName) { + this.cacheName = cacheName; + + return this; + } + + @Override + public YDBCacheStore create() { + Objects.requireNonNull(cacheName); + + return new YDBCacheStore<>(cacheName, connUrl); + } +} diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcBlobStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcBlobStore.java index b8ad5eb16210b..dae8317240041 100644 --- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcBlobStore.java +++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcBlobStore.java @@ -23,7 +23,10 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.Collection; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.LongAdder; @@ -35,11 +38,15 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cache.query.SqlFieldsQuery; import org.apache.ignite.cache.store.CacheStore; import org.apache.ignite.cache.store.CacheStoreAdapter; import org.apache.ignite.cache.store.CacheStoreSession; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata; +import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; @@ -51,6 +58,14 @@ import org.apache.ignite.resources.LoggerResource; import org.apache.ignite.transactions.Transaction; import org.jetbrains.annotations.Nullable; +import tech.ydb.auth.iam.CloudAuthHelper; +import tech.ydb.core.grpc.GrpcTransport; +import tech.ydb.table.SessionRetryContext; +import tech.ydb.table.TableClient; +import tech.ydb.table.description.TableDescription; +import tech.ydb.table.description.TableDescription.Builder; +import tech.ydb.table.rpc.grpc.GrpcTableRpc; +import tech.ydb.table.values.PrimitiveType; /** * {@link CacheStore} implementation backed by JDBC. This implementation @@ -90,8 +105,8 @@ public class CacheJdbcBlobStore extends CacheStoreAdapter { * Default create table query * (value is create table if not exists ENTRIES (akey binary primary key, val binary)). */ - public static final String DFLT_CREATE_TBL_QRY = "create table if not exists ENTRIES " + - "(akey binary primary key, val binary)"; + public static final String DFLT_CREATE_TBL_QRY = "create table ENTRIES " + + "(akey string, val string, PRIMARY KEY(akey))"; /** Default load entry query (value is select * from ENTRIES where akey=?). */ public static final String DFLT_LOAD_QRY = "select * from ENTRIES where akey=?"; @@ -268,7 +283,7 @@ public class CacheJdbcBlobStore extends CacheStoreAdapter { stmt.setObject(1, toBytes(val)); stmt.setObject(2, toBytes(key)); - if (stmt.executeUpdate() == 0) { + if (stmt.executeUpdate() == 1) { stmt.close(); stmt = conn.prepareStatement(insertQry); @@ -409,6 +424,39 @@ private void init() { return; } + IgniteEx ign = (IgniteEx) ignite; + + @Nullable List res = ign.context().query() + .getIndexing().resultMetaData("PUBLIC", new SqlFieldsQuery("select * from flex1")); + + Collection types = ign.context().query() + .types("SQL_PUBLIC_FLEX1"); + + Set pkeys = null; + for (GridQueryTypeDescriptor type : types) { + pkeys = type.primaryKeyFields(); + } + + GrpcTransport transport = GrpcTransport.forConnectionString("grpc://localhost:2136/local") + //.withAuthProvider(CloudAuthHelper.getAuthProviderFromEnviron()) + .build(); + GrpcTableRpc rpc = GrpcTableRpc.ownTransport(transport); + TableClient tableClient = TableClient.newClient(transport).build(); + String database = transport.getDatabase(); + SessionRetryContext retryCtx = SessionRetryContext.create(tableClient).build(); + + Builder ydbTableBuilder = TableDescription.newBuilder(); + + for (GridQueryFieldMetadata meta : res) { + ydbTableBuilder.addNullableColumn(meta.fieldName(), PrimitiveType.Int32); + } + ydbTableBuilder.setPrimaryKeys(pkeys.toArray(new String[pkeys.size()])); + + TableDescription table = ydbTableBuilder.build(); + + retryCtx.supplyStatus(session -> session.createTable(database + "/series1", table)) + .join().expectSuccess(); + if (F.isEmpty(createTblQry)) throw new IgniteException("Failed to initialize cache store (create table query is not provided)."); @@ -421,6 +469,7 @@ private void init() { stmt = conn.createStatement(); + //stmt.execute("drop table ENTRIES;"); stmt.execute(createTblQry); conn.commit();