summaryrefslogblamecommitdiff
path: root/src/library/scala/collection/parallel/package.scala
blob: 8f19d0ecdb06267bde0e361a1446f161376eb11e (plain) (tree)
1
2
3
4
5
6
7
8
9
10







                                                                          

                        

                                              
                                                 
                                              

                                             


                                            
                 
                        
                      
                          
                                                                            

                 








                                                                                   

                                                                             

                                                                                                 

                                                                                                 
                                                     

                                               
                                                                                                                     

                                    
 
                                  
 

                            
                                                                                                               

                                                                    
                                                                                       


                                                                                    
                                                                                                




                                                      
                                                                








                                                                                        
                                                                  



                                                                                                                             


                                                       

 
 



























                                                                                
 
               












                                                                           
                                                                                           

                                      
                      


                                                                                                        
 



                                                                
                                           
                                                                                                                                                 
                               
                            






                                 

                                                                      

                                     

                                                                         

                    










                                                       





                                                                              

                                                                               

                                                                             


                                                                                  













                                                                                                                                  
                                 
                              
                                                 
                                                                                                                         
                             
    
                 
    
                   
                                                             

            
    
                                                                           
    
                                                                          
    














                                                                                          
           






                                                
   
 
/*                     __                                               *\
**     ________ ___   / /  ___     Scala API                            **
**    / __/ __// _ | / /  / _ |    (c) 2003-2011, LAMP/EPFL             **
**  __\ \/ /__/ __ |/ /__/ __ |    http://scala-lang.org/               **
** /____/\___/_/ |_/____/_/ | |                                         **
**                          |/                                          **
\*                                                                      */

package scala.collection

import scala.collection.generic.CanBuildFrom
import scala.collection.generic.CanCombineFrom
import scala.collection.parallel.mutable.ParArray
import scala.collection.mutable.UnrolledBuffer
import annotation.unchecked.uncheckedVariance

/** Package object for parallel collections.
 */
package object parallel {
  /* constants */
  val MIN_FOR_COPY = 512
  val CHECK_RATE = 512
  val SQRT2 = math.sqrt(2)
  val availableProcessors = java.lang.Runtime.getRuntime.availableProcessors

  /* functions */

  /** Computes threshold from the size of the collection and the parallelism level.
   */
  def thresholdFromSize(sz: Int, parallelismLevel: Int) = {
    val p = parallelismLevel
    if (p > 1) 1 + sz / (8 * p)
    else sz
  }

  private[parallel] def unsupported = throw new UnsupportedOperationException

  private[parallel] def unsupportedop(msg: String) = throw new UnsupportedOperationException(msg)

  private[parallel] def outofbounds(idx: Int) = throw new IndexOutOfBoundsException(idx.toString)

  private[parallel] def getTaskSupport: TaskSupport =
    if (util.Properties.isJavaAtLeast("1.6")) {
      val vendor = util.Properties.javaVmVendor
      if ((vendor contains "Oracle") || (vendor contains "Sun") || (vendor contains "Apple")) new ForkJoinTaskSupport
      else new ThreadPoolTaskSupport
    } else new ThreadPoolTaskSupport

  val tasksupport = getTaskSupport

  /* implicit conversions */

  implicit def factory2ops[From, Elem, To](bf: CanBuildFrom[From, Elem, To]) = new FactoryOps[From, Elem, To] {
    def isParallel = bf.isInstanceOf[Parallel]
    def asParallel = bf.asInstanceOf[CanCombineFrom[From, Elem, To]]
    def ifParallel[R](isbody: CanCombineFrom[From, Elem, To] => R) = new Otherwise[R] {
      def otherwise(notbody: => R) = if (isParallel) isbody(asParallel) else notbody
    }
  }
  implicit def traversable2ops[T](t: collection.GenTraversableOnce[T]) = new TraversableOps[T] {
    def isParallel = t.isInstanceOf[Parallel]
    def isParIterable = t.isInstanceOf[ParIterable[_]]
    def asParIterable = t.asInstanceOf[ParIterable[T]]
    def isParSeq = t.isInstanceOf[ParSeq[_]]
    def asParSeq = t.asInstanceOf[ParSeq[T]]
    def ifParSeq[R](isbody: ParSeq[T] => R) = new Otherwise[R] {
      def otherwise(notbody: => R) = if (isParallel) isbody(asParSeq) else notbody
    }
    def toParArray = if (t.isInstanceOf[ParArray[_]]) t.asInstanceOf[ParArray[T]] else {
      val it = t.toIterator
      val cb = mutable.ParArrayCombiner[T]()
      while (it.hasNext) cb += it.next
      cb.result
    }
  }
  implicit def throwable2ops(self: Throwable) = new ThrowableOps {
    def alongWith(that: Throwable) = (self, that) match {
      case (self: CompositeThrowable, that: CompositeThrowable) => new CompositeThrowable(self.throwables ++ that.throwables)
      case (self: CompositeThrowable, _) => new CompositeThrowable(self.throwables + that)
      case (_, that: CompositeThrowable) => new CompositeThrowable(that.throwables + self)
      case _ => new CompositeThrowable(Set(self, that))
    }
  }
}


