Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flexsoft ignite3 #1

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,22 @@
<version>${spring.version}</version>
</dependency>

<dependency>
<groupId>tech.ydb.jdbc</groupId>
<artifactId>ydb-jdbc-driver-shaded</artifactId>
<version>2.1.0</version>
</dependency>
<!-- <dependency>
<groupId>com.yandex.ydb</groupId>
<artifactId>ydb-sdk-jdbc</artifactId>
<version>1.45.6</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-context</artifactId>
<version>1.43.2</version>
</dependency>-->

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package org.apache.ignite.examples.datagrid.store.jdbc;

public class FlexHolder {
final int intHolder;
final String strHolder;

public FlexHolder(int val1, String val2) {
this.intHolder = val1;
this.strHolder = val2;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package org.apache.ignite.examples.datagrid.store.jdbc;

import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;

import java.util.List;
import java.util.UUID;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.Ignition;
import org.apache.ignite.cache.query.FieldsQueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.examples.ExampleNodeStartup;
import org.apache.ignite.examples.ExamplesUtils;
import org.apache.ignite.transactions.Transaction;

/**
* Demonstrates usage of cache with underlying persistent store configured.
* <p>
* This example uses {@link CacheJdbcPersonStore} as a persistent store.
* <p>
* To start the example, you should:
* <ul>
* <li>Start a few nodes using {@link ExampleNodeStartup}.</li>
* <li>Start example using {@link FlexYDBStoreExample}.</li>
* </ul>
* <p>
* Remote nodes can be started with {@link ExampleNodeStartup} in another JVM which will
* start node with {@code examples/config/example-ignite.xml} configuration.
*/
public class FlexYDBStoreExample {
/** Cache name. */
private static final String CACHE_NAME = CacheJdbcStoreExample.class.getSimpleName();

/** Heap size required to run this example. */
public static final int MIN_MEMORY = 1024 * 1024 * 1024;

/** Number of entries to load. */
private static final int ENTRY_COUNT = 100_000;

/** Global person ID to use across entire example. */
private static final Long id = Math.abs(UUID.randomUUID().getLeastSignificantBits());

/**
* Executes example.
*
* @param args Command line arguments, none required.
* @throws IgniteException If example execution failed.
*/
public static void main(String[] args) throws IgniteException {
ExamplesUtils.checkMinMemory(MIN_MEMORY);

// To start ignite with desired configuration uncomment the appropriate line.
try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
System.out.println();
System.out.println(">>> Cache store example started.");

CacheConfiguration cacheCfg = new CacheConfiguration<>(CACHE_NAME);

// Set atomicity as transaction, since we are showing transactions in example.
cacheCfg.setAtomicityMode(TRANSACTIONAL);

YDBStoreFactory<Integer, FlexHolder> factory = new YDBStoreFactory<>();
factory.setConnectionUrl("grpc://localhost:2136/local");
factory.setCacheName("SQL_PUBLIC_FLEX"); // can be autogenerated from sql "create table", hardcoded for now
cacheCfg.setCacheStoreFactory(factory);

cacheCfg.setReadThrough(true);
cacheCfg.setWriteThrough(true);

// Auto-close cache at the end of the example.
try (IgniteCache<Long, FlexHolder> cache = ignite.getOrCreateCache(cacheCfg)) {
ignite.cache(CACHE_NAME).query(new SqlFieldsQuery("drop table if exists flex;"));

FieldsQueryCursor<List<?>> res0 = ignite.cache(CACHE_NAME)
.query(new SqlFieldsQuery("create table if not exists flex(id long primary key, val int not null, sval varchar);"));

res0.getAll();

// Make initial cache loading from persistent store. This is a
// distributed operation and will call CacheStore.loadCache(...)
// method on all nodes in topology.
loadCache(cache);

// Start transaction and execute several cache operations with
// read/write-through to persistent store.
executeTransaction(cache);
}
finally {
// Distributed cache could be removed from cluster only by #destroyCache() call.
ignite.destroyCache(CACHE_NAME);
}
}
}

/**
* Makes initial cache loading.
*
* @param cache Cache to load.
*/
private static void loadCache(IgniteCache<Long, FlexHolder> cache) {
long start = System.currentTimeMillis();

// Start loading cache from persistent store on all caching nodes.
cache.loadCache(null, ENTRY_COUNT);

long end = System.currentTimeMillis();

System.out.println(">>> Loaded " + cache.size() + " keys with backups in " + (end - start) + "ms.");
}

/**
* Executes transaction with read/write-through to persistent store.
*
* @param cache Cache to execute transaction on.
*/
private static void executeTransaction(IgniteCache<Long, FlexHolder> cache) {
try (Transaction tx = Ignition.ignite().transactions().txStart()) {
FlexHolder val = cache.get(id);

System.out.println("Read value: " + val);

val = cache.getAndPut(id, new FlexHolder(100, "test"));

System.out.println("Overwrite old value: " + val);

val = cache.get(id);

System.out.println("Read value: " + val);

tx.commit();
}

// Clear entry from memory, but keep it in store.
cache.clear(id);

// Operations on this cache will not affect store.
IgniteCache<Long, FlexHolder> cacheSkipStore = cache.withSkipStore();

System.out.println("Read value skipping store (expecting null): " + cacheSkipStore.get(id));

System.out.println("Read value with store lookup (expecting NOT null): " + cache.get(id));

// Expecting not null, since entry should be in memory since last call.
System.out.println("Read value skipping store (expecting NOT null): " + cacheSkipStore.get(id));

// Clear entry from memory, and from store.
cache.remove(id);

System.out.println("Key removes elsewhere (expecting null): " + cache.get(id));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package org.apache.ignite.examples.datagrid.store.jdbc;

import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.cache.Cache.Entry;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import org.apache.ignite.Ignite;
import org.apache.ignite.cache.store.CacheStoreAdapter;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.query.GridQueryProperty;
import org.apache.ignite.internal.processors.query.GridQueryTypeDescriptor;
import org.apache.ignite.resources.IgniteInstanceResource;
import tech.ydb.core.Result;
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.table.SessionRetryContext;
import tech.ydb.table.TableClient;
import tech.ydb.table.description.TableDescription;
import tech.ydb.table.description.TableDescription.Builder;
import tech.ydb.table.query.DataQueryResult;
import tech.ydb.table.result.ResultSetReader;
import tech.ydb.table.transaction.TxControl;
import tech.ydb.table.values.PrimitiveType;

public class YDBCacheStore <K, V> extends CacheStoreAdapter<K, V> {
private final String cacheName;
private SessionRetryContext retryCtx;
private boolean initialized;
boolean dropBeforeCreate = true;

@IgniteInstanceResource
private Ignite ignite;

private String database;

public YDBCacheStore(String cacheName, String connUrl) {
this.cacheName = cacheName;

GrpcTransport transport = GrpcTransport.forConnectionString(connUrl)
//.withAuthProvider(CloudAuthHelper.getAuthProviderFromEnviron())
.build();

database = transport.getDatabase();

TableClient tableClient = TableClient.newClient(transport).build();
this.retryCtx = SessionRetryContext.create(tableClient).build();
}

private synchronized void init() {
if (!initialized) {
Builder ydbTableBuilder = TableDescription.newBuilder();

Collection<GridQueryTypeDescriptor> types = ((IgniteEx) ignite).context().query().types(cacheName);

Set<String> pkeys = null;
Map<String, Class<?>> fields = null;
Map<String, GridQueryProperty> fieldProperties = null;

assert types.size() == 1;

for (GridQueryTypeDescriptor type : types) {
pkeys = type.primaryKeyFields();

if (pkeys.isEmpty()) {
pkeys = Collections.singleton(type.keyFieldName());
}

fields = type.fields();
fieldProperties = type.properties();
}

for (Map.Entry<String, Class<?>> fldInfo : fields.entrySet()) {
PrimitiveType type0;
switch (fldInfo.getValue().getSimpleName()) {
case "Integer":
type0 = PrimitiveType.Int32;
break;
case "Long":
type0 = PrimitiveType.Int64;
break;
case "String":
type0 = PrimitiveType.Text;
break;
default:
throw new UnsupportedOperationException("Undefined mapping for [" + fldInfo.getValue().getSimpleName() + "] type");
}

GridQueryProperty fldProp = Objects.requireNonNull(fieldProperties.get(fldInfo.getKey()));

/* if (fldProp.notNull()) { // need to discuss with ya what the problem here
ydbTableBuilder.addNonnullColumn(fldInfo.getKey(), type0);
} else {
ydbTableBuilder.addNullableColumn(fldInfo.getKey(), type0);
}*/

ydbTableBuilder.addNullableColumn(fldInfo.getKey(), type0);
}
ydbTableBuilder.setPrimaryKeys(pkeys.toArray(new String[pkeys.size()]));

TableDescription table = ydbTableBuilder.build();

if (dropBeforeCreate) {
retryCtx.supplyStatus(session -> session.dropTable(database + "/" + cacheName)).join();
}

retryCtx.supplyStatus(session -> session.createTable(database + "/" + cacheName, table))
.join().expectSuccess();

initialized = true;
}
}

@Override
public V load(K k) throws CacheLoaderException {
init();

String query
= "SELECT ID, VAL, SVAL "
+ "FROM %s WHERE ID = " + k + ";";

String query0 = String.format(query, cacheName);

TxControl txControl = TxControl.serializableRw().setCommitTx(true);

Result<DataQueryResult> result = retryCtx.supplyResult(session -> session.executeDataQuery(query0, txControl)).join();

DataQueryResult res = result.getValue();

ResultSetReader rs = res.getResultSet(0);

while (rs.next()) {
return (V) new FlexHolder(rs.getColumn("VAL").getInt32(), rs.getColumn("SVAL").getText());
}

return null;
}

@Override
public void write(Entry<? extends K, ? extends V> entry) throws CacheWriterException {
FlexHolder holder = (FlexHolder) entry.getValue();
String query
= "INSERT INTO %s (ID, VAL, SVAL) "
+ "VALUES (" + entry.getKey() + ", " + holder.intHolder + ", \"" + holder.strHolder + "\");";

String query0 = String.format(query, cacheName);

TxControl txControl = TxControl.serializableRw().setCommitTx(true);

retryCtx.supplyResult(session -> session.executeDataQuery(query0, txControl)).join();
}

@Override
public void delete(Object key) throws CacheWriterException {
// key can be complex !!!
String query = "delete from %s where ID=" + key;

TxControl txControl = TxControl.serializableRw().setCommitTx(true);

String query0 = String.format(query, cacheName);

retryCtx.supplyResult(session -> session.executeDataQuery(query0, txControl)).join();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package org.apache.ignite.examples.datagrid.store.jdbc;

import java.util.Objects;
import javax.cache.configuration.Factory;

public class YDBStoreFactory <K, V> implements Factory<YDBCacheStore<K, V>> {
private String connUrl = "grpc://localhost:2136/local";

private String cacheName;

public YDBStoreFactory<K, V> setConnectionUrl(String connUrl) {
this.connUrl = connUrl;

return this;
}

public YDBStoreFactory<K, V> setCacheName(String cacheName) {
this.cacheName = cacheName;

return this;
}

@Override
public YDBCacheStore<K, V> create() {
Objects.requireNonNull(cacheName);

return new YDBCacheStore<>(cacheName, connUrl);
}
}
Loading
Loading