diff --git a/src/main/java/com/yugabyte/sample/apps/SqlBankTransfers.java b/src/main/java/com/yugabyte/sample/apps/SqlBankTransfers.java new file mode 100644 index 0000000..b15c0d4 --- /dev/null +++ b/src/main/java/com/yugabyte/sample/apps/SqlBankTransfers.java @@ -0,0 +1,232 @@ +// Copyright (c) YugabyteDB, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +package com.yugabyte.sample.apps; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.concurrent.atomic.AtomicLong; + +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.log4j.Logger; + +/* + * Money transfers across bank accounts is a common usecase for a OLTP + * database. Transfers are a commonly used example for discussing + * transactions in databases because of its strong requirements on + * consistency guarantees. + * + * We want to simulate money transfers. The most important constraint here + * is that the total amount of money across all accounts should remain + * invariant. However, aggregating money across all accounts involves + * a full table scan and this exposes the query to read restarts. + * + * This app helps understand whether the new clockbound clock + * helps improve the performance of this workload. + * + * Database Configuration: + * configure with wallclock and compare the metrics with + * a clockbound clock configuration. + * + * Setup: + * 1. Create a bank_accounts TABLE with columns (account_id INT, balance INT). + * 2. Insert 1000 accounts with account_id 0 to 999 initialized to 1000. + * + * Workload: + * There are two main operations in this workload: + * a. Transfer: Transfers a random amount money from one account to another. + * The amount must be <= the balance of the source account. + * b. Verify: Verifies that the total amount of money across all accounts + * is 1000 * 1000. + * + * Transfer Operation: + * 1. Pick a sender and a receiver pair at random (they must be different). + * 2. Start a repeatable read transaction. + * 3. Query the account balance of the sender. + * 4. If the balance is zero, abort the transaction. + * 5. Pick a random amount [1, balance]. + * 6. Decrement the balance of the sender by the amount. + * 7. Increment the balance of the receiver by the amount. + * 8. Commit the transaction. + * + * Verify Operation: + * 1. Sum the balances of all accounts. + * 2. Verify that the sum is 1000 * 1000. + */ +public class SqlBankTransfers extends AppBase { + private static final Logger LOG = Logger.getLogger(SqlBankTransfers.class); + + // Static initialization of this app's config. + static { + // Only use 10 writer threads to avoid overloading the system. + // In real life, there are many more threads but there are other + // things to do too. + appConfig.readIOPSPercentage = -1; + appConfig.numReaderThreads = 1; + appConfig.numWriterThreads = 10; + // Disable number of keys. + appConfig.numKeysToRead = -1; + appConfig.numKeysToWrite = -1; + // Run the app for 1 minute. + appConfig.runTimeSeconds = 60; + // Report restart read requests metric by default. + appConfig.restartReadsReported = true; + // Avoid load balancing errors. + appConfig.loadBalance = false; + appConfig.disableYBLoadBalancingPolicy = true; + } + + // The default table name to create and use for ops. + private static final String DEFAULT_TABLE_NAME = "bank_accounts"; + + // The number of accounts in the bank. + private static final int NUM_ACCOUNTS = 1000; + + // Initial balance of each account. + private static final int INIT_BALANCE = 1000; + + // Shared counter to store the number of inconsistent reads. + private static AtomicLong numInconsistentReads = new AtomicLong(0); + + @Override + public void createTablesIfNeeded(TableOp tableOp) throws Exception { + try (Connection connection = getPostgresConnection()) { + // (Re)Create the table + // Every run should start cleanly with an empty table. + connection.createStatement().execute( + String.format("DROP TABLE IF EXISTS %s", getTableName())); + LOG.info("Dropping any table(s) left from previous runs if any"); + connection.createStatement().execute(String.format( + "CREATE TABLE %s (account_id INT, balance INT)", + getTableName())); + LOG.info(String.format("Created table: %s", getTableName())); + int numRows = connection.createStatement().executeUpdate(String.format( + "INSERT INTO %s SELECT GENERATE_SERIES(0, %d-1), %d", + getTableName(), NUM_ACCOUNTS, INIT_BALANCE)); + LOG.info(String.format( + "Inserted %d rows into %s", numRows, getTableName())); + } + } + + @Override + public String getTableName() { + String tableName = appConfig.tableName != null ? + appConfig.tableName : DEFAULT_TABLE_NAME; + return tableName.toLowerCase(); + } + + // Executes the Verify operation. + @Override + public long doRead() { + try (Connection connection = getPostgresConnection(); + Statement statement = connection.createStatement()) { + try { + ResultSet resultSet = statement.executeQuery(String.format( + "SELECT SUM(balance) FROM %s", getTableName())); + if (!resultSet.next()) { + throw new SQLException("No rows returned from sum query"); + } + int totalBalance = resultSet.getInt(1); + + // verify total balance. + if (totalBalance != NUM_ACCOUNTS * INIT_BALANCE) { + LOG.error(String.format("Total balance is %d", totalBalance)); + numInconsistentReads.incrementAndGet(); + } + } catch (Exception e) { + LOG.error("Error verifying balances ", e); + } + } catch (Exception e) { + LOG.error("Error creating a connection ", e); + } + return 1; + } + + // Executes the Transfer operation. + @Override + public long doWrite(int threadIdx) { + // Pick two random distinct accounts. + int sender = ThreadLocalRandom.current().nextInt(NUM_ACCOUNTS); + int receiver; + do { + receiver = ThreadLocalRandom.current().nextInt(NUM_ACCOUNTS); + } while (receiver == sender); + + try (Connection connection = getPostgresConnection(); + Statement statement = connection.createStatement()) { + // Start a repeatable read transaction. + connection.setAutoCommit(false); + connection.setTransactionIsolation( + Connection.TRANSACTION_REPEATABLE_READ); + try { + // Retrieve the balance of the sender. + ResultSet rs = statement.executeQuery(String.format( + "SELECT balance FROM %s WHERE account_id = %d", + getTableName(), sender)); + if (!rs.next()) { + throw new SQLException("No row found for account " + sender); + } + int senderBalance = rs.getInt("balance"); + + // If the sender has no money, abort the transaction. + if (senderBalance <= 0) { + if (senderBalance < 0) { + LOG.error(String.format( + "Sender %d has negative balance %d", sender, senderBalance)); + numInconsistentReads.incrementAndGet(); + } + throw new SQLException("Sender has no money"); + } + + // Pick a random amount to transfer [1, sendBalance]. + int amount = ThreadLocalRandom.current().nextInt(1, senderBalance + 1); + + // Decrement the sender's balance. + statement.executeUpdate(String.format( + "UPDATE %s SET balance = balance - %d WHERE account_id = %d", + getTableName(), amount, sender)); + + // Increment the receiver's balance. + statement.executeUpdate(String.format( + "UPDATE %s SET balance = balance + %d WHERE account_id = %d", + getTableName(), amount, receiver)); + + // Commit the transaction. + connection.commit(); + + // Transfer successful. + return 1; + } catch (Exception e) { + LOG.error("Error transferring money ", e); + connection.rollback(); + return 0; + } + } catch (Exception e) { + LOG.error("Error creating a connection ", e); + return 0; + } + } + + /* + * Appends the number of inconsistent reads to the metrics output. + */ + @Override + public void appendMessage(StringBuilder sb) { + sb.append("Inconsistent reads: ").append( + numInconsistentReads.get()).append(" total ops | "); + super.appendMessage(sb); + } +} diff --git a/src/main/java/com/yugabyte/sample/apps/SqlEventCounter.java b/src/main/java/com/yugabyte/sample/apps/SqlEventCounter.java new file mode 100644 index 0000000..267e85e --- /dev/null +++ b/src/main/java/com/yugabyte/sample/apps/SqlEventCounter.java @@ -0,0 +1,202 @@ +// Copyright (c) YugabyteDB, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +// in compliance with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software distributed under the License +// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +// or implied. See the License for the specific language governing permissions and limitations +// under the License. +// + +package com.yugabyte.sample.apps; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.concurrent.atomic.AtomicIntegerArray; +import java.util.concurrent.atomic.AtomicLong; + +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.log4j.Logger; + +/* + * Sometimes, applications want to track the number of times a particular event + * has occurred. Examples include user actions like clicks, purchases or + * page views. + * + * Consider a scenario where the application does NOT want an index on event + * type. In such cases, incrementing the counter requires a table scan. + * Even when the table is small, concurrent updates to unrelated event + * counters can still cause read restarts. + * + * This app helps understand whether the new clockbound clock + * helps improve the performance of this workload. + * + * Database Configuration: + * configure with wallclock and compare the metrics with + * a clockbound clock configuration. + * Not much variance is expected in the metrics. + * + * Setup: + * 1. Create a counters TABLE with columns (event INT, counter INT). + * 2. Insert 1000 counters with event 0 to 999 initialized to zero. + * + * Worklaod: + * We only run write threads, no read threads. + * Each write thread, + * 1. Starts a repeatable read transaction. + * 2. Reads the counter of a random event. + * 3. Verifies that the counter is not stale. + * 4. Increments the counter for the picked event. + * 5. Commits the transaction. + * 6. Updates the latest counter value in the cache. + */ +public class SqlEventCounter extends AppBase { + private static final Logger LOG = Logger.getLogger(SqlEventCounter.class); + + // Static initialization of this app's config. + static { + // Only use 10 writer threads to avoid overloading the system. + // In real life, there are many more threads but there are other + // things to do too. + appConfig.readIOPSPercentage = -1; + appConfig.numReaderThreads = 0; + appConfig.numWriterThreads = 10; + // Disable number of keys. + appConfig.numKeysToRead = -1; + appConfig.numKeysToWrite = -1; + // Run the app for 1 minute. + appConfig.runTimeSeconds = 60; + // Report restart read requests metric by default. + appConfig.restartReadsReported = true; + // Avoid load balancing errors. + appConfig.loadBalance = false; + appConfig.disableYBLoadBalancingPolicy = true; + } + + // The default table name to create and use for ops. + private static final String DEFAULT_TABLE_NAME = "event_counters"; + + // The number of unique events to track. + private static final int NUM_EVENTS = 1000; + + // Contains the latest updated counter indexed by event. + private AtomicIntegerArray counters = new AtomicIntegerArray(NUM_EVENTS); + + // Shared counter to store the number of stale reads. + private static AtomicLong numStaleReads = new AtomicLong(0); + + @Override + public void createTablesIfNeeded(TableOp tableOp) throws Exception { + try (Connection connection = getPostgresConnection()) { + // (Re)Create the table + // Every run should start cleanly with an empty table. + connection.createStatement().execute( + String.format("DROP TABLE IF EXISTS %s", getTableName())); + LOG.info("Dropping any table(s) left from previous runs if any"); + connection.createStatement().execute(String.format( + "CREATE TABLE %s (event INT, counter INT)", + getTableName())); + LOG.info(String.format("Created table: %s", getTableName())); + int numRows = connection.createStatement().executeUpdate(String.format( + "INSERT INTO %s SELECT GENERATE_SERIES(0, %d-1), 0", + getTableName(), NUM_EVENTS)); + LOG.info(String.format( + "Inserted %d rows into %s", numRows, getTableName())); + } + } + + @Override + public String getTableName() { + String tableName = appConfig.tableName != null ? + appConfig.tableName : DEFAULT_TABLE_NAME; + return tableName.toLowerCase(); + } + + @Override + public long doWrite(int threadIdx) { + // Choose a random event to increment. + int event = ThreadLocalRandom.current().nextInt(NUM_EVENTS); + + try (Connection connection = getPostgresConnection(); + Statement statement = connection.createStatement()) { + try { + // Start a repeatable read transaction. + connection.setAutoCommit(false); + connection.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ); + + // Retrieve the latest counter from the cache. + int cachedCounter = counters.get(event); + + // Fetch the current counter value for the event. + ResultSet rs = statement.executeQuery(String.format( + "SELECT counter FROM %s WHERE event = %d", + getTableName(), event)); + if (!rs.next()) { + throw new SQLException("No row found for event " + event); + } + int counter = rs.getInt("counter"); + + // Increment the counter for the event. + counter += 1; + statement.executeUpdate(String.format( + "UPDATE %s SET counter = %d WHERE event = %d", + getTableName(), counter, event)); + + // Commit the transaction. + connection.commit(); + + // Detect a stale read. + // Fetched counter after increment must be greater + // than the cached counter. Otherwise, the read is stale. + if (!(counter > cachedCounter)) { + numStaleReads.incrementAndGet(); + } + + // Update the counter cache as well. + // + // counters tracks the maximum observed counter for each event. + // This helps detect stale reads. + // The new counter may be the new maximum. + // In this case, update the cache. + // + // If the cached counter is higher than or equal to the + // new counter, the new counter is no longer the maximum. Skip. + // + // If the cached counter is lower than the new counter, + // we update the cache to the new counter. Do this + // only if the cache is still at the old value. Otherwise, + // fetch the new cached value and try again. + // This avoids overwriting a higher cached value with counter. + while (cachedCounter < counter && !counters.compareAndSet( + event, cachedCounter, counter)) { + cachedCounter = counters.get(event); + } + + // Counter incremented successfully. + return 1; + } catch (Exception e) { + LOG.error("Failed to increment the counter for event " + event, e); + connection.rollback(); + return 0; + } + } catch (Exception e) { + LOG.error("Failed to create a connection ", e); + return 0; + } + } + + /* + * Appends the number of stale reads to the metrics output. + */ + @Override + public void appendMessage(StringBuilder sb) { + sb.append("Stale reads: ").append(numStaleReads.get()).append(" total ops | "); + super.appendMessage(sb); + } +} diff --git a/src/main/java/com/yugabyte/sample/common/metrics/PromMetrics.java b/src/main/java/com/yugabyte/sample/common/metrics/PromMetrics.java index 9960a83..c90d033 100644 --- a/src/main/java/com/yugabyte/sample/common/metrics/PromMetrics.java +++ b/src/main/java/com/yugabyte/sample/common/metrics/PromMetrics.java @@ -34,7 +34,7 @@ public PromMetrics(List nodes) throws IOException { promContactPoints = new ArrayList<>(); for (InetSocketAddress node : nodes) { promContactPoints.add(String.format( - "https://%s:9000/prometheus-metrics", node.getHostString())); + "http://%s:9000/prometheus-metrics", node.getHostString())); } disableSSLVerification(); }