From 9cf290dfa601c658529fa9716a4349390cbb14f1 Mon Sep 17 00:00:00 2001 From: Jesse Yates Date: Wed, 12 Jun 2013 21:46:06 -0700 Subject: [PATCH] Adding hbase-index project --- contrib/hbase-index/README.md | 156 +++++++++ .../hbase-index-0.94.9-compat/pom.xml | 32 ++ .../regionserver/wal/IndexedWALEditCodec.java | 196 +++++++++++ .../TestEndtoEndIndexingWithCompression.java | 42 +++ .../wal/TestReadWriteKeyValuesWithCodec.java | 154 ++++++++ ...estWALReplayWithCompressedIndexWrites.java | 25 ++ ...WALReplayWithoutCompressedIndexWrites.java | 26 ++ contrib/hbase-index/index-core/pom.xml | 18 + .../index/CannotReachIndexException.java | 14 + .../hbase/index/IndexLogRollSynchronizer.java | 99 ++++++ .../com/salesforce/hbase/index/IndexUtil.java | 69 ++++ .../salesforce/hbase/index/IndexWriter.java | 150 ++++++++ .../com/salesforce/hbase/index/Indexer.java | 230 ++++++++++++ .../hbase/index/builder/BaseIndexBuilder.java | 38 ++ .../index/builder/ColumnFamilyIndexer.java | 172 +++++++++ .../hbase/index/builder/IndexBuilder.java | 51 +++ .../index/table/CoprocessorHTableFactory.java | 31 ++ .../hbase/index/table/HTableFactory.java | 10 + .../index/table/HTableInterfaceReference.java | 85 +++++ .../hbase/index/wal/IndexedKeyValue.java | 130 +++++++ .../hbase/index/wal/KeyValueCodec.java | 79 +++++ .../regionserver/wal/IndexedHLogReader.java | 138 ++++++++ .../regionserver/wal/IndexedWALEdit.java | 88 +++++ .../hbase/index/TestEndtoEndIndexing.java | 193 ++++++++++ .../TestFailForUnsupportedHBaseVersions.java | 135 +++++++ .../wal/TestWALReplayWithIndexWrites.java | 257 ++++++++++++++ .../src/test/resources/log4j.properties | 64 ++++ contrib/hbase-index/pom.xml | 330 ++++++++++++++++++ 28 files changed, 3012 insertions(+) create mode 100644 contrib/hbase-index/README.md create mode 100644 contrib/hbase-index/hbase-index-0.94.9-compat/pom.xml create mode 100644 contrib/hbase-index/hbase-index-0.94.9-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java create mode 100644 contrib/hbase-index/hbase-index-0.94.9-compat/src/test/java/com/salesforce/hbase/TestEndtoEndIndexingWithCompression.java create mode 100644 contrib/hbase-index/hbase-index-0.94.9-compat/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestReadWriteKeyValuesWithCodec.java create mode 100644 contrib/hbase-index/hbase-index-0.94.9-compat/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayWithCompressedIndexWrites.java create mode 100644 contrib/hbase-index/hbase-index-0.94.9-compat/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayWithoutCompressedIndexWrites.java create mode 100644 contrib/hbase-index/index-core/pom.xml create mode 100644 contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/CannotReachIndexException.java create mode 100644 contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/IndexLogRollSynchronizer.java create mode 100644 contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/IndexUtil.java create mode 100644 contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/IndexWriter.java create mode 100644 contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/Indexer.java create mode 100644 contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/builder/BaseIndexBuilder.java create mode 100644 contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/builder/ColumnFamilyIndexer.java create mode 100644 contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/builder/IndexBuilder.java create mode 100644 contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/table/CoprocessorHTableFactory.java create mode 100644 contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/table/HTableFactory.java create mode 100644 contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/table/HTableInterfaceReference.java create mode 100644 contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/wal/IndexedKeyValue.java create mode 100644 contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/wal/KeyValueCodec.java create mode 100644 contrib/hbase-index/index-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedHLogReader.java create mode 100644 contrib/hbase-index/index-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEdit.java create mode 100644 contrib/hbase-index/index-core/src/test/java/com/salesforce/hbase/index/TestEndtoEndIndexing.java create mode 100644 contrib/hbase-index/index-core/src/test/java/com/salesforce/hbase/index/TestFailForUnsupportedHBaseVersions.java create mode 100644 contrib/hbase-index/index-core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayWithIndexWrites.java create mode 100644 contrib/hbase-index/index-core/src/test/resources/log4j.properties create mode 100644 contrib/hbase-index/pom.xml diff --git a/contrib/hbase-index/README.md b/contrib/hbase-index/README.md new file mode 100644 index 00000000..af27b1a1 --- /dev/null +++ b/contrib/hbase-index/README.md @@ -0,0 +1,156 @@ +# hbase-index +============= + +A general table-level indexing framework for HBase. This is a set of pieces that when combined together enable you to do 'HBase consistent' secondary indexing. + +## How it works +=============== + +We attempt to be completely transparent to the client, providing all the indexing functionality on the server side via a combination of coprocessors and WALEdit manipulation. + +### Writing + +When you make an edit from the client, we catch it on the region via a coprocessor and check to see if it should be indexed. If so, we then write a custom WAL entry that captures both the primary edit as well as all the secondary index edits. From this point on, your edit and its index entries are considered durable - just like usual. + +Once we are durable, the edit goes through the regular processing on the primary table. At the same time, we also make the index update to the index table(s). Either this edit must succeed or the region to which we are attempting to write is unavailable. If we can't write to an index, we kill the server (System.exit) - this ensures that we always write the index entry and don't get too far behind on the primary table vs. the index(1). + +### Reading + +When reading from an index table, there is no explicit guarantee of consistent across servers (acID-like semantics), so the best thing to do its to read _as of a timestamp_, ensuring that you get close to when the edit in the primary table occurs. In the usual (non-failure) case, there is very little time difference between the primary and index tables; you get a couple milliseconds as we deal with network overhead, but there is very little else slowing things down in the usual operation flow between when the primary and index puts are visible. + +(1) We could use a 'invalid index' indicator, but then again has to live somewhere (another HBase table?) which has the same failure considerations, so its not really worth the extra complexity for what is really a relatively small chance of increased availabilty. + +## HBase Consistent +======== + +HBase only guarantees consistency on a per-row, per-table basis. Therefore, its up to you to maintain consistency if you want to write across two different tables. +hbase-index provides this consistency guarantee by hacking the HBase Write-Ahead Log (WAL) to ensure that secondary index entries always get written if the primary +table write succeeds. + +## Caveats +========== + +There are no guarantees of: + + - serializability + - two edits may occur on the primary table and their index entries may be written out of order. + - We resolve this within the HBase model by ensuring that index entries timestamp always matches the primary table edit timestamp. + +## Usage +========= + +For the general Put/Delete case (the only operations currently supported), you don't need to change anything in the usual update path. However, there are a couple of things that you will need to change when setting up your cluster and tables. + +### Jars + +You will need to put the class jar for your desired version of hbase-index on the hbase classpath. Internally, we employ a RegionObserver coprocessor as well as a custom HLog Reader, both of which need to be available to HBase on startup. + +### hbase-site.xml changes + +You will need to add the following to your hbase-site.xml: +``` + + hbase.regionserver.hlog.reader.impl + org.apache.hadoop.hbase.regionserver.wal.IndexedHLogReader + +``` + +* NOTE: The IndexedHLogReader does *NOT support compressed WAL Edits*, so you will need to ensure that "hbase.regionserver.wal.enablecompression" is set to false. + +#### Supporting Indexing with Compressed WAL + +HBase >= 0.94.9 added support for a pluggable WALEditCodec (mainly [HBASE-8636](https://issues.apache.org/jira/browse/HBASE-8636)) which we leverage to provide full indexing support with WAL Compression enabled. + +The only thing you need to add is the following property to hbase-site: + +``` + + hbase.regionserver.wal.codec + org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec + +``` +And also put the hbase-index-0.94.9-compat.jar on the HBase classpath on startup. + +This supports both compressed *and* uncompressed WAL. So feel free to toggle: + +``` + + hbase.regionserver.wal.enablecompression + true + +``` + +### Note + Moving to 0.94.9 with WAL Compression requires a clean shutdown of the cluster - no WALs can be left over to replay + when the cluster comes back online. Our custom WALEditCodec - the IndexedWALEditCodec - is *not* backwards compatible + with the indexing _if compression is enabled_. If compression is not enabled, moving to the codec from the IndexedHLogReader + will be fine. + + _This means that if you are using the IndexedWALEditCodec - HBase 0.94.9+ - you must do a clean restart (no remaining WALs) + of the cluster when switching between compressed and uncompressed WAL_. + + +## Custom Indexing +=================== + +hbase-index has a built-in concept of an IndexBuilder that lets you create custom index entries based on the primary table edits. You only need to implement a com.salesforce.hbase.index.builder.IndexBuilder; actually, you should subclass BaseIndexBuilder for cross-version compatability - not doing so voids your warranty with respect to upgrades. + +Then, you just need setup the Indexer to use your custom builder by setting up the table via IndexUtil#enableIndexing(). The enableIndexing() method ensures that your custom IndexBuilder is used by the indexer for the table and that your custom options are available to your IndexBuilder on the server-side. + +There is an example index builder, ColumnFamilyIndexer, that just indexes on column family. You can enable it on a table via ColumnFamilyIndexer#enableIndexing(), which internally will call IndexUtil#enableIndexing() and then setup the target index table(s) via ColumnFamilyIndexer#createIndexTable(). See TestEndtoEndIndexing for a thorough example. + +## Requirements +=============== + +* Java 1.6.0_34 or higher +* HBase-0.94.[0..5, .9] + - 0.94.6 has a bug in the configuration creation that mean default table references from coprocessors don't work [HBASE-8684](https://issues.apache.org/jira/browse/HBASE-8684) + - 0.94.7 breaks the RegionServerServices WAL accessing interface + - 0.94.9: has all the necessary bug fixes AND provides the interface to support indexing with a compressed WAL + +### If building from source +* All of the above +* Maven 3.X + +## Building from source +======================= + +### Building the Jars +For HBase < 0.94.9 +``` + $ mvn clean install -DskipTests +``` +For HBase >= 0.94.9 +``` + $ mvn clean install -DskipTests -Dhbase=0.94.9 +``` +This will build the necessary jars in the index-core/target directory (and if using -Dhbase=0.94.9, the hbase-0.94.9-compact/target directory). + +### Running the tests + +To just run the index-core tests, you can do: +``` + $ mvn clean test +``` +This runs the tests against HBase 0.94.4 and does not support WAL Compression. + +To run the tests against 0.94.9, run: +``` + $ mvn clean install -DskipTests + $ mvn clean test -Dhbase=0.94.9 +``` + +The first step ensures that the index-core jar is present in the local repository as the hbase-0.94.9 compatibility module requires the the index-core test-jar (and maven isn't smart enough to realize that when doing compilation, so we have to go through this extra step). + +## Roadmap/TODOs +======= + - Support alternative failure mechansims. + - The 'abort the server' mechanism is a bit heavy handed and decreases the robustness of the system in the face of transitive errors. A possible mechanism would be an 'index invalidator' that marks an index as invalid after a certain number of failures. + - Investigate client-written index updates. + - By have the region manage all the updates, it adds a lot more CPU and bandwidth load on an already delicate resource. This mechanism would still serialize the index updates to the WAL, but let the client ensure that the index updates are written to the index table. Only when the client fails to make the index updates (either via timeout or explicitly saying so) do we go into the failure + replay situation. This gets particularly tricky when managing rolling the at WAL - we cannot roll until the index updates have been marked complete, meaning we may need to block incoming requests as well as wait on all outstanding index updates. This adds a lot more complexity for what seems to a potentially modest performance upgrade, but may be worth it in some situations. + - Support Append, Increment operations + - These follow a slightly different path through the HRegion that don't make them as amenable to WALEdit modification. This will likely require some changes to HBase, but will be technically very similar to the Put and Delete implementations. + - Cleaner HTableInterface reference management in Indexer + - right now, its a little heavy-handed, creating a new set of HTables for each index request (in fact, each Put/Delete). Ideally, we would want to use some sort of time-based, LRU(ish) cache to keep track of the HTables; you don't want to keep open connections around that aren't being regularly used, but you don't want to throw away regularly used tables (so a strict, single size LRU could easily start to thrash). +- (Possible) Look into supporting multiple WALs as there is now a per-region WAL in hbase-0.94.6 + - this is part of a bigger issue with supporting multiple releases of HBase with different internals diff --git a/contrib/hbase-index/hbase-index-0.94.9-compat/pom.xml b/contrib/hbase-index/hbase-index-0.94.9-compat/pom.xml new file mode 100644 index 00000000..3d9ae38b --- /dev/null +++ b/contrib/hbase-index/hbase-index-0.94.9-compat/pom.xml @@ -0,0 +1,32 @@ + + 4.0.0 + + com.salesforce.hbase + hbase-index + 0.0.1-SNAPSHOT + + hbase-index-0.94.9-compat + HBase-0.94.9 Compatibility + Provide support for WAL Compression in HBase 0.94.X, where X > 9. + + + 0.94.9-SNAPSHOT + + + + + + com.salesforce.hbase + index-core + ${project.version} + + + + com.salesforce.hbase + index-core + ${project.version} + test-jar + test + + + \ No newline at end of file diff --git a/contrib/hbase-index/hbase-index-0.94.9-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java b/contrib/hbase-index/hbase-index-0.94.9-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java new file mode 100644 index 00000000..c94843fd --- /dev/null +++ b/contrib/hbase-index/hbase-index-0.94.9-compat/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEditCodec.java @@ -0,0 +1,196 @@ +package org.apache.hadoop.hbase.regionserver.wal; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.codec.BaseDecoder; +import org.apache.hadoop.hbase.codec.BaseEncoder; +import org.apache.hadoop.hbase.codec.Decoder; +import org.apache.hadoop.hbase.codec.Encoder; +import org.apache.hadoop.hbase.regionserver.wal.WALEditCodec; + +import com.salesforce.hbase.index.wal.IndexedKeyValue; +import com.salesforce.hbase.index.wal.KeyValueCodec; + + +/** + * Support custom indexing {@link KeyValue}s when written to the WAL. + *

