diff --git a/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java b/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
index 328c0660b9d..59acc0cb90b 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
@@ -341,6 +341,11 @@ default void fetchColumn(CharSequence colFam, CharSequence colQual) {
* scan, only how quickly it is returned.
*
*
+ * Using the hint {@code scan_type=} and documenting all of the types for your application
+ * is one strategy to consider. This allows administrators to adjust executor and prioritizer
+ * config for your application scan types without having to change the application source code.
+ *
+ *
* The default configuration for Accumulo will ignore hints. See {@link HintScanPrioritizer} and
* {@link SimpleScanDispatcher} for examples of classes that can react to hints.
*
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizer.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizer.java
index 22063efdf4c..f7138612acf 100644
--- a/core/src/main/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizer.java
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizer.java
@@ -24,14 +24,18 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
+
/**
* When configured for a scan executor, this prioritizer allows scanners to set priorities as
- * integers.
+ * integers. Lower integers result in higher priority.
*
*
- * Scanners should put the key/value {@code priority=} in the map passed to
- * {@link ScannerBase#setExecutionHints(Map)} to set the priority. Lower integers result in higher
- * priority.
+ * Scanners can put the key/values {@code priority=} and/or {@code scan_type=} in the
+ * map passed to {@link ScannerBase#setExecutionHints(Map)} to set the priority. When a
+ * {@code priority} hint is set it takes precedence and the value is used as the priority. When a
+ * {@code scan_type} hint is set the priority is looked up using the value.
*
*
* This prioritizer accepts the option {@code default_priority=} which determines what
@@ -45,6 +49,10 @@
* option silently ignores invalid hints.
*
*
+ * This prioritizer accepts the option {@code priority.=} which maps a scan type hint
+ * to a priority.
+ *
+ *
* When two scans have the same priority, the scan is prioritized based on last run time and then
* creation time.
*
@@ -61,11 +69,14 @@ public class HintScanPrioritizer implements ScanPrioritizer {
private static final Logger log = LoggerFactory.getLogger(HintScanPrioritizer.class);
+ private final String PRIO_PREFIX = "priority.";
+
private enum HintProblemAction {
NONE, LOG, FAIL
}
- private static int getPriority(ScanInfo si, int defaultPriority, HintProblemAction hpa) {
+ private static int getPriority(ScanInfo si, int defaultPriority, HintProblemAction hpa,
+ Map typePriorities) {
String prio = si.getExecutionHints().get("priority");
if (prio != null) {
try {
@@ -86,6 +97,16 @@ private static int getPriority(ScanInfo si, int defaultPriority, HintProblemActi
}
}
+ if (!typePriorities.isEmpty()) {
+ String scanType = si.getExecutionHints().get("scan_type");
+ if (scanType != null) {
+ Integer typePrio = typePriorities.get(scanType);
+ if (typePrio != null) {
+ return typePrio;
+ }
+ }
+ }
+
return defaultPriority;
}
@@ -94,10 +115,22 @@ public Comparator createComparator(CreateParameters params) {
int defaultPriority = Integer
.parseInt(params.getOptions().getOrDefault("default_priority", Integer.MAX_VALUE + ""));
+ Builder tpb = ImmutableMap.builder();
+
+ params.getOptions().forEach((k, v) -> {
+ if (k.startsWith(PRIO_PREFIX)) {
+ String type = k.substring(PRIO_PREFIX.length());
+ tpb.put(type, Integer.parseInt(v));
+ }
+ });
+
+ ImmutableMap typePriorities = tpb.build();
+
HintProblemAction hpa = HintProblemAction.valueOf(params.getOptions()
.getOrDefault("bad_hint_action", HintProblemAction.LOG.name()).toUpperCase());
- Comparator cmp = Comparator.comparingInt(si -> getPriority(si, defaultPriority, hpa));
+ Comparator cmp = Comparator
+ .comparingInt(si -> getPriority(si, defaultPriority, hpa, typePriorities));
return cmp.thenComparingLong(si -> si.getLastRunTime().orElse(0))
.thenComparingLong(ScanInfo::getCreationTime);
diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java
index 9653f4f91ec..4cba357cad0 100644
--- a/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java
+++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java
@@ -20,12 +20,10 @@
import java.util.Set;
import org.apache.accumulo.core.client.ScannerBase;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableMap.Builder;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
/**
* If no options are given, then this will dispatch to an executor named {@code default}. This
@@ -38,21 +36,9 @@
* scans to the named executor.
* {@code table.scan.dispatcher.opts.single_executor=} : dispatches regular
* scans to the named executor.
- * {@code table.scan.dispatcher.opts.heed_hints=true|false} : This option defaults to false, so
- * by default execution hints are ignored. When set to true, the executor can be set on the scanner.
- * This is done by putting the key/value {@code executor=} in the map passed to
- * {@link ScannerBase#setExecutionHints(Map)}
- * {@code table.scan.dispatcher.opts.bad_hint_action=none|log|fail} : When
- * {@code heed_hints=true}, this option determines what to do if the executor in a hint does not
- * exist. The possible values for this option are {@code none}, {@code log}, or {@code error}.
- * Setting {@code none} will silently ignore invalid hints. Setting {@code log} will log a warning
- * for invalid hints. Setting {@code fail} will throw an exception likely causing the scan to fail.
- * For {@code log} and {@code none}, when there is an invalid hint it will fall back to the table
- * configuration. The default is {@code log}.
- * {@code table.scan.dispatcher.opts.ignored_hint_action=none|log|fail} : When
- * {@code heed_hints=false}, this option determines what to do if a hint specifies an executor. The
- * possible values for this option are {@code none}, {@code log}, or {@code fail}. The default is
- * {@code log}.
+ * {@code table.scan.dispatcher.opts.executor.=} : dispatches scans
+ * that set the hint {@code scan_type=} to the named executor. If this setting matches then it
+ * takes precedence over all other settings. See {@link ScannerBase#setExecutionHints(Map)}
*
*
*
@@ -62,74 +48,52 @@
public class SimpleScanDispatcher implements ScanDispatcher {
+ private final String EXECUTOR_PREFIX = "executor.";
+
private final Set VALID_OPTS = ImmutableSet.of("executor", "multi_executor",
- "single_executor", "heed_hints", "bad_hint_action", "ignored_hint_action");
+ "single_executor");
private String multiExecutor;
private String singleExecutor;
- private boolean heedHints;
- private HintProblemAction badHintAction;
- private HintProblemAction ignoredHintHaction;
-
- public static final String DEFAULT_SCAN_EXECUTOR_NAME = "default";
- private static final Logger log = LoggerFactory.getLogger(SimpleScanDispatcher.class);
+ private Map typeExecutors;
- private enum HintProblemAction {
- NONE, LOG, FAIL
- }
+ public static final String DEFAULT_SCAN_EXECUTOR_NAME = "default";
@Override
public void init(InitParameters params) {
Map options = params.getOptions();
- Set invalidOpts = Sets.difference(options.keySet(), VALID_OPTS);
- Preconditions.checkArgument(invalidOpts.size() == 0, "Invalid options : %s", invalidOpts);
+
+ Builder teb = ImmutableMap.builder();
+
+ options.forEach((k, v) -> {
+ if (k.startsWith(EXECUTOR_PREFIX)) {
+ String type = k.substring(EXECUTOR_PREFIX.length());
+ teb.put(type, v);
+ } else if (!VALID_OPTS.contains(k)) {
+ throw new IllegalArgumentException("Invalid option " + k);
+ }
+ });
+
+ typeExecutors = teb.build();
String base = options.getOrDefault("executor", DEFAULT_SCAN_EXECUTOR_NAME);
multiExecutor = options.getOrDefault("multi_executor", base);
singleExecutor = options.getOrDefault("single_executor", base);
- heedHints = Boolean.parseBoolean(options.getOrDefault("heed_hints", "false"));
- badHintAction = HintProblemAction.valueOf(
- options.getOrDefault("bad_hint_action", HintProblemAction.LOG.name()).toUpperCase());
- ignoredHintHaction = HintProblemAction.valueOf(
- options.getOrDefault("ignored_hint_action", HintProblemAction.LOG.name()).toUpperCase());
+
}
@Override
public String dispatch(DispatchParmaters params) {
ScanInfo scanInfo = params.getScanInfo();
- if (heedHints) {
- String executor = scanInfo.getExecutionHints().get("executor");
- if (executor != null) {
- if (params.getScanExecutors().containsKey(executor)) {
+
+ if (!typeExecutors.isEmpty()) {
+ String scanType = scanInfo.getExecutionHints().get("scan_type");
+ if (scanType != null) {
+ String executor = typeExecutors.get(scanType);
+ if (executor != null) {
return executor;
- } else {
- switch (badHintAction) {
- case FAIL:
- throw new IllegalArgumentException(
- "Scan execution hint contained unknown executor " + executor);
- case LOG:
- log.warn("Scan execution hint contained unknown executor {} ", executor);
- break;
- case NONE:
- break;
- default:
- throw new IllegalStateException();
- }
}
}
- } else if (ignoredHintHaction != HintProblemAction.NONE
- && scanInfo.getExecutionHints().containsKey("executor")) {
- String executor = scanInfo.getExecutionHints().get("executor");
- switch (ignoredHintHaction) {
- case FAIL:
- throw new IllegalArgumentException(
- "Scan execution hint contained executor " + executor + " when heed_hints=false");
- case LOG:
- log.warn("Scan execution hint contained executor {} when heed_hints=false", executor);
- break;
- default:
- throw new IllegalStateException();
- }
}
switch (scanInfo.getScanType()) {
diff --git a/core/src/test/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizerTest.java
new file mode 100644
index 00000000000..0faa2f05f93
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizerTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.core.spi.scan;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.accumulo.core.spi.common.ServiceEnvironment;
+import org.apache.accumulo.core.spi.scan.ScanInfo.Type;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+
+public class HintScanPrioritizerTest {
+ @Test
+ public void testSort() {
+ long now = System.currentTimeMillis();
+
+ List scans = new ArrayList<>();
+
+ // Two following have never run, so oldest should go first
+ scans.add(new TestScanInfo("a", Type.SINGLE, now - 7));
+ scans.add(
+ new TestScanInfo("b", Type.SINGLE, now - 3).setExecutionHints("scan_type", "background"));
+ scans.add(
+ new TestScanInfo("c", Type.SINGLE, now - 4).setExecutionHints("scan_type", "background"));
+ scans.add(new TestScanInfo("d", Type.SINGLE, now - 3).setExecutionHints("scan_type", "isbn"));
+ scans.add(new TestScanInfo("e", Type.SINGLE, now - 5).setExecutionHints("scan_type", "isbn"));
+ scans.add(new TestScanInfo("f", Type.SINGLE, now - 1).setExecutionHints("priority", "35"));
+ scans.add(new TestScanInfo("g", Type.SINGLE, now - 2).setExecutionHints("priority", "25"));
+ scans.add(new TestScanInfo("h", Type.SINGLE, now - 3).setExecutionHints("priority", "15"));
+ scans.add(new TestScanInfo("i", Type.SINGLE, now - 4).setExecutionHints("priority", "5"));
+
+ Collections.shuffle(scans);
+
+ Comparator comparator = new HintScanPrioritizer()
+ .createComparator(new ScanPrioritizer.CreateParameters() {
+
+ @Override
+ public Map getOptions() {
+ return ImmutableMap.of("priority.isbn", "10", "priority.background", "30",
+ "default_priority", "20");
+ }
+
+ @Override
+ public ServiceEnvironment getServiceEnv() {
+ throw new UnsupportedOperationException();
+ }
+ });
+
+ Collections.sort(scans, comparator);
+
+ assertEquals("i", scans.get(0).testId);
+ assertEquals("e", scans.get(1).testId);
+ assertEquals("d", scans.get(2).testId);
+ assertEquals("h", scans.get(3).testId);
+ assertEquals("a", scans.get(4).testId);
+ assertEquals("g", scans.get(5).testId);
+ assertEquals("c", scans.get(6).testId);
+ assertEquals("b", scans.get(7).testId);
+ assertEquals("f", scans.get(8).testId);
+ }
+}
diff --git a/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java b/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java
index a35e52712df..432e6dd7f9e 100644
--- a/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java
@@ -125,29 +125,10 @@ public void testBasic() {
@Test
public void testHints() {
- runTest(ImmutableMap.of("executor", "E1", "heed_hints", "true"), ImmutableMap.of(), "E1", "E1");
- runTest(ImmutableMap.of("executor", "E1", "heed_hints", "true"),
- ImmutableMap.of("executor", "E2"), "E2", "E2");
- runTest(ImmutableMap.of("executor", "E1", "heed_hints", "true"),
- ImmutableMap.of("executor", "E5"), "E1", "E1");
- runTest(ImmutableMap.of("executor", "E1", "heed_hints", "true", "ignored_hint_action", "fail"),
- ImmutableMap.of("executor", "E5"), "E1", "E1");
- runTest(ImmutableMap.of("executor", "E1", "heed_hints", "true", "bad_hint_action", "fail",
- "ignored_hint_action", "fail"), ImmutableMap.of("executor", "E2"), "E2", "E2");
- runTest(ImmutableMap.of("executor", "E1", "heed_hints", "false"),
- ImmutableMap.of("executor", "E2"), "E1", "E1");
- runTest(ImmutableMap.of("executor", "E1"), ImmutableMap.of("executor", "E2"), "E1", "E1");
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testBadHint() {
- runTest(ImmutableMap.of("executor", "E1", "heed_hints", "true", "bad_hint_action", "fail"),
- ImmutableMap.of("executor", "E5"), "E2", "E2");
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void testIgnoredHint() {
- runTest(ImmutableMap.of("executor", "E1", "heed_hints", "false", "ignored_hint_action", "fail"),
- ImmutableMap.of("executor", "E2"), "E1", "E1");
+ runTest(ImmutableMap.of("executor", "E1"), ImmutableMap.of("scan_type", "quick"), "E1", "E1");
+ runTest(ImmutableMap.of("executor", "E1", "executor.quick", "E2"),
+ ImmutableMap.of("scan_type", "quick"), "E2", "E2");
+ runTest(ImmutableMap.of("executor", "E1", "executor.quick", "E2", "executor.slow", "E3"),
+ ImmutableMap.of("scan_type", "slow"), "E3", "E3");
}
}
diff --git a/core/src/test/java/org/apache/accumulo/core/spi/scan/TestScanInfo.java b/core/src/test/java/org/apache/accumulo/core/spi/scan/TestScanInfo.java
index a9c72f8cf21..4e140999305 100644
--- a/core/src/test/java/org/apache/accumulo/core/spi/scan/TestScanInfo.java
+++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/TestScanInfo.java
@@ -29,6 +29,8 @@
import org.apache.accumulo.core.spi.common.Stats;
import org.apache.accumulo.core.util.Stat;
+import com.google.common.collect.ImmutableMap;
+
public class TestScanInfo implements ScanInfo {
String testId;
@@ -56,6 +58,11 @@ public class TestScanInfo implements ScanInfo {
}
}
+ TestScanInfo setExecutionHints(String k, String v) {
+ this.executionHints = ImmutableMap.of(k, v);
+ return this;
+ }
+
@Override
public Type getScanType() {
return scanType;