package scala.collection
import java.lang.Thread._
import scala.collection.generic.CanBuildFrom
import scala.collection.generic.CanCombineFrom
import scala.collection.parallel.mutable.ParArray
import annotation.unchecked.uncheckedVariance
/** Package object for parallel collections.
*/
package object parallel {
/* constants */
val MIN_FOR_COPY = -1
val CHECK_RATE = 512
val SQRT2 = math.sqrt(2)
val availableProcessors = java.lang.Runtime.getRuntime.availableProcessors
/* functions */
/** Computes threshold from the size of the collection and the parallelism level.
*/
def thresholdFromSize(sz: Int, parallelismLevel: Int) = {
val p = parallelismLevel
if (p > 1) 1 + sz / (8 * p)
else sz
}
private[parallel] def unsupported = throw new UnsupportedOperationException
private[parallel] def unsupportedop(msg: String) = throw new UnsupportedOperationException(msg)
private[parallel] def outofbounds(idx: Int) = throw new IndexOutOfBoundsException(idx.toString)
private[parallel] def getTaskSupport: TaskSupport =
if (util.Properties.isJavaAtLeast("1.6")) {
val vendor = util.Properties.javaVmVendor
if ((vendor contains "Sun") || (vendor contains "Apple")) new ForkJoinTaskSupport
else new ThreadPoolTaskSupport
} else new ThreadPoolTaskSupport
val tasksupport = getTaskSupport
/* implicit conversions */
/** An implicit conversion providing arrays with a `par` method, which
* returns a parallel array.
*
* @tparam T type of the elements in the array, which is a subtype of AnyRef
* @param array the array to be parallelized
* @return a `Parallelizable` object with a `par` method=
*/
implicit def array2ParArray[T <: AnyRef](array: Array[T]) = new Parallelizable[mutable.ParArray[T]] {
def par = mutable.ParArray.handoff[T](array)
}
trait FactoryOps[From, Elem, To] {
trait Otherwise[R] {
def otherwise(notbody: => R): R
}
def isParallel: Boolean
def asParallel: CanCombineFrom[From, Elem, To]
def ifParallel[R](isbody: CanCombineFrom[From, Elem, To] => R): Otherwise[R]
}
implicit def factory2ops[From, Elem, To](bf: CanBuildFrom[From, Elem, To]) = new FactoryOps[From, Elem, To] {
def isParallel = bf.isInstanceOf[Parallel]
def asParallel = bf.asInstanceOf[CanCombineFrom[From, Elem, To]]
def ifParallel[R](isbody: CanCombineFrom[From, Elem, To] => R) = new Otherwise[R] {
def otherwise(notbody: => R) = if (isParallel) isbody(asParallel) else notbody
}
}
trait TraversableOps[T] {
trait Otherwise[R] {
def otherwise(notbody: => R): R
}
def isParallel: Boolean
def isParIterable: Boolean
def asParIterable: ParIterable[T]
def isParSeq: Boolean
def asParSeq: ParSeq[T]
def ifParSeq[R](isbody: ParSeq[T] => R): Otherwise[R]
def toParArray: ParArray[T]
}
implicit def traversable2ops[T](t: TraversableOnce[T]) = new TraversableOps[T] {
def isParallel = t.isInstanceOf[Parallel]
def isParIterable = t.isInstanceOf[ParIterable[_]]
def asParIterable = t.asInstanceOf[ParIterable[T]]
def isParSeq = t.isInstanceOf[ParSeq[_]]
def asParSeq = t.asInstanceOf[ParSeq[T]]
def ifParSeq[R](isbody: ParSeq[T] => R) = new Otherwise[R] {
def otherwise(notbody: => R) = if (isParallel) isbody(asParSeq) else notbody
}
def toParArray = if (t.isInstanceOf[ParArray[_]]) t.asInstanceOf[ParArray[T]] else {
val it = t.toIterator
val cb = mutable.ParArrayCombiner[T]()
while (it.hasNext) cb += it.next
cb.result
}
}
trait ThrowableOps {
def alongWith(that: Throwable): Throwable
}
implicit def throwable2ops(self: Throwable) = new ThrowableOps {
def alongWith(that: Throwable) = (self, that) match {
case (self: CompositeThrowable, that: CompositeThrowable) => new CompositeThrowable(self.throwables ++ that.throwables)
case (self: CompositeThrowable, _) => new CompositeThrowable(self.throwables + that)
case (_, that: CompositeThrowable) => new CompositeThrowable(that.throwables + self)
case _ => new CompositeThrowable(Set(self, that))
}
}
/* classes */
/** Composite throwable - thrown when multiple exceptions are thrown at the same time. */
final class CompositeThrowable(val throwables: Set[Throwable])
extends Throwable("Multiple exceptions thrown during a parallel computation: " + throwables.map(t => (t, t.getStackTrace.toList)).mkString(", "))
/** A helper iterator for iterating very small array buffers.
* Automatically forwards the signal delegate when splitting.
*/
private[parallel] class BufferIterator[T]
(private val buffer: collection.mutable.ArrayBuffer[T], private var index: Int, private val until: Int, var signalDelegate: collection.generic.Signalling)
extends ParIterableIterator[T] {
def hasNext = index < until
def next = {
val r = buffer(index)
index += 1
r
}
def remaining = until - index
def dup = new BufferIterator(buffer, index, until, signalDelegate)
def split: Seq[ParIterableIterator[T]] = if (remaining > 1) {
val divsz = (until - index) / 2
Seq(
new BufferIterator(buffer, index, index + divsz, signalDelegate),
new BufferIterator(buffer, index + divsz, until, signalDelegate)
)
} else Seq(this)
private[parallel] override def debugInformation = {
buildString {
append =>
append("---------------")
append("Buffer iterator")
append("buffer: " + buffer)
append("index: " + index)
append("until: " + until)
append("---------------")
}
}
}
/** A helper combiner which contains an array of buckets. Buckets themselves
* are unrolled linked lists. Some parallel collections are constructed by
* sorting their result set according to some criteria.
*
* A reference `buckets` to buckets is maintained. Total size of all buckets
* is kept in `sz` and maintained whenever 2 bucket combiners are combined.
*
* Clients decide how to maintain these by implementing `+=` and `result`.
* Populating and using the buckets is up to the client. While populating them,
* the client should update `sz` accordingly. Note that a bucket is by default
* set to `null` to save space - the client should initialize it.
* Note that in general the type of the elements contained in the buckets `Buck`
* doesn't have to correspond to combiner element type `Elem`.
*
* This class simply gives an efficient `combine` for free - it chains
* the buckets together. Since the `combine` contract states that the receiver (`this`)
* becomes invalidated, `combine` reuses the receiver and returns it.
*
* Methods `beforeCombine` and `afterCombine` are called before and after
* combining the buckets, respectively, given that the argument to `combine`
* is not `this` (as required by the `combine` contract).
* They can be overriden in subclasses to provide custom behaviour by modifying
* the receiver (which will be the return value).
*/
private[parallel] abstract class BucketCombiner[-Elem, +To, Buck, +CombinerType <: BucketCombiner[Elem, To, Buck, CombinerType]]
(private val bucketnumber: Int)
extends Combiner[Elem, To] {
self: EnvironmentPassingCombiner[Elem, To] =>
protected var buckets: Array[UnrolledBuffer[Buck]] @uncheckedVariance = new Array[UnrolledBuffer[Buck]](bucketnumber)
protected var sz: Int = 0
def size = sz
def clear = {
buckets = new Array[UnrolledBuffer[Buck]](bucketnumber)
sz = 0
}
def beforeCombine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]) {}
def afterCombine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]) {}
def combine[N <: Elem, NewTo >: To](other: Combiner[N, NewTo]): Combiner[N, NewTo] = if (this ne other) {
if (other.isInstanceOf[BucketCombiner[_, _, _, _]]) {
beforeCombine(other)
val that = other.asInstanceOf[BucketCombiner[Elem, To, Buck, CombinerType]]
var i = 0
while (i < bucketnumber) {
if (buckets(i) eq null) {
buckets(i) = that.buckets(i)
} else {
if (that.buckets(i) ne null) buckets(i) concat that.buckets(i)
}
i += 1
}
sz = sz + that.size
afterCombine(other)
this
} else system.error("Unexpected combiner type.")
} else this
}
}