package parallel {
  trait FactoryOps[From, Elem, To] {
    trait Otherwise[R] {
      def otherwise(notbody: => R): R
    }

    def isParallel: Boolean
    def asParallel: CanCombineFrom[From, Elem, To]
    def ifParallel[R](isbody: CanCombineFrom[From, Elem, To] => R): Otherwise[R]
  }

  trait TraversableOps[T] {
    trait Otherwise[R] {
      def otherwise(notbody: => R): R
    }

    def isParallel: Boolean
    def isParIterable: Boolean
    def asParIterable: ParIterable[T]
    def isParSeq: Boolean
    def asParSeq: ParSeq[T]
    def ifParSeq[R](isbody: ParSeq[T] => R): Otherwise[R]
    def toParArray: ParArray[T]
  }

  trait ThrowableOps {
    def alongWith(that: Throwable): Throwable
  }

  /* classes */
  
  trait CombinerFactory[U, Repr] {
    /** Provides a combiner used to construct a collection. */
    def apply(): Combiner[U, Repr]
    /** The call to the `apply` method can create a new combiner each time.
     *  If it does, this method returns `false`.
     *  The same combiner factory may be used each time (typically, this is
     *  the case for concurrent collections, which are thread safe).
     *  If so, the method returns `true`.
     */
    def doesShareCombiners: Boolean
  }
  
  /** Composite throwable - thrown when multiple exceptions are thrown at the same time. */
  final case class CompositeThrowable(
    val throwables: Set[Throwable]
  ) extends Exception(
    "Multiple exceptions thrown during a parallel computation: " +
      throwables.map(t => t + "\n" + t.getStackTrace.take(10).++("...").mkString("\n")).mkString("\n\n")
  )


  /** A helper iterator for iterating very small array buffers.
   *  Automatically forwards the signal delegate when splitting.
   */
  private[parallel] class BufferSplitter[T]
  (private val buffer: collection.mutable.ArrayBuffer[T], private var index: Int, private val until: Int, _sigdel: collection.generic.Signalling)
  extends IterableSplitter[T] {
    signalDelegate = _sigdel
    def hasNext = index < until
    def next = {
      val r = buffer(index)
      index += 1
      r
    }
    def remaining = until - index
    def dup = new BufferSplitter(buffer, index, until, signalDelegate)
    def split: Seq[IterableSplitter[T]] = if (remaining > 1) {
      val divsz = (until - index) / 2
      Seq(
        new BufferSplitter(buffer, index, index + divsz, signalDelegate),
        new BufferSplitter(buffer, index + divsz, until, signalDelegate)
      )
    } else Seq(this)
    private[parallel] override def debugInformation = {
      buildString {
        append =>
        append("---------------")
        append("Buffer iterator")
        append("buffer: " + buffer)
        append("index: " + index)
        append("until: " + until)
        append("---------------")
      }
    }
  }

  /** A helper combiner which contains an array of buckets. Buckets themselves
   *  are unrolled linked lists. Some parallel collections are constructed by
   *  sorting their result set according to some criteria.
   *
   *  A reference `buckets` to buckets is maintained. Total size of all buckets
   *  is kept in `sz` and maintained whenever 2 bucket combiners are combined.
   *
   *  Clients decide how to maintain these by implementing `+=` and `result`.
   *  Populating and using the buckets is up to the client. While populating them,
   *  the client should update `sz` accordingly. Note that a bucket is by default
   *  set to `null` to save space - the client should initialize it.
   *  Note that in general the type of the elements contained in the buckets `Buck`
   *  doesn't have to correspond to combiner element type `Elem`.
   *
   *  This class simply gives an efficient `combine` for free - it chains
   *  the buckets together. Since the `combine` contract states that the receiver (`this`)
   *  becomes invalidated, `combine` reuses the receiver and returns it.
   *
   *  Methods `beforeCombine` and `afterCombine` are called before and after
   *  combining the buckets, respectively, given that the argument to `combine`
   *  is not `this` (as required by the `combine` contract).
   *  They can be overriden in subclasses to provide custom behaviour by modifying
   *  the receiver (which will be the return value).
   */
  private[parallel] abstract class BucketCombiner[-Elem, +To, Buck, +CombinerType <: BucketCombiner[Elem, To, Buck, CombinerType]]
  (private val bucketnumber: Int)
  extends Combiner[Elem, To] {
  //self: EnvironmentPassingCombiner[Elem, To] =>
    protected var buckets: Array[UnrolledBuffer[Buck]] @uncheckedVariance = new Array[UnrolledBuffer[Buck]](bucketnumber)
    protected var sz: Int = 0
    
    def size = sz
    
    def clear() = {
      buckets = new Array[UnrolledBuffer[Buck]](bucketnumber)
      sz = 0
    }
    
    def beforeCombine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]) {}
    
    def afterCombine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]) {}
    
    def combine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]): Combiner[N, NewTo] = {
      if (this eq other) this
      else other match {
        case _: BucketCombiner[_, _, _, _] =>
          beforeCombine(other)
          val that = other.asInstanceOf[BucketCombiner[Elem, To, Buck, CombinerType]]

          var i = 0
          while (i < bucketnumber) {
            if (buckets(i) eq null)
              buckets(i) = that.buckets(i)
            else if (that.buckets(i) ne null)
              buckets(i) concat that.buckets(i)

            i += 1
          }
          sz = sz + that.size
          afterCombine(other)
          this
        case _ =>
          sys.error("Unexpected combiner type.")
      }
    }
  }
}