summaryrefslogblamecommitdiff
path: root/src/library/scala/collection/parallel/package.scala
blob: 6efff70606b2b631d18a0d1beedd4d8ff480bd1b (plain) (tree)
1
2
3
4
5
6
7
8
9








                                                                          






                                              
                                                 
                                              

                                             



                                            

                 
                        
                      
                          
                                                                            

                 








                                                                                   

                                                                             

                                                                                                 

                                                                                                 
                                                     




                                                                                       
 
                                  
 

                            










                                                                                                               

                                                                    
                                                                                       



                                                                                    














                                                                                  




                                                      
                                                                









                                                                                        




                                                                  



                                                                                                                             



                                                       
               
 

                                                                                           
                                                                                                                                                   
 













                                                                                                                                                              
                                                                      






                                                                         










                                                       





                                                                              

                                                                               

                                                                             


                                                                                  
















                                                                                                                                  
                                                                                                                         




                             
                                                             












                                                                                                             

                                        
                  
                                                                          







                           
                                                   


               

 
















 
/*                     __                                               *\
**     ________ ___   / /  ___     Scala API                            **
**    / __/ __// _ | / /  / _ |    (c) 2003-2011, LAMP/EPFL             **
**  __\ \/ /__/ __ |/ /__/ __ |    http://scala-lang.org/               **
** /____/\___/_/ |_/____/_/ | |                                         **
**                          |/                                          **
\*                                                                      */


package scala.collection


import java.lang.Thread._

import scala.collection.generic.CanBuildFrom
import scala.collection.generic.CanCombineFrom
import scala.collection.parallel.mutable.ParArray
import scala.collection.mutable.UnrolledBuffer
import annotation.unchecked.uncheckedVariance


/** Package object for parallel collections.
 */
