Skip to content

Commit

Permalink
Merge pull request #294 from orlandos-nl/transaction-crash
Browse files Browse the repository at this point in the history
Adds test for transaction.
  • Loading branch information
Andrewangeta authored Aug 26, 2022
2 parents c82d7be + b23e01e commit 055a858
Show file tree
Hide file tree
Showing 9 changed files with 201 additions and 37 deletions.
49 changes: 32 additions & 17 deletions .github/workflows/swift.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,42 @@ on:
- master/7.0

jobs:
test-linux:
linux:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
dbimage:
- mongo
runner:
- swift:5.6-focal
- swiftlang/swift:nightly-main-focal
container: ${{ matrix.runner }}
runs-on: ubuntu-latest
services:
mongo-a:
image: ${{ matrix.dbimage }}
mongo-b:
image: ${{ matrix.dbimage }}
swiftVersion: [5.6]
mongodb-version: ['4.2', '4.4', '5.0']
steps:
- name: Check out
uses: actions/checkout@v3

- name: Install Swift
uses: slashmo/install-swift@v0.1.0
with:
version: ${{ matrix.swiftVersion }}

- name: Start MongoDB
uses: supercharge/mongodb-github-action@1.7.0
with:
mongodb-version: ${{ matrix.mongodb-version }}
mongodb-replica-set: test-rs

- name: Run tests
run: swift test

macos:
runs-on: macos-12
steps:
- name: Check out
- name: Install docker
uses: docker-practice/actions-setup-docker@master

- name: Check out
uses: actions/checkout@v3

- name: Start Mongo
run: docker-compose up -d

