Skip to content

Commit

Permalink
Merge pull request #1859 from lift/failure-collection
Browse files Browse the repository at this point in the history
Fix LAFuture.collect/collectAll when sub-futures fail.
  • Loading branch information
farmdawgnation authored Jun 15, 2017
2 parents e9c3c80 + 7e4d8dc commit 907807d
Show file tree
Hide file tree
Showing 2 changed files with 179 additions and 80 deletions.
166 changes: 107 additions & 59 deletions core/actor/src/main/scala/net/liftweb/actor/LAFuture.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package net.liftweb
package actor

import scala.collection.mutable.ArrayBuffer

import common._


Expand Down Expand Up @@ -152,15 +154,36 @@ class LAFuture[T](val scheduler: LAScheduler = LAScheduler, context: Box[LAFutur
}
}

/**
* Java-friendly alias for satisfied_?.
*/
def isSatisfied: Boolean = satisfied_?

/**
* Has the future been satisfied
*/
def isSatisfied: Boolean = synchronized {satisfied}
def satisfied_? = synchronized {satisfied}
/**
* Java-friendly alias for aborted_?.
*/
def isAborted: Boolean = aborted_?

/**
* Has the future been aborted
*/
def isAborted: Boolean = synchronized {aborted}
def aborted_? = synchronized {satisfied}

/**
* Java-friendly alias for completed_?.
*/
def isCompleted: Boolean = completed_?
/**
* Has the future completed?
*/
def completed_? : Boolean = synchronized(satisfied || aborted)

@deprecated("Please use completed_? instead.", "3.1.0")
def complete_? : Boolean = completed_?

/**
* Abort the future. It can never be satified
Expand Down Expand Up @@ -240,11 +263,6 @@ class LAFuture[T](val scheduler: LAScheduler = LAScheduler, context: Box[LAFutur
}
}
}

/**
* Has the future completed?
*/
def complete_? : Boolean = synchronized(satisfied || aborted)
}

