diff --git a/src/main/java/com/salesforce/phoenix/compile/ScanRanges.java b/src/main/java/com/salesforce/phoenix/compile/ScanRanges.java index 5d8120a1..f687c4c7 100644 --- a/src/main/java/com/salesforce/phoenix/compile/ScanRanges.java +++ b/src/main/java/com/salesforce/phoenix/compile/ScanRanges.java @@ -32,12 +32,10 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; -import org.apache.hadoop.hbase.util.Bytes; import com.salesforce.phoenix.query.KeyRange; import com.salesforce.phoenix.schema.RowKeySchema; import com.salesforce.phoenix.schema.ValueBitSet; -import com.salesforce.phoenix.util.ByteUtil; import com.salesforce.phoenix.util.ScanUtil; @@ -141,21 +139,25 @@ public void setScanStartStopRow(Scan scan) { private static final ImmutableBytesWritable UNBOUND_LOWER = new ImmutableBytesWritable(KeyRange.UNBOUND_LOWER); private static final ImmutableBytesWritable UNBOUND_UPPER = new ImmutableBytesWritable(KeyRange.UNBOUND_UPPER); - public boolean intersect(KeyRange keyRange) { + /** + * Return true if the range formed by the lowerInclusiveKey and upperExclusiveKey + * intersects with any of the scan ranges and false otherwise. We cannot pass in + * a KeyRange here, because the underlying compare functions expect lower inclusive + * and upper exclusive keys. We cannot get their next key because the key must + * conform to the row key schema and if a null byte is added to a lower inclusive + * key, it's no longer a valid, real key. + * @param lowerInclusiveKey lower inclusive key + * @param upperExclusiveKey upper exclusive key + * @return true if the scan range intersects with the specified lower/upper key + * range + */ + public boolean intersect(byte[] lowerInclusiveKey, byte[] upperExclusiveKey) { if (isEverything()) { return true; } if (isDegenerate()) { return false; } - byte[] lowerInclusiveKey = keyRange.getLowerRange(); - if (!keyRange.isLowerInclusive() && !Bytes.equals(lowerInclusiveKey, KeyRange.UNBOUND_LOWER)) { - lowerInclusiveKey = ByteUtil.nextKey(lowerInclusiveKey); - } - byte[] upperExclusiveKey = keyRange.getUpperRange(); - if (keyRange.isUpperInclusive()) { - upperExclusiveKey = ByteUtil.nextKey(upperExclusiveKey); - } int i = 0; int[] position = new int[ranges.size()]; @@ -165,9 +167,13 @@ public boolean intersect(KeyRange keyRange) { int nSlots = ranges.size(); lowerPtr.set(lowerInclusiveKey, 0, lowerInclusiveKey.length); - schema.first(lowerPtr, i, ValueBitSet.EMPTY_VALUE_BITSET); + if (schema.first(lowerPtr, i, ValueBitSet.EMPTY_VALUE_BITSET) == null) { + lower = UNBOUND_LOWER; + } upperPtr.set(upperExclusiveKey, 0, upperExclusiveKey.length); - schema.first(upperPtr, i, ValueBitSet.EMPTY_VALUE_BITSET); + if (schema.first(upperPtr, i, ValueBitSet.EMPTY_VALUE_BITSET) == null) { + upper = UNBOUND_UPPER; + } int cmpLower=0,cmpUpper=0; @@ -191,19 +197,23 @@ public boolean intersect(KeyRange keyRange) { } // Move to the next part of the key - if (schema.next(lowerPtr, i, lowerInclusiveKey.length, ValueBitSet.EMPTY_VALUE_BITSET) == null) { - // If no more lower key parts, then we have no constraint for that part of the key, - // so we use unbound lower from here on out. - lower = UNBOUND_LOWER; - } else { - lower = lowerPtr; + if (lower != UNBOUND_LOWER) { + if (schema.next(lowerPtr, i, lowerInclusiveKey.length, ValueBitSet.EMPTY_VALUE_BITSET) == null) { + // If no more lower key parts, then we have no constraint for that part of the key, + // so we use unbound lower from here on out. + lower = UNBOUND_LOWER; + } else { + lower = lowerPtr; + } } - if (schema.next(upperPtr, i, upperExclusiveKey.length, ValueBitSet.EMPTY_VALUE_BITSET) == null) { - // If no more upper key parts, then we have no constraint for that part of the key, - // so we use unbound upper from here on out. - upper = UNBOUND_UPPER; - } else { - upper = upperPtr; + if (upper != UNBOUND_UPPER) { + if (schema.next(upperPtr, i, upperExclusiveKey.length, ValueBitSet.EMPTY_VALUE_BITSET) == null) { + // If no more upper key parts, then we have no constraint for that part of the key, + // so we use unbound upper from here on out. + upper = UNBOUND_UPPER; + } else { + upper = upperPtr; + } } } } diff --git a/src/main/java/com/salesforce/phoenix/iterate/SkipRangeParallelIteratorRegionSplitter.java b/src/main/java/com/salesforce/phoenix/iterate/SkipRangeParallelIteratorRegionSplitter.java index 4566ccec..4591d74f 100644 --- a/src/main/java/com/salesforce/phoenix/iterate/SkipRangeParallelIteratorRegionSplitter.java +++ b/src/main/java/com/salesforce/phoenix/iterate/SkipRangeParallelIteratorRegionSplitter.java @@ -34,10 +34,10 @@ import org.apache.hadoop.hbase.ServerName; import com.google.common.base.Predicate; -import com.google.common.collect.*; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import com.salesforce.phoenix.compile.ScanRanges; import com.salesforce.phoenix.compile.StatementContext; -import com.salesforce.phoenix.query.KeyRange; import com.salesforce.phoenix.schema.TableRef; @@ -71,8 +71,7 @@ public static List> filterRegions(NavigableMa new Predicate>() { @Override public boolean apply(Map.Entry region) { - KeyRange regionKeyRange = KeyRange.getKeyRange(region.getKey()); - return ranges.intersect(regionKeyRange); + return ranges.intersect(region.getKey().getStartKey(), region.getKey().getEndKey()); } }); } diff --git a/src/main/java/com/salesforce/phoenix/query/KeyRange.java b/src/main/java/com/salesforce/phoenix/query/KeyRange.java index 462a87a2..f05511cb 100644 --- a/src/main/java/com/salesforce/phoenix/query/KeyRange.java +++ b/src/main/java/com/salesforce/phoenix/query/KeyRange.java @@ -187,7 +187,7 @@ public int compareLowerToUpperBound( byte[] b, int o, int l) { * and 0 if they are equal. */ public int compareLowerToUpperBound( byte[] b, int o, int l, boolean isInclusive) { - if (lowerUnbound() || (Bytes.equals(b, KeyRange.UNBOUND_UPPER))) { + if (lowerUnbound() || b == KeyRange.UNBOUND_UPPER) { return -1; } int cmp = Bytes.compareTo(lowerRange, 0, lowerRange.length, b, o, l); @@ -208,7 +208,7 @@ public int compareUpperToLowerBound(byte[] b, int o, int l) { } public int compareUpperToLowerBound(byte[] b, int o, int l, boolean isInclusive) { - if (upperUnbound() || Bytes.equals(b, KeyRange.UNBOUND_LOWER)) { + if (upperUnbound() || b == KeyRange.UNBOUND_LOWER) { return 1; } int cmp = Bytes.compareTo(upperRange, 0, upperRange.length, b, o, l); diff --git a/src/main/java/com/salesforce/phoenix/schema/RowKeySchema.java b/src/main/java/com/salesforce/phoenix/schema/RowKeySchema.java index e6893b97..dd3529cd 100644 --- a/src/main/java/com/salesforce/phoenix/schema/RowKeySchema.java +++ b/src/main/java/com/salesforce/phoenix/schema/RowKeySchema.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import com.salesforce.phoenix.query.QueryConstants; +import com.salesforce.phoenix.util.ByteUtil; /** @@ -80,6 +81,9 @@ public RowKeySchema build() { @Override public Boolean next(ImmutableBytesWritable ptr, int position, int maxOffset, ValueBitSet bitSet) { + if (maxOffset == 0) { + return null; + } // If positioned at SEPARATOR_BYTE, skip it. if (position > 0 && !getField(position-1).getType().isFixedWidth() && position-1 < getMaxFields() && ptr.get()[ptr.getOffset()+ptr.getLength()] == QueryConstants.SEPARATOR_BYTE) { ptr.set(ptr.get(), ptr.getOffset()+ptr.getLength()+1, 0); @@ -121,4 +125,36 @@ protected int writeVarLengthField(ImmutableBytesWritable ptr, byte[] b, int offs public int getMaxFields() { return this.getMinNullable(); } + + /** + * Given potentially a partial key, but one that is valid against + * this row key schema, increment it to the next key in the row + * key schema key space. + * @param ptr pointer to the key to be incremented + * @return a new byte array with the incremented key + */ + public byte[] nextKey(ImmutableBytesWritable ptr) { + byte[] buf = ptr.get(); + int offset = ptr.getOffset(); + int length = ptr.getLength(); + byte[] key; + if (!this.getField(this.getMaxFields()-1).getType().isFixedWidth()) { + // Add a SEPARATOR byte at the end if we have a complete key with a variable + // length at the end + if (this.setAccessor(ptr, this.getMaxFields()-1, ValueBitSet.EMPTY_VALUE_BITSET)) { + key = new byte[length+1]; + System.arraycopy(buf, offset, key, 0, length); + key[length] = QueryConstants.SEPARATOR_BYTE; + ByteUtil.nextKey(key, key.length); + return key; + } + } + // No separator needed because we either have a fixed width value at the end + // or we have a partial key which would be terminated with a separator byte. + key = new byte[length]; + System.arraycopy(buf, offset, key, 0, length); + ByteUtil.nextKey(key, key.length); + return key; + } + } diff --git a/src/main/java/com/salesforce/phoenix/schema/ValueSchema.java b/src/main/java/com/salesforce/phoenix/schema/ValueSchema.java index a5047d3a..13bbc0a8 100644 --- a/src/main/java/com/salesforce/phoenix/schema/ValueSchema.java +++ b/src/main/java/com/salesforce/phoenix/schema/ValueSchema.java @@ -273,8 +273,10 @@ protected Boolean positionPtr(ImmutableBytesWritable ptr, int position, int maxO // that supports a reset method, so we don't need to instantiate a new one // on each iteration. public Boolean first(ImmutableBytesWritable ptr, int position, ValueBitSet bitSet) { - int maxOffset = ptr.getOffset() + ptr.getLength(); // TODO: reliable? - assert(maxOffset > 0); + if (ptr.getLength() == 0) { + return null; + } + int maxOffset = ptr.getOffset() + ptr.getLength(); ptr.set(ptr.get(), ptr.getOffset(), 0); return positionPtr(ptr, position, maxOffset, bitSet); } @@ -290,6 +292,9 @@ public Boolean first(ImmutableBytesWritable ptr, int position, ValueBitSet bitSe * @return true if there is a field after position and false otherwise. */ public Boolean next(ImmutableBytesWritable ptr, int position, int maxOffset, ValueBitSet bitSet) { + if (maxOffset == 0) { + return null; + } return positionPtr(ptr, position, maxOffset, bitSet); } diff --git a/src/test/java/com/salesforce/phoenix/compile/ScanRangesTest.java b/src/test/java/com/salesforce/phoenix/compile/ScanRangesTest.java index e937ad6e..e0291e5d 100644 --- a/src/test/java/com/salesforce/phoenix/compile/ScanRangesTest.java +++ b/src/test/java/com/salesforce/phoenix/compile/ScanRangesTest.java @@ -43,6 +43,7 @@ import com.salesforce.phoenix.query.KeyRange; import com.salesforce.phoenix.schema.*; import com.salesforce.phoenix.schema.RowKeySchema.RowKeySchemaBuilder; +import com.salesforce.phoenix.util.ByteUtil; /** @@ -64,7 +65,21 @@ public ScanRangesTest(ScanRanges scanRanges, int[] widths, @Test public void test() { - assertEquals(expectedResult, scanRanges.intersect(keyRange)); + byte[] lowerInclusiveKey = keyRange.getLowerRange(); + if (!keyRange.isLowerInclusive() && !Bytes.equals(lowerInclusiveKey, KeyRange.UNBOUND_LOWER)) { + // This assumes the last key is fixed length, otherwise the results may be incorrect + // since there's no terminating 0 byte for a variable length key and thus we may be + // incrementing the key too much. + lowerInclusiveKey = ByteUtil.nextKey(lowerInclusiveKey); + } + byte[] upperExclusiveKey = keyRange.getUpperRange(); + if (keyRange.isUpperInclusive()) { + // This assumes the last key is fixed length, otherwise the results may be incorrect + // since there's no terminating 0 byte for a variable length key and thus we may be + // incrementing the key too much. + upperExclusiveKey = ByteUtil.nextKey(upperExclusiveKey); + } + assertEquals(expectedResult, scanRanges.intersect(lowerInclusiveKey,upperExclusiveKey)); } private static KeyRange getKeyRange(byte[] lowerRange, boolean lowerInclusive, byte[] upperRange, boolean upperInclusive) { diff --git a/src/test/java/com/salesforce/phoenix/memory/MemoryManagerTest.java b/src/test/java/com/salesforce/phoenix/memory/MemoryManagerTest.java index dee34674..f257e693 100644 --- a/src/test/java/com/salesforce/phoenix/memory/MemoryManagerTest.java +++ b/src/test/java/com/salesforce/phoenix/memory/MemoryManagerTest.java @@ -38,6 +38,7 @@ /** * * Tests for GlobalMemoryManager and ChildMemoryManager + * TODO: use our own time keeper so these tests don't flap * * @author jtaylor * @since 0.1 @@ -86,16 +87,16 @@ public void testWaitForMemoryAvailable() { public void run() { MemoryChunk c1 = rmm1.allocate(50); MemoryChunk c2 = rmm1.allocate(50); - sleepFor(2000); + sleepFor(4000); c1.close(); - sleepFor(1000); + sleepFor(2000); c2.close(); } }; Thread t2 = new Thread() { @Override public void run() { - sleepFor(1000); + sleepFor(2000); // Will require waiting for a bit of time before t1 frees the requested memory long startTime = System.currentTimeMillis(); MemoryChunk c3 = rmm2.allocate(50); @@ -105,7 +106,7 @@ public void run() { }; t1.start(); t2.start(); - sleepFor(500); + sleepFor(1000); // Main thread competes with others to get all memory, but should wait // until both threads are complete (since that's when the memory will // again be all available. @@ -127,27 +128,27 @@ public void testResizeWaitForMemoryAvailable() { public void run() { MemoryChunk c1 = rmm1.allocate(50); MemoryChunk c2 = rmm1.allocate(40); - sleepFor(2000); + sleepFor(4000); c1.close(); - sleepFor(1000); + sleepFor(2000); c2.close(); } }; Thread t2 = new Thread() { @Override public void run() { - sleepFor(1000); + sleepFor(2000); MemoryChunk c3 = rmm2.allocate(10); // Will require waiting for a bit of time before t1 frees the requested memory long startTime = System.currentTimeMillis(); c3.resize(50); - assertTrue(System.currentTimeMillis() - startTime >= 1000); + assertTrue(System.currentTimeMillis() - startTime >= 2000); c3.close(); } }; t1.start(); t2.start(); - sleepFor(500); + sleepFor(1000); // Main thread competes with others to get all memory, but should wait // until both threads are complete (since that's when the memory will // again be all available. @@ -168,9 +169,9 @@ public void testWaitUntilResize() { @Override public void run() { MemoryChunk c2 = rmm1.allocate(20); - sleepFor(2000); + sleepFor(4000); c1.resize(20); // resize down to test that other thread is notified - sleepFor(1000); + sleepFor(2000); c2.close(); c1.close(); assertTrue(rmm1.getAvailableMemory() == rmm1.getMaxMemory()); @@ -179,20 +180,20 @@ public void run() { Thread t2 = new Thread() { @Override public void run() { - sleepFor(1000); + sleepFor(2000); ChildMemoryManager rmm2 = new ChildMemoryManager(gmm,100); MemoryChunk c3 = rmm2.allocate(10); long startTime = System.currentTimeMillis(); c3.resize(60); // Test that resize waits if memory not available assertTrue(c1.getSize() == 20); // c1 was resized not closed - assertTrue(System.currentTimeMillis() - startTime >= 1000); // we waited some time before the allocate happened + assertTrue(System.currentTimeMillis() - startTime >= 2000); // we waited some time before the allocate happened c3.close(); assertTrue(rmm2.getAvailableMemory() == rmm2.getMaxMemory()); } }; t1.start(); t2.start(); - sleepFor(500); + sleepFor(1000); // Main thread competes with others to get all memory, but should wait // until both threads are complete (since that's when the memory will // again be all available.