Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
122071: workload/tpcc: annotate statement errors r=nvanbenschoten a=nvanbenschoten

Informs cockroachdb#119511.

This commit updates the tpcc workload to wrap all statement execution errors in context about the statement that was run. This make debugging errors that are always returned by the same statement easier, as we can now tell where the errors are thrown.

Release note: None

122080: teamcity: fix DN validation acceptance test for arm64 CI pipeline r=souravcrl a=souravcrl

Release note: None
Epic: None
fixes: cockroachdb#122036 
fixes: cockroachdb#122031 
Currently TC builds are failing for arm64 acceptance due to cockroach license not being available for one of the
tests(TestDockerCLI_test_distinguished_name_validation) for both main and 24.1 branches.
Release justification: Non-production code changes

Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
Co-authored-by: Sourav Sarangi <sourav.sarangi@cockroachlabs.com>
  • Loading branch information
3 people committed Apr 10, 2024
3 parents 4686438 + ecc79f9 + dac6378 commit c453bd1
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 177 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ $BAZCI --artifacts_dir=$PWD/artifacts -- \
"--test_tmpdir=$ARTIFACTSDIR" \
--test_arg=-l="$ARTIFACTSDIR" \
--test_arg=-b=$PWD/artifacts/cockroach \
--test_env=COCKROACH_DEV_LICENSE \
--test_env=TZ=America/New_York \
--profile=$PWD/artifacts/profile.gz \
--test_timeout=1800 || status=$?
Expand Down
65 changes: 36 additions & 29 deletions pkg/workload/tpcc/delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (del *delivery) run(ctx context.Context, wID int) (interface{}, error) {
// If no matching order is found, the delivery of this order is skipped.
if !errors.Is(err, gosql.ErrNoRows) {
del.config.auditor.skippedDelivieries.Add(1)
return err
return errors.Wrap(err, "select new_order failed")
}
continue
}
Expand All @@ -108,38 +108,41 @@ func (del *delivery) run(ctx context.Context, wID int) (interface{}, error) {
if err := del.sumAmount.QueryRowTx(
ctx, tx, wID, dID, oID,
).Scan(&olTotal); err != nil {
return err
return errors.Wrap(err, "select order_line failed")
}
dIDolTotalPairs[dID] = olTotal
}
dIDoIDPairsStr := makeInTuples(dIDoIDPairs)

