diff --git a/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java b/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java index 328c0660b9d..59acc0cb90b 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java +++ b/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java @@ -341,6 +341,11 @@ default void fetchColumn(CharSequence colFam, CharSequence colQual) { * scan, only how quickly it is returned. * *

+ * Using the hint {@code scan_type=} and documenting all of the types for your application + * is one strategy to consider. This allows administrators to adjust executor and prioritizer + * config for your application scan types without having to change the application source code. + * + *

* The default configuration for Accumulo will ignore hints. See {@link HintScanPrioritizer} and * {@link SimpleScanDispatcher} for examples of classes that can react to hints. * diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizer.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizer.java index 22063efdf4c..f7138612acf 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizer.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizer.java @@ -24,14 +24,18 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMap.Builder; + /** * When configured for a scan executor, this prioritizer allows scanners to set priorities as - * integers. + * integers. Lower integers result in higher priority. * *

- * Scanners should put the key/value {@code priority=} in the map passed to - * {@link ScannerBase#setExecutionHints(Map)} to set the priority. Lower integers result in higher - * priority. + * Scanners can put the key/values {@code priority=} and/or {@code scan_type=} in the + * map passed to {@link ScannerBase#setExecutionHints(Map)} to set the priority. When a + * {@code priority} hint is set it takes precedence and the value is used as the priority. When a + * {@code scan_type} hint is set the priority is looked up using the value. * *

* This prioritizer accepts the option {@code default_priority=} which determines what @@ -45,6 +49,10 @@ * option silently ignores invalid hints. * *

+ * This prioritizer accepts the option {@code priority.=} which maps a scan type hint + * to a priority. + * + *

* When two scans have the same priority, the scan is prioritized based on last run time and then * creation time. * @@ -61,11 +69,14 @@ public class HintScanPrioritizer implements ScanPrioritizer { private static final Logger log = LoggerFactory.getLogger(HintScanPrioritizer.class); + private final String PRIO_PREFIX = "priority."; + private enum HintProblemAction { NONE, LOG, FAIL } - private static int getPriority(ScanInfo si, int defaultPriority, HintProblemAction hpa) { + private static int getPriority(ScanInfo si, int defaultPriority, HintProblemAction hpa, + Map typePriorities) { String prio = si.getExecutionHints().get("priority"); if (prio != null) { try { @@ -86,6 +97,16 @@ private static int getPriority(ScanInfo si, int defaultPriority, HintProblemActi } } + if (!typePriorities.isEmpty()) { + String scanType = si.getExecutionHints().get("scan_type"); + if (scanType != null) { + Integer typePrio = typePriorities.get(scanType); + if (typePrio != null) { + return typePrio; + } + } + } + return defaultPriority; } @@ -94,10 +115,22 @@ public Comparator createComparator(CreateParameters params) { int defaultPriority = Integer .parseInt(params.getOptions().getOrDefault("default_priority", Integer.MAX_VALUE + "")); + Builder tpb = ImmutableMap.builder(); + + params.getOptions().forEach((k, v) -> { + if (k.startsWith(PRIO_PREFIX)) { + String type = k.substring(PRIO_PREFIX.length()); + tpb.put(type, Integer.parseInt(v)); + } + }); + + ImmutableMap typePriorities = tpb.build(); + HintProblemAction hpa = HintProblemAction.valueOf(params.getOptions() .getOrDefault("bad_hint_action", HintProblemAction.LOG.name()).toUpperCase()); - Comparator cmp = Comparator.comparingInt(si -> getPriority(si, defaultPriority, hpa)); + Comparator cmp = Comparator + .comparingInt(si -> getPriority(si, defaultPriority, hpa, typePriorities)); return cmp.thenComparingLong(si -> si.getLastRunTime().orElse(0)) .thenComparingLong(ScanInfo::getCreationTime); diff --git a/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java b/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java index 9653f4f91ec..4cba357cad0 100644 --- a/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java +++ b/core/src/main/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcher.java @@ -20,12 +20,10 @@ import java.util.Set; import org.apache.accumulo.core.client.ScannerBase; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMap.Builder; import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Sets; /** * If no options are given, then this will dispatch to an executor named {@code default}. This @@ -38,21 +36,9 @@ * scans to the named executor. *

  • {@code table.scan.dispatcher.opts.single_executor=} : dispatches regular * scans to the named executor.
  • - *
  • {@code table.scan.dispatcher.opts.heed_hints=true|false} : This option defaults to false, so - * by default execution hints are ignored. When set to true, the executor can be set on the scanner. - * This is done by putting the key/value {@code executor=} in the map passed to - * {@link ScannerBase#setExecutionHints(Map)} - *
  • {@code table.scan.dispatcher.opts.bad_hint_action=none|log|fail} : When - * {@code heed_hints=true}, this option determines what to do if the executor in a hint does not - * exist. The possible values for this option are {@code none}, {@code log}, or {@code error}. - * Setting {@code none} will silently ignore invalid hints. Setting {@code log} will log a warning - * for invalid hints. Setting {@code fail} will throw an exception likely causing the scan to fail. - * For {@code log} and {@code none}, when there is an invalid hint it will fall back to the table - * configuration. The default is {@code log}. - *
  • {@code table.scan.dispatcher.opts.ignored_hint_action=none|log|fail} : When - * {@code heed_hints=false}, this option determines what to do if a hint specifies an executor. The - * possible values for this option are {@code none}, {@code log}, or {@code fail}. The default is - * {@code log}. + *
  • {@code table.scan.dispatcher.opts.executor.=} : dispatches scans + * that set the hint {@code scan_type=} to the named executor. If this setting matches then it + * takes precedence over all other settings. See {@link ScannerBase#setExecutionHints(Map)}
  • * * * @@ -62,74 +48,52 @@ public class SimpleScanDispatcher implements ScanDispatcher { + private final String EXECUTOR_PREFIX = "executor."; + private final Set VALID_OPTS = ImmutableSet.of("executor", "multi_executor", - "single_executor", "heed_hints", "bad_hint_action", "ignored_hint_action"); + "single_executor"); private String multiExecutor; private String singleExecutor; - private boolean heedHints; - private HintProblemAction badHintAction; - private HintProblemAction ignoredHintHaction; - - public static final String DEFAULT_SCAN_EXECUTOR_NAME = "default"; - private static final Logger log = LoggerFactory.getLogger(SimpleScanDispatcher.class); + private Map typeExecutors; - private enum HintProblemAction { - NONE, LOG, FAIL - } + public static final String DEFAULT_SCAN_EXECUTOR_NAME = "default"; @Override public void init(InitParameters params) { Map options = params.getOptions(); - Set invalidOpts = Sets.difference(options.keySet(), VALID_OPTS); - Preconditions.checkArgument(invalidOpts.size() == 0, "Invalid options : %s", invalidOpts); + + Builder teb = ImmutableMap.builder(); + + options.forEach((k, v) -> { + if (k.startsWith(EXECUTOR_PREFIX)) { + String type = k.substring(EXECUTOR_PREFIX.length()); + teb.put(type, v); + } else if (!VALID_OPTS.contains(k)) { + throw new IllegalArgumentException("Invalid option " + k); + } + }); + + typeExecutors = teb.build(); String base = options.getOrDefault("executor", DEFAULT_SCAN_EXECUTOR_NAME); multiExecutor = options.getOrDefault("multi_executor", base); singleExecutor = options.getOrDefault("single_executor", base); - heedHints = Boolean.parseBoolean(options.getOrDefault("heed_hints", "false")); - badHintAction = HintProblemAction.valueOf( - options.getOrDefault("bad_hint_action", HintProblemAction.LOG.name()).toUpperCase()); - ignoredHintHaction = HintProblemAction.valueOf( - options.getOrDefault("ignored_hint_action", HintProblemAction.LOG.name()).toUpperCase()); + } @Override public String dispatch(DispatchParmaters params) { ScanInfo scanInfo = params.getScanInfo(); - if (heedHints) { - String executor = scanInfo.getExecutionHints().get("executor"); - if (executor != null) { - if (params.getScanExecutors().containsKey(executor)) { + + if (!typeExecutors.isEmpty()) { + String scanType = scanInfo.getExecutionHints().get("scan_type"); + if (scanType != null) { + String executor = typeExecutors.get(scanType); + if (executor != null) { return executor; - } else { - switch (badHintAction) { - case FAIL: - throw new IllegalArgumentException( - "Scan execution hint contained unknown executor " + executor); - case LOG: - log.warn("Scan execution hint contained unknown executor {} ", executor); - break; - case NONE: - break; - default: - throw new IllegalStateException(); - } } } - } else if (ignoredHintHaction != HintProblemAction.NONE - && scanInfo.getExecutionHints().containsKey("executor")) { - String executor = scanInfo.getExecutionHints().get("executor"); - switch (ignoredHintHaction) { - case FAIL: - throw new IllegalArgumentException( - "Scan execution hint contained executor " + executor + " when heed_hints=false"); - case LOG: - log.warn("Scan execution hint contained executor {} when heed_hints=false", executor); - break; - default: - throw new IllegalStateException(); - } } switch (scanInfo.getScanType()) { diff --git a/core/src/test/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizerTest.java b/core/src/test/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizerTest.java new file mode 100644 index 00000000000..0faa2f05f93 --- /dev/null +++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/HintScanPrioritizerTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.spi.scan; + +import static org.junit.Assert.assertEquals; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; + +import org.apache.accumulo.core.spi.common.ServiceEnvironment; +import org.apache.accumulo.core.spi.scan.ScanInfo.Type; +import org.junit.Test; + +import com.google.common.collect.ImmutableMap; + +public class HintScanPrioritizerTest { + @Test + public void testSort() { + long now = System.currentTimeMillis(); + + List scans = new ArrayList<>(); + + // Two following have never run, so oldest should go first + scans.add(new TestScanInfo("a", Type.SINGLE, now - 7)); + scans.add( + new TestScanInfo("b", Type.SINGLE, now - 3).setExecutionHints("scan_type", "background")); + scans.add( + new TestScanInfo("c", Type.SINGLE, now - 4).setExecutionHints("scan_type", "background")); + scans.add(new TestScanInfo("d", Type.SINGLE, now - 3).setExecutionHints("scan_type", "isbn")); + scans.add(new TestScanInfo("e", Type.SINGLE, now - 5).setExecutionHints("scan_type", "isbn")); + scans.add(new TestScanInfo("f", Type.SINGLE, now - 1).setExecutionHints("priority", "35")); + scans.add(new TestScanInfo("g", Type.SINGLE, now - 2).setExecutionHints("priority", "25")); + scans.add(new TestScanInfo("h", Type.SINGLE, now - 3).setExecutionHints("priority", "15")); + scans.add(new TestScanInfo("i", Type.SINGLE, now - 4).setExecutionHints("priority", "5")); + + Collections.shuffle(scans); + + Comparator comparator = new HintScanPrioritizer() + .createComparator(new ScanPrioritizer.CreateParameters() { + + @Override + public Map getOptions() { + return ImmutableMap.of("priority.isbn", "10", "priority.background", "30", + "default_priority", "20"); + } + + @Override + public ServiceEnvironment getServiceEnv() { + throw new UnsupportedOperationException(); + } + }); + + Collections.sort(scans, comparator); + + assertEquals("i", scans.get(0).testId); + assertEquals("e", scans.get(1).testId); + assertEquals("d", scans.get(2).testId); + assertEquals("h", scans.get(3).testId); + assertEquals("a", scans.get(4).testId); + assertEquals("g", scans.get(5).testId); + assertEquals("c", scans.get(6).testId); + assertEquals("b", scans.get(7).testId); + assertEquals("f", scans.get(8).testId); + } +} diff --git a/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java b/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java index a35e52712df..432e6dd7f9e 100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/SimpleScanDispatcherTest.java @@ -125,29 +125,10 @@ public void testBasic() { @Test public void testHints() { - runTest(ImmutableMap.of("executor", "E1", "heed_hints", "true"), ImmutableMap.of(), "E1", "E1"); - runTest(ImmutableMap.of("executor", "E1", "heed_hints", "true"), - ImmutableMap.of("executor", "E2"), "E2", "E2"); - runTest(ImmutableMap.of("executor", "E1", "heed_hints", "true"), - ImmutableMap.of("executor", "E5"), "E1", "E1"); - runTest(ImmutableMap.of("executor", "E1", "heed_hints", "true", "ignored_hint_action", "fail"), - ImmutableMap.of("executor", "E5"), "E1", "E1"); - runTest(ImmutableMap.of("executor", "E1", "heed_hints", "true", "bad_hint_action", "fail", - "ignored_hint_action", "fail"), ImmutableMap.of("executor", "E2"), "E2", "E2"); - runTest(ImmutableMap.of("executor", "E1", "heed_hints", "false"), - ImmutableMap.of("executor", "E2"), "E1", "E1"); - runTest(ImmutableMap.of("executor", "E1"), ImmutableMap.of("executor", "E2"), "E1", "E1"); - } - - @Test(expected = IllegalArgumentException.class) - public void testBadHint() { - runTest(ImmutableMap.of("executor", "E1", "heed_hints", "true", "bad_hint_action", "fail"), - ImmutableMap.of("executor", "E5"), "E2", "E2"); - } - - @Test(expected = IllegalArgumentException.class) - public void testIgnoredHint() { - runTest(ImmutableMap.of("executor", "E1", "heed_hints", "false", "ignored_hint_action", "fail"), - ImmutableMap.of("executor", "E2"), "E1", "E1"); + runTest(ImmutableMap.of("executor", "E1"), ImmutableMap.of("scan_type", "quick"), "E1", "E1"); + runTest(ImmutableMap.of("executor", "E1", "executor.quick", "E2"), + ImmutableMap.of("scan_type", "quick"), "E2", "E2"); + runTest(ImmutableMap.of("executor", "E1", "executor.quick", "E2", "executor.slow", "E3"), + ImmutableMap.of("scan_type", "slow"), "E3", "E3"); } } diff --git a/core/src/test/java/org/apache/accumulo/core/spi/scan/TestScanInfo.java b/core/src/test/java/org/apache/accumulo/core/spi/scan/TestScanInfo.java index a9c72f8cf21..4e140999305 100644 --- a/core/src/test/java/org/apache/accumulo/core/spi/scan/TestScanInfo.java +++ b/core/src/test/java/org/apache/accumulo/core/spi/scan/TestScanInfo.java @@ -29,6 +29,8 @@ import org.apache.accumulo.core.spi.common.Stats; import org.apache.accumulo.core.util.Stat; +import com.google.common.collect.ImmutableMap; + public class TestScanInfo implements ScanInfo { String testId; @@ -56,6 +58,11 @@ public class TestScanInfo implements ScanInfo { } } + TestScanInfo setExecutionHints(String k, String v) { + this.executionHints = ImmutableMap.of(k, v); + return this; + } + @Override public Type getScanType() { return scanType;