Skip to content

Commit

Permalink
Merge pull request forcedotcom#129 from tonyhuang/master
Browse files Browse the repository at this point in the history
SkipRangeParallelIteratorRegionSplitter and tests.
  • Loading branch information
jtaylor-sfdc committed Apr 9, 2013
2 parents aed6bc7 + b32bb8b commit 68304f2
Show file tree
Hide file tree
Showing 10 changed files with 460 additions and 450 deletions.
59 changes: 48 additions & 11 deletions src/main/java/com/salesforce/phoenix/compile/ScanRanges.java
Original file line number Diff line number Diff line change
@@ -1,16 +1,46 @@
/*******************************************************************************
* Copyright (c) 2013, Salesforce.com, Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
* Neither the name of Salesforce.com nor the names of its contributors may
* be used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
******************************************************************************/
package com.salesforce.phoenix.compile;

import java.util.Collections;
import java.util.List;

import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;

import com.salesforce.phoenix.query.KeyRange;
import com.salesforce.phoenix.schema.RowKeySchema;
import com.salesforce.phoenix.schema.ValueBitSet;
import com.salesforce.phoenix.util.ByteUtil;
import com.salesforce.phoenix.util.ScanUtil;


public class ScanRanges {
private static final List<List<KeyRange>> EVERYTHING_RANGES = Collections.<List<KeyRange>>emptyList();
private static final List<List<KeyRange>> NOTHING_RANGES = Collections.<List<KeyRange>>singletonList(Collections.<KeyRange>singletonList(KeyRange.EMPTY_RANGE));
Expand All @@ -23,13 +53,12 @@ public static ScanRanges create(List<List<KeyRange>> ranges, RowKeySchema schema
} else if (ranges.size() == 1 && ranges.get(0).size() == 1 && ranges.get(0).get(0) == KeyRange.EMPTY_RANGE) {
return NOTHING;
}

return new ScanRanges(ranges, schema);
}

private final List<List<KeyRange>> ranges;
private final RowKeySchema schema;

