Skip to content

Commit

Permalink
Renamed PhoenixRuntime method to get uncommitted data as getUncommitt…
Browse files Browse the repository at this point in the history
…edData

and return list of sorted key values instead of mutations.
Added default Eclipse preferences and import order for development.
  • Loading branch information
jtaylor-sfdc committed Feb 14, 2013
1 parent 579a8a1 commit 69b8457
Show file tree
Hide file tree
Showing 7 changed files with 999 additions and 54 deletions.
945 changes: 945 additions & 0 deletions dev/eclipse_prefs_phoenix.epf

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions dev/phoenix.importorder
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#Organize Import Order
4=system
3=com
2=org
1=javax
0=java
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,18 @@ protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocesso
boolean hasMore;
boolean hasAny = false;
MultiKeyValueTuple result = new MultiKeyValueTuple();
if (logger.isInfoEnabled()) {
logger.info("Starting ungrouped coprocessor scan " + scan);
}
long rowCount = 0;
do {
List<KeyValue> results = new ArrayList<KeyValue>();
// Results are potentially returned even when the return value of s.next is false
// since this is an indication of whether or not there are more values after the
// ones returned
hasMore = s.next(results) && !s.isFilterDone();
if (!results.isEmpty()) {
rowCount++;
result.setKeyValues(results);
try {
if (isDelete) {
Expand Down Expand Up @@ -186,6 +191,10 @@ protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocesso
}
} while (hasMore);

if (logger.isInfoEnabled()) {
logger.info("Finished scanning " + rowCount + " rows for ungrouped coprocessor scan " + scan);
}

if (!mutations.isEmpty()) {
@SuppressWarnings("unchecked")
Pair<Mutation,Integer>[] mutationArray = new Pair[mutations.size()];
Expand Down
28 changes: 4 additions & 24 deletions src/main/java/com/salesforce/phoenix/execute/MutationState.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,12 @@
import java.util.Map.Entry;

import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.util.Bytes;


import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.salesforce.phoenix.exception.*;
import com.salesforce.phoenix.exception.PhoenixIOException;
import com.salesforce.phoenix.jdbc.PhoenixConnection;
import com.salesforce.phoenix.schema.*;
import com.salesforce.phoenix.util.ImmutableBytesPtr;
Expand Down Expand Up @@ -143,9 +140,8 @@ private static void addRowMutations(PTable table, Iterator<Entry<ImmutableBytesP
}

/**
* Get the list of HBase mutations for the tables with uncommitted data. For each HBase table,
* the rows are sorted in ascending row key order.
* @return list of row key ordered HBase mutations
* Get the unsorted list of HBase mutations for the tables with uncommitted data.
* @return list of HBase mutations for uncommitted data.
*/
public List<Mutation> toMutations() {
Long scn = connection.getSCN();
Expand All @@ -156,27 +152,11 @@ public List<Mutation> toMutations() {
Map.Entry<TableRef, Map<ImmutableBytesPtr,Map<PColumn,byte[]>>> entry = iterator.next();
PTable table = entry.getKey().getTable();
List<Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>>> rowMutations = new ArrayList<Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>>>(entry.getValue().entrySet());
// TODO: Measure using TreeSet versus copy and sort over HashMap
Collections.sort(rowMutations, new Comparator<Map.Entry<ImmutableBytesPtr,Map<PColumn,byte[]>>>() {
@Override
public int compare(Entry<ImmutableBytesPtr, Map<PColumn, byte[]>> o1,
Entry<ImmutableBytesPtr, Map<PColumn, byte[]>> o2) {
ImmutableBytesPtr ptr1 = o1.getKey();
ImmutableBytesPtr ptr2 = o2.getKey();
return Bytes.compareTo(ptr1.get(), ptr1.getOffset(), ptr1.getLength(), ptr2.get(), ptr2.getOffset(), ptr2.getLength());
}
});
addRowMutations(table, rowMutations.iterator(), timestamp, mutations);
}
// The KVs must be in the expected order for the HFile to be happy
for (Mutation m : mutations) {
for (List<KeyValue> kvs : m.getFamilyMap().values()) {
Collections.sort(kvs, KeyValue.COMPARATOR);
}
}
return mutations;
}

/**
* Validates that the meta data is still valid based on the current server time
* and returns the server time to use for the upsert for each table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.concurrent.*;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
Expand Down Expand Up @@ -376,16 +375,15 @@ private HTableDescriptor generateTableDescriptor(byte[] tableName, boolean readO
}
}
// The phoenix jar must be available on HBase classpath
Path phoenixJarPath = new Path(QueryConstants.DEFAULT_COPROCESS_PATH);
try {
descriptor.addCoprocessor(ScanRegionObserver.class.getName(), phoenixJarPath, 1, null);
descriptor.addCoprocessor(UngroupedAggregateRegionObserver.class.getName(), phoenixJarPath, 1, null);
descriptor.addCoprocessor(GroupedAggregateRegionObserver.class.getName(), phoenixJarPath, 1, null);
descriptor.addCoprocessor(HashJoiningRegionObserver.class.getName(), phoenixJarPath, 1, null);
descriptor.addCoprocessor(ScanRegionObserver.class.getName(), null, 1, null);
descriptor.addCoprocessor(UngroupedAggregateRegionObserver.class.getName(), null, 1, null);
descriptor.addCoprocessor(GroupedAggregateRegionObserver.class.getName(), null, 1, null);
descriptor.addCoprocessor(HashJoiningRegionObserver.class.getName(), null, 1, null);
// Setup split policy on Phoenix metadata table to ensure that the key values of a Phoenix table
// stay on the same region.
if (SchemaUtil.isMetaTable(tableName)) {
descriptor.addCoprocessor(MetaDataEndpointImpl.class.getName(), phoenixJarPath, 1, null);
descriptor.addCoprocessor(MetaDataEndpointImpl.class.getName(), null, 1, null);
}
} catch (IOException e) {
throw new PhoenixIOException(e);
Expand Down
17 changes: 14 additions & 3 deletions src/main/java/com/salesforce/phoenix/util/PhoenixRuntime.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.sql.*;
import java.util.*;

import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Mutation;


Expand Down Expand Up @@ -204,13 +205,23 @@ public static int executeStatements(Connection conn, Reader reader, List<Object>
}

/**
* Get the list of uncommitted HBase mutations for the connection. Currently used to write an
* Get the list of uncommitted KeyValues for the connection. Currently used to write an
* Phoenix-compliant HFile from a map/reduce job.
* @param conn an open JDBC connection
* @return the list of HBase mutations for uncommitted data
* @throws SQLException
*/
public static List<Mutation> getUncommittedMutations(Connection conn) throws SQLException {
return conn.unwrap(PhoenixConnection.class).getMutationState().toMutations();
public static List<KeyValue> getUncommittedData(Connection conn) throws SQLException {
List<Mutation> mutations = conn.unwrap(PhoenixConnection.class).getMutationState().toMutations();
List<KeyValue> keyValues = Lists.newArrayListWithExpectedSize(mutations.size() * 5); // Guess-timate 5 key values per row
for (Mutation mutation : mutations) {
for (List<KeyValue> keyValueList : mutation.getFamilyMap().values()) {
for (KeyValue keyValue : keyValueList) {
keyValues.add(keyValue);
}
}
}
Collections.sort(keyValues, KeyValue.COMPARATOR);
return keyValues;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,13 @@
******************************************************************************/
package com.salesforce.phoenix.query;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;

import java.sql.*;
import java.util.List;
import java.util.Iterator;
import java.util.Properties;

import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Mutation;
import org.junit.*;

import com.salesforce.phoenix.jdbc.PhoenixDriver;
Expand Down Expand Up @@ -75,7 +73,6 @@ public void testConnectionlessUpsert() throws Exception {
" created_date date\n" +
" CONSTRAINT pk PRIMARY KEY (organization_id, key_prefix, entity_history_id)\n" +
")";
System.out.println(dmlStmt);
Properties props = new Properties();
Connection conn = DriverManager.getConnection(getUrl(), props);
PreparedStatement statement = conn.prepareStatement(dmlStmt);
Expand All @@ -98,21 +95,20 @@ public void testConnectionlessUpsert() throws Exception {
statement.setDate(5,now);
statement.execute();

int count = 0;
List<Mutation> mutations = PhoenixRuntime.getUncommittedMutations(conn);
for (Mutation m : mutations) {
for (List<KeyValue> kvs : m.getFamilyMap().values()) {
if (count == 0) {
assertEquals("Eli", PDataType.VARCHAR.toObject(kvs.get(0).getValue()));
assertEquals(now, PDataType.DATE.toObject(kvs.get(1).getValue()));
} else if (count == 1) {
assertEquals("Simon", PDataType.VARCHAR.toObject(kvs.get(0).getValue()));
assertEquals(now, PDataType.DATE.toObject(kvs.get(1).getValue()));
}
count++;
}
}
assertEquals(2,count);
Iterator<KeyValue> iterator = PhoenixRuntime.getUncommittedData(conn).iterator();
assertTrue(iterator.hasNext());
assertEquals("Eli", PDataType.VARCHAR.toObject(iterator.next().getValue()));
assertTrue(iterator.hasNext());
assertEquals(now, PDataType.DATE.toObject(iterator.next().getValue()));
assertTrue(iterator.hasNext());
assertNull(PDataType.VARCHAR.toObject(iterator.next().getValue()));
assertTrue(iterator.hasNext());
assertEquals("Simon", PDataType.VARCHAR.toObject(iterator.next().getValue()));
assertTrue(iterator.hasNext());
assertEquals(now, PDataType.DATE.toObject(iterator.next().getValue()));
assertTrue(iterator.hasNext());
assertNull(PDataType.VARCHAR.toObject(iterator.next().getValue()));
assertFalse(iterator.hasNext());
conn.rollback(); // to clear the list of mutations for the next
}

Expand Down

0 comments on commit 69b8457

Please sign in to comment.