rows, err := tx.Query(
ctx,
fmt.Sprintf(`
UPDATE "order"
SET o_carrier_id = %d
WHERE o_w_id = %d AND (o_d_id, o_id) IN (%s)
RETURNING o_d_id, o_c_id`,
oCarrierID, wID, dIDoIDPairsStr,
),
)
if err != nil {
return err
}
dIDcIDPairs := make(map[int]int)
for rows.Next() {
var dID, oCID int
if err := rows.Scan(&dID, &oCID); err != nil {
rows.Close()
err := func() error {
rows, err := tx.Query(
ctx,
fmt.Sprintf(`
UPDATE "order"
SET o_carrier_id = %d
WHERE o_w_id = %d AND (o_d_id, o_id) IN (%s)
RETURNING o_d_id, o_c_id`,
oCarrierID, wID, dIDoIDPairsStr,
),
)
if err != nil {
return err
}
dIDcIDPairs[dID] = oCID
}
if err := rows.Err(); err != nil {
return err
defer rows.Close()

for rows.Next() {
var dID, oCID int
if err := rows.Scan(&dID, &oCID); err != nil {
return err
}
dIDcIDPairs[dID] = oCID
}
return rows.Err()
}()
if err != nil {
return errors.Wrap(err, "update order failed")
}
rows.Close()

if err := checkSameKeys(dIDoIDPairs, dIDcIDPairs); err != nil {
return err
Expand All @@ -157,8 +160,9 @@ func (del *delivery) run(ctx context.Context, wID int) (interface{}, error) {
dIDToOlTotalStr, wID, dIDcIDPairsStr,
),
); err != nil {
return err
return errors.Wrap(err, "update customer failed")
}

if _, err := tx.Exec(
ctx,
fmt.Sprintf(`
Expand All @@ -167,19 +171,22 @@ func (del *delivery) run(ctx context.Context, wID int) (interface{}, error) {
wID, dIDoIDPairsStr,
),
); err != nil {
return err
return errors.Wrap(err, "delete new_order failed")
}

_, err = tx.Exec(
if _, err := tx.Exec(
ctx,
fmt.Sprintf(`
UPDATE order_line
SET ol_delivery_d = '%s'
WHERE ol_w_id = %d AND (ol_d_id, ol_o_id) IN (%s)`,
olDeliveryD.Format("2006-01-02 15:04:05"), wID, dIDoIDPairsStr,
),
)
return err
); err != nil {
return errors.Wrap(err, "update order_line failed")
}

return nil
})
return nil, err
}
Expand Down
200 changes: 102 additions & 98 deletions pkg/workload/tpcc/new_order.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,22 +218,22 @@ func (n *newOrder) run(ctx context.Context, wID int) (interface{}, error) {
if err := n.updateDistrict.QueryRowTx(
ctx, tx, d.wID, d.dID,
).Scan(&d.dTax, &dNextOID); err != nil {
return err
return errors.Wrap(err, "update district failed")
}
d.oID = dNextOID - 1

// Select the warehouse tax rate.
if err := n.selectWarehouseTax.QueryRowTx(
ctx, tx, wID,
).Scan(&d.wTax); err != nil {
return err
return errors.Wrap(err, "select warehouse failed")
}

// Select the customer's discount, last name and credit.
if err := n.selectCustomerInfo.QueryRowTx(
ctx, tx, d.wID, d.dID, d.cID,
).Scan(&d.cDiscount, &d.cLast, &d.cCredit); err != nil {
return err
return errors.Wrap(err, "select customer failed")
}

// 2.4.2.2: For each o_ol_cnt item in the order, query the relevant item
Expand All @@ -243,135 +243,139 @@ func (n *newOrder) run(ctx context.Context, wID int) (interface{}, error) {
for i, item := range d.items {
itemIDs[i] = fmt.Sprint(item.olIID)
}
rows, err := tx.Query(
ctx,
fmt.Sprintf(`
SELECT i_price, i_name, i_data
FROM item
WHERE i_id IN (%[1]s)
ORDER BY i_id`,
strings.Join(itemIDs, ", "),
),
)
if err != nil {
return err
}
iDatas := make([]string, d.oOlCnt)
for i := range d.items {
item := &d.items[i]
iData := &iDatas[i]
iIDs := strings.Join(itemIDs, ", ")
err := func() error {
rows, err := tx.Query(
ctx,
fmt.Sprintf(`
SELECT i_price, i_name, i_data
FROM item
WHERE i_id IN (%[1]s)
ORDER BY i_id`,
iIDs,
),
)
if err != nil {
return err
}
defer rows.Close()

for i := range d.items {
item := &d.items[i]
iData := &iDatas[i]

if !rows.Next() {
if err := rows.Err(); err != nil {
return err
}
if rollback {
// 2.4.2.3: roll back when we're expecting a rollback due to
// simulated user error (invalid item id) and we actually
// can't find the item. The spec requires us to actually go
// to the database for this, even though we know earlier
// that the item has an invalid number.
n.config.auditor.newOrderRollbacks.Add(1)
return errSimulated
}
return errors.New("missing item row")
}

if !rows.Next() {
if err := rows.Err(); err != nil {
if err := rows.Scan(&item.iPrice, &item.iName, iData); err != nil {
return err
}
if rollback {
// 2.4.2.3: roll back when we're expecting a rollback due to
// simulated user error (invalid item id) and we actually
// can't find the item. The spec requires us to actually go
// to the database for this, even though we know earlier
// that the item has an invalid number.
n.config.auditor.newOrderRollbacks.Add(1)
return errSimulated
}
return errors.New("missing item row")
}

err = rows.Scan(&item.iPrice, &item.iName, iData)
if err != nil {
rows.Close()
return err
if rows.Next() {
return errors.New("extra item row")
}
return rows.Err()
}()
if err != nil {
return errors.Wrap(err, "select item failed")
}
if rows.Next() {
return errors.New("extra item row")
}
if err := rows.Err(); err != nil {
return err
}
rows.Close()

stockIDs := make([]string, d.oOlCnt)
for i, item := range d.items {
stockIDs[i] = fmt.Sprintf("(%d, %d)", item.olIID, item.olSupplyWID)
}
rows, err = tx.Query(
ctx,
fmt.Sprintf(`
SELECT s_quantity, s_ytd, s_order_cnt, s_remote_cnt, s_data, s_dist_%02[1]d
FROM stock
WHERE (s_i_id, s_w_id) IN (%[2]s)
ORDER BY s_i_id
FOR UPDATE`,
d.dID, strings.Join(stockIDs, ", "),
),
)
if err != nil {
return err
}
distInfos := make([]string, d.oOlCnt)
sQuantityUpdateCases := make([]string, d.oOlCnt)
sYtdUpdateCases := make([]string, d.oOlCnt)
sOrderCntUpdateCases := make([]string, d.oOlCnt)
sRemoteCntUpdateCases := make([]string, d.oOlCnt)
for i := range d.items {
item := &d.items[i]
if err := func() error {
rows, err := tx.Query(
ctx,
fmt.Sprintf(`
SELECT s_quantity, s_ytd, s_order_cnt, s_remote_cnt, s_data, s_dist_%02[1]d
FROM stock
WHERE (s_i_id, s_w_id) IN (%[2]s)
ORDER BY s_i_id
FOR UPDATE`,
d.dID, strings.Join(stockIDs, ", "),
),
)
if err != nil {
return err
}
defer rows.Close()

for i := range d.items {
item := &d.items[i]

if !rows.Next() {
if err := rows.Err(); err != nil {
return err
}
return errors.New("missing stock row")
}

if !rows.Next() {
if err := rows.Err(); err != nil {
var sQuantity, sYtd, sOrderCnt, sRemoteCnt int
var sData string
if err := rows.Scan(&sQuantity, &sYtd, &sOrderCnt, &sRemoteCnt, &sData, &distInfos[i]); err != nil {
return err
}
return errors.New("missing stock row")
}

var sQuantity, sYtd, sOrderCnt, sRemoteCnt int
var sData string
err = rows.Scan(&sQuantity, &sYtd, &sOrderCnt, &sRemoteCnt, &sData, &distInfos[i])
if err != nil {
rows.Close()
return err
}
if strings.Contains(sData, originalString) && strings.Contains(iDatas[i], originalString) {
item.brandGeneric = "B"
} else {
item.brandGeneric = "G"
}

if strings.Contains(sData, originalString) && strings.Contains(iDatas[i], originalString) {
item.brandGeneric = "B"
} else {
item.brandGeneric = "G"
}
newSQuantity := sQuantity - item.olQuantity
if sQuantity < item.olQuantity+10 {
newSQuantity += 91
}

newSQuantity := sQuantity - item.olQuantity
if sQuantity < item.olQuantity+10 {
newSQuantity += 91
}
newSRemoteCnt := sRemoteCnt
if item.remoteWarehouse {
newSRemoteCnt++
}

newSRemoteCnt := sRemoteCnt
if item.remoteWarehouse {
newSRemoteCnt++
sQuantityUpdateCases[i] = fmt.Sprintf("WHEN %s THEN %d", stockIDs[i], newSQuantity)
sYtdUpdateCases[i] = fmt.Sprintf("WHEN %s THEN %d", stockIDs[i], sYtd+item.olQuantity)
sOrderCntUpdateCases[i] = fmt.Sprintf("WHEN %s THEN %d", stockIDs[i], sOrderCnt+1)
sRemoteCntUpdateCases[i] = fmt.Sprintf("WHEN %s THEN %d", stockIDs[i], newSRemoteCnt)
}

sQuantityUpdateCases[i] = fmt.Sprintf("WHEN %s THEN %d", stockIDs[i], newSQuantity)
sYtdUpdateCases[i] = fmt.Sprintf("WHEN %s THEN %d", stockIDs[i], sYtd+item.olQuantity)
sOrderCntUpdateCases[i] = fmt.Sprintf("WHEN %s THEN %d", stockIDs[i], sOrderCnt+1)
sRemoteCntUpdateCases[i] = fmt.Sprintf("WHEN %s THEN %d", stockIDs[i], newSRemoteCnt)
}
if rows.Next() {
return errors.New("extra stock row")
}
if err := rows.Err(); err != nil {
return err
if rows.Next() {
return errors.New("extra stock row")
}
return rows.Err()
}(); err != nil {
return errors.Wrap(err, "select stock failed")
}
rows.Close()

// Insert row into the orders and new orders table.
if _, err := n.insertOrder.ExecTx(
ctx, tx,
d.oID, d.dID, d.wID, d.cID, d.oEntryD.Format("2006-01-02 15:04:05"), d.oOlCnt, allLocal,
); err != nil {
return err
return errors.Wrap(err, "insert order failed")
}
if _, err := n.insertNewOrder.ExecTx(
ctx, tx, d.oID, d.dID, d.wID,
); err != nil {
return err
return errors.Wrap(err, "insert new_order failed")
}

// Update the stock table for each item.
Expand All @@ -392,7 +396,7 @@ func (n *newOrder) run(ctx context.Context, wID int) (interface{}, error) {
strings.Join(stockIDs, ", "),
),
); err != nil {
return err
return errors.Wrap(err, "update stock failed")
}

// Insert a new order line for each item in the order.
Expand Down Expand Up @@ -422,7 +426,7 @@ func (n *newOrder) run(ctx context.Context, wID int) (interface{}, error) {
strings.Join(olValsStrings, ", "),
),
); err != nil {
return err
return errors.Wrap(err, "insert order_line failed")
}

// 2.4.2.2: total_amount = sum(OL_AMOUNT) * (1 - C_DISCOUNT) * (1 + W_TAX + D_TAX)
Expand Down
Loading

0 comments on commit c453bd1

Please sign in to comment.