org.springframework
spring-beans
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/FlexHolder.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/FlexHolder.java
new file mode 100644
index 0000000000000..ab4a425002544
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/FlexHolder.java
@@ -0,0 +1,11 @@
+package org.apache.ignite.examples.datagrid.store.jdbc;
+
+public class FlexHolder {
+ final int intHolder;
+ final String strHolder;
+
+ public FlexHolder(int val1, String val2) {
+ this.intHolder = val1;
+ this.strHolder = val2;
+ }
+}
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/FlexYDBStoreExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/FlexYDBStoreExample.java
new file mode 100644
index 0000000000000..0d8b61305198e
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/FlexYDBStoreExample.java
@@ -0,0 +1,153 @@
+package org.apache.ignite.examples.datagrid.store.jdbc;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.examples.ExampleNodeStartup;
+import org.apache.ignite.examples.ExamplesUtils;
+import org.apache.ignite.transactions.Transaction;
+
+/**
+ * Demonstrates usage of cache with underlying persistent store configured.
+ *
+ * This example uses {@link CacheJdbcPersonStore} as a persistent store.
+ *
+ * To start the example, you should:
+ *
+ * - Start a few nodes using {@link ExampleNodeStartup}.
+ * - Start example using {@link FlexYDBStoreExample}.
+ *
+ *
+ * Remote nodes can be started with {@link ExampleNodeStartup} in another JVM which will
+ * start node with {@code examples/config/example-ignite.xml} configuration.
+ */
+public class FlexYDBStoreExample {
+ /** Cache name. */
+ private static final String CACHE_NAME = CacheJdbcStoreExample.class.getSimpleName();
+
+ /** Heap size required to run this example. */
+ public static final int MIN_MEMORY = 1024 * 1024 * 1024;
+
+ /** Number of entries to load. */
+ private static final int ENTRY_COUNT = 100_000;
+
+ /** Global person ID to use across entire example. */
+ private static final Long id = Math.abs(UUID.randomUUID().getLeastSignificantBits());
+
+ /**
+ * Executes example.
+ *
+ * @param args Command line arguments, none required.
+ * @throws IgniteException If example execution failed.
+ */
+ public static void main(String[] args) throws IgniteException {
+ ExamplesUtils.checkMinMemory(MIN_MEMORY);
+
+ // To start ignite with desired configuration uncomment the appropriate line.
+ try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
+ System.out.println();
+ System.out.println(">>> Cache store example started.");
+
+ CacheConfiguration cacheCfg = new CacheConfiguration<>(CACHE_NAME);
+
+ // Set atomicity as transaction, since we are showing transactions in example.
+ cacheCfg.setAtomicityMode(TRANSACTIONAL);
+
+ YDBStoreFactory factory = new YDBStoreFactory<>();
+ factory.setConnectionUrl("grpc://localhost:2136/local");
+ factory.setCacheName("SQL_PUBLIC_FLEX"); // can be autogenerated from sql "create table", hardcoded for now
+ cacheCfg.setCacheStoreFactory(factory);
+
+ cacheCfg.setReadThrough(true);
+ cacheCfg.setWriteThrough(true);
+
+ // Auto-close cache at the end of the example.
+ try (IgniteCache cache = ignite.getOrCreateCache(cacheCfg)) {
+ ignite.cache(CACHE_NAME).query(new SqlFieldsQuery("drop table if exists flex;"));
+
+ FieldsQueryCursor> res0 = ignite.cache(CACHE_NAME)
+ .query(new SqlFieldsQuery("create table if not exists flex(id long primary key, val int not null, sval varchar);"));
+
+ res0.getAll();
+
+ // Make initial cache loading from persistent store. This is a
+ // distributed operation and will call CacheStore.loadCache(...)
+ // method on all nodes in topology.
+ loadCache(cache);
+
+ // Start transaction and execute several cache operations with
+ // read/write-through to persistent store.
+ executeTransaction(cache);
+ }
+ finally {
+ // Distributed cache could be removed from cluster only by #destroyCache() call.
+ ignite.destroyCache(CACHE_NAME);
+ }
+ }
+ }
+
+ /**
+ * Makes initial cache loading.
+ *
+ * @param cache Cache to load.
+ */
+ private static void loadCache(IgniteCache cache) {
+ long start = System.currentTimeMillis();
+
+ // Start loading cache from persistent store on all caching nodes.
+ cache.loadCache(null, ENTRY_COUNT);
+
+ long end = System.currentTimeMillis();
+
+ System.out.println(">>> Loaded " + cache.size() + " keys with backups in " + (end - start) + "ms.");
+ }
+
+ /**
+ * Executes transaction with read/write-through to persistent store.
+ *
+ * @param cache Cache to execute transaction on.
+ */
+ private static void executeTransaction(IgniteCache cache) {
+ try (Transaction tx = Ignition.ignite().transactions().txStart()) {
+ FlexHolder val = cache.get(id);
+
+ System.out.println("Read value: " + val);
+
+ val = cache.getAndPut(id, new FlexHolder(100, "test"));
+
+ System.out.println("Overwrite old value: " + val);
+
+ val = cache.get(id);
+
+ System.out.println("Read value: " + val);
+
+ tx.commit();
+ }
+
+ // Clear entry from memory, but keep it in store.
+ cache.clear(id);
+
+ // Operations on this cache will not affect store.
+ IgniteCache cacheSkipStore = cache.withSkipStore();
+
+ System.out.println("Read value skipping store (expecting null): " + cacheSkipStore.get(id));
+
+ System.out.println("Read value with store lookup (expecting NOT null): " + cache.get(id));
+
+ // Expecting not null, since entry should be in memory since last call.
+ System.out.println("Read value skipping store (expecting NOT null): " + cacheSkipStore.get(id));
+
+ // Clear entry from memory, and from store.
+ cache.remove(id);
+
+ System.out.println("Key removes elsewhere (expecting null): " + cache.get(id));
+ }
+}
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/YDBCacheStore.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/YDBCacheStore.java
new file mode 100644
index 0000000000000..2cd8b144dbca4
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/YDBCacheStore.java
@@ -0,0 +1,167 @@
+package org.apache.ignite.examples.datagrid.store.jdbc;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.cache.Cache.Entry;
+import javax.cache.integration.CacheLoaderException;
+import javax.cache.integration.CacheWriterException;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.query.GridQueryProperty;
+import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import tech.ydb.core.Result;
+import tech.ydb.core.grpc.GrpcTransport;
+import tech.ydb.table.SessionRetryContext;
+import tech.ydb.table.TableClient;
+import tech.ydb.table.description.TableDescription;
+import tech.ydb.table.description.TableDescription.Builder;
+import tech.ydb.table.query.DataQueryResult;
+import tech.ydb.table.result.ResultSetReader;
+import tech.ydb.table.transaction.TxControl;
+import tech.ydb.table.values.PrimitiveType;
+
+public class YDBCacheStore extends CacheStoreAdapter {
+ private final String cacheName;
+ private SessionRetryContext retryCtx;
+ private boolean initialized;
+ boolean dropBeforeCreate = true;
+
+ @IgniteInstanceResource
+ private Ignite ignite;
+
+ private String database;
+
+ public YDBCacheStore(String cacheName, String connUrl) {
+ this.cacheName = cacheName;
+
+ GrpcTransport transport = GrpcTransport.forConnectionString(connUrl)
+ //.withAuthProvider(CloudAuthHelper.getAuthProviderFromEnviron())
+ .build();
+
+ database = transport.getDatabase();
+
+ TableClient tableClient = TableClient.newClient(transport).build();
+ this.retryCtx = SessionRetryContext.create(tableClient).build();
+ }
+
+ private synchronized void init() {
+ if (!initialized) {
+ Builder ydbTableBuilder = TableDescription.newBuilder();
+
+ Collection types = ((IgniteEx) ignite).context().query().types(cacheName);
+
+ Set pkeys = null;
+ Map> fields = null;
+ Map fieldProperties = null;
+
+ assert types.size() == 1;
+
+ for (GridQueryTypeDescriptor type : types) {
+ pkeys = type.primaryKeyFields();
+
+ if (pkeys.isEmpty()) {
+ pkeys = Collections.singleton(type.keyFieldName());
+ }
+
+ fields = type.fields();
+ fieldProperties = type.properties();
+ }
+
+ for (Map.Entry> fldInfo : fields.entrySet()) {
+ PrimitiveType type0;
+ switch (fldInfo.getValue().getSimpleName()) {
+ case "Integer":
+ type0 = PrimitiveType.Int32;
+ break;
+ case "Long":
+ type0 = PrimitiveType.Int64;
+ break;
+ case "String":
+ type0 = PrimitiveType.Text;
+ break;
+ default:
+ throw new UnsupportedOperationException("Undefined mapping for [" + fldInfo.getValue().getSimpleName() + "] type");
+ }
+
+ GridQueryProperty fldProp = Objects.requireNonNull(fieldProperties.get(fldInfo.getKey()));
+
+/* if (fldProp.notNull()) { // need to discuss with ya what the problem here
+ ydbTableBuilder.addNonnullColumn(fldInfo.getKey(), type0);
+ } else {
+ ydbTableBuilder.addNullableColumn(fldInfo.getKey(), type0);
+ }*/
+
+ ydbTableBuilder.addNullableColumn(fldInfo.getKey(), type0);
+ }
+ ydbTableBuilder.setPrimaryKeys(pkeys.toArray(new String[pkeys.size()]));
+
+ TableDescription table = ydbTableBuilder.build();
+
+ if (dropBeforeCreate) {
+ retryCtx.supplyStatus(session -> session.dropTable(database + "/" + cacheName)).join();
+ }
+
+ retryCtx.supplyStatus(session -> session.createTable(database + "/" + cacheName, table))
+ .join().expectSuccess();
+
+ initialized = true;
+ }
+ }
+
+ @Override
+ public V load(K k) throws CacheLoaderException {
+ init();
+
+ String query
+ = "SELECT ID, VAL, SVAL "
+ + "FROM %s WHERE ID = " + k + ";";
+
+ String query0 = String.format(query, cacheName);
+
+ TxControl txControl = TxControl.serializableRw().setCommitTx(true);
+
+ Result result = retryCtx.supplyResult(session -> session.executeDataQuery(query0, txControl)).join();
+
+ DataQueryResult res = result.getValue();
+
+ ResultSetReader rs = res.getResultSet(0);
+
+ while (rs.next()) {
+ return (V) new FlexHolder(rs.getColumn("VAL").getInt32(), rs.getColumn("SVAL").getText());
+ }
+
+ return null;
+ }
+
+ @Override
+ public void write(Entry extends K, ? extends V> entry) throws CacheWriterException {
+ FlexHolder holder = (FlexHolder) entry.getValue();
+ String query
+ = "INSERT INTO %s (ID, VAL, SVAL) "
+ + "VALUES (" + entry.getKey() + ", " + holder.intHolder + ", \"" + holder.strHolder + "\");";
+
+ String query0 = String.format(query, cacheName);
+
+ TxControl txControl = TxControl.serializableRw().setCommitTx(true);
+
+ retryCtx.supplyResult(session -> session.executeDataQuery(query0, txControl)).join();
+ }
+
+ @Override
+ public void delete(Object key) throws CacheWriterException {
+ // key can be complex !!!
+ String query = "delete from %s where ID=" + key;
+
+ TxControl txControl = TxControl.serializableRw().setCommitTx(true);
+
+ String query0 = String.format(query, cacheName);
+
+ retryCtx.supplyResult(session -> session.executeDataQuery(query0, txControl)).join();
+ }
+}
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/YDBStoreFactory.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/YDBStoreFactory.java
new file mode 100644
index 0000000000000..099a072c3af25
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/store/jdbc/YDBStoreFactory.java
@@ -0,0 +1,29 @@
+package org.apache.ignite.examples.datagrid.store.jdbc;
+
+import java.util.Objects;
+import javax.cache.configuration.Factory;
+
+public class YDBStoreFactory implements Factory> {
+ private String connUrl = "grpc://localhost:2136/local";
+
+ private String cacheName;
+
+ public YDBStoreFactory setConnectionUrl(String connUrl) {
+ this.connUrl = connUrl;
+
+ return this;
+ }
+
+ public YDBStoreFactory setCacheName(String cacheName) {
+ this.cacheName = cacheName;
+
+ return this;
+ }
+
+ @Override
+ public YDBCacheStore create() {
+ Objects.requireNonNull(cacheName);
+
+ return new YDBCacheStore<>(cacheName, connUrl);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcBlobStore.java b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcBlobStore.java
index b8ad5eb16210b..dae8317240041 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcBlobStore.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/jdbc/CacheJdbcBlobStore.java
@@ -23,7 +23,10 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.Collection;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
@@ -35,11 +38,15 @@
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.cache.store.CacheStoreAdapter;
import org.apache.ignite.cache.store.CacheStoreSession;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.query.GridQueryFieldMetadata;
+import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -51,6 +58,14 @@
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.transactions.Transaction;
import org.jetbrains.annotations.Nullable;
+import tech.ydb.auth.iam.CloudAuthHelper;
+import tech.ydb.core.grpc.GrpcTransport;
+import tech.ydb.table.SessionRetryContext;
+import tech.ydb.table.TableClient;
+import tech.ydb.table.description.TableDescription;
+import tech.ydb.table.description.TableDescription.Builder;
+import tech.ydb.table.rpc.grpc.GrpcTableRpc;
+import tech.ydb.table.values.PrimitiveType;
/**
* {@link CacheStore} implementation backed by JDBC. This implementation
@@ -90,8 +105,8 @@ public class CacheJdbcBlobStore extends CacheStoreAdapter {
* Default create table query
* (value is create table if not exists ENTRIES (akey binary primary key, val binary)).
*/
- public static final String DFLT_CREATE_TBL_QRY = "create table if not exists ENTRIES " +
- "(akey binary primary key, val binary)";
+ public static final String DFLT_CREATE_TBL_QRY = "create table ENTRIES " +
+ "(akey string, val string, PRIMARY KEY(akey))";
/** Default load entry query (value is select * from ENTRIES where akey=?). */
public static final String DFLT_LOAD_QRY = "select * from ENTRIES where akey=?";
@@ -268,7 +283,7 @@ public class CacheJdbcBlobStore extends CacheStoreAdapter {
stmt.setObject(1, toBytes(val));
stmt.setObject(2, toBytes(key));
- if (stmt.executeUpdate() == 0) {
+ if (stmt.executeUpdate() == 1) {
stmt.close();
stmt = conn.prepareStatement(insertQry);
@@ -409,6 +424,39 @@ private void init() {
return;
}
+ IgniteEx ign = (IgniteEx) ignite;
+
+ @Nullable List res = ign.context().query()
+ .getIndexing().resultMetaData("PUBLIC", new SqlFieldsQuery("select * from flex1"));
+
+ Collection types = ign.context().query()
+ .types("SQL_PUBLIC_FLEX1");
+
+ Set pkeys = null;
+ for (GridQueryTypeDescriptor type : types) {
+ pkeys = type.primaryKeyFields();
+ }
+
+ GrpcTransport transport = GrpcTransport.forConnectionString("grpc://localhost:2136/local")
+ //.withAuthProvider(CloudAuthHelper.getAuthProviderFromEnviron())
+ .build();
+ GrpcTableRpc rpc = GrpcTableRpc.ownTransport(transport);
+ TableClient tableClient = TableClient.newClient(transport).build();
+ String database = transport.getDatabase();
+ SessionRetryContext retryCtx = SessionRetryContext.create(tableClient).build();
+
+ Builder ydbTableBuilder = TableDescription.newBuilder();
+
+ for (GridQueryFieldMetadata meta : res) {
+ ydbTableBuilder.addNullableColumn(meta.fieldName(), PrimitiveType.Int32);
+ }
+ ydbTableBuilder.setPrimaryKeys(pkeys.toArray(new String[pkeys.size()]));
+
+ TableDescription table = ydbTableBuilder.build();
+
+ retryCtx.supplyStatus(session -> session.createTable(database + "/series1", table))
+ .join().expectSuccess();
+
if (F.isEmpty(createTblQry))
throw new IgniteException("Failed to initialize cache store (create table query is not provided).");
@@ -421,6 +469,7 @@ private void init() {
stmt = conn.createStatement();
+ //stmt.execute("drop table ENTRIES;");
stmt.execute(createTblQry);
conn.commit();