Skip to content

Commit

Permalink
Merge pull request #259 from OpenKitten/feature/async-await
Browse files Browse the repository at this point in the history
Async/await helpers
  • Loading branch information
Joannis authored Oct 26, 2021
2 parents 22352b5 + 9171d24 commit 7a9d7b4
Show file tree
Hide file tree
Showing 12 changed files with 922 additions and 37 deletions.
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ let package = Package(
dependencies: ["MongoClient"]),
.target(
name: "MongoKitten",
dependencies: ["MongoClient", "MongoKittenCore"]),
dependencies: ["MongoClient", "MongoKittenCore", "NIOCore"]),
.target(
name: "Meow",
dependencies: ["MongoKitten"]),
Expand Down
175 changes: 175 additions & 0 deletions Sources/Meow/AsyncAwait.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
#if compiler(>=5.5) && canImport(_Concurrency)
import NIOCore
import NIO
import MongoClient
import MongoKitten

@available(macOS 12, iOS 15, watchOS 8, tvOS 15, *)
extension MeowDatabase {
public struct Async {
public let nio: MeowDatabase

public var raw: MongoDatabase.Async {
nio.raw.async
}

init(nio: MeowDatabase) {
self.nio = nio
}

public var name: String { nio.raw.name }

public func collection<M: BaseModel>(for model: M.Type) -> MeowCollection<M>.Async {
return MeowCollection<M>(database: nio, named: M.collectionName).async
}

public subscript<M: BaseModel>(type: M.Type) -> MeowCollection<M>.Async {
return collection(for: type)
}
}

public var `async`: Async {
Async(nio: self)
}
}

@available(macOS 12, iOS 15, watchOS 8, tvOS 15, *)
extension MeowCollection {
public struct Async {
public let nio: MeowCollection<M>

init(nio: MeowCollection<M>) {
self.nio = nio
}

public var raw: MongoCollection.Async {
nio.raw.async
}
}

public var `async`: Async {
Async(nio: self)
}
}

@available(macOS 12, iOS 15, watchOS 8, tvOS 15, *)
extension MeowCollection.Async where M: ReadableModel {
public func find(where filter: Document = [:]) -> MappedCursor<FindQueryBuilder, M> {
return nio.find(where: filter)
}

public func find<Q: MongoKittenQuery>(where filter: Q) -> MappedCursor<FindQueryBuilder, M> {
return self.find(where: filter.makeDocument())
}

public func findOne(where filter: Document) async throws -> M? {
return try await nio.findOne(where: filter).get()
}

public func findOne<Q: MongoKittenQuery>(where filter: Q) async throws -> M? {
return try await nio.findOne(where: filter).get()
}

public func count(where filter: Document) async throws -> Int {
return try await nio.count(where: filter).get()
}

public func count<Q: MongoKittenQuery>(where filter: Q) async throws -> Int {
return try await self.count(where: filter.makeDocument())
}

public func watch(options: ChangeStreamOptions = .init()) async throws -> ChangeStream<M> {
return try await nio.watch(options: options).get()
}

public func buildChangeStream(options: ChangeStreamOptions = .init(), @AggregateBuilder build: () -> AggregateBuilderStage) async throws -> ChangeStream<M> {
return try await nio.buildChangeStream(options: options, build: build).get()
}
}

@available(macOS 12, iOS 15, watchOS 8, tvOS 15, *)
extension MutableModel {
@discardableResult
public func save(in database: MeowDatabase.Async) async throws -> MeowOperationResult {
try await self.save(in: database.nio).get()
}
}

@available(macOS 12, iOS 15, watchOS 8, tvOS 15, *)
extension MeowCollection.Async where M: MutableModel {
@discardableResult
public func insert(_ instance: M) async throws -> InsertReply {
return try await nio.insert(instance).get()
}

@discardableResult
public func insertMany(_ instances: [M]) async throws -> InsertReply {
return try await nio.insertMany(instances).get()
}

@discardableResult
public func upsert(_ instance: M) async throws -> UpdateReply {
return try await nio.upsert(instance).get()
}

@discardableResult
public func deleteOne(where filter: Document) async throws -> DeleteReply {
return try await nio.deleteOne(where: filter).get()
}

@discardableResult
public func deleteOne<Q: MongoKittenQuery>(where filter: Q) async throws -> DeleteReply {
return try await nio.deleteOne(where: filter).get()
}

@discardableResult
public func deleteAll(where filter: Document) async throws -> DeleteReply {
return try await nio.deleteAll(where: filter).get()
}

@discardableResult
public func deleteAll<Q: MongoKittenQuery>(where filter: Q) async throws -> DeleteReply {
return try await nio.deleteAll(where: filter).get()
}

// public func saveChanges(_ changes: PartialChange<M>) -> EventLoopFuture<UpdateReply> {
// return raw.updateOne(where: "_id" == changes.entity, to: [
// "$set": changes.changedFields,
// "$unset": changes.removedFields
// ])
// }
}

