summaryrefslogblamecommitdiff
path: root/src/actors/scala/actors/Future.scala
blob: ff938ab1aa03f36a42853e2bc4da721ba6759100 (plain) (tree)
1
2
3
4
5
6
7
8
9

                                                                          
                                                                          




                                                                          
       



                    
      



                                                                   




                                                          
  
                         
                  
   
                                                                                                     
                                         
                    


   
                                                                            
  
                 



                         
                                                             
 
                                          
                    





                                                

       
             


                                                    
                               



                          

                                                                

                                 






                                






                                                                           
     
                                                                       



                                                  
 



                                                                                                               
                                
 
                                                                    



                                                                
                                                                
                                   
                                                              




                                           
                                                                        
                                                                                 

                                               
                                                       
         
                                               

                                       
                                                         



                                                 
                                                        





                                                
                             





                                        
                             




                                                   
/*                     __                                               *\
**     ________ ___   / /  ___     Scala API                            **
**    / __/ __// _ | / /  / _ |    (c) 2005-2009, LAMP/EPFL             **
**  __\ \/ /__/ __ |/ /__/ __ |    http://scala-lang.org/               **
** /____/\___/_/ |_/____/_/ | |                                         **
**                          |/                                          **
\*                                                                      */

// $Id$

package scala.actors

/**
 * <p>
 *   A <code>Future[T]</code> is a function of arity 0 that
 *   returns a value of type <code>T</code>.
 *   Applying a future blocks the current actor (<code>self</code>)
 *   until the future's value is available.
 * </p>
 * <p>
 *   A future can be queried to find out whether its value
 *   is already available.
 * </p>
 *
 * @author Philipp Haller
 * @version 0.9.16
 */
abstract class Future[+T](val inputChannel: InputChannel[T]) extends Responder[T] with Function0[T] {
  protected var value: Option[Any] = None
  def isSet: Boolean
}

/**
 * The <code>Futures</code> object contains methods that operate on Futures.
 *
 * @version 0.9.8
 * @author Philipp Haller
 */
object Futures {

  private lazy val sched = new DefaultExecutorScheduler(true)

  def future[T](body: => T): Future[T] = {
    case object Eval
    val a = new Actor {
      override def scheduler: IScheduler = sched
      def act() {
        Actor.react {
          case Eval => Actor.reply(body)
        }
      }
    }
    a.start()
    a !! (Eval, { case any => any.asInstanceOf[T] })
  }

  def alarm(t: Long) = future {
    Actor.reactWithin(t) {
      case TIMEOUT => {}
    }
  }

  def awaitEither[a, b](ft1: Future[a], ft2: Future[b]): Any = {
    val FutCh1 = ft1.inputChannel
    val FutCh2 = ft2.inputChannel
    Actor.receive {
      case FutCh1 ! arg1 => arg1
      case FutCh2 ! arg2 => arg2
    }
  }

  /**
   * <p>
   *   Awaits all futures returning an option containing a list of replies,
   *   or timeouts returning <code>None</code>.
   * </p>
   * <p>
   *   Note that some of the futures might already have been awaited.
   * </p>
   */
  def awaitAll(timeout: Long, fts: Future[Any]*): List[Option[Any]] = {
    val thisActor = Actor.self
    Actor.timer.schedule(new java.util.TimerTask {
      def run() { thisActor ! TIMEOUT }
    }, timeout)

    var resultsMap: collection.mutable.Map[Int, Option[Any]] = new collection.mutable.HashMap[Int, Option[Any]]

    var cnt = 0
    val mappedFts = fts.map(ft =>
      Pair({cnt+=1; cnt-1}, ft))

    val unsetFts = mappedFts.filter((p: Pair[Int, Future[Any]]) => {
      if (p._2.isSet) { resultsMap(p._1) = Some(p._2()); false }
      else { resultsMap(p._1) = None; true }
    })

    val partFuns = unsetFts.map((p: Pair[Int, Future[Any]]) => {
      val FutCh = p._2.inputChannel
      val singleCase: PartialFunction[Any, Pair[Int, Any]] = {
        case FutCh ! any => Pair(p._1, any)
      }
      singleCase
    })

    def awaitWith(partFuns: Seq[PartialFunction[Any, Pair[Int, Any]]]) {
      val reaction: PartialFunction[Any, Unit] = new PartialFunction[Any, Unit] {
        def isDefinedAt(msg: Any) = msg match {
          case TIMEOUT => true
          case _ => partFuns exists (_ isDefinedAt msg)
        }
        def apply(msg: Any): Unit = msg match {
          case TIMEOUT => // do nothing
          case _ => {
            val pfOpt = partFuns find (_ isDefinedAt msg)
            val pf = pfOpt.get // succeeds always
            val Pair(idx, subres) = pf(msg)
            resultsMap(idx) = Some(subres)

            val partFunsRest = partFuns filter (_ != pf)
            // wait on rest of partial functions
            if (partFunsRest.length > 0)
              awaitWith(partFunsRest)
          }
        }
      }
      Actor.receive(reaction)
    }

    awaitWith(partFuns)

    var results: List[Option[Any]] = Nil
    val size = resultsMap.size
    for (i <- 0 until size) {
      results = resultsMap(size - i - 1) :: results
    }
    results
  }
}