From e5859c6ee8850819617ef4540f923bb544b8b9de Mon Sep 17 00:00:00 2001 From: ntisseyre Date: Thu, 14 Nov 2024 18:43:02 -0600 Subject: [PATCH] Reindex subset of vertices Signed-off-by: ntisseyre --- docs/configs/janusgraph-cfg.md | 1 + .../graphdb/JanusGraphBaseTest.java | 2 +- .../graphdb/JanusGraphIndexTest.java | 89 ++++++++++ .../core/schema/JanusGraphManagement.java | 12 ++ .../common/DistributedStoreManager.java | 12 ++ .../keycolumnvalue/KeyColumnValueStore.java | 5 + .../scan/MultiThreadsRowsCollector.java | 13 +- .../keycolumnvalue/scan/ScanJob.java | 8 + .../scan/StandardScannerExecutor.java | 2 +- .../util/MetricInstrumentedStore.java | 12 ++ .../GraphDatabaseConfiguration.java | 4 + .../database/management/ManagementSystem.java | 13 +- .../graphdb/olap/AbstractScanJob.java | 2 +- .../graphdb/olap/VertexJobConverter.java | 27 ++- .../cql/CQLKeyColumnValueStore.java | 17 +- .../diskstorage/cql/CQLMapKeyIterator.java | 81 +++++++++ .../diskstorage/cql/CQLSubsetIterator.java | 69 ++++++++ .../cql/CQLSubsetIteratorTest.java | 163 ++++++++++++++++++ .../es/BerkeleyElasticsearchTest.java | 6 + .../hadoop/scan/HadoopVertexScanMapper.java | 2 +- .../lucene/BerkeleyLuceneTest.java | 7 + .../diskstorage/solr/BerkeleySolrTest.java | 8 + 22 files changed, 541 insertions(+), 14 deletions(-) create mode 100644 janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLMapKeyIterator.java create mode 100644 janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLSubsetIterator.java create mode 100644 janusgraph-cql/src/test/java/org/janusgraph/diskstorage/cql/CQLSubsetIteratorTest.java diff --git a/docs/configs/janusgraph-cfg.md b/docs/configs/janusgraph-cfg.md index dbc36fff57..283604f4ce 100644 --- a/docs/configs/janusgraph-cfg.md +++ b/docs/configs/janusgraph-cfg.md @@ -422,6 +422,7 @@ Configuration options for the storage backend. Some options are applicable only | storage.directory | Storage directory for those storage backends that require local storage. | String | (no default value) | LOCAL | | storage.drop-on-clear | Whether to drop the graph database (true) or delete rows (false) when clearing storage. Note that some backends always drop the graph database when clearing storage. Also note that indices are always dropped when clearing storage. | Boolean | true | MASKABLE | | storage.hostname | The hostname or comma-separated list of hostnames of storage backend servers. This is only applicable to some storage backends, such as cassandra and hbase. | String[] | 127.0.0.1 | LOCAL | +| storage.keys-size | The maximum amount of keys/partitions to retrieve from distributed storage system by JanusGraph in a single request. | Integer | 100 | MASKABLE | | storage.num-mutations-parallel-threshold | This parameter determines the minimum number of mutations a transaction must have before parallel processing is applied during aggregation. Leveraging parallel processing can enhance the commit times for transactions involving a large number of mutations. However, it is advisable not to set the threshold too low (e.g., 0 or 1) due to the overhead associated with parallelism synchronization. This overhead is more efficiently offset in the context of larger transactions. | Integer | 100 | MASKABLE | | storage.page-size | JanusGraph break requests that may return many results from distributed storage backends into a series of requests for small chunks/pages of results, where each chunk contains up to this many elements. | Integer | 100 | MASKABLE | | storage.parallel-backend-ops | Whether JanusGraph should attempt to parallelize storage operations | Boolean | true | MASKABLE | diff --git a/janusgraph-backend-testutils/src/main/java/org/janusgraph/graphdb/JanusGraphBaseTest.java b/janusgraph-backend-testutils/src/main/java/org/janusgraph/graphdb/JanusGraphBaseTest.java index a69af6d185..e5cf614628 100644 --- a/janusgraph-backend-testutils/src/main/java/org/janusgraph/graphdb/JanusGraphBaseTest.java +++ b/janusgraph-backend-testutils/src/main/java/org/janusgraph/graphdb/JanusGraphBaseTest.java @@ -747,7 +747,7 @@ public static void evaluateQuery(JanusGraphQuery query, ElementCategory resultTy } protected ScanMetrics executeScanJob(VertexScanJob job) throws Exception { - return executeScanJob(VertexJobConverter.convert(graph,job)); + return executeScanJob(VertexJobConverter.convert(graph, job, null)); } protected ScanMetrics executeScanJob(ScanJob job) throws Exception { diff --git a/janusgraph-backend-testutils/src/main/java/org/janusgraph/graphdb/JanusGraphIndexTest.java b/janusgraph-backend-testutils/src/main/java/org/janusgraph/graphdb/JanusGraphIndexTest.java index d66707855d..bdfdde3676 100644 --- a/janusgraph-backend-testutils/src/main/java/org/janusgraph/graphdb/JanusGraphIndexTest.java +++ b/janusgraph-backend-testutils/src/main/java/org/janusgraph/graphdb/JanusGraphIndexTest.java @@ -102,6 +102,7 @@ import org.janusgraph.graphdb.types.vertices.JanusGraphSchemaVertex; import org.janusgraph.graphdb.vertices.CacheVertex; import org.janusgraph.testutil.TestGraphConfigs; +import org.javatuples.Pair; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInfo; @@ -1451,6 +1452,94 @@ public void testCompositeVsMixedIndexing() { assertTrue(tx.traversal().V().has("intId2", 234).hasNext()); } + @Test + public void testSubsetReindex() throws Exception { + + clopen(option(FORCE_INDEX_USAGE), true); + + mgmt.makeVertexLabel("cat").make(); + mgmt.makeVertexLabel("dog").make(); + + makeKey("id", Integer.class); + makeKey("name", String.class); + final PropertyKey typeKey = makeKey("type", String.class); + + String typeIndex = "searchByType"; + mgmt.buildIndex(typeIndex, Vertex.class) + .addKey(typeKey) + .buildCompositeIndex(); + mgmt.commit(); + + //Cats + int catsCount = 3; + for (int i = 0; i < catsCount; i++) { + Vertex v = tx.addVertex("cat"); + v.property("id", i); + v.property("name", "cat_" + i); + v.property("type", "cat"); + } + + //Dogs + for (int i = 0; i < 5; i++) { + Vertex v = tx.addVertex("dog"); + v.property("id", i); + v.property("name", "dog_" + i); + v.property("type", "dog"); + } + + tx.commit(); + + //Select a subset of vertices to index + clopen(option(FORCE_INDEX_USAGE), true); + List cats = tx.traversal().V().has("type", "cat").toList(); + assertEquals(catsCount, cats.size()); + String excludedCat = cats.get(cats.size() - 1).value("name"); + List> catsSubset = cats.subList(0, cats.size() - 1).stream() + .map(kitty -> new Pair(kitty.id(), kitty.value("name"))) + .collect(Collectors.toList()); + + List dogs = tx.traversal().V().has("type", "dog").toList(); + assertEquals(5, dogs.size()); + tx.rollback(); + + //Create new Index + graph.getOpenTransactions().forEach(JanusGraphTransaction::rollback); + mgmt = graph.openManagement(); + mgmt.getOpenInstances().stream().filter(i -> !i.contains("current")).forEach(i -> mgmt.forceCloseInstance(i)); + mgmt.commit(); + + String catsNameIndex = "searchByName_CatsOnly"; + mgmt = graph.openManagement(); + mgmt.buildIndex(catsNameIndex, Vertex.class) + .addKey(mgmt.getPropertyKey("name")) + .indexOnly(mgmt.getVertexLabel("cat")) + .buildCompositeIndex(); + mgmt.commit(); + + //Make Index as REGISTERED + mgmt = graph.openManagement(); + mgmt.updateIndex(mgmt.getGraphIndex(catsNameIndex), SchemaAction.REGISTER_INDEX).get(); + mgmt.commit(); + ManagementSystem.awaitGraphIndexStatus(graph, catsNameIndex).status(SchemaStatus.REGISTERED).call(); + + //Reindex a given subset + List reIndexOnlyIds = catsSubset.stream().map(Pair::getValue0).collect(Collectors.toList()); + mgmt = graph.openManagement(); + mgmt.updateIndex(mgmt.getGraphIndex(catsNameIndex), SchemaAction.REINDEX, reIndexOnlyIds).get(); + mgmt.commit(); + ManagementSystem.awaitGraphIndexStatus(graph, catsNameIndex).status(SchemaStatus.ENABLED).call(); + + clopen(option(FORCE_INDEX_USAGE), true); + catsSubset.forEach(kitty -> { + List catsByName = tx.traversal().V().hasLabel("cat").has("name", kitty.getValue1()).toList(); + assertEquals(1, catsByName.size()); + }); + + List catsByName = tx.traversal().V().hasLabel("cat").has("name", excludedCat).toList(); + assertEquals(0, catsByName.size()); + tx.rollback(); + } + @Test public void testIndexInlineProperties() throws NoSuchMethodException { diff --git a/janusgraph-core/src/main/java/org/janusgraph/core/schema/JanusGraphManagement.java b/janusgraph-core/src/main/java/org/janusgraph/core/schema/JanusGraphManagement.java index a2fc563e89..5da96a3751 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/core/schema/JanusGraphManagement.java +++ b/janusgraph-core/src/main/java/org/janusgraph/core/schema/JanusGraphManagement.java @@ -25,6 +25,7 @@ import org.janusgraph.diskstorage.keycolumnvalue.scan.ScanJobFuture; import java.time.Duration; +import java.util.List; import java.util.Set; /** @@ -341,6 +342,17 @@ interface IndexBuilder { */ ScanJobFuture updateIndex(Index index, SchemaAction updateAction, int numOfThreads); + /** + * Updates the provided index according to the given {@link SchemaAction} for + * the given subset of vertices. + * + * @param index + * @param updateAction + * @param vertexOnly Set of vertexIds that only should be considered for index update + * @return a future that completes when the index action is done + */ + ScanJobFuture updateIndex(Index index, SchemaAction updateAction, List vertexOnly); + /** * If an index update job was triggered through {@link #updateIndex(Index, SchemaAction)} with schema actions * {@link org.janusgraph.core.schema.SchemaAction#REINDEX} or {@link org.janusgraph.core.schema.SchemaAction#DISCARD_INDEX} diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/common/DistributedStoreManager.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/common/DistributedStoreManager.java index 0896d8d3cd..89ed24d547 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/common/DistributedStoreManager.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/common/DistributedStoreManager.java @@ -28,6 +28,7 @@ import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.AUTH_PASSWORD; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.AUTH_USERNAME; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.CONNECTION_TIMEOUT; +import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.KEYS_SIZE; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.PAGE_SIZE; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.STORAGE_HOSTS; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.STORAGE_PORT; @@ -69,6 +70,7 @@ public enum Deployment { protected final int port; protected final Duration connectionTimeoutMS; protected final int pageSize; + protected final int keysSize; protected final String username; protected final String password; @@ -83,6 +85,7 @@ public DistributedStoreManager(Configuration storageConfig, int portDefault) { else this.port = portDefault; this.connectionTimeoutMS = storageConfig.get(CONNECTION_TIMEOUT); this.pageSize = storageConfig.get(PAGE_SIZE); + this.keysSize = storageConfig.get(KEYS_SIZE); this.times = storageConfig.get(TIMESTAMP_PROVIDER); if (storageConfig.has(AUTH_USERNAME)) { @@ -121,6 +124,15 @@ public int getPageSize() { return pageSize; } + /** + * Returns the default configured keys size for this storage backend. The keys size is used to determine + * how many keys/partitions to request from storage within single request. + * @return + */ + public int getKeysSize() { + return keysSize; + } + /* * TODO this should go away once we have a JanusGraphConfig that encapsulates TimestampProvider */ diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/KeyColumnValueStore.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/KeyColumnValueStore.java index 4973433406..6ec14ea850 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/KeyColumnValueStore.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/KeyColumnValueStore.java @@ -14,6 +14,7 @@ package org.janusgraph.diskstorage.keycolumnvalue; +import org.apache.commons.lang.NotImplementedException; import org.janusgraph.diskstorage.BackendException; import org.janusgraph.diskstorage.Entry; import org.janusgraph.diskstorage.EntryList; @@ -181,6 +182,10 @@ default Map> getMultiSlices(MultiKeysQu */ void acquireLock(StaticBuffer key, StaticBuffer column, StaticBuffer expectedValue, StoreTransaction txh) throws BackendException; + default KeyIterator getKeys(final List keys, final SliceQuery query, final StoreTransaction txh) throws BackendException { + throw new NotImplementedException(); + } + /** * Returns a {@link KeyIterator} over all keys that fall within the key-range specified by the given query and have one or more columns matching the column-range. * Calling {@link KeyIterator#getEntries()} returns the list of all entries that match the column-range specified by the given query. diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/scan/MultiThreadsRowsCollector.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/scan/MultiThreadsRowsCollector.java index 3e0c7f9bc4..5ddf82b313 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/scan/MultiThreadsRowsCollector.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/scan/MultiThreadsRowsCollector.java @@ -61,6 +61,7 @@ class MultiThreadsRowsCollector extends RowsCollector { private final StoreTransaction storeTx; private final List queries; private final Predicate keyFilter; + private final List keysToScan; private final Configuration graphConfiguration; private final DataPuller[] pullThreads; private final BlockingQueue[] dataQueues; @@ -72,6 +73,7 @@ class MultiThreadsRowsCollector extends RowsCollector { StoreTransaction storeTx, List queries, Predicate keyFilter, + List keysToScan, BlockingQueue rowQueue, Configuration graphConfiguration) throws BackendException { @@ -80,6 +82,7 @@ class MultiThreadsRowsCollector extends RowsCollector { this.storeTx = storeTx; this.queries = queries; this.keyFilter = keyFilter; + this.keysToScan = keysToScan; this.graphConfiguration = graphConfiguration; this.dataQueues = new BlockingQueue[queries.size()]; @@ -189,8 +192,14 @@ private void addDataPuller(SliceQuery sq, StoreTransaction stx, int pos) throws this.graphConfiguration.get(GraphDatabaseConfiguration.PAGE_SIZE)); dataQueues[pos] = queue; - DataPuller dp = new DataPuller(sq, queue, - KCVSUtil.getKeys(store,sq,storeFeatures, MAX_KEY_LENGTH,stx), keyFilter); + KeyIterator keyIterator; + if (keysToScan != null) { + keyIterator = store.getKeys(keysToScan, sq, stx); + } else { + keyIterator = KCVSUtil.getKeys(store, sq, storeFeatures, MAX_KEY_LENGTH, stx); + } + + DataPuller dp = new DataPuller(sq, queue, keyIterator, keyFilter); pullThreads[pos] = dp; dp.setName("data-puller-" + pos); // setting the name for thread dumps! dp.start(); diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/scan/ScanJob.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/scan/ScanJob.java index ed123b4352..043a138d81 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/scan/ScanJob.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/scan/ScanJob.java @@ -113,6 +113,14 @@ default void workerIterationEnd(ScanMetrics metrics) {} */ List getQueries(); + /** + * Get keys to scan by the job. If stream is empty, all keys will be scanned. + * @return + */ + default List getKeysToScan() { + return null; + } + /** * A predicate that determines whether * {@link #process(org.janusgraph.diskstorage.StaticBuffer, java.util.Map, ScanMetrics)} diff --git a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/scan/StandardScannerExecutor.java b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/scan/StandardScannerExecutor.java index 8efd3ed2f4..3c2d681693 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/scan/StandardScannerExecutor.java +++ b/janusgraph-core/src/main/java/org/janusgraph/diskstorage/keycolumnvalue/scan/StandardScannerExecutor.java @@ -167,7 +167,7 @@ private RowsCollector buildScanner(BlockingQueue processorQueue, List keys, final SliceQuery query, final StoreTransaction txh) throws BackendException { + return runWithMetrics(txh, metricsStoreName, M_GET_KEYS, () -> { + final KeyIterator ki = backend.getKeys(keys, query, txh); + if (txh.getConfiguration().hasGroupName()) { + return MetricInstrumentedIterator.of(ki, txh.getConfiguration().getGroupName(), metricsStoreName, M_GET_KEYS, M_ITERATOR); + } else { + return ki; + } + }); + } + @Override public KeyIterator getKeys(final KeyRangeQuery query, final StoreTransaction txh) throws BackendException { return runWithMetrics(txh, metricsStoreName, M_GET_KEYS, () -> { diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/configuration/GraphDatabaseConfiguration.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/configuration/GraphDatabaseConfiguration.java index 8237e3a9d0..7c51ad087a 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/configuration/GraphDatabaseConfiguration.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/configuration/GraphDatabaseConfiguration.java @@ -830,6 +830,10 @@ public boolean apply(@Nullable String s) { "up to this many elements.", ConfigOption.Type.MASKABLE, 100); + public static final ConfigOption KEYS_SIZE = new ConfigOption<>(STORAGE_NS,"keys-size", + "The maximum amount of keys/partitions to retrieve from distributed storage system by JanusGraph in a single request.", + ConfigOption.Type.MASKABLE, 100); + public static final ConfigOption DROP_ON_CLEAR = new ConfigOption<>(STORAGE_NS, "drop-on-clear", "Whether to drop the graph database (true) or delete rows (false) when clearing storage. " + "Note that some backends always drop the graph database when clearing storage. Also note that indices are " + diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/management/ManagementSystem.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/management/ManagementSystem.java index 302071428d..7925668bf7 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/management/ManagementSystem.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/database/management/ManagementSystem.java @@ -914,11 +914,20 @@ public JanusGraphIndex buildMixedIndex(String backingIndex) { --------------- */ @Override public ScanJobFuture updateIndex(Index index, SchemaAction updateAction) { - return updateIndex(index, updateAction, Runtime.getRuntime().availableProcessors()); + return updateIndex(index, updateAction, null, Runtime.getRuntime().availableProcessors()); } @Override public ScanJobFuture updateIndex(Index index, SchemaAction updateAction, int numOfThreads) { + return updateIndex(index, updateAction, null, numOfThreads); + } + + @Override + public ScanJobFuture updateIndex(Index index, SchemaAction updateAction, List vertexOnly) { + return updateIndex(index, updateAction, vertexOnly, Runtime.getRuntime().availableProcessors()); + } + + private ScanJobFuture updateIndex(Index index, SchemaAction updateAction, List vertexOnly, int numOfThreads) { Preconditions.checkArgument(index != null, "Need to provide an index"); Preconditions.checkArgument(updateAction != null, "Need to provide update action"); @@ -967,7 +976,7 @@ public ScanJobFuture updateIndex(Index index, SchemaAction updateAction, int num builder.setFinishJob(indexId.getIndexJobFinisher(graph, SchemaAction.ENABLE_INDEX)); builder.setJobId(indexId); builder.setNumProcessingThreads(numOfThreads); - builder.setJob(VertexJobConverter.convert(graph, new IndexRepairJob(indexId.indexName, indexId.relationTypeName))); + builder.setJob(VertexJobConverter.convert(graph, new IndexRepairJob(indexId.indexName, indexId.relationTypeName), vertexOnly)); try { future = builder.execute(); } catch (BackendException e) { diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/AbstractScanJob.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/AbstractScanJob.java index 34f3d0d3c7..62a060593b 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/AbstractScanJob.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/AbstractScanJob.java @@ -28,7 +28,7 @@ public abstract class AbstractScanJob implements ScanJob { protected final GraphProvider graph; protected StandardJanusGraphTx tx; - private IDManager idManager; + protected IDManager idManager; public AbstractScanJob(JanusGraph graph) { this.graph = new GraphProvider(); diff --git a/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/VertexJobConverter.java b/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/VertexJobConverter.java index 4f86ac3db6..72ac0f9151 100644 --- a/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/VertexJobConverter.java +++ b/janusgraph-core/src/main/java/org/janusgraph/graphdb/olap/VertexJobConverter.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.function.Predicate; +import java.util.stream.Collectors; /** * @author Matthias Broecheler (me@matthiasb.com) @@ -50,23 +51,31 @@ public class VertexJobConverter extends AbstractScanJob { protected final VertexScanJob job; + protected final List vertexIdsToScan; + protected VertexJobConverter(JanusGraph graph, VertexScanJob job) { + this(graph, job, null); + } + + protected VertexJobConverter(JanusGraph graph, VertexScanJob job, List vertexIdsToScan) { super(graph); Preconditions.checkArgument(job!=null); this.job = job; + this.vertexIdsToScan = vertexIdsToScan; } protected VertexJobConverter(VertexJobConverter copy) { super(copy); this.job = copy.job.clone(); + this.vertexIdsToScan = copy.vertexIdsToScan; } - public static ScanJob convert(JanusGraph graph, VertexScanJob vertexJob) { - return new VertexJobConverter(graph,vertexJob); + public static ScanJob convert(JanusGraph graph, VertexScanJob vertexJob, List vertexIdsToScan) { + return new VertexJobConverter(graph, vertexJob, vertexIdsToScan); } public static ScanJob convert(VertexScanJob vertexJob) { - return new VertexJobConverter(null,vertexJob); + return new VertexJobConverter(null, vertexJob, null); } @Override @@ -130,6 +139,18 @@ public List getQueries() { } } + @Override + public List getKeysToScan() { + if (this.vertexIdsToScan == null) { + return null; + } else { + return this.vertexIdsToScan + .stream() + .map(k -> idManager.getKey(k)) + .collect(Collectors.toList()); + } + } + @Override public Predicate getKeyFilter() { return buffer -> !IDManager.VertexIDType.Invisible.is(getVertexId(buffer)); diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLKeyColumnValueStore.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLKeyColumnValueStore.java index b01136a176..f983205126 100644 --- a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLKeyColumnValueStore.java +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLKeyColumnValueStore.java @@ -121,12 +121,12 @@ public class CQLKeyColumnValueStore implements KeyColumnValueStore { public static final Function EXCEPTION_MAPPER = cause -> { cause = CompletableFutureUtil.unwrapExecutionException(cause); - if(cause instanceof InterruptedException || cause.getCause() instanceof InterruptedException){ + if (cause instanceof InterruptedException || cause.getCause() instanceof InterruptedException) { Thread.currentThread().interrupt(); return new PermanentBackendException(cause instanceof InterruptedException ? cause : cause.getCause()); } - if(cause instanceof BackendException){ - return (BackendException) cause; + if (cause instanceof BackendException || cause.getCause() instanceof BackendException) { + return (BackendException) (cause instanceof BackendException ? cause : cause.getCause()); } return Match(cause).of( Case($(instanceOf(QueryValidationException.class)), PermanentBackendException::new), @@ -479,6 +479,17 @@ public void acquireLock(final StaticBuffer key, final StaticBuffer column, final } } + @Override + public KeyIterator getKeys(final List keys, final SliceQuery query, final StoreTransaction txh) throws BackendException { + return Try.of(() -> new CQLMapKeyIterator(new CQLSubsetIterator<>(keys, this.storeManager.getKeysSize(), (keysList) -> { + try { + return getSlice(keysList, query, txh).entrySet().iterator(); + } catch (BackendException e) { + throw new RuntimeException(e); + } + }))).getOrElseThrow(EXCEPTION_MAPPER); + } + @Override public KeyIterator getKeys(final KeyRangeQuery query, final StoreTransaction txh) throws BackendException { if (!this.storeManager.getFeatures().hasOrderedScan()) { diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLMapKeyIterator.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLMapKeyIterator.java new file mode 100644 index 0000000000..129c0f982c --- /dev/null +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLMapKeyIterator.java @@ -0,0 +1,81 @@ +// Copyright 2024 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.cql; + +import org.janusgraph.diskstorage.Entry; +import org.janusgraph.diskstorage.EntryList; +import org.janusgraph.diskstorage.StaticBuffer; +import org.janusgraph.diskstorage.keycolumnvalue.KeyIterator; +import org.janusgraph.diskstorage.util.RecordIterator; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; + +public class CQLMapKeyIterator implements KeyIterator { + + private Map.Entry currentEntry = null; + + private final Iterator> entriesIterator; + + public CQLMapKeyIterator(Iterator> entriesIterator) { + this.entriesIterator = entriesIterator; + } + + @Override + public RecordIterator getEntries() { + return new EntryRecordIterator(this.currentEntry.getValue()); + } + + @Override + public void close() throws IOException { + //NOP + } + + @Override + public boolean hasNext() { + return this.entriesIterator.hasNext(); + } + + @Override + public StaticBuffer next() { + this.currentEntry = this.entriesIterator.next(); + return currentEntry.getKey(); + } + + static class EntryRecordIterator implements RecordIterator { + + private final Iterator entryIterator; + + public EntryRecordIterator(EntryList entryList) { + this.entryIterator = entryList.iterator(); + } + + @Override + public void close() throws IOException { + //NOP + } + + @Override + public boolean hasNext() { + return entryIterator.hasNext(); + } + + @Override + public Entry next() { + return entryIterator.next(); + } + } +} diff --git a/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLSubsetIterator.java b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLSubsetIterator.java new file mode 100644 index 0000000000..8b601f7633 --- /dev/null +++ b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLSubsetIterator.java @@ -0,0 +1,69 @@ +// Copyright 2024 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.cql; + +import com.google.common.collect.Iterables; +import org.janusgraph.diskstorage.StaticBuffer; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.function.Function; + +public class CQLSubsetIterator implements Iterator { + + private Iterator itemIterator; + private final int keysSliceSize; + private final Iterable keysIter; + private final Function, Iterator> iteratorSupplier; + + public CQLSubsetIterator(List keys, int keysSliceSize, Function, Iterator> iteratorSupplier) { + this.iteratorSupplier = iteratorSupplier; + Iterator keysConverted = keys.iterator(); + this.keysIter = () -> keysConverted; + this.keysSliceSize = keysSliceSize; + this.fetchSubset(); + } + + @Override + public boolean hasNext() { + while (!this.itemIterator.hasNext()) { + boolean hasKeys = fetchSubset(); + if (!hasKeys) { + return false; + } + } + return true; + } + + @Override + public T next() { + return this.itemIterator.next(); + } + + private boolean fetchSubset() { + List subset = new ArrayList<>(this.keysSliceSize); + Iterables.limit(this.keysIter, this.keysSliceSize).forEach(subset::add); + + if (subset.isEmpty()) { + this.itemIterator = Collections.emptyIterator(); + return false; + } else { + this.itemIterator = iteratorSupplier.apply(subset); + return true; + } + } +} diff --git a/janusgraph-cql/src/test/java/org/janusgraph/diskstorage/cql/CQLSubsetIteratorTest.java b/janusgraph-cql/src/test/java/org/janusgraph/diskstorage/cql/CQLSubsetIteratorTest.java new file mode 100644 index 0000000000..f682ceaa5a --- /dev/null +++ b/janusgraph-cql/src/test/java/org/janusgraph/diskstorage/cql/CQLSubsetIteratorTest.java @@ -0,0 +1,163 @@ +// Copyright 2024 JanusGraph Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.janusgraph.diskstorage.cql; + +import com.google.common.collect.Lists; +import io.vavr.collection.Array; +import io.vavr.collection.Iterator; +import org.janusgraph.diskstorage.StaticBuffer; +import org.janusgraph.diskstorage.util.WriteByteBuffer; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.fail; + +public class CQLSubsetIteratorTest { + + @Test + public void testIterator() { + + int keysSliceSize = 2; + List inputKeys = Array.of("key1", "key2", "key3", "key4").asJava(); + List checkKeys = new ArrayList<>(4); + + final List keys = encodeKeys(inputKeys); + + CQLSubsetIterator subsetIterator = new CQLSubsetIterator<>(keys, + keysSliceSize, + (subList) -> { + assertEquals(subList.size(), 2); + subList.stream().map(this::decodeKey).forEach(checkKeys::add); + return Array.of(10, 20).iterator(); + }); + + assertTrue(subsetIterator.hasNext()); + assertEquals(Array.of(10, 20, 10, 20).asJava(), Lists.newArrayList(subsetIterator)); + assertFalse(subsetIterator.hasNext()); + assertEquals(inputKeys, checkKeys); + } + + @Test + public void testIteratorNotEven() { + + int keysSliceSize = 2; + List inputKeys = Array.of("key1", "key2", "key3").asJava(); + List checkKeys = new ArrayList<>(3); + + final List keys = encodeKeys(inputKeys); + + CQLSubsetIterator subsetIterator = new CQLSubsetIterator<>(keys, + keysSliceSize, + (subList) -> { + subList.stream().map(this::decodeKey).forEach(checkKeys::add); + return Array.of(10, 20).iterator(); + }); + + assertTrue(subsetIterator.hasNext()); + assertEquals(Array.of(10, 20, 10, 20).asJava(), Lists.newArrayList(subsetIterator)); + assertFalse(subsetIterator.hasNext()); + assertEquals(inputKeys, checkKeys); + } + + @Test + public void testPartialEmptyResults() { + + int keysSliceSize = 1; + List inputKeys = Array.of("key1", "key2", "key3", "key4").asJava(); + List checkKeys = new ArrayList<>(3); + + final List keys = encodeKeys(inputKeys); + + CQLSubsetIterator subsetIterator = new CQLSubsetIterator<>(keys, + keysSliceSize, + (subList) -> { + assertEquals(subList.size(), 1); + String key = this.decodeKey(subList.get(0)); + checkKeys.add(key); + if (!key.equals("key3")) { + return Iterator.empty(); + } else { + return Array.of(10, 20).iterator(); + } + }); + + assertTrue(subsetIterator.hasNext()); + assertEquals(Array.of(10, 20).asJava(), Lists.newArrayList(subsetIterator)); + assertFalse(subsetIterator.hasNext()); + assertEquals(inputKeys, checkKeys); + } + + @Test + public void testEmptyResults() { + + int keysSliceSize = 1; + List inputKeys = Array.of("key1", "key2", "key3").asJava(); + List checkKeys = new ArrayList<>(3); + + final List keys = encodeKeys(inputKeys); + + CQLSubsetIterator subsetIterator = new CQLSubsetIterator<>(keys, + keysSliceSize, + (subList) -> { + assertEquals(subList.size(), 1); + String key = this.decodeKey(subList.get(0)); + checkKeys.add(key); + return Iterator.empty(); + + }); + + assertFalse(subsetIterator.hasNext()); + assertEquals(inputKeys, checkKeys); + } + + @Test + public void testEmptyKeys() { + + int keysSliceSize = 2; + final List keys = Collections.emptyList(); + + CQLSubsetIterator subsetIterator = new CQLSubsetIterator<>(keys, + keysSliceSize, + (subList) -> { + fail("Iterator should have never been called"); + return Array.of(1, 2, 3, 4, 5, 6).iterator(); + }); + + assertFalse(subsetIterator.hasNext()); + } + + private List encodeKeys(List keys) { + return keys.stream() + .map(keyStr -> { + byte[] bytes = keyStr.getBytes(); + WriteByteBuffer bb = new WriteByteBuffer(bytes.length); + bb.putBytes(bytes); + return bb.getStaticBuffer(); + }) + .collect(Collectors.toList()); + } + + private String decodeKey(StaticBuffer key) { + return StandardCharsets.UTF_8.decode(key.asByteBuffer()).toString(); + } +} diff --git a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/BerkeleyElasticsearchTest.java b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/BerkeleyElasticsearchTest.java index 45dca41fdb..60b8d57b94 100644 --- a/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/BerkeleyElasticsearchTest.java +++ b/janusgraph-es/src/test/java/org/janusgraph/diskstorage/es/BerkeleyElasticsearchTest.java @@ -21,6 +21,7 @@ import org.janusgraph.example.GraphOfTheGodsFactory; import org.janusgraph.graphdb.JanusGraphIndexTest; import org.janusgraph.util.system.IOUtils; +import org.junit.Assert; import org.junit.jupiter.api.Test; import org.testcontainers.junit.jupiter.Testcontainers; @@ -75,4 +76,9 @@ public void testDisableAndDiscardManuallyAndDropEnabledIndex() throws Exception public void testDiscardAndDropRegisteredIndex() throws ExecutionException, InterruptedException { super.testDiscardAndDropRegisteredIndex(); } + + @Test + public void testSubsetReindex() { + Assert.assertThrows("Code is not implemented", ExecutionException.class, super::testSubsetReindex); + } } diff --git a/janusgraph-hadoop/src/main/java/org/janusgraph/hadoop/scan/HadoopVertexScanMapper.java b/janusgraph-hadoop/src/main/java/org/janusgraph/hadoop/scan/HadoopVertexScanMapper.java index 6282f2dea6..76970bb57a 100644 --- a/janusgraph-hadoop/src/main/java/org/janusgraph/hadoop/scan/HadoopVertexScanMapper.java +++ b/janusgraph-hadoop/src/main/java/org/janusgraph/hadoop/scan/HadoopVertexScanMapper.java @@ -36,7 +36,7 @@ protected void setup(Context context) throws IOException, InterruptedException { VertexScanJob vertexScan = getVertexScanJob(scanConf); ModifiableConfiguration graphConf = getJanusGraphConfiguration(context); graph = JanusGraphFactory.open(graphConf); - job = VertexJobConverter.convert(graph, vertexScan); + job = VertexJobConverter.convert(graph, vertexScan, null); metrics = new HadoopContextScanMetrics(context); finishSetup(scanConf, graphConf); } diff --git a/janusgraph-lucene/src/test/java/org/janusgraph/diskstorage/lucene/BerkeleyLuceneTest.java b/janusgraph-lucene/src/test/java/org/janusgraph/diskstorage/lucene/BerkeleyLuceneTest.java index 332e80ccaa..b697ebb11a 100644 --- a/janusgraph-lucene/src/test/java/org/janusgraph/diskstorage/lucene/BerkeleyLuceneTest.java +++ b/janusgraph-lucene/src/test/java/org/janusgraph/diskstorage/lucene/BerkeleyLuceneTest.java @@ -20,8 +20,11 @@ import org.janusgraph.diskstorage.configuration.WriteConfiguration; import org.janusgraph.example.GraphOfTheGodsFactory; import org.janusgraph.graphdb.JanusGraphIndexTest; +import org.junit.Assert; import org.junit.jupiter.api.Test; +import java.util.concurrent.ExecutionException; + import static org.janusgraph.BerkeleyStorageSetup.getBerkeleyJEConfiguration; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_BACKEND; import static org.janusgraph.graphdb.configuration.GraphDatabaseConfiguration.INDEX_DIRECTORY; @@ -156,4 +159,8 @@ public void testCreateMixedIndexThatPreviouslyExisted() { assertThrows(UnsupportedOperationException.class, super::testCreateMixedIndexThatPreviouslyExisted); } + @Test + public void testSubsetReindex() { + Assert.assertThrows("Code is not implemented", ExecutionException.class, super::testSubsetReindex); + } } diff --git a/janusgraph-solr/src/test/java/org/janusgraph/diskstorage/solr/BerkeleySolrTest.java b/janusgraph-solr/src/test/java/org/janusgraph/diskstorage/solr/BerkeleySolrTest.java index af38e73d4b..51186ff4e4 100644 --- a/janusgraph-solr/src/test/java/org/janusgraph/diskstorage/solr/BerkeleySolrTest.java +++ b/janusgraph-solr/src/test/java/org/janusgraph/diskstorage/solr/BerkeleySolrTest.java @@ -15,9 +15,12 @@ package org.janusgraph.diskstorage.solr; import org.janusgraph.diskstorage.configuration.ModifiableConfiguration; +import org.junit.Assert; import org.junit.jupiter.api.Test; import org.testcontainers.junit.jupiter.Testcontainers; +import java.util.concurrent.ExecutionException; + import static org.janusgraph.BerkeleyStorageSetup.getBerkeleyJEConfiguration; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -45,4 +48,9 @@ public void testDiscardAndDropRegisteredIndex() { public void testCreateMixedIndexThatPreviouslyExisted() { assertThrows(UnsupportedOperationException.class, super::testCreateMixedIndexThatPreviouslyExisted); } + + @Test + public void testSubsetReindex() { + Assert.assertThrows("Code is not implemented", ExecutionException.class, super::testSubsetReindex); + } }