package object parallel {

  /* constants */
  val MIN_FOR_COPY = 512
  val CHECK_RATE = 512
  val SQRT2 = math.sqrt(2)
  val availableProcessors = java.lang.Runtime.getRuntime.availableProcessors

  /* functions */

  /** Computes threshold from the size of the collection and the parallelism level.
   */
  def thresholdFromSize(sz: Int, parallelismLevel: Int) = {
    val p = parallelismLevel
    if (p > 1) 1 + sz / (8 * p)
    else sz
  }

  private[parallel] def unsupported = throw new UnsupportedOperationException

  private[parallel] def unsupportedop(msg: String) = throw new UnsupportedOperationException(msg)

  private[parallel] def outofbounds(idx: Int) = throw new IndexOutOfBoundsException(idx.toString)

  private[parallel] def getTaskSupport: TaskSupport =
    if (util.Properties.isJavaAtLeast("1.6")) {
      val vendor = util.Properties.javaVmVendor
      if ((vendor contains "Sun") || (vendor contains "Apple")) new ForkJoinTaskSupport
      else new ThreadPoolTaskSupport
    } else new ThreadPoolTaskSupport

  val tasksupport = getTaskSupport

  /* implicit conversions */

  trait FactoryOps[From, Elem, To] {
    trait Otherwise[R] {
      def otherwise(notbody: => R): R
    }

    def isParallel: Boolean
    def asParallel: CanCombineFrom[From, Elem, To]
    def ifParallel[R](isbody: CanCombineFrom[From, Elem, To] => R): Otherwise[R]
  }

  implicit def factory2ops[From, Elem, To](bf: CanBuildFrom[From, Elem, To]) = new FactoryOps[From, Elem, To] {
    def isParallel = bf.isInstanceOf[Parallel]
    def asParallel = bf.asInstanceOf[CanCombineFrom[From, Elem, To]]
    def ifParallel[R](isbody: CanCombineFrom[From, Elem, To] => R) = new Otherwise[R] {
      def otherwise(notbody: => R) = if (isParallel) isbody(asParallel) else notbody
    }
  }

  trait TraversableOps[T] {
    trait Otherwise[R] {
      def otherwise(notbody: => R): R
    }

    def isParallel: Boolean
    def isParIterable: Boolean
    def asParIterable: ParIterable[T]
    def isParSeq: Boolean
    def asParSeq: ParSeq[T]
    def ifParSeq[R](isbody: ParSeq[T] => R): Otherwise[R]
    def toParArray: ParArray[T]
  }

  implicit def traversable2ops[T](t: TraversableOnce[T]) = new TraversableOps[T] {
    def isParallel = t.isInstanceOf[Parallel]
    def isParIterable = t.isInstanceOf[ParIterable[_]]
    def asParIterable = t.asInstanceOf[ParIterable[T]]
    def isParSeq = t.isInstanceOf[ParSeq[_]]
    def asParSeq = t.asInstanceOf[ParSeq[T]]
    def ifParSeq[R](isbody: ParSeq[T] => R) = new Otherwise[R] {
      def otherwise(notbody: => R) = if (isParallel) isbody(asParSeq) else notbody
    }
    def toParArray = if (t.isInstanceOf[ParArray[_]]) t.asInstanceOf[ParArray[T]] else {
      val it = t.toIterator
      val cb = mutable.ParArrayCombiner[T]()
      while (it.hasNext) cb += it.next
      cb.result
    }
  }

  trait ThrowableOps {
    def alongWith(that: Throwable): Throwable
  }

  implicit def throwable2ops(self: Throwable) = new ThrowableOps {
    def alongWith(that: Throwable) = (self, that) match {
      case (self: CompositeThrowable, that: CompositeThrowable) => new CompositeThrowable(self.throwables ++ that.throwables)
      case (self: CompositeThrowable, _) => new CompositeThrowable(self.throwables + that)
      case (_, that: CompositeThrowable) => new CompositeThrowable(that.throwables + self)
      case _ => new CompositeThrowable(Set(self, that))
    }
  }

  /* classes */

  /** Composite throwable - thrown when multiple exceptions are thrown at the same time. */
  final class CompositeThrowable(val throwables: Set[Throwable])
  extends Throwable("Multiple exceptions thrown during a parallel computation: " + throwables.map(t => (t, t.getStackTrace.toList)).mkString(", "))


  /** A helper iterator for iterating very small array buffers.
   *  Automatically forwards the signal delegate when splitting.
   */
  private[parallel] class BufferIterator[T]
    (private val buffer: collection.mutable.ArrayBuffer[T], private var index: Int, private val until: Int, var signalDelegate: collection.generic.Signalling)
  extends ParIterableIterator[T] {
    def hasNext = index < until
    def next = {
      val r = buffer(index)
      index += 1
      r
    }
    def remaining = until - index
    def dup = new BufferIterator(buffer, index, until, signalDelegate)
    def split: Seq[ParIterableIterator[T]] = if (remaining > 1) {
      val divsz = (until - index) / 2
      Seq(
        new BufferIterator(buffer, index, index + divsz, signalDelegate),
        new BufferIterator(buffer, index + divsz, until, signalDelegate)
      )
    } else Seq(this)
    private[parallel] override def debugInformation = {
      buildString {
        append =>
        append("---------------")
        append("Buffer iterator")
        append("buffer: " + buffer)
        append("index: " + index)
        append("until: " + until)
        append("---------------")
      }
    }
  }

  /** A helper combiner which contains an array of buckets. Buckets themselves
   *  are unrolled linked lists. Some parallel collections are constructed by
   *  sorting their result set according to some criteria.
   *
   *  A reference `buckets` to buckets is maintained. Total size of all buckets
   *  is kept in `sz` and maintained whenever 2 bucket combiners are combined.
   *
   *  Clients decide how to maintain these by implementing `+=` and `result`.
   *  Populating and using the buckets is up to the client. While populating them,
   *  the client should update `sz` accordingly. Note that a bucket is by default
   *  set to `null` to save space - the client should initialize it.
   *  Note that in general the type of the elements contained in the buckets `Buck`
   *  doesn't have to correspond to combiner element type `Elem`.
   *
   *  This class simply gives an efficient `combine` for free - it chains
   *  the buckets together. Since the `combine` contract states that the receiver (`this`)
   *  becomes invalidated, `combine` reuses the receiver and returns it.
   *
   *  Methods `beforeCombine` and `afterCombine` are called before and after
   *  combining the buckets, respectively, given that the argument to `combine`
   *  is not `this` (as required by the `combine` contract).
   *  They can be overriden in subclasses to provide custom behaviour by modifying
   *  the receiver (which will be the return value).
   */
  private[parallel] abstract class BucketCombiner[-Elem, +To, Buck, +CombinerType <: BucketCombiner[Elem, To, Buck, CombinerType]]
    (private val bucketnumber: Int)
  extends Combiner[Elem, To] {
  self: EnvironmentPassingCombiner[Elem, To] =>
    protected var buckets: Array[UnrolledBuffer[Buck]] @uncheckedVariance = new Array[UnrolledBuffer[Buck]](bucketnumber)
    protected var sz: Int = 0

    def size = sz

    def clear = {
      buckets = new Array[UnrolledBuffer[Buck]](bucketnumber)
      sz = 0
    }

    def beforeCombine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]) {}
    def afterCombine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]) {}

    def combine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) {
      if (other.isInstanceOf[BucketCombiner[_, _, _, _]]) {
        beforeCombine(other)

        val that = other.asInstanceOf[BucketCombiner[Elem, To, Buck, CombinerType]]
        var i = 0
        while (i < bucketnumber) {
          if (buckets(i) eq null) {
            buckets(i) = that.buckets(i)
          } else {
            if (that.buckets(i) ne null) buckets(i) concat that.buckets(i)
          }
          i += 1
        }
        sz = sz + that.size

        afterCombine(other)

        this
      } else sys.error("Unexpected combiner type.")
    } else this

  }


}