Skip to content

Commit

Permalink
Pig StoreFunc for writing data to HBase.
Browse files Browse the repository at this point in the history
  • Loading branch information
prashantkommireddi committed Apr 13, 2013
1 parent 8f1b3d9 commit 800c568
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 147 deletions.
3 changes: 1 addition & 2 deletions examples/pig/test.pig
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
A = load 'examples/pig/testdata' as (a:chararray, b:chararray, c:int, d:chararray, e: datetime) ;
STORE A into 'hbase://TESTPHX2' using com.salesforce.phoenix.pig.PhoenixHBaseStorage('localhost','-batchSize 10');
--dump A;
STORE A into 'hbase://TESTPHX' using com.salesforce.phoenix.pig.PhoenixHBaseStorage('localhost','-batchSize 1000');
108 changes: 0 additions & 108 deletions examples/pig/testdata
Original file line number Diff line number Diff line change
Expand Up @@ -16,111 +16,3 @@
00D300000000XHP 124 123456 weq 2012-12-12
00D300000000XHP 111 123456 nab 2012-01-21
00D300000000UIH 101 123456 ben 2014-01-01
00D300000000XHP 124 123433389012 weq 2012-12-12
00D300000000XHP 111 123422289012 nab 2012-01-21
00D300000000UIH 101 123411189012 ben 2014-01-01
00D300000000XHP 124 123433389012 weq 2012-12-12
00D300000000XHP 111 123422289012 nab 2012-01-21
00D300000000UIH 101 123411189012 ben 2014-01-01
00D300000000XHP 124 123433389012 weq 2012-12-12
00D300000000XHP 111 123422289012 nab 2012-01-21
00D300000000UIH 101 123411189012 ben 2014-01-01
00D300000000XHP 124 123433389012 weq 2012-12-12
00D300000000XHP 111 123422289012 nab 2012-01-21
00D300000000UIH 101 123411189012 ben 2014-01-01
00D300000000XHP 124 123433389012 weq 2012-12-12
00D300000000XHP 111 123422289012 nab 2012-01-21
00D300000000UIH 101 123411189012 ben 2014-01-01
00D300000000XHP 124 123433389012 weq 2012-12-12
00D300000000XHP 111 123422289012 nab 2012-01-21
00D300000000UIH 101 123411189012 ben 2014-01-01
00D300000000XHP 124 123433389012 weq 2012-12-12
00D300000000XHP 111 123422289012 nab 2012-01-21
00D300000000UIH 101 123411189012 ben 2014-01-01
00D300000000XHP 124 123433389012 weq 2012-12-12
00D300000000XHP 111 123422289012 nab 2012-01-21
00D300000000UIH 101 123411189012 ben 2014-01-01
00D300000000XHP 124 123433389012 weq 2012-12-12
00D300000000XHP 111 123422289012 nab 2012-01-21
00D300000000UIH 101 123411189012 ben 2014-01-01
00D300000000XHP 124 123433389012 weq 2012-12-12
00D300000000XHP 111 123422289012 nab 2012-01-21
00D300000000UIH 101 123411189012 ben 2014-01-01
00D300000000XHP 124 123433389012 weq 2012-12-12
00D300000000XHP 111 123422289012 nab 2012-01-21
00D300000000UIH 101 123411189012 ben 2014-01-01
00D300000000XHf 124 123433389012 weq 2012-12-12
00D300000000XH6 111 123422289012 nab 2012-01-21
00D300000000UI1 101 123411189012 ben 2014-01-01
00D300000000XH4 124 123433389012 weq 2012-12-12
00D300000000XH1 111 123422289012 nab 2012-01-21
00D300000000UIq 101 123411189012 ben 2014-01-01
00D300000000XHP 124 123433389012 weq 2012-12-12
00D300000000XHP 111 123422289012 nab 2012-01-21
00D300000000UIH 101 123411189012 ben 2014-01-01
00D300000000XH1 124 123433389012 weq 2012-12-12
00D300000000XH3 111 123422289012 nab 2012-01-21
00D300000000UIH 101 123411189012 ben 2014-01-01
00D300000000XHP 124 123433389012 weq 2012-12-12
00D300000000XHP 111 123422289012 nab 2012-01-21
00D300000000UIH 101 123411189012 ben 2014-01-01
00D300000000XHP 124 123433389012 weq 2012-12-12
00D300000000XHP 111 123422289012 nab 2012-01-21
00D300000000UIH 101 123411189012 ben 2014-01-01
00D300000000XHP 124 123433389012 weq 2012-12-12
00D300000000XHP 111 123422289012 nab 2012-01-21
00D300000000UIH 101 123411189012 ben 2014-01-01
00D300000000XHP 124 123433389012 weq 2012-12-12
00D300000000XHP 111 123422289012 nab 2012-01-21
00D300000000UIH 101 123411189012 ben 2014-01-01
00D300000000XHP 124 123433389012 weq 2012-12-12
00D300000000XHP 111 123422289012 nab 2012-01-21
00D300000000UIH 101 123411189012 ben 2014-01-01
00D300000000XHP 124 123433389012 weq 2012-12-12
00D300000000XHP 111 123422289012 nab 2012-01-21
00D300000000UIH 101 123411189012 ben 2014-01-01
00D300000000XHP 124 123433389012 weq 2012-12-12
00D300000000XHP 111 123422289012 nab 2012-01-21
00D300000000UIH 101 123411189012 ben 2014-01-01
00D300000000XHP 124 123433389012 weq 2012-12-12
00D300000000XHP 111 123422289012 nab 2012-01-21
00D300000000UIH 101 123411189012 ben 2014-01-01
00D300000000XHP 124 123433389012 weq 2012-12-12
00D300000000XHP 111 123422289012 nab 2012-01-21
00D300000000UIH 101 123411189012 ben 2014-01-01
00D300000000XHP 124 123433389012 weq 2012-12-12
00D300000000XHP 111 123422289012 nab 2012-01-21
00D300000000UIH 101 123411189012 ben 2014-01-01
00D300000000XHP 124 123433389012 weq 2012-12-12
00D300000000XHP 111 123422289012 nab 2012-01-21
00D300000000UIH 101 123411189012 ben 2014-01-01
00D300000000XHP 124 123433389012 weq 2012-12-12
00D300000000XHP 111 123422289012 nab 2012-01-21
00D300000000UIH 101 123411189012 ben 2014-01-01
00D300000000XHP 124 123433389012 weq 2012-12-12
00D30000000002P 111 123422289012 nab 2012-01-21
00D30000000093H 101 123411189012 ben 2014-01-01
00D30000000082P 124 123433389012 weq 2012-12-12
00D30000000073P 111 123422289012 nab 2012-01-21
00D30000000064H 101 123411189012 ben 2014-01-01
00D30000000055P 124 123433389012 weq 2012-12-12
00D30000000046P 111 123422289012 nab 2012-01-21
00D30000000037H 101 123411189012 ben 2014-01-01
00D30000000028P 124 123433389012 weq 2012-12-12
00D30000000019P 111 123422289012 nab 2012-01-21
00D300000000U0H 101 123411189012 ben 2014-01-01
00D300000000X2P 124 123433389012 weq 2012-12-12
00D300000000X3P 111 123422289012 nab 2012-01-21
00D300000000U4H 101 123411189012 ben 2014-01-01
00D300000000X3f 124 123433389012 weq 2012-12-12
00D300000000X36 111 123422289012 nab 2012-01-21
00D300000000U31 101 123411189012 ben 2014-01-01
00D300000000X34 124 123433389012 weq 2012-12-12
00D300000000X21 111 123422289012 nab 2012-01-21
00D300000000U2q 101 123411189012 uvw 2014-01-01
00D300000000X5P 124 123433389012 pqr 2012-12-12
00D308900000X6P 111 123422289012 mno 2012-01-21
00D307800000U7H 101 123411189012 jkl 2014-01-01
00D305600000X81 124 123433389012 ghi 2012-12-12
00D303400000X93 111 123422289012 def 2012-01-21
00D301200000U0H 101 123411189012 abc 2014-01-01
57 changes: 45 additions & 12 deletions src/main/java/com/salesforce/phoenix/pig/PhoenixHBaseStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,12 @@
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.pig.ResourceSchema;
import org.apache.pig.ResourceSchema.ResourceFieldSchema;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.ObjectSerializer;
import org.apache.pig.impl.util.UDFContext;