/**
Expand Down Expand Up @@ -333,32 +351,64 @@ object LAFuture {
}
}


/**
* Collect all the future values into the aggregate future
* The returned future will be satisfied when all the
* collected futures are satisfied
* Given handlers for a value's success and failure and a set of futures, runs
* the futures simultaneously and invokes either success or failure callbacks
* as each future completes. When all futures are complete, if the handlers
* have not either satisfied or failed the overall result, `onAllFuturesCompleted`
* is called to complete it. If it *still* isn't complete, the overall result
* is failed with an error.
*
* Note that the success and failure functions are guaranteed to be run in a
* thread-safe manner. Each is passed the value, the result future, the
* accumulating `ArrayBuffer`, and the index of the future that has been
* completed. For the failure handler, the value is the `Box` of the failure.
*/
def collect[T](future: LAFuture[T]*): LAFuture[List[T]] = {
val result = new LAFuture[List[T]]
if (future.isEmpty) {
result.satisfy(Nil)
def collect[T, A](
onFutureSucceeded: (T, LAFuture[A], ArrayBuffer[Box[T]], Int)=>Unit,
onFutureFailed: (Box[Nothing], LAFuture[A], ArrayBuffer[Box[T]], Int)=>Unit,
onAllFuturesCompleted: (LAFuture[A], ArrayBuffer[Box[T]])=>Unit,
futures: LAFuture[T]*
): LAFuture[A] = {
val result = new LAFuture[A]

if (futures.isEmpty) {
onAllFuturesCompleted(result, new ArrayBuffer[Box[T]](0))
} else {
val sync = new Object
val len = future.length
val vals = new collection.mutable.ArrayBuffer[Box[T]](len)
val len = futures.length
val accumulator = new ArrayBuffer[Box[T]](len)
// pad array so inserts at random places are possible
for (i <- 0 to len) { vals.insert(i, Empty) }
for (i <- 0 to len) { accumulator.insert(i, Empty) }
var gotCnt = 0

future.toList.zipWithIndex.foreach {
case (f, idx) =>
f.foreach {
v => sync.synchronized {
vals.insert(idx, Full(v))
futures.toList.zipWithIndex.foreach {
case (future, index) =>
future.onSuccess {
value => sync.synchronized {
gotCnt += 1
if (gotCnt >= len) {
result.satisfy(vals.toList.flatten)
onFutureSucceeded(value, result, accumulator, index)

if (gotCnt >= len && ! result.completed_?) {
onAllFuturesCompleted(result, accumulator)

if (! result.completed_?) {
result.fail(Failure("collect invoker did not complete result"))
}
}
}
}
future.onFail {
failureBox => sync.synchronized {
gotCnt += 1
onFutureFailed(failureBox, result, accumulator, index)

if (gotCnt >= len && ! result.completed_?) {
onAllFuturesCompleted(result, accumulator)

if (! result.completed_?) {
result.fail(Failure("collect invoker did not complete result"))
}
}
}
}
Expand All @@ -368,6 +418,23 @@ object LAFuture {
result
}


/**
* Collect all the future values into the aggregate future
* The returned future will be satisfied when all the
* collected futures are satisfied
*/
def collect[T](future: LAFuture[T]*): LAFuture[List[T]] = {
collect[T, List[T]](
onFutureSucceeded = { (value, result, values, index) =>
values.insert(index, Full(value))
},
onFutureFailed = { (valueBox, result, values, index) => result.fail(valueBox) },
onAllFuturesCompleted = { (result, values) => result.satisfy(values.toList.flatten) },
future: _*
)
}

/**
* Collect all the future values into the aggregate future
* The returned future will be satisfied when all the
Expand All @@ -376,40 +443,21 @@ object LAFuture {
* returned future with an Empty
*/
def collectAll[T](future: LAFuture[Box[T]]*): LAFuture[Box[List[T]]] = {
val result = new LAFuture[Box[List[T]]]
if (future.isEmpty) {
result.satisfy(Full(Nil))
} else {
val sync = new Object
val len = future.length
val vals = new collection.mutable.ArrayBuffer[Box[T]](len)
// pad array so inserts at random places are possible
for (i <- 0 to len) { vals.insert(i, Empty) }
var gotCnt = 0

future.toList.zipWithIndex.foreach {
case (f, idx) =>
f.foreach {
vb => sync.synchronized {
vb match {
case Full(v) => {
vals.insert(idx, Full(v))
gotCnt += 1
if (gotCnt >= len) {
result.satisfy(Full(vals.toList.flatten))
}
}

case eb: EmptyBox => {
result.satisfy(eb)
}
}
}
}
}
}

result
collect[Box[T], Box[List[T]]](
onFutureSucceeded = { (value, result, values, index) =>
value match {
case Full(realValue) =>
values.insert(index, Full(Full(realValue)))
case other: EmptyBox =>
result.satisfy(other)
}
},
onFutureFailed = { (valueBox, result, values, index) => result.fail(valueBox) },
onAllFuturesCompleted = { (result: LAFuture[Box[List[T]]], values: ArrayBuffer[Box[Box[T]]]) =>
result.satisfy(Full(values.toList.flatten.flatten))
},
future: _*
)
}

private def inContext[T](f: () => T, context: Box[LAFuture.Context]): () => T = {
Expand Down
93 changes: 72 additions & 21 deletions core/actor/src/test/scala/net/liftweb/actor/LAFutureSpec.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package net.liftweb.actor

import net.liftweb.common.{Failure, Box}
import net.liftweb.common.{Box, Failure, Full}
import org.specs2.mutable.Specification
import java.util.concurrent.atomic.AtomicBoolean

Expand Down Expand Up @@ -61,30 +61,81 @@ class LAFutureSpec extends Specification {
result shouldEqual givenFailure
}

"collect one future result" in {
val givenOneResult = 123
val one = LAFuture(() => givenOneResult)
LAFuture.collect(one).get(timeout) shouldEqual List(givenOneResult)
}
"when collecting results with LAFuture.collect" in {
"collect one future result" in {
val givenOneResult = 123
val one = LAFuture(() => givenOneResult)
LAFuture.collect(one).get(timeout) shouldEqual List(givenOneResult)
}

"collect more future results in correct order" in {
val givenOneResult = 123
val givenTwoResult = 234
val one = LAFuture(() => givenOneResult)
val two = LAFuture(() => givenTwoResult)
LAFuture.collect(one, two).get(timeout) shouldEqual List(givenOneResult, givenTwoResult)
}
"collect more future results in correct order" in {
val givenOneResult = 123
val givenTwoResult = 234
val one = LAFuture(() => givenOneResult)
val two = LAFuture(() => givenTwoResult)
LAFuture.collect(one, two).get(timeout) shouldEqual List(givenOneResult, givenTwoResult)
}

"collect empty list immediately" in {
val collectResult = LAFuture.collect(Nil: _*)
collectResult.isSatisfied shouldEqual true
collectResult.get(timeout) shouldEqual Nil
}

"collect empty list immediately" in {
val collectResult = LAFuture.collect(Nil: _*)
collectResult.isSatisfied shouldEqual true
collectResult.get(timeout) shouldEqual Nil
"report a failed LAFuture as a failure for the overall future" in {
val one: LAFuture[Int] = new LAFuture
val two: LAFuture[Int] = LAFuture(() => 5)

one.fail(Failure("boom boom boom!"))

val collectResult = LAFuture.collect(one, two)
collectResult.get(timeout) shouldEqual Failure("boom boom boom!")
}
}

"collectAll empty list immediately" in {
val collectResult = LAFuture.collectAll(Nil : _*)
collectResult.isSatisfied shouldEqual true
collectResult.get(timeout) shouldEqual Nil
"when collecting Boxed results with collectAll" in {
"collectAll collects an EmptyBox immediately" in {
val one: LAFuture[Box[Int]] = LAFuture(() => { Failure("whoops"): Box[Int] })
val two: LAFuture[Box[Int]] = LAFuture(() => { Thread.sleep(10000); Full(1) })

val collectResult = LAFuture.collectAll(one, two)
collectResult.get(5000) shouldEqual Failure("whoops")
}

"collectAll collects a set of Fulls" in {
val one: LAFuture[Box[Int]] = LAFuture(() => Full(1): Box[Int])
val two: LAFuture[Box[Int]] = LAFuture(() => Full(2): Box[Int])
val three: LAFuture[Box[Int]] = LAFuture(() => Full(3): Box[Int])

val collectResult = LAFuture.collectAll(one, two, three)
collectResult.get(timeout) shouldEqual Full(List(1, 2, 3))
}

"collectAll reports a failed LAFuture as a failure for the overall future" in {
val one: LAFuture[Box[Int]] = new LAFuture
val two: LAFuture[Box[Int]] = LAFuture(() => Full(5): Box[Int])

one.fail(Failure("boom boom boom!"))

val collectResult = LAFuture.collectAll(one, two)
collectResult.get(timeout) shouldEqual Failure("boom boom boom!")
}

"collectAll empty list immediately" in {
val collectResult = LAFuture.collectAll(Nil : _*)
collectResult.isSatisfied shouldEqual true
collectResult.get(timeout) shouldEqual Nil
}

"report a failed LAFuture as a failure for the overall future" in {
val one: LAFuture[Box[Int]] = new LAFuture
val two: LAFuture[Box[Int]] = LAFuture(() => Full(5): Box[Int])

one.fail(Failure("boom boom boom!"))

val collectResult = LAFuture.collectAll(one, two)
collectResult.get(timeout) shouldEqual Failure("boom boom boom!")
}
}
}

Expand Down

0 comments on commit 907807d

Please sign in to comment.