From 800c56840f06d42051678ababf68500db4467a32 Mon Sep 17 00:00:00 2001 From: Prashant Kommireddi Date: Sat, 13 Apr 2013 12:05:21 -0700 Subject: [PATCH] Pig StoreFunc for writing data to HBase. --- examples/pig/test.pig | 3 +- examples/pig/testdata | 108 ------------------ .../phoenix/pig/PhoenixHBaseStorage.java | 57 +++++++-- .../com/salesforce/phoenix/pig/TypeUtil.java | 97 ++++++++++++---- 4 files changed, 118 insertions(+), 147 deletions(-) diff --git a/examples/pig/test.pig b/examples/pig/test.pig index 5a2be223..670c3e3c 100644 --- a/examples/pig/test.pig +++ b/examples/pig/test.pig @@ -1,3 +1,2 @@ A = load 'examples/pig/testdata' as (a:chararray, b:chararray, c:int, d:chararray, e: datetime) ; -STORE A into 'hbase://TESTPHX2' using com.salesforce.phoenix.pig.PhoenixHBaseStorage('localhost','-batchSize 10'); ---dump A; \ No newline at end of file +STORE A into 'hbase://TESTPHX' using com.salesforce.phoenix.pig.PhoenixHBaseStorage('localhost','-batchSize 1000'); diff --git a/examples/pig/testdata b/examples/pig/testdata index 5a6d2d00..15f3f0b6 100644 --- a/examples/pig/testdata +++ b/examples/pig/testdata @@ -16,111 +16,3 @@ 00D300000000XHP 124 123456 weq 2012-12-12 00D300000000XHP 111 123456 nab 2012-01-21 00D300000000UIH 101 123456 ben 2014-01-01 -00D300000000XHP 124 123433389012 weq 2012-12-12 -00D300000000XHP 111 123422289012 nab 2012-01-21 -00D300000000UIH 101 123411189012 ben 2014-01-01 -00D300000000XHP 124 123433389012 weq 2012-12-12 -00D300000000XHP 111 123422289012 nab 2012-01-21 -00D300000000UIH 101 123411189012 ben 2014-01-01 -00D300000000XHP 124 123433389012 weq 2012-12-12 -00D300000000XHP 111 123422289012 nab 2012-01-21 -00D300000000UIH 101 123411189012 ben 2014-01-01 -00D300000000XHP 124 123433389012 weq 2012-12-12 -00D300000000XHP 111 123422289012 nab 2012-01-21 -00D300000000UIH 101 123411189012 ben 2014-01-01 -00D300000000XHP 124 123433389012 weq 2012-12-12 -00D300000000XHP 111 123422289012 nab 2012-01-21 -00D300000000UIH 101 123411189012 ben 2014-01-01 -00D300000000XHP 124 123433389012 weq 2012-12-12 -00D300000000XHP 111 123422289012 nab 2012-01-21 -00D300000000UIH 101 123411189012 ben 2014-01-01 -00D300000000XHP 124 123433389012 weq 2012-12-12 -00D300000000XHP 111 123422289012 nab 2012-01-21 -00D300000000UIH 101 123411189012 ben 2014-01-01 -00D300000000XHP 124 123433389012 weq 2012-12-12 -00D300000000XHP 111 123422289012 nab 2012-01-21 -00D300000000UIH 101 123411189012 ben 2014-01-01 -00D300000000XHP 124 123433389012 weq 2012-12-12 -00D300000000XHP 111 123422289012 nab 2012-01-21 -00D300000000UIH 101 123411189012 ben 2014-01-01 -00D300000000XHP 124 123433389012 weq 2012-12-12 -00D300000000XHP 111 123422289012 nab 2012-01-21 -00D300000000UIH 101 123411189012 ben 2014-01-01 -00D300000000XHP 124 123433389012 weq 2012-12-12 -00D300000000XHP 111 123422289012 nab 2012-01-21 -00D300000000UIH 101 123411189012 ben 2014-01-01 -00D300000000XHf 124 123433389012 weq 2012-12-12 -00D300000000XH6 111 123422289012 nab 2012-01-21 -00D300000000UI1 101 123411189012 ben 2014-01-01 -00D300000000XH4 124 123433389012 weq 2012-12-12 -00D300000000XH1 111 123422289012 nab 2012-01-21 -00D300000000UIq 101 123411189012 ben 2014-01-01 -00D300000000XHP 124 123433389012 weq 2012-12-12 -00D300000000XHP 111 123422289012 nab 2012-01-21 -00D300000000UIH 101 123411189012 ben 2014-01-01 -00D300000000XH1 124 123433389012 weq 2012-12-12 -00D300000000XH3 111 123422289012 nab 2012-01-21 -00D300000000UIH 101 123411189012 ben 2014-01-01 -00D300000000XHP 124 123433389012 weq 2012-12-12 -00D300000000XHP 111 123422289012 nab 2012-01-21 -00D300000000UIH 101 123411189012 ben 2014-01-01 -00D300000000XHP 124 123433389012 weq 2012-12-12 -00D300000000XHP 111 123422289012 nab 2012-01-21 -00D300000000UIH 101 123411189012 ben 2014-01-01 -00D300000000XHP 124 123433389012 weq 2012-12-12 -00D300000000XHP 111 123422289012 nab 2012-01-21 -00D300000000UIH 101 123411189012 ben 2014-01-01 -00D300000000XHP 124 123433389012 weq 2012-12-12 -00D300000000XHP 111 123422289012 nab 2012-01-21 -00D300000000UIH 101 123411189012 ben 2014-01-01 -00D300000000XHP 124 123433389012 weq 2012-12-12 -00D300000000XHP 111 123422289012 nab 2012-01-21 -00D300000000UIH 101 123411189012 ben 2014-01-01 -00D300000000XHP 124 123433389012 weq 2012-12-12 -00D300000000XHP 111 123422289012 nab 2012-01-21 -00D300000000UIH 101 123411189012 ben 2014-01-01 -00D300000000XHP 124 123433389012 weq 2012-12-12 -00D300000000XHP 111 123422289012 nab 2012-01-21 -00D300000000UIH 101 123411189012 ben 2014-01-01 -00D300000000XHP 124 123433389012 weq 2012-12-12 -00D300000000XHP 111 123422289012 nab 2012-01-21 -00D300000000UIH 101 123411189012 ben 2014-01-01 -00D300000000XHP 124 123433389012 weq 2012-12-12 -00D300000000XHP 111 123422289012 nab 2012-01-21 -00D300000000UIH 101 123411189012 ben 2014-01-01 -00D300000000XHP 124 123433389012 weq 2012-12-12 -00D300000000XHP 111 123422289012 nab 2012-01-21 -00D300000000UIH 101 123411189012 ben 2014-01-01 -00D300000000XHP 124 123433389012 weq 2012-12-12 -00D300000000XHP 111 123422289012 nab 2012-01-21 -00D300000000UIH 101 123411189012 ben 2014-01-01 -00D300000000XHP 124 123433389012 weq 2012-12-12 -00D300000000XHP 111 123422289012 nab 2012-01-21 -00D300000000UIH 101 123411189012 ben 2014-01-01 -00D300000000XHP 124 123433389012 weq 2012-12-12 -00D30000000002P 111 123422289012 nab 2012-01-21 -00D30000000093H 101 123411189012 ben 2014-01-01 -00D30000000082P 124 123433389012 weq 2012-12-12 -00D30000000073P 111 123422289012 nab 2012-01-21 -00D30000000064H 101 123411189012 ben 2014-01-01 -00D30000000055P 124 123433389012 weq 2012-12-12 -00D30000000046P 111 123422289012 nab 2012-01-21 -00D30000000037H 101 123411189012 ben 2014-01-01 -00D30000000028P 124 123433389012 weq 2012-12-12 -00D30000000019P 111 123422289012 nab 2012-01-21 -00D300000000U0H 101 123411189012 ben 2014-01-01 -00D300000000X2P 124 123433389012 weq 2012-12-12 -00D300000000X3P 111 123422289012 nab 2012-01-21 -00D300000000U4H 101 123411189012 ben 2014-01-01 -00D300000000X3f 124 123433389012 weq 2012-12-12 -00D300000000X36 111 123422289012 nab 2012-01-21 -00D300000000U31 101 123411189012 ben 2014-01-01 -00D300000000X34 124 123433389012 weq 2012-12-12 -00D300000000X21 111 123422289012 nab 2012-01-21 -00D300000000U2q 101 123411189012 uvw 2014-01-01 -00D300000000X5P 124 123433389012 pqr 2012-12-12 -00D308900000X6P 111 123422289012 mno 2012-01-21 -00D307800000U7H 101 123411189012 jkl 2014-01-01 -00D305600000X81 124 123433389012 ghi 2012-12-12 -00D303400000X93 111 123422289012 def 2012-01-21 -00D301200000U0H 101 123411189012 abc 2014-01-01 diff --git a/src/main/java/com/salesforce/phoenix/pig/PhoenixHBaseStorage.java b/src/main/java/com/salesforce/phoenix/pig/PhoenixHBaseStorage.java index bb3fb406..e39fdd88 100644 --- a/src/main/java/com/salesforce/phoenix/pig/PhoenixHBaseStorage.java +++ b/src/main/java/com/salesforce/phoenix/pig/PhoenixHBaseStorage.java @@ -51,8 +51,12 @@ import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.pig.ResourceSchema; +import org.apache.pig.ResourceSchema.ResourceFieldSchema; import org.apache.pig.StoreFuncInterface; +import org.apache.pig.data.DataType; import org.apache.pig.data.Tuple; +import org.apache.pig.impl.util.ObjectSerializer; +import org.apache.pig.impl.util.UDFContext; import com.salesforce.phoenix.jdbc.PhoenixConnection; import com.salesforce.phoenix.jdbc.PhoenixDriver; @@ -65,11 +69,15 @@ * * Example usage: A = load 'testdata' as (a:chararray, b:chararray, c:chararray, * d:chararray, e: datetime); STORE A into 'hbase://CORE.ENTITY_HISTORY' using - * com.salesforce.bdaas.PhoenixHBaseStorage('localhost','-batchSize 5'); + * com.salesforce.bdaas.PhoenixHBaseStorage('localhost','-batchSize 5000'); * * The above reads a file 'testdata' and writes the elements to HBase. First - * argument to this StoreFunc is the server, the 2nd column is the number of - * columns being upserted via Phoenix. + * argument to this StoreFunc is the server, the 2nd argument is the batch size + * for upserts via Phoenix. + * + * Note that Pig types must be in sync with the target Phoenix data types. This + * StoreFunc tries best to cast based on input Pig types and target Phoenix data + * types, but it is recommended to supply appropriate schema. * * This is only a STORE implementation. LoadFunc coming soon. * @@ -95,6 +103,10 @@ public class PhoenixHBaseStorage implements StoreFuncInterface { private final static Options validOptions = new Options(); private final CommandLine configuredOptions; private final static CommandLineParser parser = new GnuParser(); + + private String contextSignature = null; + private ResourceSchema schema; + private static final String SCHEMA = "_schema"; public PhoenixHBaseStorage(String server) throws ParseException { this(server, null); @@ -122,6 +134,15 @@ private static void populateValidOptions() { validOptions.addOption("batchSize", true, "Specify upsert batch size"); } + /** + * Returns UDFProperties based on contextSignature. + */ + private Properties getUDFProperties() { + return UDFContext.getUDFContext() + .getUDFProperties(this.getClass(), new String[] {contextSignature}); + } + + /** * Parse the HBase table name and configure job */ @@ -133,6 +154,12 @@ public void setStoreLocation(String location, Job job) throws IOException { Configuration conf = job.getConfiguration(); PhoenixPigConfiguration.configure(conf); + + String serializedSchema = getUDFProperties().getProperty(contextSignature + SCHEMA); + if (serializedSchema!= null) { + schema = (ResourceSchema) ObjectSerializer.deserialize(serializedSchema); + } + } /** @@ -155,7 +182,7 @@ public void prepareToWrite(RecordWriter writer) throws IOException { // Generating UPSERT statement without column name information. String upsertStmt = QueryUtil.constructUpsertStatement(null, tableName, columnMetadataList.size()); - LOG.info("Upsert Statement: " + upsertStmt); + LOG.info("Phoenix Upsert Statement: " + upsertStmt); statement = conn.prepareStatement(upsertStmt); } catch (SQLException e) { @@ -170,12 +197,14 @@ public void prepareToWrite(RecordWriter writer) throws IOException { @Override public void putNext(Tuple t) throws IOException { Object upsertValue = null; - + ResourceFieldSchema[] fieldSchemas = (schema == null) ? null : schema.getFields(); + + try { for (int i = 0; i < columnMetadataList.size(); i++) { Object o = t.get(i); - - upsertValue = convertTypeSpecificValue(o, columnMetadataList + byte type = (fieldSchemas == null) ? DataType.findType(t.get(i)) : fieldSchemas[i].getType(); + upsertValue = convertTypeSpecificValue(o, type, columnMetadataList .get(i).getSqlType()); if (upsertValue != null) { @@ -199,14 +228,15 @@ public void putNext(Tuple t) throws IOException { } - private Object convertTypeSpecificValue(Object o, Integer sqlType) { + private Object convertTypeSpecificValue(Object o, byte type, Integer sqlType) { PDataType pDataType = PDataType.fromSqlType(sqlType); - return TypeUtil.castPigTypeToPhoenix(o, pDataType); + return TypeUtil.castPigTypeToPhoenix(o, type, pDataType); } @Override public void setStoreFuncUDFContextSignature(String signature) { + this.contextSignature = signature; } @Override @@ -257,9 +287,12 @@ public OutputFormat getOutputFormat() throws IOException { return new NullOutputFormat(); } - @Override - public void checkSchema(ResourceSchema s) throws IOException { - } + @Override + public void checkSchema(ResourceSchema s) throws IOException { + schema = s; + getUDFProperties().setProperty(contextSignature + SCHEMA, + ObjectSerializer.serialize(schema)); + } private String[] getTableMetadata(String table) { String[] schemaAndTable = table.split("\\."); diff --git a/src/main/java/com/salesforce/phoenix/pig/TypeUtil.java b/src/main/java/com/salesforce/phoenix/pig/TypeUtil.java index daea37ff..b3d48a76 100644 --- a/src/main/java/com/salesforce/phoenix/pig/TypeUtil.java +++ b/src/main/java/com/salesforce/phoenix/pig/TypeUtil.java @@ -28,33 +28,39 @@ package com.salesforce.phoenix.pig; +import java.io.IOException; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; +import java.sql.Types; +import org.apache.pig.builtin.Utf8StorageConverter; +import org.apache.pig.data.DataByteArray; import org.apache.pig.data.DataType; import org.joda.time.DateTime; import com.salesforce.phoenix.schema.PDataType; public class TypeUtil { - + + private static final Utf8StorageConverter utf8Converter = new Utf8StorageConverter(); + /** - * This method infers incoming Pig data type and returns the most - * appropriate PDataType associated with it. Note for Pig DataType DATETIME, - * this method returns DATE as inferredSqlType. This is later used to make a - * cast to targetPhoenixType accordingly. See - * {@link #castPigTypeToPhoenix(Object, PDataType)} + * This method returns the most appropriate PDataType associated with + * the incoming Pig type. Note for Pig DataType DATETIME, returns DATE as + * inferredSqlType. + * + * This is later used to make a cast to targetPhoenixType accordingly. See + * {@link #castPigTypeToPhoenix(Object, byte, PDataType)} * * @param obj * @return */ - public static PDataType getType(Object obj) { + public static PDataType getType(Object obj, byte type) { if (obj == null) { return null; } - - byte type = DataType.findType(obj); + PDataType sqlType; switch (type) { @@ -65,23 +71,17 @@ public static PDataType getType(Object obj) { sqlType = PDataType.VARCHAR; break; case DataType.DOUBLE: - sqlType = PDataType.DECIMAL; - break; case DataType.FLOAT: + case DataType.BIGDECIMAL: sqlType = PDataType.DECIMAL; break; case DataType.INTEGER: sqlType = PDataType.INTEGER; break; case DataType.LONG: - sqlType = PDataType.LONG; - break; case DataType.BIGINTEGER: sqlType = PDataType.LONG; break; - case DataType.BIGDECIMAL: - sqlType = PDataType.DECIMAL; - break; case DataType.BOOLEAN: sqlType = PDataType.BOOLEAN; break; @@ -98,31 +98,41 @@ public static PDataType getType(Object obj) { } /** - * This method encodes a value with Phoenix data type. + * This method encodes a value with Phoenix data type. It begins + * with checking whether an object is BINARY and makes a call to + * {@link #castBytes(Object, PDataType)} to convery bytes to + * targetPhoenixType * * @param o * @param targetPhoenixType * @return */ - public static Object castPigTypeToPhoenix(Object o, PDataType targetPhoenixType) { - PDataType inferredPType = getType(o); + public static Object castPigTypeToPhoenix(Object o, byte objectType, PDataType targetPhoenixType) { + PDataType inferredPType = getType(o, objectType); if(inferredPType == null) { return null; } + + if(inferredPType == PDataType.BINARY && targetPhoenixType != PDataType.BINARY) { + try { + o = castBytes(o, targetPhoenixType); + inferredPType = getType(o, DataType.findType(o)); + } catch (IOException e) { + throw new RuntimeException("Error while casting bytes for object " +o); + } + } if(inferredPType == PDataType.DATE) { int inferredSqlType = targetPhoenixType.getSqlType(); - // if sqlType is DATE - if(inferredSqlType == 91) { + + if(inferredSqlType == Types.DATE) { return new Date(((DateTime)o).getMillis()); } - // if sqlType is Time - if(inferredSqlType == 92) { + if(inferredSqlType == Types.TIME) { return new Time(((DateTime)o).getMillis()); } - // if sqlType is Timestamp - if(inferredSqlType == 93) { + if(inferredSqlType == Types.TIMESTAMP) { return new Timestamp(((DateTime)o).getMillis()); } } @@ -134,4 +144,41 @@ public static Object castPigTypeToPhoenix(Object o, PDataType targetPhoenixType) throw new RuntimeException(o.getClass().getName() + " cannot be coerced to "+targetPhoenixType.toString()); } + + /** + * This method converts bytes to the target type required + * for Phoenix. It uses {@link Utf8StorageConverter} for + * the conversion. + * + * @param o + * @param targetPhoenixType + * @return + * @throws IOException + */ + public static Object castBytes(Object o, PDataType targetPhoenixType) throws IOException { + byte[] bytes = ((DataByteArray)o).get(); + + switch(targetPhoenixType) { + case CHAR: + case VARCHAR: + return utf8Converter.bytesToCharArray(bytes); + case UNSIGNED_INT: + case INTEGER: + return utf8Converter.bytesToInteger(bytes); + case BOOLEAN: + return utf8Converter.bytesToBoolean(bytes); + case DECIMAL: + return utf8Converter.bytesToBigDecimal(bytes); + case UNSIGNED_LONG: + case LONG: + return utf8Converter.bytesToLong(bytes); + case TIME: + case TIMESTAMP: + case DATE: + return utf8Converter.bytesToDateTime(bytes); + default: + return o; + } + } + }