- name: Run tests
run: swift test
env:
MONGO_HOSTNAME_A: mongo-a
MONGO_HOSTNAME_B: mongo-b
4 changes: 4 additions & 0 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ let package = Package(
// 📈
.package(url: "https://github.com/apple/swift-metrics.git", "1.0.0" ..< "3.0.0"),

// ✅
.package(url: "https://github.com/apple/swift-atomics.git", from: "1.0.0"),

// 💾
.package(url: "https://github.com/orlandos-nl/BSON.git", from: "8.0.0"),
// .package(name: "BSON", path: "../BSON"),
Expand All @@ -57,6 +60,7 @@ let package = Package(
.product(name: "NIOFoundationCompat", package: "swift-nio"),
.product(name: "Logging", package: "swift-log"),
.product(name: "Metrics", package: "swift-metrics"),
.product(name: "Atomics", package: "swift-atomics"),
]),
.target(
name: "MongoKittenCore",
Expand Down
4 changes: 2 additions & 2 deletions Sources/MongoClient/Cluster.swift
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public final class MongoCluster: MongoConnectionPool, @unchecked Sendable {
public var slaveOk = false {
didSet {
for connection in pool {
connection.connection.slaveOk.store(self.slaveOk)
connection.connection.slaveOk.store(self.slaveOk, ordering: .relaxed)
}
}
}
Expand Down Expand Up @@ -294,7 +294,7 @@ public final class MongoCluster: MongoConnectionPool, @unchecked Sendable {
resolver: self.dns,
sessionManager: sessionManager
)
connection.slaveOk.store(slaveOk)
connection.slaveOk.store(slaveOk, ordering: .relaxed)

/// Ensures we default to the cluster's lowest version
if let connectionHandshake = await connection.serverHandshake {
Expand Down
7 changes: 4 additions & 3 deletions Sources/MongoClient/Connection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import BSON
import Foundation
import MongoCore
import NIO
import Atomics
import Logging
import Metrics
import NIOConcurrencyHelpers
Expand Down Expand Up @@ -56,7 +57,7 @@ public final actor MongoConnection: @unchecked Sendable {
}

/// The current request ID, used to generate unique identifiers for MongoDB commands
private var currentRequestId: NIOAtomic<Int32> = .makeAtomic(value: 0)
private var currentRequestId = ManagedAtomic<Int32>(0)
internal let context: MongoClientContext
public var serverHandshake: ServerHandshake? {
get async { await context.serverHandshake }
Expand All @@ -69,10 +70,10 @@ public final actor MongoConnection: @unchecked Sendable {
public nonisolated var eventLoop: EventLoop { return channel.eventLoop }
public var allocator: ByteBufferAllocator { return channel.allocator }

public let slaveOk = NIOAtomic<Bool>.makeAtomic(value: false)
public let slaveOk = ManagedAtomic(false)

internal func nextRequestId() -> Int32 {
return currentRequestId.add(1)
return currentRequestId.loadThenWrappingIncrement(ordering: .relaxed)
}

/// Creates a connection that can communicate with MongoDB over a channel
Expand Down
22 changes: 10 additions & 12 deletions Sources/MongoCore/Sessions.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import Foundation
import Atomics
import NIOConcurrencyHelpers
import BSON
import NIO
Expand Down Expand Up @@ -74,25 +75,17 @@ public final class MongoClientSession: @unchecked Sendable {
// }

deinit {
Task { [sessionManager] in
await sessionManager?.releaseSession(serverSession)
}
sessionManager?.releaseSession(serverSession)
}
}

internal final class MongoServerSession: @unchecked Sendable {
internal let sessionId: SessionIdentifier
internal let lastUse: Date
private let transaction: NIOAtomic<Int> = .makeAtomic(value: 1)
private let transaction = ManagedAtomic<Int>(1)

func nextTransactionNumber() -> Int {
defer {
// Overflow to negative will break new transactions
// MongoDB has no solution other than using a different ServerSession
transaction.add(1)
}

return transaction.load()
transaction.loadThenWrappingIncrement(ordering: .relaxed)
}

init(for sessionId: SessionIdentifier) {
Expand All @@ -107,7 +100,8 @@ internal final class MongoServerSession: @unchecked Sendable {
}

/// A LIFO (Last In, First Out) pool of sessions with a MongoDB "cluster" of 1 or more hosts
public final actor MongoSessionManager {
public final class MongoSessionManager: @unchecked Sendable {
private let lock = NSLock()
private var availableSessions = [MongoServerSession]()
private let implicitSession: MongoServerSession
public nonisolated let implicitClientSession: MongoClientSession
Expand All @@ -122,12 +116,16 @@ public final actor MongoSessionManager {
}

internal func releaseSession(_ session: MongoServerSession) {
lock.lock()
defer { lock.unlock() }
self.availableSessions.append(session)
}

/// Retains an existing or generates a new session to MongoDB.
/// The session is returned to the pool when the ClientSession's `deinit` triggers
public func retainSession(with options: MongoSessionOptions) -> MongoClientSession {
lock.lock()
defer { lock.unlock() }
let serverSession: MongoServerSession

if availableSessions.count > 0 {
Expand Down
7 changes: 4 additions & 3 deletions Sources/MongoKitten/MongoDatabase.swift
Original file line number Diff line number Diff line change
Expand Up @@ -125,15 +125,16 @@ public class MongoDatabase {
with options: MongoSessionOptions = .init(),
transactionOptions: MongoTransactionOptions? = nil
) async throws -> MongoTransactionDatabase {
guard await pool.wireVersion?.supportsReplicaTransactions == true else {
let connection = try await pool.next(for: .writable)
guard await connection.wireVersion?.supportsReplicaTransactions == true else {
pool.logger.error("MongoDB transaction not supported by the server")
throw MongoKittenError(.unsupportedFeatureByServer, reason: nil)
}

let newSession = await self.pool.sessionManager.retainSession(with: options)
let newSession = self.pool.sessionManager.retainSession(with: options)
let transaction = newSession.startTransaction(autocommit: autoCommit)

let db = MongoTransactionDatabase(named: name, pool: pool)
let db = MongoTransactionDatabase(named: name, pool: connection)
db.transaction = transaction
db.session = newSession
return db
Expand Down
68 changes: 68 additions & 0 deletions Tests/MongoKittenTests/TransactionTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import NIO
import MongoKitten
import XCTest
import MongoCore

class TransactionTests: XCTestCase {
struct ModelA: Codable {
static let collection = "model_a"
let _id: ObjectId
let value: String
}

struct ModelB: Codable {
static let collection = "model_b"
let _id: ObjectId
let value: String
}

var mongo: MongoDatabase!

override func setUp() async throws {
try await super.setUp()
let mongoSettings = try ConnectionSettings("mongodb://\(ProcessInfo.processInfo.environment["MONGO_HOSTNAME_A"] ?? "localhost")/mongokitten-test")
mongo = try await MongoDatabase.connect(to: mongoSettings)
}

override func tearDown() async throws {
try await super.tearDown()
try await mongo.drop()
}

func test_transaction() async throws {
try await mongo.transaction { db in
try await db[ModelA.collection].insertEncoded(ModelA(_id: .init(), value: UUID().uuidString))
try await db[ModelB.collection].insertEncoded(ModelB(_id: .init(), value: UUID().uuidString))
}
}

func test_backToBackTransaction() async throws {
for _ in 0..<100 {
try await mongo.transaction { db in

try await db[ModelA.collection].insertEncoded(ModelA(_id: .init(), value: UUID().uuidString))
try await db[ModelB.collection].insertEncoded(ModelB(_id: .init(), value: UUID().uuidString))
}
}
}
}


extension MongoDatabase {
func transaction<T>(_ closure: @escaping (MongoDatabase) async throws -> T) async throws -> T {
guard !self.isInTransaction else {
return try await closure(self)
}
let transactionDatabase = try await self.startTransaction(autoCommitChanges: false)

do {
let value = try await closure(transactionDatabase)

try await transactionDatabase.commit()
return value
} catch {
try await transactionDatabase.abort()
throw error
}
}
}
48 changes: 48 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
version: "3"

services:
mongo-1:
image: mongo:5.0
ports:
- "27017:27017"
container_name: mongo-1
hostname: mongo-1
networks:
- mongo_cluster
command: mongod --replSet rs0

mongo-2:
image: mongo:5.0
ports:
- "27018:27017"
container_name: mongo-2
hostname: mongo-2
networks:
- mongo_cluster
command: mongod --replSet rs0
depends_on:
- mongo-1

mongo-3:
image: mongo:5.0
ports:
- "27019:27017"
container_name: mongo-3
hostname: mongo-3
networks:
- mongo_cluster
command: mongod --replSet rs0
depends_on:
- mongo-2

mongosetup:
image: mongo:5.0
networks:
- mongo_cluster
volumes:
- ./scripts:/scripts
command: bash -c "chmod +x /scripts/setup.sh && /scripts/setup.sh"

networks:
mongo_cluster:
driver: bridge
29 changes: 29 additions & 0 deletions scripts/setup.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#!/bin/sh
sleep 5
mongo --host mongo-1:27017 <<EOF
var cfg = {
"_id": "rs0",
"version": 1,
"members": [
{
"_id": 0,
"host": "mongo-1:27017",
"priority": 2
},
{
"_id": 1,
"host": "mongo-2:27017",
"priority": 1
},
{
"_id": 2,
"host": "mongo-3:27017",
"priority": 0,
"arbiterOnly": true
}
]
};
rs.initiate(cfg, { force: true });
rs.reconfig(cfg, { force: true });
rs.status();
EOF

0 comments on commit 055a858

Please sign in to comment.