import com.salesforce.phoenix.jdbc.PhoenixConnection;
import com.salesforce.phoenix.jdbc.PhoenixDriver;
Expand All @@ -65,11 +69,15 @@
*
* Example usage: A = load 'testdata' as (a:chararray, b:chararray, c:chararray,
* d:chararray, e: datetime); STORE A into 'hbase://CORE.ENTITY_HISTORY' using
* com.salesforce.bdaas.PhoenixHBaseStorage('localhost','-batchSize 5');
* com.salesforce.bdaas.PhoenixHBaseStorage('localhost','-batchSize 5000');
*
* The above reads a file 'testdata' and writes the elements to HBase. First
* argument to this StoreFunc is the server, the 2nd column is the number of
* columns being upserted via Phoenix.
* argument to this StoreFunc is the server, the 2nd argument is the batch size
* for upserts via Phoenix.
*
* Note that Pig types must be in sync with the target Phoenix data types. This
* StoreFunc tries best to cast based on input Pig types and target Phoenix data
* types, but it is recommended to supply appropriate schema.
*
* This is only a STORE implementation. LoadFunc coming soon.
*
Expand All @@ -95,6 +103,10 @@ public class PhoenixHBaseStorage implements StoreFuncInterface {
private final static Options validOptions = new Options();
private final CommandLine configuredOptions;
private final static CommandLineParser parser = new GnuParser();

private String contextSignature = null;
private ResourceSchema schema;
private static final String SCHEMA = "_schema";

public PhoenixHBaseStorage(String server) throws ParseException {
this(server, null);
Expand Down Expand Up @@ -122,6 +134,15 @@ private static void populateValidOptions() {
validOptions.addOption("batchSize", true, "Specify upsert batch size");
}

/**
* Returns UDFProperties based on <code>contextSignature</code>.
*/
private Properties getUDFProperties() {
return UDFContext.getUDFContext()
.getUDFProperties(this.getClass(), new String[] {contextSignature});
}


/**
* Parse the HBase table name and configure job
*/
Expand All @@ -133,6 +154,12 @@ public void setStoreLocation(String location, Job job) throws IOException {

Configuration conf = job.getConfiguration();
PhoenixPigConfiguration.configure(conf);

String serializedSchema = getUDFProperties().getProperty(contextSignature + SCHEMA);
if (serializedSchema!= null) {
schema = (ResourceSchema) ObjectSerializer.deserialize(serializedSchema);
}

}

/**
Expand All @@ -155,7 +182,7 @@ public void prepareToWrite(RecordWriter writer) throws IOException {

// Generating UPSERT statement without column name information.
String upsertStmt = QueryUtil.constructUpsertStatement(null, tableName, columnMetadataList.size());
LOG.info("Upsert Statement: " + upsertStmt);
LOG.info("Phoenix Upsert Statement: " + upsertStmt);
statement = conn.prepareStatement(upsertStmt);

} catch (SQLException e) {
Expand All @@ -170,12 +197,14 @@ public void prepareToWrite(RecordWriter writer) throws IOException {
@Override
public void putNext(Tuple t) throws IOException {
Object upsertValue = null;

ResourceFieldSchema[] fieldSchemas = (schema == null) ? null : schema.getFields();


try {
for (int i = 0; i < columnMetadataList.size(); i++) {
Object o = t.get(i);

upsertValue = convertTypeSpecificValue(o, columnMetadataList
byte type = (fieldSchemas == null) ? DataType.findType(t.get(i)) : fieldSchemas[i].getType();
upsertValue = convertTypeSpecificValue(o, type, columnMetadataList
.get(i).getSqlType());

if (upsertValue != null) {
Expand All @@ -199,14 +228,15 @@ public void putNext(Tuple t) throws IOException {

}

private Object convertTypeSpecificValue(Object o, Integer sqlType) {
private Object convertTypeSpecificValue(Object o, byte type, Integer sqlType) {
PDataType pDataType = PDataType.fromSqlType(sqlType);

return TypeUtil.castPigTypeToPhoenix(o, pDataType);
return TypeUtil.castPigTypeToPhoenix(o, type, pDataType);
}

@Override
public void setStoreFuncUDFContextSignature(String signature) {
this.contextSignature = signature;
}

@Override
Expand Down Expand Up @@ -257,9 +287,12 @@ public OutputFormat getOutputFormat() throws IOException {
return new NullOutputFormat();
}

@Override
public void checkSchema(ResourceSchema s) throws IOException {
}
@Override
public void checkSchema(ResourceSchema s) throws IOException {
schema = s;
getUDFProperties().setProperty(contextSignature + SCHEMA,
ObjectSerializer.serialize(schema));
}

private String[] getTableMetadata(String table) {
String[] schemaAndTable = table.split("\\.");
Expand Down
Loading

0 comments on commit 800c568

Please sign in to comment.