+ * Currently, we don't support reading older WAL files - only new WAL files. Therefore, this should + * not be installed on a running cluster, but rather one that has been cleanly shutdown and requires + * no WAL replay on startup. + */ +public class IndexedWALEditCodec extends WALEditCodec { + + // can't have negative values because reading off a stream returns a negative if its the end of + // the stream + private static final int REGULAR_KEY_VALUE_MARKER = 0; + private CompressionContext compression; + + /** Required nullary constructor */ + public IndexedWALEditCodec() { + } + + /** + * Override the parent implementation so we can get access to the current context too + * @param compression compression to support for the encoder/decoder + */ + @Override + public void setCompression(CompressionContext compression) { + super.setCompression(compression); + this.compression = compression; + } + + @Override + public Decoder getDecoder(InputStream is) { + // compression isn't enabled + if (this.compression == null) { + return new IndexKeyValueDecoder(is); + } + + // there is compression, so we get the standard decoder to handle reading those kvs + Decoder decoder = super.getDecoder(is); + // compression is on, reqturn our custom decoder + return new CompressedIndexKeyValueDecoder(is, decoder); + } + + @Override + public Encoder getEncoder(OutputStream os) { + // compression isn't on, do the default thing + if (this.compression == null) { + return new IndexKeyValueEncoder(os); + } + + // compression is on, return our one that will handle putting in the correct markers + Encoder encoder = super.getEncoder(os); + return new CompressedIndexKeyValueEncoder(os, encoder); + } + + /** + * Custom {@link Decoder} that can handle a stream of regular and indexed {@link KeyValue}s. + */ + public class IndexKeyValueDecoder extends BaseDecoder { + + /** + * Create a {@link Decoder} on the given input stream with the given {@link Decoder} to parse + * generic {@link KeyValue}s. + * @param is stream to read from + */ + public IndexKeyValueDecoder(InputStream is){ + super(is); + } + + @Override + protected KeyValue parseCell() throws IOException{ + return KeyValueCodec.readKeyValue((DataInput) this.in); + } + } + + public class CompressedIndexKeyValueDecoder extends BaseDecoder { + + private Decoder decoder; + + /** + * Create a {@link Decoder} on the given input stream with the given {@link Decoder} to parse + * generic {@link KeyValue}s. + * @param is stream to read from + * @param compressedDecoder decoder for generic {@link KeyValue}s. Should support the expected + * compression. + */ + public CompressedIndexKeyValueDecoder(InputStream is, Decoder compressedDecoder) { + super(is); + this.decoder = compressedDecoder; + } + + @Override + protected KeyValue parseCell() throws IOException { + // reader the marker + int marker = this.in.read(); + if (marker < 0) { + throw new EOFException( + "Unexepcted end of stream found while reading next (Indexed) KeyValue"); + } + + // do the normal thing, if its a regular kv + if (marker == REGULAR_KEY_VALUE_MARKER) { + if (!this.decoder.advance()) { + throw new IOException("Could not read next key-value from generic KeyValue Decoder!"); + } + return this.decoder.current(); + } + + // its an indexedKeyValue, so parse it out specially + return KeyValueCodec.readKeyValue((DataInput) this.in); + } + } + + /** + * Encode {@link IndexedKeyValue}s via the {@link KeyValueCodec}. Does not support + * compression. + */ + private static class IndexKeyValueEncoder extends BaseEncoder { + public IndexKeyValueEncoder(OutputStream os) { + super(os); + } + + @Override + public void flush() throws IOException { + super.flush(); + } + + @Override + public void write(KeyValue cell) throws IOException { + // make sure we are open + checkFlushed(); + + // use the standard encoding mechanism + KeyValueCodec.write((DataOutput) this.out, cell); + } + } + + /** + * Write {@link IndexedKeyValue}s along side compressed {@link KeyValue}s. This Encoder is + * not compatible with the {@link IndexKeyValueDecoder} - one cannot intermingle compressed + * and uncompressed WALs that contain index entries. + */ + private static class CompressedIndexKeyValueEncoder extends BaseEncoder { + private Encoder compressedKvEncoder; + + public CompressedIndexKeyValueEncoder(OutputStream os, Encoder compressedKvEncoder) { + super(os); + this.compressedKvEncoder = compressedKvEncoder; + } + + @Override + public void flush() throws IOException { + this.compressedKvEncoder.flush(); + super.flush(); + } + + @Override + public void write(KeyValue cell) throws IOException { + //make sure we are open + checkFlushed(); + + //write the special marker so we can figure out which kind of kv is it + int marker = IndexedWALEditCodec.REGULAR_KEY_VALUE_MARKER; + if (cell instanceof IndexedKeyValue) { + marker = KeyValueCodec.INDEX_TYPE_LENGTH_MARKER; + } + out.write(marker); + + //then serialize based on the marker + if (marker == IndexedWALEditCodec.REGULAR_KEY_VALUE_MARKER) { + this.compressedKvEncoder.write(cell); + } + else{ + KeyValueCodec.write((DataOutput) out, cell); + } + } + } +} \ No newline at end of file diff --git a/contrib/hbase-index/hbase-index-0.94.9-compat/src/test/java/com/salesforce/hbase/TestEndtoEndIndexingWithCompression.java b/contrib/hbase-index/hbase-index-0.94.9-compat/src/test/java/com/salesforce/hbase/TestEndtoEndIndexingWithCompression.java new file mode 100644 index 00000000..c4a0716c --- /dev/null +++ b/contrib/hbase-index/hbase-index-0.94.9-compat/src/test/java/com/salesforce/hbase/TestEndtoEndIndexingWithCompression.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.salesforce.hbase; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.regionserver.wal.IndexedWALEditCodec; +import org.apache.hadoop.hbase.regionserver.wal.WALEditCodec; +import org.junit.BeforeClass; + +import com.salesforce.hbase.index.TestEndtoEndIndexing; + +/** + * Test secondary indexing from an end-to-end perspective (client to server to index table). + */ +public class TestEndtoEndIndexingWithCompression extends TestEndtoEndIndexing{ + + @BeforeClass + public static void setupCluster() throws Exception { + //add our codec and enable WAL compression + UTIL.getConfiguration().set(WALEditCodec.WAL_EDIT_CODEC_CLASS_KEY, + IndexedWALEditCodec.class.getName()); + UTIL.getConfiguration().setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true); + + //start the mini-cluster + UTIL.startMiniCluster(); + } +} \ No newline at end of file diff --git a/contrib/hbase-index/hbase-index-0.94.9-compat/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestReadWriteKeyValuesWithCodec.java b/contrib/hbase-index/hbase-index-0.94.9-compat/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestReadWriteKeyValuesWithCodec.java new file mode 100644 index 00000000..66f91ebf --- /dev/null +++ b/contrib/hbase-index/hbase-index-0.94.9-compat/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestReadWriteKeyValuesWithCodec.java @@ -0,0 +1,154 @@ +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.salesforce.hbase.index.table.HTableInterfaceReference; +import com.salesforce.hbase.index.wal.IndexedKeyValue; + +/** + * Simple test to read/write simple files via our custom {@link WALEditCodec} to ensure properly + * encoding/decoding without going through a cluster. + */ +public class TestReadWriteKeyValuesWithCodec { + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private static final byte[] ROW = Bytes.toBytes("row"); + private static final byte[] FAMILY = Bytes.toBytes("family"); + + @BeforeClass + public static void setupCodec() { + Configuration conf = UTIL.getConfiguration(); + conf.set(WALEditCodec.WAL_EDIT_CODEC_CLASS_KEY, IndexedWALEditCodec.class.getName()); + } + + @Test + public void testWithoutCompression() throws Exception { + // get the FS ready to read/write the edits + Path testDir = UTIL.getDataTestDir("TestReadWriteCustomEdits_withoutCompression"); + Path testFile = new Path(testDir, "testfile"); + FileSystem fs = UTIL.getTestFileSystem(); + + List edits = getEdits(); + WALEditCodec codec = WALEditCodec.create(UTIL.getConfiguration(), null); + writeReadAndVerify(codec, fs, edits, testFile); + + } + + @Test + public void testWithCompression() throws Exception { + // get the FS ready to read/write the edit + Path testDir = UTIL.getDataTestDir("TestReadWriteCustomEdits_withCompression"); + Path testFile = new Path(testDir, "testfile"); + FileSystem fs = UTIL.getTestFileSystem(); + + List edits = getEdits(); + CompressionContext compression = new CompressionContext(LRUDictionary.class); + WALEditCodec codec = WALEditCodec.create(UTIL.getConfiguration(), compression); + writeReadAndVerify(codec, fs, edits, testFile); + } + + /** + * @return a bunch of {@link WALEdit}s that test a range of serialization possibilities. + */ + private List getEdits() { + // Build up a couple of edits + List edits = new ArrayList(); + Put p = new Put(ROW); + p.add(FAMILY, null, Bytes.toBytes("v1")); + + WALEdit withPut = new WALEdit(); + addMutation(withPut, p, FAMILY); + edits.add(withPut); + + Delete d = new Delete(ROW); + d.deleteColumn(FAMILY, null); + WALEdit withDelete = new WALEdit(); + addMutation(withDelete, d, FAMILY); + edits.add(withDelete); + + WALEdit withPutsAndDeletes = new WALEdit(); + addMutation(withPutsAndDeletes, d, FAMILY); + addMutation(withPutsAndDeletes, p, FAMILY); + edits.add(withPutsAndDeletes); + + WALEdit justIndexUpdates = new WALEdit(); + HTableInterfaceReference target = new HTableInterfaceReference("targetTable"); + IndexedKeyValue ikv = new IndexedKeyValue(target, p); + justIndexUpdates.add(ikv); + edits.add(justIndexUpdates); + + WALEdit mixed = new WALEdit(); + addMutation(mixed, d, FAMILY); + mixed.add(ikv); + addMutation(mixed, p, FAMILY); + edits.add(mixed); + + return edits; + } + + /** + * Add all the {@link KeyValue}s in the {@link Mutation}, for the pass family, to the given + * {@link WALEdit}. + */ + private void addMutation(WALEdit edit, Mutation m, byte[] family) { + List kvs = m.getFamilyMap().get(FAMILY); + for (KeyValue kv : kvs) { + edit.add(kv); + } + } + + /** + * Write the edits to the specified path on the {@link FileSystem} using the given codec and then + * read them back in and ensure that we read the same thing we wrote. + */ + private void writeReadAndVerify(WALEditCodec codec, FileSystem fs, List edits, + Path testFile) throws IOException { + // write the edits out + FSDataOutputStream out = fs.create(testFile); + for (WALEdit edit : edits) { + edit.setCodec(codec); + edit.write(out); + } + out.close(); + + // read in the edits + FSDataInputStream in = fs.open(testFile); + List read = new ArrayList(); + for (int i = 0; i < edits.size(); i++) { + WALEdit edit = new WALEdit(); + edit.setCodec(codec); + edit.readFields(in); + read.add(edit); + } + in.close(); + + // make sure the read edits match the written + for(int i=0; i< edits.size(); i++){ + WALEdit expected = edits.get(i); + WALEdit found = read.get(i); + for(int j=0; j< expected.getKeyValues().size(); j++){ + KeyValue fkv = found.getKeyValues().get(j); + KeyValue ekv = expected.getKeyValues().get(j); + assertEquals("KV mismatch for edit! Expected: "+expected+", but found: "+found, ekv, fkv); + } + } + } +} \ No newline at end of file diff --git a/contrib/hbase-index/hbase-index-0.94.9-compat/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayWithCompressedIndexWrites.java b/contrib/hbase-index/hbase-index-0.94.9-compat/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayWithCompressedIndexWrites.java new file mode 100644 index 00000000..06460651 --- /dev/null +++ b/contrib/hbase-index/hbase-index-0.94.9-compat/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayWithCompressedIndexWrites.java @@ -0,0 +1,25 @@ +package org.apache.hadoop.hbase.regionserver.wal; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.junit.BeforeClass; + +/** + * Do the WAL Replay test but with the WALEditCodec, rather than an {@link IndexedHLogReader}, but + * still without compression + */ +public class TestWALReplayWithCompressedIndexWrites extends TestWALReplayWithIndexWrites { + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + configureCluster(); + // use our custom Codec to handle the custom WALEdits + Configuration conf = UTIL.getConfiguration(); + conf.set(WALEditCodec.WAL_EDIT_CODEC_CLASS_KEY, IndexedWALEditCodec.class.getName()); + + // enable WAL compression + conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true); + + startCluster(); + } +} \ No newline at end of file diff --git a/contrib/hbase-index/hbase-index-0.94.9-compat/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayWithoutCompressedIndexWrites.java b/contrib/hbase-index/hbase-index-0.94.9-compat/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayWithoutCompressedIndexWrites.java new file mode 100644 index 00000000..20c32370 --- /dev/null +++ b/contrib/hbase-index/hbase-index-0.94.9-compat/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayWithoutCompressedIndexWrites.java @@ -0,0 +1,26 @@ +package org.apache.hadoop.hbase.regionserver.wal; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.junit.BeforeClass; + +/** + * Do the WAL Replay test but with the our custom {@link WALEditCodec} - {@link IndexedWALEditCodec} + * - and enabling compression - the main use case for having a custom {@link WALEditCodec}. + */ +public class TestWALReplayWithoutCompressedIndexWrites extends TestWALReplayWithIndexWrites { + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + configureCluster(); + + // use our custom Codec to handle the custom WALEdits + Configuration conf = UTIL.getConfiguration(); + conf.set(WALEditCodec.WAL_EDIT_CODEC_CLASS_KEY, IndexedWALEditCodec.class.getName()); + + // disable WAL compression + conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, false); + + startCluster(); + } +} \ No newline at end of file diff --git a/contrib/hbase-index/index-core/pom.xml b/contrib/hbase-index/index-core/pom.xml new file mode 100644 index 00000000..abc45823 --- /dev/null +++ b/contrib/hbase-index/index-core/pom.xml @@ -0,0 +1,18 @@ + + 4.0.0 + + com.salesforce.hbase + hbase-index + 0.0.1-SNAPSHOT + + index-core + Indexing - Core + Core of the secondary indexing management services + + + + com.google.guava + guava + + + \ No newline at end of file diff --git a/contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/CannotReachIndexException.java b/contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/CannotReachIndexException.java new file mode 100644 index 00000000..dd53b095 --- /dev/null +++ b/contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/CannotReachIndexException.java @@ -0,0 +1,14 @@ +package com.salesforce.hbase.index; + +import org.apache.hadoop.hbase.client.Mutation; + +/** + * Exception thrown if we cannot successfully write to an index table. + */ +@SuppressWarnings("serial") +public class CannotReachIndexException extends Exception { + + public CannotReachIndexException(String targetTableName, Mutation m, Exception cause) { + super("Cannot reach index table " + targetTableName + " to update index for edit: " + m, cause); + } +} diff --git a/contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/IndexLogRollSynchronizer.java b/contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/IndexLogRollSynchronizer.java new file mode 100644 index 00000000..12b2639e --- /dev/null +++ b/contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/IndexLogRollSynchronizer.java @@ -0,0 +1,99 @@ +package com.salesforce.hbase.index; + +import java.io.IOException; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; + +/** + * Ensure that the log isn't rolled while we are the in middle of doing a pending index write. + *