private ScanRanges (List<List<KeyRange>> ranges, RowKeySchema schema) {
this.ranges = ranges;
this.schema = schema;
Expand All @@ -42,11 +71,11 @@ public List<List<KeyRange>> getRanges() {
public RowKeySchema getSchema() {
return schema;
}

public boolean isEverything() {
return this == EVERYTHING;
}

public boolean isDegenerate() {
return this == NOTHING;
}
Expand Down Expand Up @@ -89,10 +118,10 @@ public boolean isSingleRowScan() {
}

public void setScanStartStopRow(Scan scan) {
if (this == EVERYTHING) {
if (isEverything()) {
return;
}
if (this == NOTHING) {
if (isDegenerate()) {
scan.setStartRow(KeyRange.EMPTY_RANGE.getLowerRange());
scan.setStopRow(KeyRange.EMPTY_RANGE.getUpperRange());
return;
Expand All @@ -111,14 +140,22 @@ public void setScanStartStopRow(Scan scan) {

private static final ImmutableBytesWritable UNBOUND_LOWER = new ImmutableBytesWritable(KeyRange.UNBOUND_LOWER);
private static final ImmutableBytesWritable UNBOUND_UPPER = new ImmutableBytesWritable(KeyRange.UNBOUND_UPPER);
public boolean intersect(byte[] lowerInclusiveKey, byte[] upperExclusiveKey) {
if (this == EVERYTHING) {

public boolean intersect(KeyRange keyRange) {
if (isEverything()) {
return true;
}
if (this == NOTHING) {
if (isDegenerate()) {
return false;
}
byte[] lowerInclusiveKey = keyRange.getLowerRange();
if (!keyRange.isLowerInclusive() && !Bytes.equals(lowerInclusiveKey, KeyRange.UNBOUND_LOWER)) {
lowerInclusiveKey = ByteUtil.nextKey(lowerInclusiveKey);
}
byte[] upperExclusiveKey = keyRange.getUpperRange();
if (keyRange.isUpperInclusive()) {
upperExclusiveKey = ByteUtil.nextKey(upperExclusiveKey);
}
int i = 0;
int[] position = new int[ranges.size()];

Expand Down
113 changes: 0 additions & 113 deletions src/main/java/com/salesforce/phoenix/filter/SkipScanFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.hadoop.io.WritableUtils;

import com.google.common.base.Objects;
import com.google.common.collect.Lists;
import com.google.common.hash.*;
import com.salesforce.phoenix.query.KeyRange;
import com.salesforce.phoenix.query.KeyRange.Bound;
Expand Down Expand Up @@ -70,7 +69,6 @@ public class SkipScanFilter extends FilterBase {
private int startKeyLength;
private byte[] endKey;
private int endKeyLength;
private int estimateSplitNum;

private final ImmutableBytesWritable ptr = new ImmutableBytesWritable();

Expand Down Expand Up @@ -107,50 +105,6 @@ private void init(List<List<KeyRange>> slots, RowKeySchema schema, int maxKeyLen
endKeyLength = 0;
// Start key for the scan will initially be set to start at the right place
// We just need to set the end key for when we need to calculate the next skip hint
this.estimateSplitNum = estimateSplitNum();
}

// Estimate the number of splits that would be generated from the slots.
private int estimateSplitNum() {
int estimate = 0;
do {
estimate += 1;
} while (incrementKey());
return estimate;
}

// Used externally by region splitters to generate all split ranges.
public List<KeyRange> generateSplitRanges(int maxConcurrency) {
List<KeyRange> splits = Lists.newArrayListWithCapacity(estimateSplitNum);
// steps we need to increment when chunking multiple key range together.
int step = (int) Math.ceil(((double) estimateSplitNum) / maxConcurrency) - 1;
boolean terminated = false, lowerInclusive;
while (!terminated) {
// Do not need to care about the length since we set the key from the very beginning.
setStartKey();
lowerInclusive = isKeyInclusive(Bound.LOWER);
terminated = !incrementKey(step, Bound.UPPER);
if (!terminated) {
setEndKey();
} else {
// We have wrapped around already, set the key to be the last key.
for (int i=0; i<slots.size(); i++) {
position[i] = slots.get(i).size() - 1;
}
setEndKey();
}
// We only mark the lower bound as exclusive if all the key parts making up of the
// lower bound are exclusive.
// Since setKey for upper key always increment it by one byte, we will always mark the
// upper bound as not inclusive.
KeyRange range = KeyRange.getKeyRange(
Arrays.copyOf(startKey, startKeyLength), lowerInclusive,
Arrays.copyOf(endKey, endKeyLength), false,
false);
splits.add(range);
terminated = terminated || !incrementKey(1, Bound.LOWER);
}
return splits;
}

@Override
Expand Down Expand Up @@ -280,77 +234,10 @@ private void appendToStartKey(int slotIndex, int byteOffset) {
startKeyLength += setKey(Bound.LOWER, startKey, byteOffset, slotIndex);
}

private void setEndKey() {
endKeyLength = setKey(Bound.UPPER, endKey, 0, 0);
}

private void appendToEndKey(int slotIndex, int byteOffset) {
endKeyLength += setKey(Bound.UPPER, endKey, byteOffset, slotIndex);
}

private boolean isKeyInclusive(Bound bound) {
// We declare the key as exclusive only when all the parts make up of it are exclusive.
for (int i=0; i<slots.size(); i++) {
if (!slots.get(i).get(position[i]).isInclusive(bound)) {
return false;
}
}
return true;
}

private boolean incrementKey(int steps, Bound bound) {
for (int i=0; i<steps; i++) {
if (!incrementKey()) {
return false;
}
}
setBoundSlotPosition(bound);
return true;
}

private boolean incrementKey() {
// Find first index on the current position that is a range slot.
int idx;
for (idx = 0; idx < slots.size(); idx++) {
if (!slots.get(idx).get(position[idx]).isSingleKey()) {
break;
}
}
// No slot on the current position is a range.
if (idx == slots.size()) {
idx = slots.size() - 1;
}
while (idx >= 0 && (position[idx] = (position[idx] + 1) % slots.get(idx).size()) == 0) {
idx--;
}
return idx >= 0;
}

private void setBoundSlotPosition(Bound bound) {
// Find first index on the current position that is a range slot.
int idx;
for (idx = 0; idx < slots.size(); idx++) {
if (!slots.get(idx).get(position[idx]).isSingleKey()) {
break;
}
}
// If the idx is not the last position, reset the slots beyond to become 0th position. If
// we are setting the position for a lower bound, reset all of them to 0. If we are setting
// the position for an uppser bound, reset all of them to the last index. If the bound is
// not specified, set all positions to 0.
if (idx < slots.size() - 1) {
if (bound == Bound.LOWER) {
for (int i = idx + 1; i < slots.size(); i++) {
position[i] = 0;
}
} else {
for (int i = idx + 1; i < slots.size(); i++) {
position[i] = slots.get(i).size() - 1;
}
}
}
}

private int getTerminatorCount(RowKeySchema schema) {
int nTerminators = 0;
for (int i = 0; i < schema.getFieldCount(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hadoop.hbase.util.Bytes;

import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.*;
import com.salesforce.phoenix.compile.StatementContext;
import com.salesforce.phoenix.query.*;
Expand Down Expand Up @@ -77,7 +78,32 @@ protected DefaultParallelIteratorRegionSplitter(StatementContext context, TableR
protected List<Entry<HRegionInfo, ServerName>> getAllRegions() throws SQLException {
Scan scan = context.getScan();
NavigableMap<HRegionInfo, ServerName> allTableRegions = context.getConnection().getQueryServices().getAllTableRegions(table);
return ParallelIterators.filterRegions(allTableRegions, scan.getStartRow(), scan.getStopRow());
return filterRegions(allTableRegions, scan.getStartRow(), scan.getStopRow());
}

/**
* Filters out regions that intersect with key range specified by the startKey and stopKey
* @param allTableRegions all region infos for a given table
* @param startKey the lower bound of key range, inclusive
* @param stopKey the upper bound of key range, inclusive
* @return regions that intersect with the key range given by the startKey and stopKey
*/
// exposed for tests
public static List<Map.Entry<HRegionInfo, ServerName>> filterRegions(NavigableMap<HRegionInfo, ServerName> allTableRegions, byte[] startKey, byte[] stopKey) {
Iterable<Map.Entry<HRegionInfo, ServerName>> regions;
final KeyRange keyRange = KeyRange.getKeyRange(startKey, true, stopKey, false, false);
if (keyRange == KeyRange.EVERYTHING_RANGE) {
regions = allTableRegions.entrySet();
} else {
regions = Iterables.filter(allTableRegions.entrySet(), new Predicate<Map.Entry<HRegionInfo, ServerName>>() {
@Override
public boolean apply(Map.Entry<HRegionInfo, ServerName> region) {
KeyRange regionKeyRange = KeyRange.getKeyRange(region.getKey());
return keyRange.intersect(regionKeyRange) != KeyRange.EMPTY_RANGE;
}
});
}
return Lists.newArrayList(regions);
}

protected List<KeyRange> genKeyRanges(List<Map.Entry<HRegionInfo, ServerName>> regions) {
Expand All @@ -95,7 +121,7 @@ protected List<KeyRange> genKeyRanges(List<Map.Entry<HRegionInfo, ServerName>> r
//
// if r >= t:
// scan using regional boundaries
// elif r/2 > t:
// elif r > t/2:
// split each region in s splits such that:
// s = max(x) where s * x < m
// else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@
import org.apache.hadoop.hbase.util.Pair;

import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.salesforce.phoenix.compile.StatementContext;
import com.salesforce.phoenix.execute.RowCounter;
import com.salesforce.phoenix.job.JobManager.JobCallable;
Expand Down Expand Up @@ -80,31 +77,6 @@ public ParallelIterators(StatementContext context, TableRef table, RowCounter ro
this.splits = getSplits(context, table);
}

/**
* Filters out regions that intersect with key range specified by the startKey and stopKey
* @param allTableRegions all region infos for a given table
* @param startKey the lower bound of key range, inclusive
* @param stopKey the upper bound of key range, inclusive
* @return regions that intersect with the key range given by the startKey and stopKey
*/
// exposed for tests
public static List<Map.Entry<HRegionInfo, ServerName>> filterRegions(NavigableMap<HRegionInfo, ServerName> allTableRegions, byte[] startKey, byte[] stopKey) {
Iterable<Map.Entry<HRegionInfo, ServerName>> regions;
final KeyRange keyRange = KeyRange.getKeyRange(startKey, true, stopKey, false, false);
if (keyRange == KeyRange.EVERYTHING_RANGE) {
regions = allTableRegions.entrySet();
} else {
regions = Iterables.filter(allTableRegions.entrySet(), new Predicate<Map.Entry<HRegionInfo, ServerName>>() {
@Override
public boolean apply(Map.Entry<HRegionInfo, ServerName> region) {
KeyRange regionKeyRange = KeyRange.getKeyRange(region.getKey());
return keyRange.intersect(regionKeyRange) != KeyRange.EMPTY_RANGE;
}
});
}
return Lists.newArrayList(regions);
}

/**
* Splits the given scan's key range so that each split can be queried in parallel
*
Expand Down
Loading

0 comments on commit 68304f2

Please sign in to comment.