Skip to content

Commit

Permalink
Pass in lowerInclusive and upperExclusive ranges to intersect
Browse files Browse the repository at this point in the history
  • Loading branch information
jtaylor-sfdc committed Apr 9, 2013
1 parent 68304f2 commit 5f23fbb
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 48 deletions.
60 changes: 35 additions & 25 deletions src/main/java/com/salesforce/phoenix/compile/ScanRanges.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,10 @@

import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;

import com.salesforce.phoenix.query.KeyRange;
import com.salesforce.phoenix.schema.RowKeySchema;
import com.salesforce.phoenix.schema.ValueBitSet;
import com.salesforce.phoenix.util.ByteUtil;
import com.salesforce.phoenix.util.ScanUtil;


Expand Down Expand Up @@ -141,21 +139,25 @@ public void setScanStartStopRow(Scan scan) {
private static final ImmutableBytesWritable UNBOUND_LOWER = new ImmutableBytesWritable(KeyRange.UNBOUND_LOWER);
private static final ImmutableBytesWritable UNBOUND_UPPER = new ImmutableBytesWritable(KeyRange.UNBOUND_UPPER);

public boolean intersect(KeyRange keyRange) {
/**
* Return true if the range formed by the lowerInclusiveKey and upperExclusiveKey
* intersects with any of the scan ranges and false otherwise. We cannot pass in
* a KeyRange here, because the underlying compare functions expect lower inclusive
* and upper exclusive keys. We cannot get their next key because the key must
* conform to the row key schema and if a null byte is added to a lower inclusive
* key, it's no longer a valid, real key.
* @param lowerInclusiveKey lower inclusive key
* @param upperExclusiveKey upper exclusive key
* @return true if the scan range intersects with the specified lower/upper key
* range
*/
public boolean intersect(byte[] lowerInclusiveKey, byte[] upperExclusiveKey) {
if (isEverything()) {
return true;
}
if (isDegenerate()) {
return false;
}
byte[] lowerInclusiveKey = keyRange.getLowerRange();
if (!keyRange.isLowerInclusive() && !Bytes.equals(lowerInclusiveKey, KeyRange.UNBOUND_LOWER)) {
lowerInclusiveKey = ByteUtil.nextKey(lowerInclusiveKey);
}
byte[] upperExclusiveKey = keyRange.getUpperRange();
if (keyRange.isUpperInclusive()) {
upperExclusiveKey = ByteUtil.nextKey(upperExclusiveKey);
}
int i = 0;
int[] position = new int[ranges.size()];

Expand All @@ -165,9 +167,13 @@ public boolean intersect(KeyRange keyRange) {
int nSlots = ranges.size();

lowerPtr.set(lowerInclusiveKey, 0, lowerInclusiveKey.length);
schema.first(lowerPtr, i, ValueBitSet.EMPTY_VALUE_BITSET);
if (schema.first(lowerPtr, i, ValueBitSet.EMPTY_VALUE_BITSET) == null) {
lower = UNBOUND_LOWER;
}
upperPtr.set(upperExclusiveKey, 0, upperExclusiveKey.length);
schema.first(upperPtr, i, ValueBitSet.EMPTY_VALUE_BITSET);
if (schema.first(upperPtr, i, ValueBitSet.EMPTY_VALUE_BITSET) == null) {
upper = UNBOUND_UPPER;
}

int cmpLower=0,cmpUpper=0;

Expand All @@ -191,19 +197,23 @@ public boolean intersect(KeyRange keyRange) {
}

// Move to the next part of the key
if (schema.next(lowerPtr, i, lowerInclusiveKey.length, ValueBitSet.EMPTY_VALUE_BITSET) == null) {
// If no more lower key parts, then we have no constraint for that part of the key,
// so we use unbound lower from here on out.
lower = UNBOUND_LOWER;
} else {
lower = lowerPtr;
if (lower != UNBOUND_LOWER) {
if (schema.next(lowerPtr, i, lowerInclusiveKey.length, ValueBitSet.EMPTY_VALUE_BITSET) == null) {
// If no more lower key parts, then we have no constraint for that part of the key,
// so we use unbound lower from here on out.
lower = UNBOUND_LOWER;
} else {
lower = lowerPtr;
}
}
if (schema.next(upperPtr, i, upperExclusiveKey.length, ValueBitSet.EMPTY_VALUE_BITSET) == null) {
// If no more upper key parts, then we have no constraint for that part of the key,
// so we use unbound upper from here on out.
upper = UNBOUND_UPPER;
} else {
upper = upperPtr;
if (upper != UNBOUND_UPPER) {
if (schema.next(upperPtr, i, upperExclusiveKey.length, ValueBitSet.EMPTY_VALUE_BITSET) == null) {
// If no more upper key parts, then we have no constraint for that part of the key,
// so we use unbound upper from here on out.
upper = UNBOUND_UPPER;
} else {
upper = upperPtr;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@
import org.apache.hadoop.hbase.ServerName;

import com.google.common.base.Predicate;
import com.google.common.collect.*;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.salesforce.phoenix.compile.ScanRanges;
import com.salesforce.phoenix.compile.StatementContext;
import com.salesforce.phoenix.query.KeyRange;
import com.salesforce.phoenix.schema.TableRef;


Expand Down Expand Up @@ -71,8 +71,7 @@ public static List<Map.Entry<HRegionInfo, ServerName>> filterRegions(NavigableMa
new Predicate<Map.Entry<HRegionInfo, ServerName>>() {
@Override
public boolean apply(Map.Entry<HRegionInfo, ServerName> region) {
KeyRange regionKeyRange = KeyRange.getKeyRange(region.getKey());
return ranges.intersect(regionKeyRange);
return ranges.intersect(region.getKey().getStartKey(), region.getKey().getEndKey());
}
});
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/salesforce/phoenix/query/KeyRange.java
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public int compareLowerToUpperBound( byte[] b, int o, int l) {
* and 0 if they are equal.
*/
public int compareLowerToUpperBound( byte[] b, int o, int l, boolean isInclusive) {
if (lowerUnbound() || (Bytes.equals(b, KeyRange.UNBOUND_UPPER))) {
if (lowerUnbound() || b == KeyRange.UNBOUND_UPPER) {
return -1;
}
int cmp = Bytes.compareTo(lowerRange, 0, lowerRange.length, b, o, l);
Expand All @@ -208,7 +208,7 @@ public int compareUpperToLowerBound(byte[] b, int o, int l) {
}

public int compareUpperToLowerBound(byte[] b, int o, int l, boolean isInclusive) {
if (upperUnbound() || Bytes.equals(b, KeyRange.UNBOUND_LOWER)) {
if (upperUnbound() || b == KeyRange.UNBOUND_LOWER) {
return 1;
}
int cmp = Bytes.compareTo(upperRange, 0, upperRange.length, b, o, l);
Expand Down
36 changes: 36 additions & 0 deletions src/main/java/com/salesforce/phoenix/schema/RowKeySchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import com.salesforce.phoenix.query.QueryConstants;
import com.salesforce.phoenix.util.ByteUtil;


/**
Expand Down Expand Up @@ -80,6 +81,9 @@ public RowKeySchema build() {

@Override
public Boolean next(ImmutableBytesWritable ptr, int position, int maxOffset, ValueBitSet bitSet) {
if (maxOffset == 0) {
return null;
}
// If positioned at SEPARATOR_BYTE, skip it.
if (position > 0 && !getField(position-1).getType().isFixedWidth() && position-1 < getMaxFields() && ptr.get()[ptr.getOffset()+ptr.getLength()] == QueryConstants.SEPARATOR_BYTE) {
ptr.set(ptr.get(), ptr.getOffset()+ptr.getLength()+1, 0);
Expand Down Expand Up @@ -121,4 +125,36 @@ protected int writeVarLengthField(ImmutableBytesWritable ptr, byte[] b, int offs
public int getMaxFields() {
return this.getMinNullable();
}

/**
* Given potentially a partial key, but one that is valid against
* this row key schema, increment it to the next key in the row
* key schema key space.
* @param ptr pointer to the key to be incremented
* @return a new byte array with the incremented key
*/
public byte[] nextKey(ImmutableBytesWritable ptr) {
byte[] buf = ptr.get();
int offset = ptr.getOffset();
int length = ptr.getLength();
byte[] key;
if (!this.getField(this.getMaxFields()-1).getType().isFixedWidth()) {
// Add a SEPARATOR byte at the end if we have a complete key with a variable
// length at the end
if (this.setAccessor(ptr, this.getMaxFields()-1, ValueBitSet.EMPTY_VALUE_BITSET)) {
key = new byte[length+1];
System.arraycopy(buf, offset, key, 0, length);
key[length] = QueryConstants.SEPARATOR_BYTE;
ByteUtil.nextKey(key, key.length);
return key;
}
}
// No separator needed because we either have a fixed width value at the end
// or we have a partial key which would be terminated with a separator byte.
key = new byte[length];
System.arraycopy(buf, offset, key, 0, length);
ByteUtil.nextKey(key, key.length);
return key;
}

}
9 changes: 7 additions & 2 deletions src/main/java/com/salesforce/phoenix/schema/ValueSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,10 @@ protected Boolean positionPtr(ImmutableBytesWritable ptr, int position, int maxO
// that supports a reset method, so we don't need to instantiate a new one
// on each iteration.
public Boolean first(ImmutableBytesWritable ptr, int position, ValueBitSet bitSet) {
int maxOffset = ptr.getOffset() + ptr.getLength(); // TODO: reliable?
assert(maxOffset > 0);
if (ptr.getLength() == 0) {
return null;
}
int maxOffset = ptr.getOffset() + ptr.getLength();
ptr.set(ptr.get(), ptr.getOffset(), 0);
return positionPtr(ptr, position, maxOffset, bitSet);
}
Expand All @@ -290,6 +292,9 @@ public Boolean first(ImmutableBytesWritable ptr, int position, ValueBitSet bitSe
* @return true if there is a field after position and false otherwise.
*/
public Boolean next(ImmutableBytesWritable ptr, int position, int maxOffset, ValueBitSet bitSet) {
if (maxOffset == 0) {
return null;
}
return positionPtr(ptr, position, maxOffset, bitSet);
}

Expand Down
17 changes: 16 additions & 1 deletion src/test/java/com/salesforce/phoenix/compile/ScanRangesTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.salesforce.phoenix.query.KeyRange;
import com.salesforce.phoenix.schema.*;
import com.salesforce.phoenix.schema.RowKeySchema.RowKeySchemaBuilder;
import com.salesforce.phoenix.util.ByteUtil;


/**
Expand All @@ -64,7 +65,21 @@ public ScanRangesTest(ScanRanges scanRanges, int[] widths,

@Test
public void test() {
assertEquals(expectedResult, scanRanges.intersect(keyRange));
byte[] lowerInclusiveKey = keyRange.getLowerRange();
if (!keyRange.isLowerInclusive() && !Bytes.equals(lowerInclusiveKey, KeyRange.UNBOUND_LOWER)) {
// This assumes the last key is fixed length, otherwise the results may be incorrect
// since there's no terminating 0 byte for a variable length key and thus we may be
// incrementing the key too much.
lowerInclusiveKey = ByteUtil.nextKey(lowerInclusiveKey);
}
byte[] upperExclusiveKey = keyRange.getUpperRange();
if (keyRange.isUpperInclusive()) {
// This assumes the last key is fixed length, otherwise the results may be incorrect
// since there's no terminating 0 byte for a variable length key and thus we may be
// incrementing the key too much.
upperExclusiveKey = ByteUtil.nextKey(upperExclusiveKey);
}
assertEquals(expectedResult, scanRanges.intersect(lowerInclusiveKey,upperExclusiveKey));
}

private static KeyRange getKeyRange(byte[] lowerRange, boolean lowerInclusive, byte[] upperRange, boolean upperInclusive) {
Expand Down
29 changes: 15 additions & 14 deletions src/test/java/com/salesforce/phoenix/memory/MemoryManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
/**
*
* Tests for GlobalMemoryManager and ChildMemoryManager
* TODO: use our own time keeper so these tests don't flap
*
* @author jtaylor
* @since 0.1
Expand Down Expand Up @@ -86,16 +87,16 @@ public void testWaitForMemoryAvailable() {
public void run() {
MemoryChunk c1 = rmm1.allocate(50);
MemoryChunk c2 = rmm1.allocate(50);
sleepFor(2000);
sleepFor(4000);
c1.close();
sleepFor(1000);
sleepFor(2000);
c2.close();
}
};
Thread t2 = new Thread() {
@Override
public void run() {
sleepFor(1000);
sleepFor(2000);
// Will require waiting for a bit of time before t1 frees the requested memory
long startTime = System.currentTimeMillis();
MemoryChunk c3 = rmm2.allocate(50);
Expand All @@ -105,7 +106,7 @@ public void run() {
};
t1.start();
t2.start();
sleepFor(500);
sleepFor(1000);
// Main thread competes with others to get all memory, but should wait
// until both threads are complete (since that's when the memory will
// again be all available.
Expand All @@ -127,27 +128,27 @@ public void testResizeWaitForMemoryAvailable() {
public void run() {
MemoryChunk c1 = rmm1.allocate(50);
MemoryChunk c2 = rmm1.allocate(40);
sleepFor(2000);
sleepFor(4000);
c1.close();
sleepFor(1000);
sleepFor(2000);
c2.close();
}
};
Thread t2 = new Thread() {
@Override
public void run() {
sleepFor(1000);
sleepFor(2000);
MemoryChunk c3 = rmm2.allocate(10);
// Will require waiting for a bit of time before t1 frees the requested memory
long startTime = System.currentTimeMillis();
c3.resize(50);
assertTrue(System.currentTimeMillis() - startTime >= 1000);
assertTrue(System.currentTimeMillis() - startTime >= 2000);
c3.close();
}
};
t1.start();
t2.start();
sleepFor(500);
sleepFor(1000);
// Main thread competes with others to get all memory, but should wait
// until both threads are complete (since that's when the memory will
// again be all available.
Expand All @@ -168,9 +169,9 @@ public void testWaitUntilResize() {
@Override
public void run() {
MemoryChunk c2 = rmm1.allocate(20);
sleepFor(2000);
sleepFor(4000);
c1.resize(20); // resize down to test that other thread is notified
sleepFor(1000);
sleepFor(2000);
c2.close();
c1.close();
assertTrue(rmm1.getAvailableMemory() == rmm1.getMaxMemory());
Expand All @@ -179,20 +180,20 @@ public void run() {
Thread t2 = new Thread() {
@Override
public void run() {
sleepFor(1000);
sleepFor(2000);
ChildMemoryManager rmm2 = new ChildMemoryManager(gmm,100);
MemoryChunk c3 = rmm2.allocate(10);
long startTime = System.currentTimeMillis();
c3.resize(60); // Test that resize waits if memory not available
assertTrue(c1.getSize() == 20); // c1 was resized not closed
assertTrue(System.currentTimeMillis() - startTime >= 1000); // we waited some time before the allocate happened
assertTrue(System.currentTimeMillis() - startTime >= 2000); // we waited some time before the allocate happened
c3.close();
assertTrue(rmm2.getAvailableMemory() == rmm2.getMaxMemory());
}
};
t1.start();
t2.start();
sleepFor(500);
sleepFor(1000);
// Main thread competes with others to get all memory, but should wait
// until both threads are complete (since that's when the memory will
// again be all available.
Expand Down

0 comments on commit 5f23fbb

Please sign in to comment.