@available(macOS 12, iOS 15, watchOS 8, tvOS 15, *)
extension Reference {
/// Resolves a reference
public func resolve(in db: MeowDatabase.Async, where query: Document = Document()) async throws -> M {
try await resolve(in: db.nio, where: query).get()
}

/// Resolves a reference, returning `nil` if the referenced object cannot be found
public func resolveIfPresent(in db: MeowDatabase.Async, where query: Document = Document()) async throws -> M? {
try await resolveIfPresent(in: db.nio, where: query).get()
}

public func exists(in db: MeowDatabase.Async) async throws -> Bool {
return try await exists(in: db.nio).get()
}

public func exists(in db: MeowDatabase.Async, where filter: Document) async throws -> Bool {
return try await exists(in: db.nio, where: filter).get()
}

public func exists<Query: MongoKittenQuery>(in db: MeowDatabase.Async, where filter: Query) async throws -> Bool {
return try await exists(in: db.nio, where: filter).get()
}
}

@available(macOS 12, iOS 15, watchOS 8, tvOS 15, *)
extension Reference where M: MutableModel {
@discardableResult
public func deleteTarget(in context: MeowDatabase) async throws -> MeowOperationResult {
try await deleteTarget(in: context).get()
}
}
#endif
5 changes: 4 additions & 1 deletion Sources/MongoClient/Cluster.swift
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,10 @@ public final class MongoCluster: MongoConnectionPool {
var handshakes = [EventLoopFuture<Void>]()

for pooledConnection in pool {
let handshake = pooledConnection.connection.doHandshake(clientDetails: nil, credentials: settings.authentication)
let handshake = pooledConnection.connection.doHandshake(
clientDetails: nil,
credentials: settings.authentication
)
handshake.whenFailure { _ in
self.discoveredHosts.remove(pooledConnection.host)
}
Expand Down
15 changes: 14 additions & 1 deletion Sources/MongoClient/Cursor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import MongoCore
public final class MongoCursor {
public private(set) var id: Int64
private var initialBatch: [Document]?
internal let closePromise: EventLoopPromise<Void>
public var closeFuture: EventLoopFuture<Void> { closePromise.futureResult }
public var isDrained: Bool {
return self.id == 0
}
Expand All @@ -30,6 +32,7 @@ public final class MongoCursor {
self.connection = connection
self.session = session
self.transaction = transaction
self.closePromise = connection.eventLoop.makePromise()
}

/// Performs a `GetMore` command on the database, requesting the next batch of items
Expand Down Expand Up @@ -67,13 +70,23 @@ public final class MongoCursor {
public func close() -> EventLoopFuture<Void> {
let command = KillCursorsCommand([self.id], inCollection: namespace.collectionName)
self.id = 0
return connection.executeCodable(
let closed = connection.executeCodable(
command,
namespace: namespace,
in: self.transaction,
sessionId: session?.sessionId
).flatMapThrowing { reply -> Void in
try reply.assertOK()
}

closed.whenComplete { [closePromise] _ in
closePromise.succeed(())
}

return closed
}

deinit {
_ = close()
}
}
8 changes: 4 additions & 4 deletions Sources/MongoKitten/AggregateBuilder.swift
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public struct AggregateBuilder {
}

public static func buildBlock(_ content: AggregateBuilderStage) -> AggregateBuilderStage {
return AggregateBuilderStage(documents: content.stages)
return content
}

public static func buildBlock(_ content: AggregateBuilderStage...) -> AggregateBuilderStage {
Expand All @@ -18,18 +18,18 @@ public struct AggregateBuilder {

public static func buildIf(_ content: AggregateBuilderStage?) -> AggregateBuilderStage {
if let content = content {
return AggregateBuilderStage(documents: content.stages)
return content
}

return AggregateBuilderStage(documents: [])
}

public static func buildEither(first: AggregateBuilderStage) -> AggregateBuilderStage {
return AggregateBuilderStage(documents: first.stages)
return first
}

public static func buildEither(second: AggregateBuilderStage) -> AggregateBuilderStage {
return AggregateBuilderStage(documents: second.stages)
return second
}
}
#elseif swift(>=5.1)
Expand Down
Loading

0 comments on commit 7a9d7b4

Please sign in to comment.