From a698033aace8abbad9f49257aea0340b48df7af4 Mon Sep 17 00:00:00 2001 From: James Taylor Date: Tue, 14 May 2013 17:00:26 -0700 Subject: [PATCH] Fix PhoenixTestDriver so it can used from SQuirrel, fix various salting issues --- docs/phoenix.csv | 137 ++++--- pom.xml | 2 +- .../phoenix/compile/UpsertCompiler.java | 333 ++++++++++-------- .../phoenix/compile/WhereOptimizer.java | 24 +- .../phoenix/jdbc/PhoenixEmbeddedDriver.java | 2 +- .../salesforce/phoenix/schema/PTableImpl.java | 13 +- .../phoenix/schema/SaltingUtil.java | 16 +- .../salesforce/phoenix/util/ResultUtil.java | 3 +- .../com/salesforce/phoenix/util/ScanUtil.java | 7 +- .../phoenix/jdbc/PhoenixTestDriver.java | 30 +- .../salesforce/phoenix/query/BaseTest.java | 10 +- 11 files changed, 343 insertions(+), 234 deletions(-) diff --git a/docs/phoenix.csv b/docs/phoenix.csv index 0416ab79..b0b48465 100644 --- a/docs/phoenix.csv +++ b/docs/phoenix.csv @@ -1,22 +1,27 @@ "SECTION","TOPIC","SYNTAX","TEXT","EXAMPLE" "Commands","SELECT"," -SELECT [/*+ hint */] selectExpression [,...] FROM tableExpression [ WHERE expression ] +SELECT [/*+ hint */] [DISTINCT | ALL] selectExpression [,...] +FROM tableExpression [( column [,...] )] [ WHERE expression ] [ GROUP BY expression [,...] ] [ HAVING expression ] -[ ORDER BY order [,...] ] -[ LIMIT {bindParameter | number} ] +[ ORDER BY order [,...] ] [ LIMIT {bindParameter | number} ] "," Selects data from a table. +DISTINCT filters out duplicate results while ALL, the default, includes all results. +FROM identifies the table being queried (single table only currently - no joins or derived tables yet). +Dynamic columns not declared at create time may be defined in parenthesis after the table name and then +used in the query. GROUP BY groups the the result by the given expression(s). HAVING filter rows after grouping. ORDER BY sorts the result by the given column(s) or expression(s) and is only allowed for aggregate queries or queries with a LIMIT clause. LIMIT limits the number of rows returned by the query with no limit applied if specified as null or less than zero. The LIMIT clause is executed after the ORDER BY clause to support TopN type queries. -Only single tables are currently supported - joins are currently not supported. An optional hint overrides the default query plan. "," SELECT * FROM TEST; +SELECT a.* FROM TEST; +SELECT DISTINCT NAME FROM TEST; SELECT ID, COUNT(1) FROM TEST GROUP BY ID; SELECT NAME, SUM(VAL) FROM TEST GROUP BY NAME HAVING COUNT(1) > 2; SELECT 'ID' COL, MAX(ID) AS MAX FROM TEST; @@ -52,8 +57,7 @@ UPSERT INTO foo SELECT * FROM bar; " "Commands","DELETE"," DELETE [/*+ hint */] FROM tableName [ WHERE expression ] -[ ORDER BY order [,...] ] -[ LIMIT {bindParameter | number} ] +[ ORDER BY order [,...] ] [ LIMIT {bindParameter | number} ] "," Deletes the rows selected by the where clause. If auto commit is on, the deletion is performed completely server-side. @@ -82,9 +86,13 @@ be passed through as key/value pairs to setup the HBase table as needed. "," CREATE TABLE my_table ( id BIGINT not null primary key, date DATE not null) -CREATE TABLE my_table ( id INTEGER not null primary key, date DATE not null, m.db_utilization DECIMAL, i.db_utilization) m.DATA_BLOCK_ENCODING='DIFF' -CREATE TABLE prod_metrics ( host char(50) not null, created_date date not null, txn_count bigint CONSTRAINT pk PRIMARY KEY (host, created_date) ) -CREATE TABLE IF NOT EXISTS my_table ( id char(10) not null primary key, value integer) DATA_BLOCK_ENCODING='NONE',VERSIONS=?,MAX_FILESIZE=2000000 split on (?, ?, ?) +CREATE TABLE my_table ( id INTEGER not null primary key desc, date DATE not null, + m.db_utilization DECIMAL, i.db_utilization) + m.DATA_BLOCK_ENCODING='DIFF' +CREATE TABLE prod_metrics ( host char(50) not null, created_date date not null, + txn_count bigint CONSTRAINT pk PRIMARY KEY (host, created_date) ) +CREATE TABLE IF NOT EXISTS my_table ( id char(10) not null primary key, value integer) + DATA_BLOCK_ENCODING='NONE',VERSIONS=?,MAX_FILESIZE=2000000 split on (?, ?, ?) " "Commands","DROP"," @@ -128,12 +136,14 @@ EXPLAIN SELECT entity_id FROM CORE.CUSTOM_ENTITY_DATA WHERE organization_id='00D " "Other Grammar","Constraint"," -CONSTRAINT constraintName PRIMARY KEY (columnName [,...]) +CONSTRAINT constraintName PRIMARY KEY (columnName [ASC | DESC] [,...]) "," -Defines a multi-part primary key constraint. +Defines a multi-part primary key constraint. Each column may be declared to be +sorted in ascending or descending ordering. The default is ascending. "," CONSTRAINT my_pk PRIMARY KEY (host,created_date) +CONSTRAINT my_pk PRIMARY KEY (host ASC,created_date DESC) " "Other Grammar","Table Options"," @@ -144,13 +154,14 @@ The option applies to the named family or if omitted to all families if the name references an HColumnDescriptor property. Otherwise, the option applies to the HTableDescriptor. -One built-in option is SALT_BUCKETS. This option causes an extra byte to transparently -be prepended to every row key to ensure an even distribution of write load across all -your region servers. The byte is determined by hashing the row key and modding it with the -SALT_BUCKETS value. The value may be from 1 to 256. The value to pick is a trade-off: bigger -values will distribute your write load more evenly while making your range queries execute -more slowly. This is because every region must be queried to piece back together the range -of rows. For an excellent write-up of this technique, see http://blog.sematext.com/2012/04/09/hbasewd-avoid-regionserver-hotspotting-despite-writing-records-with-sequential-keys/ +One built-in option is SALT_BUCKETS. This option causes an extra byte to be transparently +prepended to every row key to ensure an even distribution of write load across all +your region servers. This is useful when your row key is always monotonically increasing +causing hot spotting on a single region server. The byte is determined by hashing the row +key and modding it with the SALT_BUCKETS value. The value may be from 1 to 256. If not +split points are defined for the table, it will automatically be pre-split at each possible +salt bucket value. For an excellent write-up of this technique, see +http://blog.sematext.com/2012/04/09/hbasewd-avoid-regionserver-hotspotting-despite-writing-records-with-sequential-keys/ "," SALT_BUCKETS=10 @@ -172,10 +183,16 @@ MAX_FILESIZE=2000000000,MEMSTORE_FLUSHSIZE=80000000 name [,...] "," Advanced features that overrides default query processing behavior. The -three supported hints are 1) NO_INTRA_REGION_PARALLELIZATION to prevent the -spawning of multiple threads to process data within a single region, -2) SKIP_SCAN to force a skip scan to be performed on the query, and -3) RANGE_SCAN to force a range scan to be performed on the query. +three supported hints are 1) SKIP_SCAN to force a skip scan to be performed on the query when +it otherwise would not. This option may improve performance if a query does +not include the leading primary key column, but does include other, very +selective primary key columns. 2) RANGE_SCAN to force a range scan to be +performed on the query. This option may improve performance if a query +filters on a range for non selective leading primary key column along +with other primary key columns 3) NO_INTRA_REGION_PARALLELIZATION to prevent the +spawning of multiple threads to process data within a single region. This +option is useful when the overall data set being queries is known to be +small. "," /*+ SKIP_SCAN */ @@ -184,10 +201,11 @@ spawning of multiple threads to process data within a single region, " "Other Grammar","Column"," -[familyName .] columnName dataType [[NOT] NULL] [PRIMARY KEY] +[familyName .] columnName dataType [[NOT] NULL] [PRIMARY KEY [ASC | DESC] ] "," Define a new primary key column. The column name is case insensitive by default and -case sensitive if double quoted. +case sensitive if double quoted. The sort order of a primary key may be ascending (ASC) +or descending. The default is ascending. "," id char(15) not null primary key @@ -196,11 +214,15 @@ m.response_time bigint " "Other Grammar","Select Expression"," -term [ [ AS ] columnAlias ] +* | ( familyName . *) | term [ [ AS ] columnAlias ] "," -An expression in a SELECT statement. +An expression in a SELECT statement. All columns in a table may be selected using +*, and all columns in a column family may be selected using .*. "," +* +cf.* ID AS VALUE +VALUE + 1 VALUE_PLUS_ONE " "Other Grammar","Split Point"," @@ -384,7 +406,7 @@ NULL " "Other Grammar","Data Type"," -charType | varcharType | integerType | bigintType | decimalType | timestampType | dateType | timeType | unsignedLongType | unsignedIntType | binaryType +charType | varcharType | integerType | bigintType | decimalType | timestampType | dateType | timeType | unsignedLongType | unsignedIntType | binaryType | varBinaryType "," A type name. "," @@ -392,6 +414,7 @@ CHAR(15) VARCHAR VARCHAR(1000) INTEGER +BINARY(200) " "Other Grammar","String"," @@ -584,15 +607,25 @@ CHAR(10) " "Data Types","BINARY Type"," -BINARY +BINARY ( precisionInt ) "," -Raw byte array. +Raw fixed length byte array. Mapped to ""byte[]"". "," BINARY " +"Data Types","VARBINARY Type"," +VARBINARY +"," +Raw variable length byte array. + +Mapped to ""byte[]"". +"," +VARBINARY +" + "Functions (Aggregate)","AVG"," AVG ( { numericTerm } ) "," @@ -745,28 +778,42 @@ This method returns a decimal number. TO_NUMBER('$123.33', '\u00A4###.##') " -"Functions (Time and Date)","TO_CHAR"," -TO_CHAR( timestampTerm [, formatString] ) +"Functions (String)","UPPER"," +UPPER( stringTerm ) "," -Formats a date, time or timestamp as a string. -The most important format characters are: -y year, M month, d day, H hour, m minute, s second. -The default format string is ""yyyy-MM-dd HH:mm:ss"". -For details of the format, see ""java.text.SimpleDateFormat"". -This method returns a string. +Returns upper case string of the string argument. +"," +UPPER('Hello') +" + +"Functions (String)","LOWER"," +LOWER( stringTerm ) +"," +Returns lower case string of the string argument. +"," +LOWER('HELLO') +" + +"Functions (String)","REVERSE"," +REVERSE( stringTerm ) +"," +Returns reversed string of the string argument. "," -TO_CHAR(TIMESTAMP, '2001-02-03 04:05:06') +REVERSE('Hello') " -"Functions (Number)","TO_CHAR"," -TO_CHAR( number [, formatString] ) +"Functions (String)","TO_CHAR"," +TO_CHAR( timestampTerm | numberTerm [, formatString] ) "," -Formats a decimal or integer as a string, optionally accepting a format string. -The default format string is ""#,##0.###"". -For details of the format, see ""java.text.DecimalFormat"". -This method returns a string. +Formats a date, time, timestamp, or number as a string. +The default date format is ""yyyy-MM-dd HH:mm:ss"" and +the default number format is ""#,##0.###"". +For details, see ""java.text.SimpleDateFormat"" +for date/time values and ""java.text.DecimalFormat"" for +numbers. This method returns a string. "," -TO_CHAR(DECIMAL, '#,##0.###') +TO_CHAR(myDate, '2001-02-03 04:05:06') +TO_CHAR(myDecimal, '#,##0.###') " "Functions (Time and Date)","TO_DATE"," diff --git a/pom.xml b/pom.xml index e66d4a31..fa23b882 100644 --- a/pom.xml +++ b/pom.xml @@ -58,7 +58,7 @@ target/generated-sources/antlr3 - 0.94.5 + 0.94.7 1.2 1.0.4 4.11 diff --git a/src/main/java/com/salesforce/phoenix/compile/UpsertCompiler.java b/src/main/java/com/salesforce/phoenix/compile/UpsertCompiler.java index 1ca9d05d..0d4b0953 100644 --- a/src/main/java/com/salesforce/phoenix/compile/UpsertCompiler.java +++ b/src/main/java/com/salesforce/phoenix/compile/UpsertCompiler.java @@ -149,7 +149,7 @@ public MutationPlan compile(UpsertStatement upsert, List binds) throws S QueryPlan plan = null; RowProjector projector = null; int nValuesToSet; - boolean mayAutoCommit = false; + boolean runOnServer = false; if (valueNodes == null) { SelectStatement select = upsert.getSelect(); assert(select != null); @@ -160,8 +160,9 @@ public MutationPlan compile(UpsertStatement upsert, List binds) throws S plan = compiler.compile(select, binds); projector = plan.getProjector(); nValuesToSet = projector.getColumnCount(); - // Cannot auto commit if doing aggregation or topN - mayAutoCommit = !plan.isAggregate() && sameTable && select.getOrderBy().isEmpty(); + // Cannot auto commit if doing aggregation or topN or salted + // Salted causes problems because the row may end up living on a different region + runOnServer = !plan.isAggregate() && sameTable && select.getOrderBy().isEmpty() && table.getBucketNum() != null; } else { nValuesToSet = valueNodes.size(); } @@ -180,7 +181,12 @@ public MutationPlan compile(UpsertStatement upsert, List binds) throws S final int[] columnIndexes = columnIndexesToBe; final int[] pkSlotIndexes = pkSlotIndexesToBe; - if (valueNodes == null) { // UPSERT SELECT + + // TODO: break this up into multiple functions + //////////////////////////////////////////////////////////////////// + // UPSERT SELECT + ///////////////////////////////////////////////////////////////////// + if (valueNodes == null) { /* We can run the upsert in a coprocessor if: * 1) the into table matches from table * 2) the select query isn't doing aggregation @@ -190,8 +196,13 @@ public MutationPlan compile(UpsertStatement upsert, List binds) throws S * and populate the MutationState (upto a limit). */ final boolean isAutoCommit = connection.getAutoCommit(); - if (isAutoCommit && mayAutoCommit) { // UPSERT SELECT run server-side - // At most this array will grow bigger my the number of PK columns + runOnServer |= isAutoCommit; + + //////////////////////////////////////////////////////////////////// + // UPSERT SELECT run server-side (maybe) + ///////////////////////////////////////////////////////////////////// + if (runOnServer) { + // At most this array will grow bigger by the number of PK columns int[] allColumnsIndexes = Arrays.copyOf(columnIndexes, columnIndexes.length + nValuesToSet); int[] reverseColumnIndexes = new int[table.getColumns().size()]; List projectedExpressions = Lists.newArrayListWithExpectedSize(reverseColumnIndexes.length); @@ -229,157 +240,182 @@ public MutationPlan compile(UpsertStatement upsert, List binds) throws S reverseColumnIndexes[tempPos] = reverseColumnIndexes[i]; reverseColumnIndexes[i] = i; } - // Iterate through columns being projected - List projectedColumns = Lists.newArrayListWithExpectedSize(projectedExpressions.size()); - for (int i = 0; i < projectedExpressions.size(); i++) { - // Must make new column if position has changed - PColumn column = allColumns.get(allColumnsIndexes[i]); - projectedColumns.add(column.getPosition() == i ? column : new PColumnImpl(column, i)); + // If any pk slots are changing, be conservative and don't run this server side. + // If the row ends up living in a different region, we'll get an error otherwise. + for (int i = 0; i < table.getPKColumns().size(); i++) { + PColumn column = table.getPKColumns().get(i); + Expression source = projectedExpressions.get(i); + if (source == null || !source.equals(new ColumnRef(tableRef, column.getPosition()).newColumnExpression())) { + // TODO: we could check the region boundaries to see if the pk will still be in it. + runOnServer = false; // bail on running server side, since PK may be changing + break; + } } - // Build table from projectedColumns - PTable projectedTable = new PTableImpl(table.getName(), table.getType(), table.getTimeStamp(), table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), projectedColumns); - List select = Collections.singletonList( - NODE_FACTORY.aliasedNode(null, - NODE_FACTORY.function(CountAggregateFunction.NORMALIZED_NAME, LiteralParseNode.STAR))); - // Ignore order by - it has no impact - final RowProjector aggProjector = ProjectionCompiler.getRowProjector(context, select, false, GroupBy.EMPTY_GROUP_BY, OrderBy.EMPTY_ORDER_BY, null); - /* - * Transfer over PTable representing subset of columns selected, but all PK columns. - * Move columns setting PK first in pkSlot order, adding LiteralExpression of null for any missing ones. - * Transfer over List for projection. - * In region scan, evaluate expressions in order, collecting first n columns for PK and collection non PK in mutation Map - * Create the PRow and get the mutations, adding them to the batch - */ - scan.setAttribute(UngroupedAggregateRegionObserver.UPSERT_SELECT_TABLE, UngroupedAggregateRegionObserver.serialize(projectedTable)); - scan.setAttribute(UngroupedAggregateRegionObserver.UPSERT_SELECT_EXPRS, UngroupedAggregateRegionObserver.serialize(projectedExpressions)); - final QueryPlan aggPlan = new AggregatePlan(context, tableRef, projector, null, GroupBy.EMPTY_GROUP_BY, false, null, OrderBy.EMPTY_ORDER_BY); - return new MutationPlan() { - - @Override - public PhoenixConnection getConnection() { - return connection; + //////////////////////////////////////////////////////////////////// + // UPSERT SELECT run server-side + ///////////////////////////////////////////////////////////////////// + if (runOnServer) { + // Iterate through columns being projected + List projectedColumns = Lists.newArrayListWithExpectedSize(projectedExpressions.size()); + for (int i = 0; i < projectedExpressions.size(); i++) { + // Must make new column if position has changed + PColumn column = allColumns.get(allColumnsIndexes[i]); + projectedColumns.add(column.getPosition() == i ? column : new PColumnImpl(column, i)); } - - @Override - public ParameterMetaData getParameterMetaData() { - return context.getBindManager().getParameterMetaData(); - } - - @Override - public MutationState execute() throws SQLException { - Scanner scanner = aggPlan.getScanner(); - ResultIterator iterator = scanner.iterator(); - try { - Tuple row = iterator.next(); - ImmutableBytesWritable ptr = context.getTempPtr(); - final long mutationCount = (Long)aggProjector.getColumnProjector(0).getValue(row, PDataType.LONG, ptr); - return new MutationState(maxSize, connection) { - @Override - public long getUpdateCount() { - return mutationCount; - } - }; - } finally { - iterator.close(); + // Build table from projectedColumns + PTable projectedTable = new PTableImpl(table.getName(), table.getType(), table.getTimeStamp(), table.getSequenceNumber(), table.getPKName(), table.getBucketNum(), projectedColumns); + + List select = Collections.singletonList( + NODE_FACTORY.aliasedNode(null, + NODE_FACTORY.function(CountAggregateFunction.NORMALIZED_NAME, LiteralParseNode.STAR))); + // Ignore order by - it has no impact + final RowProjector aggProjector = ProjectionCompiler.getRowProjector(context, select, false, GroupBy.EMPTY_GROUP_BY, OrderBy.EMPTY_ORDER_BY, null); + /* + * Transfer over PTable representing subset of columns selected, but all PK columns. + * Move columns setting PK first in pkSlot order, adding LiteralExpression of null for any missing ones. + * Transfer over List for projection. + * In region scan, evaluate expressions in order, collecting first n columns for PK and collection non PK in mutation Map + * Create the PRow and get the mutations, adding them to the batch + */ + scan.setAttribute(UngroupedAggregateRegionObserver.UPSERT_SELECT_TABLE, UngroupedAggregateRegionObserver.serialize(projectedTable)); + scan.setAttribute(UngroupedAggregateRegionObserver.UPSERT_SELECT_EXPRS, UngroupedAggregateRegionObserver.serialize(projectedExpressions)); + final QueryPlan aggPlan = new AggregatePlan(context, tableRef, projector, null, GroupBy.EMPTY_GROUP_BY, false, null, OrderBy.EMPTY_ORDER_BY); + return new MutationPlan() { + + @Override + public PhoenixConnection getConnection() { + return connection; } - } + + @Override + public ParameterMetaData getParameterMetaData() { + return context.getBindManager().getParameterMetaData(); + } + + @Override + public MutationState execute() throws SQLException { + Scanner scanner = aggPlan.getScanner(); + ResultIterator iterator = scanner.iterator(); + try { + Tuple row = iterator.next(); + ImmutableBytesWritable ptr = context.getTempPtr(); + final long mutationCount = (Long)aggProjector.getColumnProjector(0).getValue(row, PDataType.LONG, ptr); + return new MutationState(maxSize, connection) { + @Override + public long getUpdateCount() { + return mutationCount; + } + }; + } finally { + iterator.close(); + } + } + + @Override + public ExplainPlan getExplainPlan() throws SQLException { + List queryPlanSteps = aggPlan.getExplainPlan().getPlanSteps(); + List planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1); + planSteps.add("UPSERT ROWS"); + planSteps.addAll(queryPlanSteps); + return new ExplainPlan(planSteps); + } + }; + } + } - @Override - public ExplainPlan getExplainPlan() throws SQLException { - List queryPlanSteps = aggPlan.getExplainPlan().getPlanSteps(); - List planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1); - planSteps.add("UPSERT ROWS"); - planSteps.addAll(queryPlanSteps); - return new ExplainPlan(planSteps); - } - }; - } else { // UPSERT SELECT run client-side - final int batchSize = Math.min(connection.getMutateBatchSize(), maxSize); - return new MutationPlan() { + //////////////////////////////////////////////////////////////////// + // UPSERT SELECT run client-side + ///////////////////////////////////////////////////////////////////// + final int batchSize = Math.min(connection.getMutateBatchSize(), maxSize); + return new MutationPlan() { - @Override - public PhoenixConnection getConnection() { - return connection; - } - - @Override - public ParameterMetaData getParameterMetaData() { - return context.getBindManager().getParameterMetaData(); - } + @Override + public PhoenixConnection getConnection() { + return connection; + } + + @Override + public ParameterMetaData getParameterMetaData() { + return context.getBindManager().getParameterMetaData(); + } - @Override - public MutationState execute() throws SQLException { - byte[][] values = new byte[columnIndexes.length][]; - Scanner scanner = queryPlan.getScanner(); - int estSize = scanner.getEstimatedSize(); - int rowCount = 0; - Map> mutation = Maps.newHashMapWithExpectedSize(estSize); - ResultSet rs = new PhoenixResultSet(scanner, statement); - PTable table = tableRef.getTable(); - PColumn column; - while (rs.next()) { - for (int i = 0; i < values.length; i++) { - column = table.getColumns().get(columnIndexes[i]); - // We are guaranteed that the two column will have the same type. - if (!column.getDataType().isSizeCompatible(column.getDataType(), - null, rs.getBytes(i+1), - null, column.getMaxLength(), - null, column.getScale())) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.DATA_INCOMPATIBLE_WITH_TYPE) - .setColumnName(column.getName().getString()).build().buildException(); - } - values[i] = column.getDataType().coerceBytes(rs.getBytes(i+1), null, column.getDataType(), - null, null, column.getMaxLength(), column.getScale()); - } - setValues(values, pkSlotIndexes, columnIndexes, table, mutation); - rowCount++; - // Commit a batch if auto commit is true and we're at our batch size - if (isAutoCommit && rowCount % batchSize == 0) { - MutationState state = new MutationState(tableRef, mutation, 0, maxSize, connection); - connection.getMutationState().join(state); - connection.commit(); - mutation.clear(); + @Override + public MutationState execute() throws SQLException { + byte[][] values = new byte[columnIndexes.length][]; + Scanner scanner = queryPlan.getScanner(); + int estSize = scanner.getEstimatedSize(); + int rowCount = 0; + Map> mutation = Maps.newHashMapWithExpectedSize(estSize); + ResultSet rs = new PhoenixResultSet(scanner, statement); + PTable table = tableRef.getTable(); + PColumn column; + while (rs.next()) { + for (int i = 0; i < values.length; i++) { + column = table.getColumns().get(columnIndexes[i]); + // We are guaranteed that the two column will have the same type. + if (!column.getDataType().isSizeCompatible(column.getDataType(), + null, rs.getBytes(i+1), + null, column.getMaxLength(), + null, column.getScale())) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.DATA_INCOMPATIBLE_WITH_TYPE) + .setColumnName(column.getName().getString()).build().buildException(); } + values[i] = column.getDataType().coerceBytes(rs.getBytes(i+1), null, column.getDataType(), + null, null, column.getMaxLength(), column.getScale()); + } + setValues(values, pkSlotIndexes, columnIndexes, table, mutation); + rowCount++; + // Commit a batch if auto commit is true and we're at our batch size + if (isAutoCommit && rowCount % batchSize == 0) { + MutationState state = new MutationState(tableRef, mutation, 0, maxSize, connection); + connection.getMutationState().join(state); + connection.commit(); + mutation.clear(); } - // If auto commit is true, this last batch will be committed upon return - return new MutationState(tableRef, mutation, rowCount / batchSize * batchSize, maxSize, connection); } + // If auto commit is true, this last batch will be committed upon return + return new MutationState(tableRef, mutation, rowCount / batchSize * batchSize, maxSize, connection); + } - @Override - public ExplainPlan getExplainPlan() throws SQLException { - List queryPlanSteps = queryPlan.getExplainPlan().getPlanSteps(); - List planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1); - planSteps.add("UPSERT SELECT"); - planSteps.addAll(queryPlanSteps); - return new ExplainPlan(planSteps); - } - - }; - } - } else { // UPSERT VALUES - int nodeIndex = 0; - // Allocate array based on size of all columns in table, - // since some values may not be set (if they're nullable). - UpsertValuesCompiler expressionBuilder = new UpsertValuesCompiler(context); - final byte[][] values = new byte[nValuesToSet][]; - for (ParseNode valueNode : valueNodes) { - if (!valueNode.isConstant()) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.VALUE_IN_UPSERT_NOT_CONSTANT).build().buildException(); + @Override + public ExplainPlan getExplainPlan() throws SQLException { + List queryPlanSteps = queryPlan.getExplainPlan().getPlanSteps(); + List planSteps = Lists.newArrayListWithExpectedSize(queryPlanSteps.size()+1); + planSteps.add("UPSERT SELECT"); + planSteps.addAll(queryPlanSteps); + return new ExplainPlan(planSteps); } - PColumn column = allColumns.get(columnIndexes[nodeIndex]); - expressionBuilder.setColumn(column); - LiteralExpression literalExpression = (LiteralExpression)valueNode.accept(expressionBuilder); - if (literalExpression.getDataType() != null) { - if (!literalExpression.getDataType().isCoercibleTo(column.getDataType(), literalExpression.getValue())) { - throw new TypeMismatchException(literalExpression.getDataType(), column.getDataType(), "expression: " + literalExpression.toString() + " in column " + column); - } - if (!column.getDataType().isSizeCompatible(literalExpression.getDataType(), - literalExpression.getValue(), literalExpression.getBytes(), - literalExpression.getMaxLength(), column.getMaxLength(), - literalExpression.getScale(), column.getScale())) { - throw new SQLExceptionInfo.Builder(SQLExceptionCode.DATA_INCOMPATIBLE_WITH_TYPE) - .setColumnName(column.getName().getString()).setMessage("value=" + literalExpression.toString()).build().buildException(); + + }; + } + + + //////////////////////////////////////////////////////////////////// + // UPSERT VALUES + ///////////////////////////////////////////////////////////////////// + int nodeIndex = 0; + // Allocate array based on size of all columns in table, + // since some values may not be set (if they're nullable). + UpsertValuesCompiler expressionBuilder = new UpsertValuesCompiler(context); + final byte[][] values = new byte[nValuesToSet][]; + for (ParseNode valueNode : valueNodes) { + if (!valueNode.isConstant()) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.VALUE_IN_UPSERT_NOT_CONSTANT).build().buildException(); + } + PColumn column = allColumns.get(columnIndexes[nodeIndex]); + expressionBuilder.setColumn(column); + LiteralExpression literalExpression = (LiteralExpression)valueNode.accept(expressionBuilder); + if (literalExpression.getDataType() != null) { + if (!literalExpression.getDataType().isCoercibleTo(column.getDataType(), literalExpression.getValue())) { + throw new TypeMismatchException(literalExpression.getDataType(), column.getDataType(), "expression: " + literalExpression.toString() + " in column " + column); + } + if (!column.getDataType().isSizeCompatible(literalExpression.getDataType(), + literalExpression.getValue(), literalExpression.getBytes(), + literalExpression.getMaxLength(), column.getMaxLength(), + literalExpression.getScale(), column.getScale())) { + throw new SQLExceptionInfo.Builder(SQLExceptionCode.DATA_INCOMPATIBLE_WITH_TYPE) + .setColumnName(column.getName().getString()).setMessage("value=" + literalExpression.toString()).build().buildException(); } } byte[] byteValue = column.getDataType().coerceBytes(literalExpression.getBytes(), literalExpression.getValue(), literalExpression.getDataType(), @@ -409,10 +445,9 @@ public MutationState execute() { @Override public ExplainPlan getExplainPlan() throws SQLException { return new ExplainPlan(Collections.singletonList("PUT SINGLE ROW")); - } - - }; - } + } + + }; } private static final class ColumnUpsertCompiler extends ExpressionCompiler { diff --git a/src/main/java/com/salesforce/phoenix/compile/WhereOptimizer.java b/src/main/java/com/salesforce/phoenix/compile/WhereOptimizer.java index b8ca71c9..6855d126 100644 --- a/src/main/java/com/salesforce/phoenix/compile/WhereOptimizer.java +++ b/src/main/java/com/salesforce/phoenix/compile/WhereOptimizer.java @@ -39,6 +39,7 @@ import com.salesforce.phoenix.expression.visitor.TraverseNoExpressionVisitor; import com.salesforce.phoenix.parse.HintNode.Hint; import com.salesforce.phoenix.query.KeyRange; +import com.salesforce.phoenix.query.QueryConstants; import com.salesforce.phoenix.schema.*; import com.salesforce.phoenix.util.*; @@ -52,7 +53,7 @@ * @since 0.1 */ public class WhereOptimizer { - + private static final List SALT_PLACEHOLDER = Collections.singletonList(PDataType.CHAR.getKeyRange(QueryConstants.SEPARATOR_BYTE_ARRAY)); private WhereOptimizer() { } @@ -144,22 +145,21 @@ public static Expression pushKeyExpressionsToScan(StatementContext context, Expr break; } } - ScanRanges range; + RowKeySchema schema = table.getRowKeySchema(); + List> ranges = cnf; if (table.getBucketNum() != null) { - if (ScanUtil.isAllSingleRowScan(cnf, table.getRowKeySchema(), false)) { - List> expandedRanges = SaltingUtil.expandScanRangesToSaltedKeyRange( - cnf, table.getRowKeySchema(), table.getBucketNum()); - range = ScanRanges.create(expandedRanges, SaltingUtil.BINARY_SCHEMA); - } else { - if (!cnf.isEmpty()) { + if (!cnf.isEmpty()) { + // If we have all single keys, we can optimize by adding the salt byte up front + if (ScanUtil.isAllSingleRowScan(cnf, table.getRowKeySchema())) { + cnf.addFirst(SALT_PLACEHOLDER); + ranges = SaltingUtil.flattenRanges(cnf, table.getRowKeySchema(), table.getBucketNum()); + schema = SaltingUtil.BINARY_SCHEMA; + } else { cnf.addFirst(SaltingUtil.generateAllSaltingRanges(table.getBucketNum())); } - range = ScanRanges.create(cnf, table.getRowKeySchema()); } - } else { - range = ScanRanges.create(cnf, table.getRowKeySchema()); } - context.setScanRanges(range); + context.setScanRanges(ScanRanges.create(ranges, schema)); return whereClause.accept(new RemoveExtractedNodesVisitor(extractNodes)); } diff --git a/src/main/java/com/salesforce/phoenix/jdbc/PhoenixEmbeddedDriver.java b/src/main/java/com/salesforce/phoenix/jdbc/PhoenixEmbeddedDriver.java index 5dbeeaaf..fa4291af 100644 --- a/src/main/java/com/salesforce/phoenix/jdbc/PhoenixEmbeddedDriver.java +++ b/src/main/java/com/salesforce/phoenix/jdbc/PhoenixEmbeddedDriver.java @@ -78,7 +78,7 @@ public QueryServices getQueryServices() { } protected static ConnectionInfo getConnectionInfo(String url) throws SQLException { - StringTokenizer tokenizer = new StringTokenizer(url.substring(PhoenixRuntime.JDBC_PROTOCOL.length()),DELIMITERS, true); + StringTokenizer tokenizer = new StringTokenizer(url == null ? "" : url.substring(PhoenixRuntime.JDBC_PROTOCOL.length()),DELIMITERS, true); int i = 0; boolean isMalformedUrl = false; String[] tokens = new String[3]; diff --git a/src/main/java/com/salesforce/phoenix/schema/PTableImpl.java b/src/main/java/com/salesforce/phoenix/schema/PTableImpl.java index 112eceab..69c26e9c 100644 --- a/src/main/java/com/salesforce/phoenix/schema/PTableImpl.java +++ b/src/main/java/com/salesforce/phoenix/schema/PTableImpl.java @@ -193,6 +193,12 @@ public int newKey(ImmutableBytesWritable key, byte[][] values) { int i = 0; TrustedByteArrayOutputStream os = new TrustedByteArrayOutputStream(SchemaUtil.estimateKeyLength(this)); try { + Integer bucketNum = this.getBucketNum(); + if (bucketNum != null) { + // Write place holder for salt byte + i++; + os.write(QueryConstants.SEPARATOR_BYTE_ARRAY); + } List columns = getPKColumns(); int nColumns = columns.size(); PColumn lastPKColumn = columns.get(nColumns - 1); @@ -239,7 +245,12 @@ public int newKey(ImmutableBytesWritable key, byte[][] values) { os.write(SEPARATOR_BYTE); } } - key.set(os.getBuffer(),0,os.size()); + byte[] buf = os.getBuffer(); + int size = os.size(); + if (bucketNum != null) { + buf[0] = SaltingUtil.getSaltingByte(buf, 1, size, bucketNum); + } + key.set(buf,0,size); return i; } finally { try { diff --git a/src/main/java/com/salesforce/phoenix/schema/SaltingUtil.java b/src/main/java/com/salesforce/phoenix/schema/SaltingUtil.java index 7bce2df0..82f50fc1 100644 --- a/src/main/java/com/salesforce/phoenix/schema/SaltingUtil.java +++ b/src/main/java/com/salesforce/phoenix/schema/SaltingUtil.java @@ -128,27 +128,27 @@ private static int hashCode(byte a[], int offset, int length) { return result; } - public static List> expandScanRangesToSaltedKeyRange(List> ranges, RowKeySchema schema, int bucketNum) { + public static List> flattenRanges(List> ranges, RowKeySchema schema, int bucketNum) { if (ranges == null || ranges.isEmpty()) { return ScanRanges.NOTHING.getRanges(); } int count = 1; - for (List orRanges: ranges) { - count *= orRanges.size(); + // Skip salt byte range in the first position + for (int i = 1; i < ranges.size(); i++) { + count *= ranges.get(i).size(); } KeyRange[] expandedRanges = new KeyRange[count]; int[] position = new int[ranges.size()]; int estimatedKeyLength = ScanUtil.estimateMaximumKeyLength(schema, 1, ranges); int idx = 0, length; byte saltByte; - byte[] key = new byte[estimatedKeyLength + 1]; + byte[] key = new byte[estimatedKeyLength]; do { - length = ScanUtil.setKey(schema, ranges, position, Bound.LOWER, key, 1, 0, ranges.size(), 1); + length = ScanUtil.setKey(schema, ranges, position, Bound.LOWER, key, 1, 1, ranges.size(), 1); saltByte = SaltingUtil.getSaltingByte(key, 1, length, bucketNum); key[0] = saltByte; - KeyRange range = KeyRange.getKeyRange( - Arrays.copyOf(key, length + 1), true, - Arrays.copyOf(key, length + 1), true); + byte[] saltedKey = Arrays.copyOf(key, length + 1); + KeyRange range = PDataType.VARBINARY.getKeyRange(saltedKey, true, saltedKey, true); expandedRanges[idx++] = range; } while (incrementKey(ranges, position)); // The comparator is imperfect, but sufficient for all single keys. diff --git a/src/main/java/com/salesforce/phoenix/util/ResultUtil.java b/src/main/java/com/salesforce/phoenix/util/ResultUtil.java index 1b8e8415..6e14d1e2 100644 --- a/src/main/java/com/salesforce/phoenix/util/ResultUtil.java +++ b/src/main/java/com/salesforce/phoenix/util/ResultUtil.java @@ -55,7 +55,8 @@ public static ImmutableBytesWritable getKey(Result r) { } public static void getKey(Result r, ImmutableBytesWritable key) { - key.set(getRawBytes(r), getKeyOffset(r), getKeyLength(r)); + key.set(r.getRow()); + //key.set(getRawBytes(r), getKeyOffset(r), getKeyLength(r)); } public static void getKey(KeyValue value, ImmutableBytesWritable key) { diff --git a/src/main/java/com/salesforce/phoenix/util/ScanUtil.java b/src/main/java/com/salesforce/phoenix/util/ScanUtil.java index 18b6f1f8..e28e5684 100644 --- a/src/main/java/com/salesforce/phoenix/util/ScanUtil.java +++ b/src/main/java/com/salesforce/phoenix/util/ScanUtil.java @@ -303,11 +303,12 @@ public static int setKey(RowKeySchema schema, List> slots, int[] return offset - byteOffset; } - public static boolean isAllSingleRowScan(List> ranges, RowKeySchema schema, boolean rangesWithSaltByte) { - if (schema == null || ranges.size() < (rangesWithSaltByte ? schema.getMaxFields() : schema.getMaxFields() - 1)) { + public static boolean isAllSingleRowScan(List> ranges, RowKeySchema schema) { + if (ranges.size() < schema.getMaxFields()) { return false; } - for (List orRanges : ranges) { + for (int i = 0; i < ranges.size(); i++) { + List orRanges = ranges.get(i); for (KeyRange range: orRanges) { if (!range.isSingleKey()) { return false; diff --git a/src/test/java/com/salesforce/phoenix/jdbc/PhoenixTestDriver.java b/src/test/java/com/salesforce/phoenix/jdbc/PhoenixTestDriver.java index 02834b87..f4cf8f17 100644 --- a/src/test/java/com/salesforce/phoenix/jdbc/PhoenixTestDriver.java +++ b/src/test/java/com/salesforce/phoenix/jdbc/PhoenixTestDriver.java @@ -27,9 +27,13 @@ ******************************************************************************/ package com.salesforce.phoenix.jdbc; +import static com.salesforce.phoenix.query.QueryServicesOptions.withDefaults; + import java.sql.SQLException; import java.util.Properties; +import org.apache.hadoop.hbase.HBaseConfiguration; + import com.salesforce.phoenix.end2end.ConnectionQueryServicesTestImpl; import com.salesforce.phoenix.query.*; import com.salesforce.phoenix.util.PhoenixRuntime; @@ -45,17 +49,14 @@ * @since 0.1 */ public class PhoenixTestDriver extends PhoenixEmbeddedDriver { - private final ConnectionQueryServices queryServices; + private ConnectionQueryServices queryServices; - public PhoenixTestDriver(QueryServices services, String url, Properties props) throws SQLException { + public PhoenixTestDriver() { + this(new QueryServicesTestImpl(withDefaults(HBaseConfiguration.create()))); + } + + public PhoenixTestDriver(QueryServices services) { super(services); - ConnectionInfo connInfo = getConnectionInfo(url); - if (PhoenixRuntime.CONNECTIONLESS.equals(connInfo.getZookeeperQuorum())) { - queryServices = new ConnectionlessQueryServicesImpl(services); - } else { - queryServices = new ConnectionQueryServicesTestImpl(services, services.getConfig()); - } - queryServices.init(url, props); } @Override @@ -66,6 +67,17 @@ public boolean acceptsURL(String url) throws SQLException { @Override // public for testing public ConnectionQueryServices getConnectionQueryServices(String url, Properties info) throws SQLException { + if (queryServices != null) { + return queryServices; + } + QueryServices services = getQueryServices(); + ConnectionInfo connInfo = getConnectionInfo(url); + if (PhoenixRuntime.CONNECTIONLESS.equals(connInfo.getZookeeperQuorum())) { + queryServices = new ConnectionlessQueryServicesImpl(services); + } else { + queryServices = new ConnectionQueryServicesTestImpl(services, services.getConfig()); + } + queryServices.init(url, info); return queryServices; } diff --git a/src/test/java/com/salesforce/phoenix/query/BaseTest.java b/src/test/java/com/salesforce/phoenix/query/BaseTest.java index e72d33e3..add4f190 100644 --- a/src/test/java/com/salesforce/phoenix/query/BaseTest.java +++ b/src/test/java/com/salesforce/phoenix/query/BaseTest.java @@ -234,15 +234,15 @@ public static long nextTimestamp() { protected static PhoenixTestDriver driver; private static int driverRefCount = 0; - protected static synchronized void initDriver(QueryServices services, String url) throws Exception { + protected static synchronized PhoenixTestDriver initDriver(QueryServices services) throws Exception { if (driver == null) { if (driverRefCount == 0) { - BaseTest.driver = new PhoenixTestDriver(services, url, TEST_PROPERTIES); + BaseTest.driver = new PhoenixTestDriver(services); DriverManager.registerDriver(driver); - assertTrue(DriverManager.getDriver(url) == driver); driverRefCount++; } } + return BaseTest.driver; } // We need to deregister an already existing driver in order @@ -271,7 +271,9 @@ protected static synchronized void destroyDriver() { } protected static void startServer(String url) throws Exception { - initDriver(new QueryServicesTestImpl(), url); + PhoenixTestDriver driver = initDriver(new QueryServicesTestImpl()); + assertTrue(DriverManager.getDriver(url) == driver); + driver.connect(url, TEST_PROPERTIES); } protected static void stopServer() throws Exception {