+ * The problem we are trying to solve is the following sequence: + *

    + *
  1. Write to the indexed table
  2. + *
  3. Write the index-containing WALEdit
  4. + *
  5. Start writing to the index tables in the postXXX hook
  6. + *
  7. WAL gets rolled and archived
  8. + *
  9. An index update fails, in which case we should kill ourselves to get WAL replay
  10. + *
  11. Since the WAL got archived, we won't get the replay of the index writes
  12. + *
+ *

+ * The usual course of events should be: + *

    + *
  1. In a preXXX hook, + *
      + *
    1. Build the {@link WALEdit} + index information
    2. + *
    3. Lock the {@link IndexLogRollSynchronizer#INDEX_UPDATE_LOCK}
    4. + *
        + *
      • This is a reentrant readlock on the WAL archiving, so we can make multiple WAL/index updates + * concurrently
      • + *
      + * + *
    + *
  2. + *
  3. Pass that {@link WALEdit} to the WAL, ensuring its durable and replayable
  4. + *
  5. In the corresponding postXXX, + *
      + *
    1. make the updates to the index tables
    2. + *
    3. Unlock {@link IndexLogRollSynchronizer#INDEX_UPDATE_LOCK}
    4. + *
    + *
+ *

+ * this should be added as a {@link WALActionsListener} by updating + */ +public class IndexLogRollSynchronizer implements WALActionsListener { + + private WriteLock logArchiveLock; + + public IndexLogRollSynchronizer(WriteLock logWriteLock){ + this.logArchiveLock = logWriteLock; + } + + + @Override + public void preLogArchive(Path oldPath, Path newPath) throws IOException { + //take a write lock on the index - any pending index updates will complete before we finish + logArchiveLock.lock(); + } + + @Override + public void postLogArchive(Path oldPath, Path newPath) throws IOException { + // done archiving the logs, any WAL updates will be replayed on failure + logArchiveLock.unlock(); + } + + @Override + public void logCloseRequested() { + // don't care- before this is called, all the HRegions are closed, so we can't get any new requests and all pending request can finish before the WAL closes. + } + + @Override + public void preLogRoll(Path oldPath, Path newPath) throws IOException { + // noop + } + + @Override + public void postLogRoll(Path oldPath, Path newPath) throws IOException { + // noop + } + + @Override + public void logRollRequested() { + // noop + } + + @Override + public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey, WALEdit logEdit) { + // noop + } + + @Override + public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey, WALEdit logEdit) { + // noop + } +} \ No newline at end of file diff --git a/contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/IndexUtil.java b/contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/IndexUtil.java new file mode 100644 index 00000000..155b9681 --- /dev/null +++ b/contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/IndexUtil.java @@ -0,0 +1,69 @@ +package com.salesforce.hbase.index; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; + +import com.salesforce.hbase.index.builder.IndexBuilder; + +public final class IndexUtil { + + static final String INDEX_BUILDER_CONF_KEY = "index.builder"; + + private IndexUtil(){ + //private ctor for util classes + } + + + /** + * Enable indexing on the given table + * @param desc {@link HTableDescriptor} for the table on which indexing should be enabled + * @param builder class to use when building the index for this table + * @param properties map of custom configuration options to make available to your + * {@link IndexBuilder} on the server-side + * @throws IOException the Indexer coprocessor cannot be added + */ + public static void enableIndexing(HTableDescriptor desc, Class builder, + Map properties) throws IOException { + properties.put(INDEX_BUILDER_CONF_KEY, builder.getName()); + desc.addCoprocessor(Indexer.class.getName(), null, Coprocessor.PRIORITY_USER, properties); + } + + + /** + * Validate that the version and configuration parameters are supported + * @param hBaseVersion current version of HBase on which this coprocessor is installed + * @param conf configuration to check for allowed parameters (e.g. WAL Compression only if >= + * 0.94.9) + */ + static String validateVersion(String hBaseVersion, Configuration conf) { + String[] versions = hBaseVersion.split("[.]"); + if (versions.length < 3) { + return "HBase version could not be read, expected three parts, but found: " + + Arrays.toString(versions); + } + + if (versions[1].equals("94")) { + String pointVersion = versions[2]; + //remove -SNAPSHOT if applicable + int snapshot = pointVersion.indexOf("-SNAPSHOT"); + if(snapshot > 0){ + pointVersion = pointVersion.substring(0, snapshot); + } + // less than 0.94.9, so we need to check if WAL Compression is enabled + if (Integer.parseInt(pointVersion) < 9) { + if (conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false)) { + return + "Indexing not supported with WAL Compression for versions of HBase older than 0.94.9 - found version:" + + Arrays.toString(versions); + } + } + } + return null; + } +} diff --git a/contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/IndexWriter.java b/contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/IndexWriter.java new file mode 100644 index 00000000..e55c6d3d --- /dev/null +++ b/contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/IndexWriter.java @@ -0,0 +1,150 @@ +package com.salesforce.hbase.index; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; + +import com.salesforce.hbase.index.table.HTableFactory; +import com.salesforce.hbase.index.table.HTableInterfaceReference; + +/** + * Do the actual work of writing to the index tables. Ensures that if we do fail to write to the + * index table that we cleanly kill the region/server to ensure that the region's WAL gets replayed. + */ +public class IndexWriter { + + private static final Log LOG = LogFactory.getLog(IndexWriter.class); + + private final String sourceInfo; + private final Abortable abortable; + + public IndexWriter(String sourceInfo, Abortable abortable) { + this.sourceInfo = sourceInfo; + this.abortable = abortable; + } + + /** + * Just write the index update portions of of the edit, if it is an {@link IndexedWALEdit}. If it + * is not passed an {@link IndexedWALEdit}, any further actions are ignored. + *

+ * Internally, uses {@link #write(HRegionInfo, WALEdit)} to make the write and if is receives a + * {@link CannotReachIndexException}, it attempts to move ( + * {@link HBaseAdmin#unassign(byte[], boolean)}) the region and then failing that calls + * {@link System#exit(int)} to kill the server. + * @param factory Factory to use when resolving the {@link HTableInterfaceReference}. If + * null, its assumed that the {@link HTableInterfaceReference} already has its + * factory set (e.g. by {@link HTableInterfaceReference#setFactory(HTableFactory)} - if + * its not already set, a {@link NullPointerException} is thrown. + * @param source source region from which we are writing + * @param edit log edit to attempt to use to write to the idnex table + * @return true if we successfully wrote to the index table. Also, returns false + * if we are not passed an {@link IndexedWALEdit}. + */ + public void writeAndKillYourselfOnFailure(Map indexUpdates, + HTableFactory factory) { + try { + write(indexUpdates, factory); + } catch (Exception e) { + killYourself(e); + } + } + + /** + * Write the mutations to their respective table using the {@link HTableFactory} accompanying each + * reference. + * @param updates Updates to write + * @param factory Factory to use when resolving the {@link HTableInterfaceReference}. If + * null, its assumed that the {@link HTableInterfaceReference} already has its + * factory set (e.g. by {@link HTableInterfaceReference#setFactory(HTableFactory)} - if + * its not already set, a {@link NullPointerException} is thrown. + * @throws CannotReachIndexException if we cannot successfully write a single index entry. We stop + * immediately on the first failed index write, rather than attempting all writes. + */ + public void write(Map updates) + throws CannotReachIndexException { + this.write(updates, null); + } + + /** + * Write the mutations to their respective table using the provided factory. + *

+ * This method is not thread-safe and if accessed in a non-serial manner could leak HTables. + * @param updates Updates to write + * @param factory Factory to use when resolving the {@link HTableInterfaceReference}. If + * null, its assumed that the {@link HTableInterfaceReference} already has its + * factory set (e.g. by {@link HTableInterfaceReference#setFactory(HTableFactory)} - if + * its not already set, a {@link NullPointerException} is thrown. + * @throws CannotReachIndexException if we cannot successfully write a single index entry. We stop + * immediately on the first failed index write, rather than attempting all writes. + */ + private void write(Map updates, HTableFactory factory) + throws CannotReachIndexException { + List singleMutation = new ArrayList(1); + Set tables = new HashSet(); + for (Entry entry : updates.entrySet()) { + // do the put into the index table + singleMutation.add(entry.getKey()); + LOG.info("Writing index update:" + entry.getKey() + " to table: " + + entry.getValue().getTableName()); + try { + HTableInterface table; + + if (factory == null) { + table = entry.getValue().getTable(); + } else { + table = entry.getValue().getTable(factory); + } + // do the update + table.batch(singleMutation); + tables.add(table); + } catch (IOException e) { + throw new CannotReachIndexException(entry.getValue().getTableName(), entry.getKey(), e); + } catch (InterruptedException e) { + throw new CannotReachIndexException(entry.getValue().getTableName(), entry.getKey(), e); + } + singleMutation.clear(); + } + // go through each reference and close the connection + // we can't do this earlier as we may reuse table references between different index entries, + // which would prematurely close a table before we could write the later update + for (HTableInterface table : tables) { + try { + table.close(); + } catch (IOException e) { + LOG.error("Failed to close connection to table:" + Bytes.toString(table.getTableName()), e); + } + } + + LOG.info("Done writing all index updates"); + } + + /** + * @param logEdit edit for which we need to kill ourselves + * @param info region from which we are attempting to write the log + */ + private void killYourself(Throwable cause) { + String msg = "Could not update the index table, killing server region from: " + this.sourceInfo; + LOG.error(msg); + try { + this.abortable.abort(msg, cause); + } catch (Exception e) { + LOG.fatal("Couldn't abort this server to preserve index writes, attempting to hard kill the server from" + + this.sourceInfo); + System.exit(1); + } + } +} diff --git a/contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/Indexer.java b/contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/Indexer.java new file mode 100644 index 00000000..2443aacc --- /dev/null +++ b/contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/Indexer.java @@ -0,0 +1,230 @@ +package com.salesforce.hbase.index; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.regionserver.wal.HLogKey; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; + +import com.salesforce.hbase.index.builder.IndexBuilder; +import com.salesforce.hbase.index.table.CoprocessorHTableFactory; +import com.salesforce.hbase.index.table.HTableFactory; +import com.salesforce.hbase.index.table.HTableInterfaceReference; +import com.salesforce.hbase.index.wal.IndexedKeyValue; + +/** + * Do all the work of managing index updates from a single coprocessor. All Puts/Delets are passed + * to an {@link IndexBuilder} to determine the actual updates to make. + *

+ * If the WAL is enabled, these updates are then added to the WALEdit and attempted to be written to + * the WAL after the WALEdit has been saved. If any of the index updates fail, this server is + * immediately terminated and we rely on WAL replay to attempt the index updates again (see + * {@link #preWALRestore(ObserverContext, HRegionInfo, HLogKey, WALEdit)}). + *

+ * If the WAL is disabled, the updates are attempted immediately. No consistency guarantees are made + * if the WAL is disabled - some or none of the index updates may be successful. + */ +public class Indexer extends BaseRegionObserver { + + private static final Log LOG = LogFactory.getLog(Indexer.class); + + /** WAL on this server */ + private HLog log; + private IndexWriter writer; + private HTableFactory factory; + + private IndexBuilder builder; + + // Setup out locking on the index edits/WAL so we can be sure that we don't lose a roll a WAL edit + // before an edit is applied to the index tables + private static final ReentrantReadWriteLock INDEX_READ_WRITE_LOCK = new ReentrantReadWriteLock( + true); + public static final ReadLock INDEX_UPDATE_LOCK = INDEX_READ_WRITE_LOCK.readLock(); + + @Override + public void start(CoprocessorEnvironment e) throws IOException { + this.factory = new CoprocessorHTableFactory(e); + + final RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e; + + // make sure the right version <-> combinations are allowed. + String errormsg = IndexUtil.validateVersion(env.getHBaseVersion(), env.getConfiguration()); + if (errormsg != null) { + IOException ioe = new IOException(errormsg); + env.getRegionServerServices().abort(errormsg, ioe); + throw ioe; + } + + // setup the index entry builder so we can build edits for the index tables + Configuration conf = e.getConfiguration(); + Class builderClass = conf.getClass(IndexUtil.INDEX_BUILDER_CONF_KEY, + null, IndexBuilder.class); + try { + this.builder = builderClass.newInstance(); + } catch (InstantiationException e1) { + throw new IOException("Couldn't instantiate index builder:" + builderClass + + ", disabling indexing on table " + env.getRegion().getTableDesc().getNameAsString()); + } catch (IllegalAccessException e1) { + throw new IOException("Couldn't instantiate index builder:" + builderClass + + ", disabling indexing on table " + env.getRegion().getTableDesc().getNameAsString()); + } + this.builder.setup(conf); + + // get a reference to the WAL + log = env.getRegionServerServices().getWAL(); + // add a synchronizer so we don't archive a WAL that we need + log.registerWALActionsListener(new IndexLogRollSynchronizer(INDEX_READ_WRITE_LOCK.writeLock())); + + // and setup the actual index writer + this.writer = new IndexWriter("Region: " + env.getRegion().getRegionNameAsString(), + env.getRegionServerServices()); + } + + @Override + public void prePut(final ObserverContext c, final Put put, + final WALEdit edit, final boolean writeToWAL) throws IOException { + // get the mapping for index column -> target index table + Map indexUpdates = this.builder.getIndexUpdate(put); + + doPre(indexUpdates, edit, writeToWAL); + } + + @Override + public void preDelete(ObserverContext e, Delete delete, + WALEdit edit, boolean writeToWAL) throws IOException { + // get the mapping for index column -> target index table + Map indexUpdates = this.builder.getIndexUpdate(delete); + + doPre(indexUpdates, edit, writeToWAL); + } + + private void doPre(Map indexUpdates, + final WALEdit edit, final boolean writeToWAL) throws IOException { + // no index updates, so we are done + if (indexUpdates == null || indexUpdates.size() == 0) { + return; + } + + // move the string table name to a full reference. Right now, this is pretty inefficient as each + // time through the index request we create a new connection to the HTable, only attempting to + // be a little smart by just reusing the references if two updates go to the same table in the + // same update. + Map updates = new HashMap( + indexUpdates.size()); + Map tables = new HashMap(updates.size()); + for (Entry entry : indexUpdates.entrySet()) { + String tableName = entry.getValue(); + HTableInterfaceReference table = tables.get(tableName); + if( table== null){ + // make sure we use the CP factory to reach the remote table - this is all kept in memory, + // so we can be sure its uses our factory when getting the table + table = new HTableInterfaceReference(entry.getValue(), factory); + tables.put(tableName, table); + } + updates.put(entry.getKey(), table); + } + + // if writing to wal is disabled, we never see the WALEdit updates down the way, so do the index + // update right away + if (!writeToWAL) { + try { + this.writer.write(updates); + return; + } catch (CannotReachIndexException e) { + LOG.error("Failed to update index with entries:" + indexUpdates, e); + throw new IOException(e); + } + } + + // we have all the WAL durability, so we just update the WAL entry and move on + for (Entry entry : updates.entrySet()) { + edit.add(new IndexedKeyValue(entry.getValue(), entry.getKey())); + } + + // lock the log, so we are sure that index write gets atomically committed + INDEX_UPDATE_LOCK.lock(); + } + + @Override + public void postPut(ObserverContext e, Put put, WALEdit edit, + boolean writeToWAL) throws IOException { + doPost(edit, writeToWAL); + } + + @Override + public void postDelete(ObserverContext e, Delete delete, + WALEdit edit, boolean writeToWAL) throws IOException { + doPost(edit, writeToWAL); + } + + /** + * @param edit + * @param writeToWAL + */ + private void doPost(WALEdit edit, boolean writeToWAL) { + if (!writeToWAL) { + // already did the index update in prePut, so we are done + return; + } + + // the WAL edit is kept in memory and we already specified the factory when we created the + // references originally - therefore, we just pass in a null factory here and use the ones + // already specified on each reference + writeToIndexFromWALEntry(edit, null); + + // release the lock on the index, we wrote everything properly + INDEX_UPDATE_LOCK.unlock(); + } + + /** + * Write the index update parts of the {@link WALEdit} to the index tables. + * @param edit edit to examine for index updates. No attempt to write will be made if no index + * updates are contained in the edit's {@link KeyValue}s. + * @param factory factory to use when accessing the {@link HTableInterface}s from the + * {@link HTableInterfaceReference}s. If null, its expected that the the + * {@link HTableInterfaceReference} already has a reference to a valid factory. + */ + private void writeToIndexFromWALEntry(WALEdit edit, HTableFactory factory) { + // get the edits out that we need to write + Map indexUpdates = new HashMap(); + for (KeyValue kv : edit.getKeyValues()) { + if (kv instanceof IndexedKeyValue) { + IndexedKeyValue ikv = (IndexedKeyValue) kv; + indexUpdates.put(ikv.getMutation(), ikv.getIndexTable()); + } + } + + // no changes to the index, so we are done + if (indexUpdates.size() == 0) { + return; + } + + writer.writeAndKillYourselfOnFailure(indexUpdates, factory); + } + + @Override + public void preWALRestore(ObserverContext env, HRegionInfo info, + HLogKey logKey, WALEdit logEdit) throws IOException { + writeToIndexFromWALEntry(logEdit, factory); + } +} \ No newline at end of file diff --git a/contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/builder/BaseIndexBuilder.java b/contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/builder/BaseIndexBuilder.java new file mode 100644 index 00000000..c38c0dc9 --- /dev/null +++ b/contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/builder/BaseIndexBuilder.java @@ -0,0 +1,38 @@ +package com.salesforce.hbase.index.builder; + +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; + +/** + * Basic implementation of the {@link IndexBuilder} that doesn't do any actual work of indexing. + *

+ * You should extend this class, rather than implementing IndexBuilder directly to maintain + * compatability going forward. + */ +public class BaseIndexBuilder implements IndexBuilder { + + @Override + public void extendBaseIndexBuilderInstead() { } + + @Override + public void setup(Configuration conf) { + // noop + } + + @Override + public Map getIndexUpdate(Put put) { + return null; + } + + @Override + public Map getIndexUpdate(Delete delete) { + return null; + } + + + +} diff --git a/contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/builder/ColumnFamilyIndexer.java b/contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/builder/ColumnFamilyIndexer.java new file mode 100644 index 00000000..fbb5c4c6 --- /dev/null +++ b/contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/builder/ColumnFamilyIndexer.java @@ -0,0 +1,172 @@ +package com.salesforce.hbase.index.builder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.Bytes; + +import com.salesforce.hbase.index.IndexUtil; + +/** + * Simple indexer that just indexes rows based on their column families + */ +public class ColumnFamilyIndexer extends BaseIndexBuilder { + + private static final String INDEX_TO_TABLE_CONF_PREFX = "hbase.index.family."; + private static final String INDEX_TO_TABLE_COUNT_KEY = INDEX_TO_TABLE_CONF_PREFX + "families"; + private static final String SEPARATOR = ","; + + static final byte[] INDEX_ROW_COLUMN_FAMILY = Bytes.toBytes("ROW"); + static final byte[] INDEX_REMAINING_COLUMN_FAMILY = Bytes.toBytes("REMAINING"); + + public static void enableIndexing(HTableDescriptor desc, Map familyMap) + throws IOException { + // not indexing any families, so we shouldn't add the indexer + if (familyMap == null || familyMap.size() == 0) { + return; + } + Map opts = new HashMap(); + List families = new ArrayList(familyMap.size()); + + for (Entry family : familyMap.entrySet()) { + String fam = Bytes.toString(family.getKey()); + opts.put(INDEX_TO_TABLE_CONF_PREFX + fam, family.getValue()); + families.add(fam); + } + + // add the list of families so we can deserialize each + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < families.size(); i++) { + sb.append(families.get(i)); + if (i < families.size() - 1) { + sb.append(SEPARATOR); + } + } + opts.put(INDEX_TO_TABLE_COUNT_KEY, sb.toString()); + IndexUtil.enableIndexing(desc, ColumnFamilyIndexer.class, opts); + } + + private Map columnTargetMap; + + public void setup(Configuration conf) { + String[] families = conf.get(INDEX_TO_TABLE_COUNT_KEY).split(SEPARATOR); + + // build up our mapping of column - > index table + columnTargetMap = new HashMap(families.length); + for (int i = 0; i < families.length; i++) { + byte[] fam = Bytes.toBytes(families[i]); + String indexTable = conf.get(INDEX_TO_TABLE_CONF_PREFX + families[i]); + columnTargetMap.put(new ImmutableBytesWritable(fam), indexTable); + } + } + + @Override + public Map getIndexUpdate(Put p) { + // if not columns to index, we are done and don't do anything special + if (columnTargetMap == null || columnTargetMap.size() == 0) { + return null; + } + + Map updateMap = new HashMap(); + Set keys = p.getFamilyMap().keySet(); + for (Entry> entry : p.getFamilyMap().entrySet()) { + String ref = columnTargetMap + .get(new ImmutableBytesWritable(entry.getKey())); + // no reference for that column, skip it + if (ref == null) { + continue; + } + + // get the keys for this family + List kvs = entry.getValue(); + if (kvs == null || kvs.isEmpty()) { + // should never be the case, but just to be careful + continue; + } + + // swap the row key and the column family + Put put = new Put(kvs.get(0).getFamily()); + // got through each of the family's key-values and add it to the put + for (KeyValue kv : entry.getValue()) { + put.add(ColumnFamilyIndexer.INDEX_ROW_COLUMN_FAMILY, + ArrayUtils.addAll(kv.getRow(), kv.getQualifier()), kv.getValue()); + } + + // go through the rest of the families and add them to the put, under the special columnfamily + for (byte[] key : keys) { + if (!Bytes.equals(key, entry.getKey())) { + List otherFamilyKeys = p.getFamilyMap().get(key); + if (otherFamilyKeys == null || otherFamilyKeys.isEmpty()) { + continue; + } + for (KeyValue kv : otherFamilyKeys) { + put.add(ColumnFamilyIndexer.INDEX_REMAINING_COLUMN_FAMILY, + ArrayUtils.addAll(kv.getFamily(), kv.getQualifier()), kv.getValue()); + } + } + } + + // add the mapping + updateMap.put(put, ref); + } + return updateMap; + } + + @Override + public Map getIndexUpdate(Delete d) { + // if no columns to index, we are done and don't do anything special + if (columnTargetMap == null || columnTargetMap.size() == 0) { + return null; + } + + Map updateMap = new HashMap(); + for (Entry> entry : d.getFamilyMap().entrySet()) { + String ref = columnTargetMap + .get(new ImmutableBytesWritable(entry.getKey())); + // no reference for that column, skip it + if (ref == null) { + continue; + } + List kvs = entry.getValue(); + if (kvs == null || kvs.isEmpty()) { + continue; + } + + // swap the row key and the column family - we only need the row key since we index on the + // column family from the original update + Delete delete = new Delete(kvs.get(0).getFamily()); + // add the mapping + updateMap.put(delete, ref); + } + return updateMap; + } + + /** + * Create the specified index table with the necessary columns + * @param admin {@link HBaseAdmin} to use when creating the table + * @param indexTable name of the index table. Should be specified in + * {@link setupColumnFamilyIndex} as an index target + */ + public static void createIndexTable(HBaseAdmin admin, String indexTable) throws IOException { + HTableDescriptor index = new HTableDescriptor(indexTable); + index.addFamily(new HColumnDescriptor(INDEX_REMAINING_COLUMN_FAMILY)); + index.addFamily(new HColumnDescriptor(INDEX_ROW_COLUMN_FAMILY)); + + admin.createTable(index); + } +} \ No newline at end of file diff --git a/contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/builder/IndexBuilder.java b/contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/builder/IndexBuilder.java new file mode 100644 index 00000000..48324a7b --- /dev/null +++ b/contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/builder/IndexBuilder.java @@ -0,0 +1,51 @@ +package com.salesforce.hbase.index.builder; + +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; + +import com.salesforce.hbase.index.IndexUtil; +import com.salesforce.hbase.index.Indexer; + +/** + * Interface to build updates ({@link Mutation}s) to the index tables, based on the primary table + * updates. + *

+ * Either all the index updates will be applied to all tables or the primary table will kill itself + * and will attempt to replay the index edits through the WAL replay mechanism. + */ +public interface IndexBuilder { + + /** + * This is always called exactly once on install of {@link Indexer}, before any calls + * {@link #getIndexUpdate} on + * @param conf {@link Configuration} containing any properties specified in + * {@link IndexUtil#enableIndexing(HTableDescriptor, Class, Map)} + */ + public void setup(Configuration conf); + + /** + * Your opportunity to update any/all index tables based on the delete of the primary table row. + * Its up to your implementation to ensure that timestamps match between the primary and index + * tables. + * @param put {@link Put} to the primary table that may be indexed + * @return a Map of the mutations to make -> target index table name + */ + public Map getIndexUpdate(Put put); + + /** + * The counter-part to {@link #getIndexUpdate(Put)} - your opportunity to update any/all index + * tables based on the delete of the primary table row. Its up to your implementation to ensure + * that timestamps match between the primary and index tables. + * @param delete {@link Delete} to the primary table that may be indexed + * @return a {@link Map} of the mutations to make -> target index table name + */ + public Map getIndexUpdate(Delete delete); + + /** Helper method signature to ensure people don't attempt to extend this class directly */ + public void extendBaseIndexBuilderInstead(); +} diff --git a/contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/table/CoprocessorHTableFactory.java b/contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/table/CoprocessorHTableFactory.java new file mode 100644 index 00000000..8ee049dc --- /dev/null +++ b/contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/table/CoprocessorHTableFactory.java @@ -0,0 +1,31 @@ +package com.salesforce.hbase.index.table; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.HTableInterface; + +public class CoprocessorHTableFactory implements HTableFactory { + + private CoprocessorEnvironment e; + + public CoprocessorHTableFactory(CoprocessorEnvironment e) { + this.e = e; + } + + @Override + public HTableInterface getTable(byte[] tablename) throws IOException { + Configuration conf = e.getConfiguration(); + // make sure writers fail fast + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3); + conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1000); + conf.setInt("zookeeper.recovery.retry", 3); + conf.setInt("zookeeper.recovery.retry.intervalmill", 100); + conf.setInt(HConstants.ZK_SESSION_TIMEOUT, 30000); + conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 5000); + + return this.e.getTable(tablename); + } +} \ No newline at end of file diff --git a/contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/table/HTableFactory.java b/contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/table/HTableFactory.java new file mode 100644 index 00000000..e00e5d30 --- /dev/null +++ b/contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/table/HTableFactory.java @@ -0,0 +1,10 @@ +package com.salesforce.hbase.index.table; + +import java.io.IOException; + +import org.apache.hadoop.hbase.client.HTableInterface; + +public interface HTableFactory { + + public HTableInterface getTable(byte [] tablename) throws IOException; +} diff --git a/contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/table/HTableInterfaceReference.java b/contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/table/HTableInterfaceReference.java new file mode 100644 index 00000000..42637df4 --- /dev/null +++ b/contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/table/HTableInterfaceReference.java @@ -0,0 +1,85 @@ +package com.salesforce.hbase.index.table; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Writable; + +/** + * Reference to an HTableInterface that only gets the underlying {@link HTableInterface} from the + * {@link CoprocessorEnvironment} on calls to {@link #getTable()}. Until calling {@link #getTable()} + * , this class just contains the name of the table and an optional {@link HTableFactory}. After + * calling {@link #getTable()}, this holds a reference to that {@link HTableInterface}, + * even if that table is closed. + *

+ * Whenever calling {@link #getTable()}, an internal reference counter is incremented. Similarly, + * the reference count is decremented by calling {@link #close()}. The underlying table, if + * previously resolved, will be closed on calls to {@link #close()} only if the underlying reference + * count is zero. + *

+ * This class is not thread-safe when resolving the reference to the {@link HTableInterface} - + * multi-threaded usage must employ external locking to ensure that multiple {@link HTableInterface} + * s are not resolved. + */ +public class HTableInterfaceReference implements Writable { + + private String tableName; + private HTableInterface table; + private HTableFactory factory; + + /** + * For use with {@link #readFields(DataInput)}. A {@link HTableFactory} must be passed either to + * {@link #setFactory(HTableFactory)} before resolving an HTableInterface or + * {@link #getTable(HTableFactory)} when resolving an {@link HTableInterface} + */ + public HTableInterfaceReference() { + } + + public HTableInterfaceReference(String tablename) { + this.tableName = tablename; + } + + public HTableInterfaceReference(String tablename, HTableFactory factory) { + this.tableName = tablename; + this.factory = factory; + } + + public void setFactory(HTableFactory e) { + this.factory = e; + } + + public HTableInterface getTable(HTableFactory e) throws IOException { + if (this.table == null) { + this.table = e.getTable(Bytes.toBytes(tableName)); + } + return this.table; + } + + /** + * @return get the referenced table, if one has been stored + * @throws IOException if we are creating a new table (first instance of request) and it cannot be + * reached + */ + public HTableInterface getTable() throws IOException { + return this.getTable(this.factory); + } + + public String getTableName() { + return this.tableName; + } + + @Override + public void readFields(DataInput in) throws IOException { + this.tableName = in.readUTF(); + + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(this.tableName); + } +} \ No newline at end of file diff --git a/contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/wal/IndexedKeyValue.java b/contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/wal/IndexedKeyValue.java new file mode 100644 index 00000000..ac01d12f --- /dev/null +++ b/contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/wal/IndexedKeyValue.java @@ -0,0 +1,130 @@ +package com.salesforce.hbase.index.wal; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.commons.io.output.ByteArrayOutputStream; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.hbase.util.Bytes; + +import com.salesforce.hbase.index.table.HTableInterfaceReference; + +public class IndexedKeyValue extends KeyValue { + + HTableInterfaceReference indexTable; + Mutation mutation; + + public IndexedKeyValue() { + } + + public IndexedKeyValue(HTableInterfaceReference target, Mutation mutation) { + this.indexTable = target; + this.mutation = mutation; + } + + public HTableInterfaceReference getIndexTable() { + return indexTable; + } + + public Mutation getMutation() { + return mutation; + } + + /** + * This is a KeyValue that shouldn't actually be replayed, so we always mark it as an + * {@link HLog#METAFAMILY} so it isn't replayed via the normal replay mechanism + */ + @Override + public boolean matchingFamily(final byte[] family) { + return Bytes.equals(family, HLog.METAFAMILY); + } + + @Override + public String toString() { + return "IndexWrite - table: " + indexTable.getTableName() + ", mutation:" + mutation; + } + + /** + * This is a very heavy-weight operation and should only be done when absolutely necessary - it + * does a full serialization of the underyling mutation to compare the underlying data. + */ + @Override + public boolean equals(Object o) { + if (o instanceof IndexedKeyValue) { + IndexedKeyValue other = (IndexedKeyValue) o; + if (other.indexTable.getTableName().equals(this.indexTable.getTableName())) { + try { + byte[] current = getBytes(this.mutation); + byte[] otherMutation = getBytes(other.mutation); + return Bytes.equals(current, otherMutation); + } catch (IOException e) { + throw new IllegalArgumentException("Failed to correctly serialize a mutation!", e); + } + } + } + return false; + } + + private byte[] getBytes(Mutation m) throws IOException{ + ByteArrayOutputStream bos = null; + try{ + bos = new ByteArrayOutputStream(); + this.mutation.write(new DataOutputStream(bos)); + bos.flush(); + return bos.toByteArray(); + }finally{ + if(bos != null){ + bos.close(); + } + } + } + + @Override + public int hashCode() { + return this.indexTable.getTableName().hashCode() + this.mutation.hashCode(); + } + + @Override + public void write(DataOutput out) throws IOException{ + KeyValueCodec.write(out, this); + } + + /** + * Internal write the underlying data for the entry - this does not do any special prefixing. + * Writing should be done via {@link KeyValueCodec#write(DataOutput, KeyValue)} to ensure + * consistent reading/writing of {@link IndexedKeyValue}s. + * @param out to write data to. Does not close or flush the passed object. + * @throws IOException if there is a problem writing the underlying data + */ + void writeData(DataOutput out) throws IOException { + out.writeUTF(indexTable.getTableName()); + out.writeUTF(this.mutation.getClass().getName()); + this.mutation.write(out); + } + + /** + * This method shouldn't be used - you should use {@link KeyValueCodec#readKeyValue(DataInput)} + * instead. Its the complement to {@link #writeData(DataOutput)}. + */ + @SuppressWarnings("javadoc") + @Override + public void readFields(DataInput in) throws IOException { + this.indexTable = new HTableInterfaceReference(in.readUTF()); + Class clazz; + try { + clazz = Class.forName(in.readUTF()).asSubclass(Mutation.class); + this.mutation = clazz.newInstance(); + this.mutation.readFields(in); + } catch (ClassNotFoundException e) { + throw new IOException(e); + } catch (InstantiationException e) { + throw new IOException(e); + } catch (IllegalAccessException e) { + throw new IOException(e); + } + } +} \ No newline at end of file diff --git a/contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/wal/KeyValueCodec.java b/contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/wal/KeyValueCodec.java new file mode 100644 index 00000000..d6d75bf4 --- /dev/null +++ b/contrib/hbase-index/index-core/src/main/java/com/salesforce/hbase/index/wal/KeyValueCodec.java @@ -0,0 +1,79 @@ +package com.salesforce.hbase.index.wal; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; + +/** + * Codec to encode/decode {@link KeyValue}s and {@link IndexedKeyValue}s within a {@link WALEdit} + */ +public class KeyValueCodec { + + /** + * KeyValue length marker specifying that its actually an {@link IndexedKeyValue} rather than a + * regular {@link KeyValue}. + */ + public static final int INDEX_TYPE_LENGTH_MARKER = -1; + + /** + * Read a {@link List} of {@link KeyValue} from the input stream - may contain regular + * {@link KeyValue}s or {@link IndexedKeyValue}s. + * @param in to read from + * @return the next {@link KeyValue}s + * @throws IOException if the next {@link KeyValue} cannot be read + */ + public static List readKeyValues(DataInput in) throws IOException { + int size = in.readInt(); + if (size == 0) { + return Collections.emptyList(); + } + List kvs = new ArrayList(size); + for (int i = 0; i < size; i++) { + kvs.add(readKeyValue(in)); + } + return kvs; + } + + /** + * Read a single {@link KeyValue} from the input stream - may either be a regular {@link KeyValue} + * or an {@link IndexedKeyValue}. + * @param in to read from + * @return the next {@link KeyValue}, if one is available + * @throws IOException if the next {@link KeyValue} cannot be read + */ + public static KeyValue readKeyValue(DataInput in) throws IOException { + int length = in.readInt(); + KeyValue kv; + // its a special IndexedKeyValue + if (length == INDEX_TYPE_LENGTH_MARKER) { + kv = new IndexedKeyValue(); + kv.readFields(in); + } else { + kv = new KeyValue(); + kv.readFields(length, in); + } + return kv; + } + + /** + * Write a {@link KeyValue} or an {@link IndexedKeyValue} to the output stream. These can be read + * back via {@link #readKeyValue(DataInput)} or {@link #readKeyValues(DataInput)}. + * @param out to write to + * @param kv {@link KeyValue} to which to write + * @throws IOException if there is an error writing + */ + public static void write(DataOutput out, KeyValue kv) throws IOException { + if (kv instanceof IndexedKeyValue) { + out.writeInt(INDEX_TYPE_LENGTH_MARKER); + ((IndexedKeyValue) kv).writeData(out); + } else { + kv.write(out); + } + } +} \ No newline at end of file diff --git a/contrib/hbase-index/index-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedHLogReader.java b/contrib/hbase-index/index-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedHLogReader.java new file mode 100644 index 00000000..18528739 --- /dev/null +++ b/contrib/hbase-index/index-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedHLogReader.java @@ -0,0 +1,138 @@ +package org.apache.hadoop.hbase.regionserver.wal; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry; +import org.apache.hadoop.hbase.regionserver.wal.HLog.Reader; +import org.apache.hadoop.io.Writable; + + + +/** + * A WALReader that can also deserialize custom {@link WALEdit}s that contain index information. + *

+ * This is basically a wrapper around a {@link SequenceFileLogReader} that has a custom + * {@link #next(Entry)} method that only replaces the creation of the WALEdit with our own custom + * type + *

+ * This is a little bit of a painful way of going about this, but saves the effort of hacking the + * HBase source (and deal with getting it reviewed and backported, etc.) and still works. + */ +public class IndexedHLogReader implements Reader { + + private SequenceFileLogReader delegate; + + + private static class IndexedWALReader extends SequenceFileLogReader.WALReader { + + /** + * @param fs + * @param p + * @param c + * @throws IOException + */ + IndexedWALReader(FileSystem fs, Path p, Configuration c) throws IOException { + super(fs, p, c); + } + + /** + * we basically have to reproduce what the SequenceFile.Reader is doing in next(), but without + * the check out the value class, since we have a special value class that doesn't directly + * match what was specified in the file header + */ + @Override + public synchronized boolean next(Writable key, Writable val) throws IOException { + boolean more = next(key); + + if (more) { + getCurrentValue(val); + } + + return more; + } + + } + + public IndexedHLogReader() { + this.delegate = new SequenceFileLogReader(); + } + + @Override + public void init(final FileSystem fs, final Path path, Configuration conf) throws IOException { + this.delegate.init(fs, path, conf); + // close the old reader and replace with our own, custom one + this.delegate.reader.close(); + this.delegate.reader = new IndexedWALReader(fs, path, conf); + } + + @Override + public void close() throws IOException { + this.delegate.close(); + } + + @Override + public Entry next() throws IOException { + return next(null); + } + + @Override + public Entry next(Entry reuse) throws IOException { + delegate.entryStart = delegate.reader.getPosition(); + HLog.Entry e = reuse; + if (e == null) { + HLogKey key; + if (delegate.keyClass == null) { + key = HLog.newKey(delegate.conf); + } else { + try { + key = delegate.keyClass.newInstance(); + } catch (InstantiationException ie) { + throw new IOException(ie); + } catch (IllegalAccessException iae) { + throw new IOException(iae); + } + } + WALEdit val = new WALEdit(); + e = new HLog.Entry(key, val); + } + + // now read in the HLog.Entry from the WAL + boolean nextPairValid = false; + try { + if (delegate.compressionContext != null) { + throw new UnsupportedOperationException( + "Reading compression isn't supported with the IndexedHLogReader! Compresed WALEdits " + + "are only support for HBase 0.94.9+ and with the IndexedWALEditCodec!"); + } + // this is the special bit - we use our custom entry to read in the key-values that have index + // information, but otherwise it looks just like a regular WALEdit + IndexedWALEdit edit = new IndexedWALEdit(e.getEdit()); + nextPairValid = delegate.reader.next(e.getKey(), edit); + } catch (IOException ioe) { + throw delegate.addFileInfoToException(ioe); + } + delegate.edit++; + if (delegate.compressionContext != null && delegate.emptyCompressionContext) { + delegate.emptyCompressionContext = false; + } + return nextPairValid ? e : null; + } + + @Override + public void seek(long pos) throws IOException { + this.delegate.seek(pos); + } + + @Override + public long getPosition() throws IOException { + return this.delegate.getPosition(); + } + + @Override + public void reset() throws IOException { + this.delegate.reset(); + } +} \ No newline at end of file diff --git a/contrib/hbase-index/index-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEdit.java b/contrib/hbase-index/index-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEdit.java new file mode 100644 index 00000000..573ea186 --- /dev/null +++ b/contrib/hbase-index/index-core/src/main/java/org/apache/hadoop/hbase/regionserver/wal/IndexedWALEdit.java @@ -0,0 +1,88 @@ +package org.apache.hadoop.hbase.regionserver.wal; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; +import java.util.NavigableMap; +import java.util.TreeMap; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.util.Bytes; + +import com.salesforce.hbase.index.wal.KeyValueCodec; + +/** + * Read in data for a delegate {@link WALEdit}. This should only be used in concert with an IndexedHLogReader + *

+ * This class should only be used with HBase < 0.94.9. Newer installations of HBase should + * instead use the IndexedWALEditCodec along with the correct configuration options. + */ +public class IndexedWALEdit extends WALEdit { + //reproduced here so we don't need to modify the HBase source. + private static final int VERSION_2 = -1; + private WALEdit delegate; + + /** + * Copy-constructor. Only does a surface copy of the delegates fields - no actual data is copied, only referenced. + * @param delegate to copy + */ + public IndexedWALEdit(WALEdit delegate) { + this.delegate = delegate; + // reset the delegate's fields + this.delegate.getKeyValues().clear(); + if (this.delegate.getScopes() != null) { + this.delegate.getScopes().clear(); + } + } + + public IndexedWALEdit() { + + } + + public void setCompressionContext(CompressionContext context) { + throw new UnsupportedOperationException( + "Compression not supported for IndexedWALEdit! If you are using HBase 0.94.9+, use IndexedWALEditCodec instead."); + } + + @Override + public void readFields(DataInput in) throws IOException { + delegate.getKeyValues().clear(); + if (delegate.getScopes() != null) { + delegate.getScopes().clear(); + } + // ---------------------------------------------------------------------------------------- + // no compression, so we do pretty much what the usual WALEdit does, plus a little magic to + // capture the index updates + // ----------------------------------------------------------------------------------------- + int versionOrLength = in.readInt(); + if (versionOrLength != VERSION_2) { + throw new IOException("You must update your cluster to the lastest version of HBase and" + + " clean out all logs (cleanly start and then shutdown) before enabling indexing!"); + } + // this is new style HLog entry containing multiple KeyValues. + List kvs = KeyValueCodec.readKeyValues(in); + delegate.getKeyValues().addAll(kvs); + + // then read in the rest of the WALEdit + int numFamilies = in.readInt(); + NavigableMap scopes = delegate.getScopes(); + if (numFamilies > 0) { + if (scopes == null) { + scopes = new TreeMap(Bytes.BYTES_COMPARATOR); + } + for (int i = 0; i < numFamilies; i++) { + byte[] fam = Bytes.readByteArray(in); + int scope = in.readInt(); + scopes.put(fam, scope); + } + delegate.setScopes(scopes); + } + } + + @Override + public void write(DataOutput out) throws IOException { + throw new IOException( + "Indexed WALEdits aren't written directly out - use IndexedKeyValues instead"); + } +} \ No newline at end of file diff --git a/contrib/hbase-index/index-core/src/test/java/com/salesforce/hbase/index/TestEndtoEndIndexing.java b/contrib/hbase-index/index-core/src/test/java/com/salesforce/hbase/index/TestEndtoEndIndexing.java new file mode 100644 index 00000000..ad999db7 --- /dev/null +++ b/contrib/hbase-index/index-core/src/test/java/com/salesforce/hbase/index/TestEndtoEndIndexing.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.salesforce.hbase.index; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.salesforce.hbase.index.builder.ColumnFamilyIndexer; + +/** + * Test secondary indexing from an end-to-end perspective (client to server to index table) + */ +public class TestEndtoEndIndexing { + + protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private static final byte[] FAM = Bytes.toBytes("FAMILY"); + private static final byte[] FAM2 = Bytes.toBytes("FAMILY2"); + private static final String INDEXED_TABLE = "INDEXED_TABLE"; + private static final String INDEX_TABLE = "INDEX_TABLE"; + + @BeforeClass + public static void setupCluster() throws Exception { + UTIL.startMiniCluster(); + } + + @AfterClass + public static void teardownCluster() throws Exception { + UTIL.shutdownMiniCluster(); + } + + /** + * Ensure that even if we don't write to the WAL in the Put we at least attempt to index + * the values in the Put + * @throws Exception on failure + */ + @Test + public void testPutWithoutWALGetsIndexed() throws Exception { + byte[] k = new byte[] { 'a', 'a', 'a' }; + Put put = new Put(k); + put.add(FAM, null, k); + put.add(FAM2, null, k); + put.setWriteToWAL(false); + doPrimaryTablePutWithExpectedIndex(put, 2); + } + + /** + * Test that a simple put into the primary table gets a corresponding put in the index table, in + * non-failure situations. + * @throws Exception on failure + */ + @Test + public void testSimplePrimaryAndIndexTables() throws Exception { + byte[] k = new byte[] { 'a', 'a', 'a' }; + Put put = new Put(k); + put.add(FAM, null, k); + put.add(FAM2, null, k); + doPrimaryTablePutWithExpectedIndex(put, 2); + } + + /** + * Test that we delete change also propagates from the primary table to the index table + * @throws Exception on failure + */ + @Test + public void testPutAndDeleteIsIndexed() throws Exception { + byte[] k = new byte[] { 'a', 'a', 'a' }; + // start with a put, so we know we have some data + Put put = new Put(k); + put.add(FAM, null, k); + put.add(FAM2, null, k); + + // then do a delete of that same row, ending up with no edits in the index table + Delete d = new Delete(k); + // we need to do a full specification here so we in the indexer what to delete on the index + // table + d.deleteColumn(FAM, null); + d.deleteColumn(FAM2, null); + doPrimaryTableUpdatesWithExpectedIndex(Arrays.asList(put, d), 0); + } + + private void doPrimaryTablePutWithExpectedIndex(Put m, int indexSize) throws Exception { + doPrimaryTableUpdatesWithExpectedIndex(Collections.singletonList((Mutation) m), indexSize); + } + + /** + * Create a new primary and index table, write the put to the primary table and then scan the + * index table to ensure that the {@link Put} made it. + * @param put put to write to the primary table + * @param indexSize expected size of the index after the operation + * @throws Exception on failure + */ + private void doPrimaryTableUpdatesWithExpectedIndex(List mutations, int indexSize) + throws Exception { + HTableDescriptor primary = new HTableDescriptor(INDEXED_TABLE); + primary.addFamily(new HColumnDescriptor(FAM)); + primary.addFamily(new HColumnDescriptor(FAM2)); + // setup indexing on one table and one of its columns + Map indexMapping = new HashMap(); + indexMapping.put(FAM, INDEX_TABLE); + ColumnFamilyIndexer.enableIndexing(primary, indexMapping); + + // setup the stats table + HBaseAdmin admin = UTIL.getHBaseAdmin(); + // create the primary table + admin.createTable(primary); + + // create the index table + ColumnFamilyIndexer.createIndexTable(admin, INDEX_TABLE); + + assertTrue("Target index table (" + INDEX_TABLE + ") didn't get created!", + admin.tableExists(INDEX_TABLE)); + + // load some data into our primary table + HTable primaryTable = new HTable(UTIL.getConfiguration(), INDEXED_TABLE); + primaryTable.setAutoFlush(false); + primaryTable.batch(mutations); + primaryTable.flushCommits(); + primaryTable.close(); + + // and now scan the index table + HTable index = new HTable(UTIL.getConfiguration(), INDEX_TABLE); + int count = getKeyValueCount(index); + + // we should have 1 index values - one for each key in the FAM column family + // but none in the FAM2 column family + assertEquals("Got an unexpected amount of index entries!", indexSize, count); + + // then delete the table and make sure we don't have any more stats in our table + admin.disableTable(primary.getName()); + admin.deleteTable(primary.getName()); + admin.disableTable(INDEX_TABLE); + admin.deleteTable(INDEX_TABLE); + } + + /** + * Count the number of keyvalue in the table. Scans all possible versions + * @param table table to scan + * @return number of keyvalues over all rows in the table + * @throws IOException + */ + private int getKeyValueCount(HTable table) throws IOException { + Scan scan = new Scan(); + scan.setMaxVersions(Integer.MAX_VALUE - 1); + + ResultScanner results = table.getScanner(scan); + int count = 0; + for (Result res : results) { + count += res.list().size(); + System.out.println(count + ") " + res); + } + results.close(); + + return count; + } +} diff --git a/contrib/hbase-index/index-core/src/test/java/com/salesforce/hbase/index/TestFailForUnsupportedHBaseVersions.java b/contrib/hbase-index/index-core/src/test/java/com/salesforce/hbase/index/TestFailForUnsupportedHBaseVersions.java new file mode 100644 index 00000000..2c7c6cd6 --- /dev/null +++ b/contrib/hbase-index/index-core/src/test/java/com/salesforce/hbase/index/TestFailForUnsupportedHBaseVersions.java @@ -0,0 +1,135 @@ +package com.salesforce.hbase.index; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.VersionInfo; +import org.junit.Test; + +import com.salesforce.hbase.index.builder.ColumnFamilyIndexer; + +/** + * Test that we correctly fail for versions of HBase that don't support current properties + */ +public class TestFailForUnsupportedHBaseVersions { + private static final Log LOG = LogFactory.getLog(TestFailForUnsupportedHBaseVersions.class); + + /** + * We don't support WAL Compression for HBase < 0.94.9, so we shouldn't even allow the server + * to start if both indexing and WAL Compression are enabled for the wrong versions. + */ + @Test + public void testDoesNotSupportCompressedWAL() { + Configuration conf = HBaseConfiguration.create(); + // get the current version + String version = VersionInfo.getVersion(); + + // ensure WAL Compression not enabled + conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, false); + + //we support all versions without WAL Compression + String supported = IndexUtil.validateVersion(version, conf); + assertNull( + "WAL Compression wasn't enabled, but version "+version+" of HBase wasn't supported! All versions should" + + " support writing without a compressed WAL. Message: "+supported, supported); + + // enable WAL Compression + conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true); + + // set the version to something we know isn't supported + version = "0.94.4"; + supported = IndexUtil.validateVersion(version, conf); + assertNotNull("WAL Compression was enabled, but incorrectly marked version as supported", + supported); + + //make sure the first version of 0.94 that supports Indexing + WAL Compression works + version = "0.94.9"; + supported = IndexUtil.validateVersion(version, conf); + assertNull( + "WAL Compression wasn't enabled, but version "+version+" of HBase wasn't supported! Message: "+supported, supported); + + //make sure we support snapshot builds too + version = "0.94.9-SNAPSHOT"; + supported = IndexUtil.validateVersion(version, conf); + assertNull( + "WAL Compression wasn't enabled, but version "+version+" of HBase wasn't supported! Message: "+supported, supported); + } + + /** + * Test that we correctly abort a RegionServer when we run tests with an unsupported HBase + * version. The 'completeness' of this test requires that we run the test with both a version of + * HBase that wouldn't be supported with WAL Compression. Currently, this is the default version + * (0.94.4) so just running 'mvn test' will run the full test. However, this test will not fail + * when running against a version of HBase with WALCompression enabled. Therefore, to fully test + * this functionality, we need to run the test against both a supported and an unsupported version + * of HBase (as long as we want to support an version of HBase that doesn't support custom WAL + * Codecs). + * @throws Exception on failure + */ + @Test(timeout = 300000 /* 5 mins */) + public void testDoesNotStartRegionServerForUnsupportedCompressionAndVersion() throws Exception { + Configuration conf = HBaseConfiguration.create(); + // enable WAL Compression + conf.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true); + + // check the version to see if it isn't supported + String version = VersionInfo.getVersion(); + boolean supported = false; + if (IndexUtil.validateVersion(version, conf) == null) { + supported = true; + } + + // start the minicluster + HBaseTestingUtility util = new HBaseTestingUtility(conf); + util.startMiniCluster(); + + // setup the primary table + HTableDescriptor desc = new HTableDescriptor( + "testDoesNotStartRegionServerForUnsupportedCompressionAndVersion"); + String family = "f"; + desc.addFamily(new HColumnDescriptor(Bytes.toBytes(family))); + + // enable indexing to a non-existant index table + Map familyMap = new HashMap(); + familyMap.put(Bytes.toBytes(family), "INDEX_TABLE"); + ColumnFamilyIndexer.enableIndexing(desc, familyMap); + + // get a reference to the regionserver, so we can ensure it aborts + HRegionServer server = util.getMiniHBaseCluster().getRegionServer(0); + + // create the primary table + HBaseAdmin admin = util.getHBaseAdmin(); + if (supported) { + admin.createTable(desc); + assertFalse("Hosting regeion server failed, even the HBase version (" + version + + ") supports WAL Compression.", server.isAborted()); + } else { + admin.createTableAsync(desc, null); + + // wait for the regionserver to abort - if this doesn't occur in the timeout, assume its + // broken. + while (!server.isAborted()) { + LOG.debug("Waiting on regionserver to abort.."); + } + } + + // cleanup + util.shutdownMiniCluster(); + } +} \ No newline at end of file diff --git a/contrib/hbase-index/index-core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayWithIndexWrites.java b/contrib/hbase-index/index-core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayWithIndexWrites.java new file mode 100644 index 00000000..2f40ac6b --- /dev/null +++ b/contrib/hbase-index/index-core/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayWithIndexWrites.java @@ -0,0 +1,257 @@ +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Coprocessor; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.RegionServerAccounting; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; + +import com.salesforce.hbase.index.Indexer; +import com.salesforce.hbase.index.builder.ColumnFamilyIndexer; +import com.salesforce.hbase.index.table.HTableFactory; + +/** + * most of the underlying work (creating/splitting the WAL, etc) is from + * org.apache.hadoop.hhbase.regionserver.wal.TestWALReplay, copied here for completeness and ease of + * use + */ +public class TestWALReplayWithIndexWrites { + + public static final Log LOG = LogFactory.getLog(TestWALReplay.class); + static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + private static final String INDEX_TABLE_NAME = "IndexTable"; + private Path hbaseRootDir = null; + private Path oldLogDir; + private Path logDir; + private FileSystem fs; + private Configuration conf; + + protected static void configureCluster() throws Exception { + Configuration conf = UTIL.getConfiguration(); + // make sure writers fail quickly + conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3); + conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 1000); + conf.setInt("zookeeper.recovery.retry", 3); + conf.setInt("zookeeper.recovery.retry.intervalmill", 100); + conf.setInt(HConstants.ZK_SESSION_TIMEOUT, 30000); + conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 5000); + + // enable appends + conf.setBoolean("dfs.support.append", true); + } + + protected static void startCluster() throws Exception { + UTIL.startMiniDFSCluster(3); + UTIL.startMiniZKCluster(); + UTIL.startMiniHBaseCluster(1, 1); + + Path hbaseRootDir = UTIL.getDFSCluster().getFileSystem().makeQualified(new Path("/hbase")); + LOG.info("hbase.rootdir=" + hbaseRootDir); + UTIL.getConfiguration().set(HConstants.HBASE_DIR, hbaseRootDir.toString()); + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + configureCluster(); + // use our custom WAL Reader + UTIL.getConfiguration().set("hbase.regionserver.hlog.reader.impl", IndexedHLogReader.class.getName()); + startCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + UTIL.shutdownMiniHBaseCluster(); + UTIL.shutdownMiniDFSCluster(); + UTIL.shutdownMiniZKCluster(); + } + + @Before + public void setUp() throws Exception { + this.conf = HBaseConfiguration.create(UTIL.getConfiguration()); + this.fs = UTIL.getDFSCluster().getFileSystem(); + this.hbaseRootDir = new Path(this.conf.get(HConstants.HBASE_DIR)); + this.oldLogDir = new Path(this.hbaseRootDir, HConstants.HREGION_OLDLOGDIR_NAME); + this.logDir = new Path(this.hbaseRootDir, HConstants.HREGION_LOGDIR_NAME); + } + + @After + public void tearDown() throws Exception { + } + + private void deleteDir(final Path p) throws IOException { + if (this.fs.exists(p)) { + if (!this.fs.delete(p, true)) { + throw new IOException("Failed remove of " + p); + } + } + } + + /** + * Test writing edits into an HRegion, closing it, splitting logs, opening Region again. Verify + * seqids. + * @throws Exception on failure + */ + @Test + public void testReplayEditsWrittenViaHRegion() throws Exception { + final String tableNameStr = "testReplayEditsWrittenViaHRegion"; + final HRegionInfo hri = new HRegionInfo(Bytes.toBytes(tableNameStr), null, null, false); + final Path basedir = new Path(this.hbaseRootDir, tableNameStr); + deleteDir(basedir); + final HTableDescriptor htd = createBasic3FamilyHTD(tableNameStr); + + //setup basic indexing for the table + Map familyMap = new HashMap(); + byte[] indexedFamily = new byte[] {'a'}; + familyMap.put(indexedFamily, INDEX_TABLE_NAME); + ColumnFamilyIndexer.enableIndexing(htd, familyMap); + + // create the region + its WAL + HRegion region0 = HRegion.createHRegion(hri, hbaseRootDir, this.conf, htd); + region0.close(); + region0.getLog().closeAndDelete(); + HLog wal = createWAL(this.conf); + RegionServerServices mockRS = Mockito.mock(RegionServerServices.class); + // mock out some of the internals of the RSS, so we can run CPs + Mockito.when(mockRS.getWAL()).thenReturn(wal); + RegionServerAccounting rsa = Mockito.mock(RegionServerAccounting.class); + Mockito.when(mockRS.getRegionServerAccounting()).thenReturn(rsa); + HRegion region = new HRegion(basedir, wal, this.fs, this.conf, hri, htd, mockRS); + long seqid = region.initialize(); + // HRegionServer usually does this. It knows the largest seqid across all regions. + wal.setSequenceNumber(seqid); + + //make an attempted write to the primary that should also be indexed + byte[] rowkey = Bytes.toBytes("indexed_row_key"); + Put p = new Put(rowkey); + p.add(indexedFamily, Bytes.toBytes("qual"), Bytes.toBytes("value")); + region.put(p); + + // we should then see the server go down + Mockito.verify(mockRS, Mockito.times(1)).abort(Mockito.anyString(), + Mockito.any(Exception.class)); + region.close(true); + wal.close(); + + // then create the index table so we are successful on WAL replay + ColumnFamilyIndexer.createIndexTable(UTIL.getHBaseAdmin(), INDEX_TABLE_NAME); + + // run the WAL split and setup the region + runWALSplit(this.conf); + HLog wal2 = createWAL(this.conf); + HRegion region1 = new HRegion(basedir, wal2, this.fs, this.conf, hri, htd, mockRS); + + // initialize the region - this should replay the WALEdits from the WAL + region1.initialize(); + + // now check to ensure that we wrote to the index table + HTable index = new HTable(UTIL.getConfiguration(), INDEX_TABLE_NAME); + int indexSize = getKeyValueCount(index); + assertEquals("Index wasn't propertly updated from WAL replay!", 1, indexSize); + Get g = new Get(rowkey); + final Result result = region1.get(g); + assertEquals("Primary region wasn't updated from WAL replay!", 1, result.size()); + + // cleanup the index table + HBaseAdmin admin = UTIL.getHBaseAdmin(); + admin.disableTable(INDEX_TABLE_NAME); + admin.deleteTable(INDEX_TABLE_NAME); + admin.close(); + } + + /** + * Create simple HTD with three families: 'a', 'b', and 'c' + * @param tableName name of the table descriptor + * @return + */ + private HTableDescriptor createBasic3FamilyHTD(final String tableName) { + HTableDescriptor htd = new HTableDescriptor(tableName); + HColumnDescriptor a = new HColumnDescriptor(Bytes.toBytes("a")); + htd.addFamily(a); + HColumnDescriptor b = new HColumnDescriptor(Bytes.toBytes("b")); + htd.addFamily(b); + HColumnDescriptor c = new HColumnDescriptor(Bytes.toBytes("c")); + htd.addFamily(c); + return htd; + } + + /* + * @param c + * @return WAL with retries set down from 5 to 1 only. + * @throws IOException + */ + private HLog createWAL(final Configuration c) throws IOException { + HLog wal = new HLog(FileSystem.get(c), logDir, oldLogDir, c); + // Set down maximum recovery so we dfsclient doesn't linger retrying something + // long gone. + HBaseTestingUtility.setMaxRecoveryErrorCount(wal.getOutputStream(), 1); + return wal; + } + + /* + * Run the split. Verify only single split file made. + * @param c + * @return The single split file made + * @throws IOException + */ + private Path runWALSplit(final Configuration c) throws IOException { + FileSystem fs = FileSystem.get(c); + HLogSplitter logSplitter = HLogSplitter.createLogSplitter(c, this.hbaseRootDir, this.logDir, + this.oldLogDir, fs); + List splits = logSplitter.splitLog(); + // Split should generate only 1 file since there's only 1 region + assertEquals("splits=" + splits, 1, splits.size()); + // Make sure the file exists + assertTrue(fs.exists(splits.get(0))); + LOG.info("Split file=" + splits.get(0)); + return splits.get(0); + } + + private int getKeyValueCount(HTable table) throws IOException { + Scan scan = new Scan(); + scan.setMaxVersions(Integer.MAX_VALUE - 1); + + ResultScanner results = table.getScanner(scan); + int count = 0; + for (Result res : results) { + count += res.list().size(); + System.out.println(count + ") " + res); + } + results.close(); + + return count; + } +} \ No newline at end of file diff --git a/contrib/hbase-index/index-core/src/test/resources/log4j.properties b/contrib/hbase-index/index-core/src/test/resources/log4j.properties new file mode 100644 index 00000000..cb27ddfc --- /dev/null +++ b/contrib/hbase-index/index-core/src/test/resources/log4j.properties @@ -0,0 +1,64 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Define some default values that can be overridden by system properties +hbase.root.logger=INFO,console +hbase.log.dir=. +hbase.log.file=hbase.log + +# Define the root logger to the system property "hbase.root.logger". +log4j.rootLogger=${hbase.root.logger} + +# Logging Threshold +log4j.threshhold=ALL + +# +# Daily Rolling File Appender +# +log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender +log4j.appender.DRFA.File=${hbase.log.dir}/${hbase.log.file} + +# Rollver at midnight +log4j.appender.DRFA.DatePattern=.yyyy-MM-dd + +# 30-day backup +#log4j.appender.DRFA.MaxBackupIndex=30 +log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout + +# Pattern format: Date LogLevel LoggerName LogMessage +#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n + +# Debugging Pattern format +log4j.appender.DRFA.layout.ConversionPattern=%d %-5p [%t] %C{2}(%L): %m%n + + +# +# console +# Add "console" to rootlogger above if you want to use this +# +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d %-5p [%t] %C{2}(%L): %m%n + +# Custom Logging levels + +#log4j.logger.org.apache.hadoop.fs.FSNamesystem=DEBUG + +log4j.logger.org.apache.hadoop=WARN +log4j.logger.org.apache.zookeeper=ERROR +log4j.logger.org.apache.hadoop.hbase=DEBUG +log4j.logger.com.salesforce=DEBUG diff --git a/contrib/hbase-index/pom.xml b/contrib/hbase-index/pom.xml new file mode 100644 index 00000000..2eb15ba0 --- /dev/null +++ b/contrib/hbase-index/pom.xml @@ -0,0 +1,330 @@ + + 4.0.0 + com.salesforce.hbase + hbase-index + 0.0.1-SNAPSHOT + HBase Index + Simple Secondary Indexing for HBase + pom + + + index-core + + + + + + + + + repo + https://git.soma.salesforce.com/hbase/hbase-index/tree/maven-artifacts/raw/master/releases + + + snapshot-repo + https://git.soma.salesforce.com/hbase/hbase-index/tree/maven-artifacts/raw/master/snapshots + + + + + + apache release + https://repository.apache.org/content/repositories/releases/ + + + apache non-releases + Apache non-releases + http://people.apache.org/~stack/m2/repository + + false + + + true + + + + + codehaus + Codehaus Public + http://repository.codehaus.org/ + + false + + + true + + + + + + + 0.94.4 + 1.0.4 + 1.8.8 + 12.0.1 + + 4.10 + 1.8.5 + 900 + true + + 2.14 + + + + + + com.google.guava + guava + ${guava.version} + + + org.apache.hadoop + hadoop-core + ${hadoop.version} + true + + + hsqldb + hsqldb + + + net.sf.kosmosfs + kfs + + + org.eclipse.jdt + core + + + net.java.dev.jets3t + jets3t + + + oro + oro + + + + + org.apache.hbase + hbase + ${hbase.version} + + + + org.codehaus.jackson + jackson-core-asl + ${jackson.version} + + + org.codehaus.jackson + jackson-mapper-asl + ${jackson.version} + + + org.codehaus.jackson + jackson-jaxrs + ${jackson.version} + + + org.codehaus.jackson + jackson-xc + ${jackson.version} + + + + + org.apache.hbase + hbase + ${hbase.version} + test-jar + test + + + org.apache.hadoop + hadoop-test + ${hadoop.version} + true + test + + + junit + junit + ${junit.version} + test + + + org.mockito + mockito-all + ${mockito-all.version} + test + + + + + + + org.apache.hbase + hbase + + + org.apache.hadoop + hadoop-core + + + + org.codehaus.jackson + jackson-core-asl + + + org.codehaus.jackson + jackson-mapper-asl + + + org.codehaus.jackson + jackson-jaxrs + + + org.codehaus.jackson + jackson-xc + + + + org.apache.hbase + hbase + test-jar + test + + + org.apache.hadoop + hadoop-test + test + + + junit + junit + + + org.mockito + mockito-all + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.0 + + 1.6 + 1.6 + + + + + + + + org.apache.maven.plugins + maven-jar-plugin + 2.4 + + + prepare-package + + + test-jar + + + + + + + org/apache/jute/** + org/apache/zookeeper/** + **/*.jsp + log4j.properties + + + + + + org.apache.maven.plugins + maven-source-plugin + 2.2.1 + + + attach-sources + prepare-package + + jar-no-fork + + + + + + + maven-surefire-plugin + ${surefire.version} + + ${test.timeout} + -enableassertions -Xmx1900m + -Djava.security.egd=file:/dev/./urandom + ${test.output.tofile} + + 1 + false + none + 1 + + + + + + + + + hbase-0.94.9 + + + hbase + 0.94.9 + + + + + 0.94.9-SNAPSHOT + + + + + hbase-index-0.94.9-compat + + + +