diff --git a/core/actor/src/main/scala/net/liftweb/actor/LAFuture.scala b/core/actor/src/main/scala/net/liftweb/actor/LAFuture.scala index a176345406..4ade14aa13 100644 --- a/core/actor/src/main/scala/net/liftweb/actor/LAFuture.scala +++ b/core/actor/src/main/scala/net/liftweb/actor/LAFuture.scala @@ -17,6 +17,8 @@ package net.liftweb package actor +import scala.collection.mutable.ArrayBuffer + import common._ @@ -152,15 +154,36 @@ class LAFuture[T](val scheduler: LAScheduler = LAScheduler, context: Box[LAFutur } } + /** + * Java-friendly alias for satisfied_?. + */ + def isSatisfied: Boolean = satisfied_? + /** * Has the future been satisfied */ - def isSatisfied: Boolean = synchronized {satisfied} + def satisfied_? = synchronized {satisfied} + /** + * Java-friendly alias for aborted_?. + */ + def isAborted: Boolean = aborted_? /** * Has the future been aborted */ - def isAborted: Boolean = synchronized {aborted} + def aborted_? = synchronized {satisfied} + + /** + * Java-friendly alias for completed_?. + */ + def isCompleted: Boolean = completed_? + /** + * Has the future completed? + */ + def completed_? : Boolean = synchronized(satisfied || aborted) + + @deprecated("Please use completed_? instead.", "3.1.0") + def complete_? : Boolean = completed_? /** * Abort the future. It can never be satified @@ -240,11 +263,6 @@ class LAFuture[T](val scheduler: LAScheduler = LAScheduler, context: Box[LAFutur } } } - - /** - * Has the future completed? - */ - def complete_? : Boolean = synchronized(satisfied || aborted) } /** @@ -333,32 +351,64 @@ object LAFuture { } } - /** - * Collect all the future values into the aggregate future - * The returned future will be satisfied when all the - * collected futures are satisfied + * Given handlers for a value's success and failure and a set of futures, runs + * the futures simultaneously and invokes either success or failure callbacks + * as each future completes. When all futures are complete, if the handlers + * have not either satisfied or failed the overall result, `onAllFuturesCompleted` + * is called to complete it. If it *still* isn't complete, the overall result + * is failed with an error. + * + * Note that the success and failure functions are guaranteed to be run in a + * thread-safe manner. Each is passed the value, the result future, the + * accumulating `ArrayBuffer`, and the index of the future that has been + * completed. For the failure handler, the value is the `Box` of the failure. */ - def collect[T](future: LAFuture[T]*): LAFuture[List[T]] = { - val result = new LAFuture[List[T]] - if (future.isEmpty) { - result.satisfy(Nil) + def collect[T, A]( + onFutureSucceeded: (T, LAFuture[A], ArrayBuffer[Box[T]], Int)=>Unit, + onFutureFailed: (Box[Nothing], LAFuture[A], ArrayBuffer[Box[T]], Int)=>Unit, + onAllFuturesCompleted: (LAFuture[A], ArrayBuffer[Box[T]])=>Unit, + futures: LAFuture[T]* + ): LAFuture[A] = { + val result = new LAFuture[A] + + if (futures.isEmpty) { + onAllFuturesCompleted(result, new ArrayBuffer[Box[T]](0)) } else { val sync = new Object - val len = future.length - val vals = new collection.mutable.ArrayBuffer[Box[T]](len) + val len = futures.length + val accumulator = new ArrayBuffer[Box[T]](len) // pad array so inserts at random places are possible - for (i <- 0 to len) { vals.insert(i, Empty) } + for (i <- 0 to len) { accumulator.insert(i, Empty) } var gotCnt = 0 - future.toList.zipWithIndex.foreach { - case (f, idx) => - f.foreach { - v => sync.synchronized { - vals.insert(idx, Full(v)) + futures.toList.zipWithIndex.foreach { + case (future, index) => + future.onSuccess { + value => sync.synchronized { gotCnt += 1 - if (gotCnt >= len) { - result.satisfy(vals.toList.flatten) + onFutureSucceeded(value, result, accumulator, index) + + if (gotCnt >= len && ! result.completed_?) { + onAllFuturesCompleted(result, accumulator) + + if (! result.completed_?) { + result.fail(Failure("collect invoker did not complete result")) + } + } + } + } + future.onFail { + failureBox => sync.synchronized { + gotCnt += 1 + onFutureFailed(failureBox, result, accumulator, index) + + if (gotCnt >= len && ! result.completed_?) { + onAllFuturesCompleted(result, accumulator) + + if (! result.completed_?) { + result.fail(Failure("collect invoker did not complete result")) + } } } } @@ -368,6 +418,23 @@ object LAFuture { result } + + /** + * Collect all the future values into the aggregate future + * The returned future will be satisfied when all the + * collected futures are satisfied + */ + def collect[T](future: LAFuture[T]*): LAFuture[List[T]] = { + collect[T, List[T]]( + onFutureSucceeded = { (value, result, values, index) => + values.insert(index, Full(value)) + }, + onFutureFailed = { (valueBox, result, values, index) => result.fail(valueBox) }, + onAllFuturesCompleted = { (result, values) => result.satisfy(values.toList.flatten) }, + future: _* + ) + } + /** * Collect all the future values into the aggregate future * The returned future will be satisfied when all the @@ -376,40 +443,21 @@ object LAFuture { * returned future with an Empty */ def collectAll[T](future: LAFuture[Box[T]]*): LAFuture[Box[List[T]]] = { - val result = new LAFuture[Box[List[T]]] - if (future.isEmpty) { - result.satisfy(Full(Nil)) - } else { - val sync = new Object - val len = future.length - val vals = new collection.mutable.ArrayBuffer[Box[T]](len) - // pad array so inserts at random places are possible - for (i <- 0 to len) { vals.insert(i, Empty) } - var gotCnt = 0 - - future.toList.zipWithIndex.foreach { - case (f, idx) => - f.foreach { - vb => sync.synchronized { - vb match { - case Full(v) => { - vals.insert(idx, Full(v)) - gotCnt += 1 - if (gotCnt >= len) { - result.satisfy(Full(vals.toList.flatten)) - } - } - - case eb: EmptyBox => { - result.satisfy(eb) - } - } - } - } - } - } - - result + collect[Box[T], Box[List[T]]]( + onFutureSucceeded = { (value, result, values, index) => + value match { + case Full(realValue) => + values.insert(index, Full(Full(realValue))) + case other: EmptyBox => + result.satisfy(other) + } + }, + onFutureFailed = { (valueBox, result, values, index) => result.fail(valueBox) }, + onAllFuturesCompleted = { (result: LAFuture[Box[List[T]]], values: ArrayBuffer[Box[Box[T]]]) => + result.satisfy(Full(values.toList.flatten.flatten)) + }, + future: _* + ) } private def inContext[T](f: () => T, context: Box[LAFuture.Context]): () => T = { diff --git a/core/actor/src/test/scala/net/liftweb/actor/LAFutureSpec.scala b/core/actor/src/test/scala/net/liftweb/actor/LAFutureSpec.scala index 2a6178c8e0..f2afc7b927 100644 --- a/core/actor/src/test/scala/net/liftweb/actor/LAFutureSpec.scala +++ b/core/actor/src/test/scala/net/liftweb/actor/LAFutureSpec.scala @@ -1,6 +1,6 @@ package net.liftweb.actor -import net.liftweb.common.{Failure, Box} +import net.liftweb.common.{Box, Failure, Full} import org.specs2.mutable.Specification import java.util.concurrent.atomic.AtomicBoolean @@ -61,30 +61,81 @@ class LAFutureSpec extends Specification { result shouldEqual givenFailure } - "collect one future result" in { - val givenOneResult = 123 - val one = LAFuture(() => givenOneResult) - LAFuture.collect(one).get(timeout) shouldEqual List(givenOneResult) - } + "when collecting results with LAFuture.collect" in { + "collect one future result" in { + val givenOneResult = 123 + val one = LAFuture(() => givenOneResult) + LAFuture.collect(one).get(timeout) shouldEqual List(givenOneResult) + } - "collect more future results in correct order" in { - val givenOneResult = 123 - val givenTwoResult = 234 - val one = LAFuture(() => givenOneResult) - val two = LAFuture(() => givenTwoResult) - LAFuture.collect(one, two).get(timeout) shouldEqual List(givenOneResult, givenTwoResult) - } + "collect more future results in correct order" in { + val givenOneResult = 123 + val givenTwoResult = 234 + val one = LAFuture(() => givenOneResult) + val two = LAFuture(() => givenTwoResult) + LAFuture.collect(one, two).get(timeout) shouldEqual List(givenOneResult, givenTwoResult) + } + + "collect empty list immediately" in { + val collectResult = LAFuture.collect(Nil: _*) + collectResult.isSatisfied shouldEqual true + collectResult.get(timeout) shouldEqual Nil + } - "collect empty list immediately" in { - val collectResult = LAFuture.collect(Nil: _*) - collectResult.isSatisfied shouldEqual true - collectResult.get(timeout) shouldEqual Nil + "report a failed LAFuture as a failure for the overall future" in { + val one: LAFuture[Int] = new LAFuture + val two: LAFuture[Int] = LAFuture(() => 5) + + one.fail(Failure("boom boom boom!")) + + val collectResult = LAFuture.collect(one, two) + collectResult.get(timeout) shouldEqual Failure("boom boom boom!") + } } - "collectAll empty list immediately" in { - val collectResult = LAFuture.collectAll(Nil : _*) - collectResult.isSatisfied shouldEqual true - collectResult.get(timeout) shouldEqual Nil + "when collecting Boxed results with collectAll" in { + "collectAll collects an EmptyBox immediately" in { + val one: LAFuture[Box[Int]] = LAFuture(() => { Failure("whoops"): Box[Int] }) + val two: LAFuture[Box[Int]] = LAFuture(() => { Thread.sleep(10000); Full(1) }) + + val collectResult = LAFuture.collectAll(one, two) + collectResult.get(5000) shouldEqual Failure("whoops") + } + + "collectAll collects a set of Fulls" in { + val one: LAFuture[Box[Int]] = LAFuture(() => Full(1): Box[Int]) + val two: LAFuture[Box[Int]] = LAFuture(() => Full(2): Box[Int]) + val three: LAFuture[Box[Int]] = LAFuture(() => Full(3): Box[Int]) + + val collectResult = LAFuture.collectAll(one, two, three) + collectResult.get(timeout) shouldEqual Full(List(1, 2, 3)) + } + + "collectAll reports a failed LAFuture as a failure for the overall future" in { + val one: LAFuture[Box[Int]] = new LAFuture + val two: LAFuture[Box[Int]] = LAFuture(() => Full(5): Box[Int]) + + one.fail(Failure("boom boom boom!")) + + val collectResult = LAFuture.collectAll(one, two) + collectResult.get(timeout) shouldEqual Failure("boom boom boom!") + } + + "collectAll empty list immediately" in { + val collectResult = LAFuture.collectAll(Nil : _*) + collectResult.isSatisfied shouldEqual true + collectResult.get(timeout) shouldEqual Nil + } + + "report a failed LAFuture as a failure for the overall future" in { + val one: LAFuture[Box[Int]] = new LAFuture + val two: LAFuture[Box[Int]] = LAFuture(() => Full(5): Box[Int]) + + one.fail(Failure("boom boom boom!")) + + val collectResult = LAFuture.collectAll(one, two) + collectResult.get(timeout) shouldEqual Failure("boom boom boom!") + } } }