path: root/src/library/scala/collection/parallel/mutable/ParArray.scala
diff options
authorAleksandar Pokopec <>2010-07-23 16:39:51 +0000
committerAleksandar Pokopec <>2010-07-23 16:39:51 +0000
commitda234921b783d8ab31ddeb54a9c912f80715846d (patch)
treefd32fc40df0973011bc0460b7d49121f8aa1a709 /src/library/scala/collection/parallel/mutable/ParArray.scala
parent97b7cc4ddb806641ce5d1584ae913312755c012d (diff)
Parallel collection library renamings. No review
Diffstat (limited to 'src/library/scala/collection/parallel/mutable/ParArray.scala')
1 files changed, 605 insertions, 0 deletions
diff --git a/src/library/scala/collection/parallel/mutable/ParArray.scala b/src/library/scala/collection/parallel/mutable/ParArray.scala
new file mode 100644
index 0000000000..2443888465
--- /dev/null
+++ b/src/library/scala/collection/parallel/mutable/ParArray.scala
@@ -0,0 +1,605 @@
+package scala.collection.parallel.mutable
+import scala.collection.generic.GenericParTemplate
+import scala.collection.generic.GenericCompanion
+import scala.collection.generic.GenericParCompanion
+import scala.collection.generic.CanCombineFrom
+import scala.collection.generic.CanBuildFrom
+import scala.collection.generic.ParFactory
+import scala.collection.generic.Sizing
+import scala.collection.parallel.Combiner
+import scala.collection.parallel.ParSeqLike
+import scala.collection.parallel.CHECK_RATE
+import scala.collection.mutable.ArraySeq
+import scala.collection.mutable.Builder
+import scala.collection.Sequentializable
+/** Parallel sequence holding elements in a linear array.
+ *
+ * `ParArray` is a parallel sequence with a predefined size. The size of the array
+ * cannot be changed after it's been created.
+ *
+ * `ParArray` internally keeps an array containing the elements. This means that
+ * bulk operations based on traversal are fast, but those returning a parallel array as a result
+ * are slightly slower. The reason for this is that `ParArray` uses lazy builders that
+ * create the internal data array only after the size of the array is known. The fragments
+ * are then copied into the resulting data array in parallel using fast array copy operations.
+ * Operations for which the resulting array size is known in advance are optimised to use this
+ * information.
+ *
+ * @tparam T type of the elements in the array
+ *
+ * @define Coll ParArray
+ * @define coll parallel array
+ */
+class ParArray[T] private[mutable] (val arrayseq: ArraySeq[T])
+extends ParSeq[T]
+ with GenericParTemplate[T, ParArray]
+ with ParSeqLike[T, ParArray[T], ArraySeq[T]]
+ self =>
+ private val array: Array[Any] = arrayseq.array.asInstanceOf[Array[Any]]
+ override def companion: GenericCompanion[ParArray] with GenericParCompanion[ParArray] = ParArray
+ def this(sz: Int) = this {
+ require(sz >= 0)
+ new ArraySeq[T](sz)
+ }
+ def apply(i: Int) = array(i).asInstanceOf[T]
+ def update(i: Int, elem: T) = array(i) = elem
+ def length = arrayseq.length
+ def seq = arrayseq
+ type SCPI = SignalContextPassingIterator[ParArrayIterator]
+ def parallelIterator: ParArrayIterator = {
+ val pit = new ParArrayIterator with SCPI
+ pit
+ }
+ class ParArrayIterator(var i: Int = 0, val until: Int = length, val arr: Array[Any] = array)
+ extends super.ParIterator {
+ me: SignalContextPassingIterator[ParArrayIterator] =>
+ def hasNext = i < until
+ def next = {
+ val elem = arr(i)
+ i += 1
+ elem.asInstanceOf[T]
+ }
+ def remaining = until - i
+ def psplit(sizesIncomplete: Int*): Seq[ParIterator] = {
+ var traversed = i
+ val total = sizesIncomplete.reduceLeft(_ + _)
+ val left = remaining
+ val sizes = if (total >= left) sizesIncomplete else sizesIncomplete :+ (left - total)
+ for (sz <- sizes) yield if (traversed < until) {
+ val start = traversed
+ val end = (traversed + sz) min until
+ traversed = end
+ new ParArrayIterator(start, end, arr) with SCPI
+ } else {
+ new ParArrayIterator(traversed, traversed, arr) with SCPI
+ }
+ }
+ override def split: Seq[ParIterator] = {
+ val left = remaining
+ if (left >= 2) {
+ val splitpoint = left / 2
+ Seq(new ParArrayIterator(i, i + splitpoint, arr) with SCPI,
+ new ParArrayIterator(i + splitpoint, until, arr) with SCPI)
+ } else {
+ Seq(this)
+ }
+ }
+ override def toString = "ParArrayIterator(" + i + ", " + until + ")"
+ /* overrides for efficiency */
+ /* accessors */
+ override def foreach[U](f: T => U) = {
+ foreach_quick(f, arr, until, i)
+ i = until
+ }
+ private def foreach_quick[U](f: T => U, a: Array[Any], ntil: Int, from: Int) = {
+ var j = from
+ while (j < ntil) {
+ f(a(j).asInstanceOf[T])
+ j += 1
+ }
+ }
+ override def count(p: T => Boolean) = {
+ val c = count_quick(p, arr, until, i)
+ i = until
+ c
+ }
+ private def count_quick(p: T => Boolean, a: Array[Any], ntil: Int, from: Int) = {
+ var cnt = 0
+ var j = from
+ while (j < ntil) {
+ if (p(a(j).asInstanceOf[T])) cnt += 1
+ j += 1
+ }
+ cnt
+ }
+ override def foldLeft[S](z: S)(op: (S, T) => S): S = {
+ val r = foldLeft_quick(arr, until, op, z)
+ i = until
+ r
+ }
+ private def foldLeft_quick[S](a: Array[Any], ntil: Int, op: (S, T) => S, z: S): S = {
+ var j = i
+ var sum = z
+ while (j < ntil) {
+ sum = op(sum, a(j).asInstanceOf[T])
+ j += 1
+ }
+ sum
+ }
+ def aggregate[S](z: S)(seqop: (S, T) => S, combop: (S, S) => S): S = foldLeft[S](z)(seqop)
+ override def sum[U >: T](implicit num: Numeric[U]): U = {
+ var s = sum_quick(num, arr, until, i,
+ i = until
+ s
+ }
+ private def sum_quick[U >: T](num: Numeric[U], a: Array[Any], ntil: Int, from: Int, zero: U): U = {
+ var j = from
+ var sum = zero
+ while (j < ntil) {
+ sum =, a(j).asInstanceOf[T])
+ j += 1
+ }
+ sum
+ }
+ override def product[U >: T](implicit num: Numeric[U]): U = {
+ var p = product_quick(num, arr, until, i,
+ i = until
+ p
+ }
+ private def product_quick[U >: T](num: Numeric[U], a: Array[Any], ntil: Int, from: Int, one: U): U = {
+ var j = from
+ var prod = one
+ while (j < ntil) {
+ prod = num.times(prod, a(j).asInstanceOf[T])
+ j += 1
+ }
+ prod
+ }
+ override def forall(p: T => Boolean): Boolean = {
+ if (isAborted) return false
+ var all = true
+ while (i < until) {
+ val nextuntil = if (i + CHECK_RATE > until) until else i + CHECK_RATE
+ all = forall_quick(p, array, nextuntil, i)
+ if (all) i = nextuntil
+ else {
+ i = until
+ abort
+ }
+ if (isAborted) return false
+ }
+ all
+ }
+ // it's faster to use a separate small method
+ private def forall_quick(p: T => Boolean, a: Array[Any], nextuntil: Int, start: Int): Boolean = {
+ var j = start
+ while (j < nextuntil) {
+ if (p(a(j).asInstanceOf[T])) j += 1
+ else return false
+ }
+ return true
+ }
+ override def exists(p: T => Boolean): Boolean = {
+ if (isAborted) return true
+ var some = false
+ while (i < until) {
+ val nextuntil = if (i + CHECK_RATE > until) until else i + CHECK_RATE
+ some = exists_quick(p, array, nextuntil, i)
+ if (some) {
+ i = until
+ abort
+ } else i = nextuntil
+ if (isAborted) return true
+ }
+ some
+ }
+ // faster to use separate small method
+ private def exists_quick(p: T => Boolean, a: Array[Any], nextuntil: Int, start: Int): Boolean = {
+ var j = start
+ while (j < nextuntil) {
+ if (p(a(j).asInstanceOf[T])) return true
+ else j += 1
+ }
+ return false
+ }
+ override def find(p: T => Boolean): Option[T] = {
+ if (isAborted) return None
+ var r: Option[T] = None
+ while (i < until) {
+ val nextuntil = if ((i + CHECK_RATE) < until) (i + CHECK_RATE) else until
+ r = find_quick(p, array, nextuntil, i)
+ if (r != None) {
+ i = until
+ abort
+ } else i = nextuntil
+ if (isAborted) return r
+ }
+ r
+ }
+ private def find_quick(p: T => Boolean, a: Array[Any], nextuntil: Int, start: Int): Option[T] = {
+ var j = start
+ while (j < nextuntil) {
+ val elem = a(j).asInstanceOf[T]
+ if (p(elem)) return Some(elem)
+ else j += 1
+ }
+ return None
+ }
+ override def drop(n: Int): ParArrayIterator = {
+ i += n
+ this
+ }
+ override def copyToArray[U >: T](array: Array[U], from: Int, len: Int) {
+ val totallen = (self.length - i) min len min (array.length - from)
+ Array.copy(arr, i, array, from, totallen)
+ i += totallen
+ }
+ override def prefixLength(pred: T => Boolean): Int = {
+ val r = prefixLength_quick(pred, arr, until, i)
+ i += r + 1
+ r
+ }
+ private def prefixLength_quick(pred: T => Boolean, a: Array[Any], ntil: Int, startpos: Int): Int = {
+ var j = startpos
+ var endpos = ntil
+ while (j < endpos) {
+ if (pred(a(j).asInstanceOf[T])) j += 1
+ else endpos = j
+ }
+ endpos - startpos
+ }
+ override def indexWhere(pred: T => Boolean): Int = {
+ val r = indexWhere_quick(pred, arr, until, i)
+ val ret = if (r != -1) r - i else r
+ i = until
+ ret
+ }
+ private def indexWhere_quick(pred: T => Boolean, a: Array[Any], ntil: Int, from: Int): Int = {
+ var j = from
+ var pos = -1
+ while (j < ntil) {
+ if (pred(a(j).asInstanceOf[T])) {
+ pos = j
+ j = ntil
+ } else j += 1
+ }
+ pos
+ }
+ override def lastIndexWhere(pred: T => Boolean): Int = {
+ val r = lastIndexWhere_quick(pred, arr, i, until)
+ val ret = if (r != -1) r - i else r
+ i = until
+ ret
+ }
+ private def lastIndexWhere_quick(pred: T => Boolean, a: Array[Any], from: Int, ntil: Int): Int = {
+ var pos = -1
+ var j = ntil - 1
+ while (j >= from) {
+ if (pred(a(j).asInstanceOf[T])) {
+ pos = j
+ j = -1
+ } else j -= 1
+ }
+ pos
+ }
+ override def sameElements(that: Iterator[_]): Boolean = {
+ var same = true
+ while (i < until && that.hasNext) {
+ if (arr(i) != {
+ i = until
+ same = false
+ }
+ i += 1
+ }
+ same
+ }
+ /* transformers */
+ override def map2combiner[S, That](f: T => S, cb: Combiner[S, That]): Combiner[S, That] = {
+ //val cb = cbf(self.repr)
+ cb.sizeHint(remaining)
+ map2combiner_quick(f, arr, cb, until, i)
+ i = until
+ cb
+ }
+ private def map2combiner_quick[S, That](f: T => S, a: Array[Any], cb: Builder[S, That], ntil: Int, from: Int) {
+ var j = from
+ while (j < ntil) {
+ cb += f(a(j).asInstanceOf[T])
+ j += 1
+ }
+ }
+ override def collect2combiner[S, That](pf: PartialFunction[T, S], pbf: CanCombineFrom[ParArray[T], S, That]): Combiner[S, That] = {
+ val cb = pbf(self.repr)
+ collect2combiner_quick(pf, arr, cb, until, i)
+ i = until
+ cb
+ }
+ private def collect2combiner_quick[S, That](pf: PartialFunction[T, S], a: Array[Any], cb: Builder[S, That], ntil: Int, from: Int) {
+ var j = from
+ while (j < ntil) {
+ val curr = a(j).asInstanceOf[T]
+ if (pf.isDefinedAt(curr)) cb += pf(curr)
+ j += 1
+ }
+ }
+ override def flatmap2combiner[S, That](f: T => Traversable[S], pbf: CanCombineFrom[ParArray[T], S, That]): Combiner[S, That] = {
+ val cb = pbf(self.repr)
+ while (i < until) {
+ val traversable = f(arr(i).asInstanceOf[T])
+ if (traversable.isInstanceOf[Iterable[_]]) cb ++= traversable.asInstanceOf[Iterable[S]].iterator
+ else cb ++= traversable
+ i += 1
+ }
+ cb
+ }
+ override def filter2combiner[U >: T, This >: ParArray[T]](pred: T => Boolean, cb: Combiner[U, This]) = {
+ filter2combiner_quick(pred, cb, arr, until, i)
+ i = until
+ cb
+ }
+ private def filter2combiner_quick[U >: T, This >: ParArray[T]](pred: T => Boolean, cb: Builder[U, This], a: Array[Any], ntil: Int, from: Int) {
+ var j = i
+ while(j < ntil) {
+ var curr = a(j).asInstanceOf[T]
+ if (pred(curr)) cb += curr
+ j += 1
+ }
+ }
+ override def filterNot2combiner[U >: T, This >: ParArray[T]](pred: T => Boolean, cb: Combiner[U, This]) = {
+ filterNot2combiner_quick(pred, cb, arr, until, i)
+ i = until
+ cb
+ }
+ private def filterNot2combiner_quick[U >: T, This >: ParArray[T]](pred: T => Boolean, cb: Builder[U, This], a: Array[Any], ntil: Int, from: Int) {
+ var j = i
+ while(j < ntil) {
+ var curr = a(j).asInstanceOf[T]
+ if (!pred(curr)) cb += curr
+ j += 1
+ }
+ }
+ override def copy2builder[U >: T, Coll, Bld <: Builder[U, Coll]](cb: Bld): Bld = {
+ cb.sizeHint(remaining)
+ cb.ifIs[ParArrayCombiner[T]] { pac =>
+ val targetarr: Array[Any] = pac.lastbuff.internalArray.asInstanceOf[Array[Any]]
+ Array.copy(arr, i, targetarr, pac.lastbuff.size, until - i)
+ pac.lastbuff.setInternalSize(remaining)
+ } otherwise {
+ copy2builder_quick(cb, arr, until, i)
+ i = until
+ }
+ cb
+ }
+ private def copy2builder_quick[U >: T, Coll](b: Builder[U, Coll], a: Array[Any], ntil: Int, from: Int) {
+ var j = from
+ while (j < ntil) {
+ b += a(j).asInstanceOf[T]
+ j += 1
+ }
+ }
+ override def partition2combiners[U >: T, This >: ParArray[T]](pred: T => Boolean, btrue: Combiner[U, This], bfalse: Combiner[U, This]) = {
+ partition2combiners_quick(pred, btrue, bfalse, arr, until, i)
+ i = until
+ (btrue, bfalse)
+ }
+ private def partition2combiners_quick[U >: T, This >: ParArray[T]](p: T => Boolean, btrue: Builder[U, This], bfalse: Builder[U, This], a: Array[Any], ntil: Int, from: Int) {
+ var j = from
+ while (j < ntil) {
+ val curr = a(j).asInstanceOf[T]
+ if (p(curr)) btrue += curr else bfalse += curr
+ j += 1
+ }
+ }
+ override def take2combiner[U >: T, This >: ParArray[T]](n: Int, cb: Combiner[U, This]) = {
+ cb.sizeHint(n)
+ val ntil = i + n
+ val a = arr
+ while (i < ntil) {
+ cb += a(i).asInstanceOf[T]
+ i += 1
+ }
+ cb
+ }
+ override def drop2combiner[U >: T, This >: ParArray[T]](n: Int, cb: Combiner[U, This]) = {
+ drop(n)
+ cb.sizeHint(remaining)
+ while (i < until) {
+ cb += arr(i).asInstanceOf[T]
+ i += 1
+ }
+ cb
+ }
+ override def reverse2combiner[U >: T, This >: ParArray[T]](cb: Combiner[U, This]): Combiner[U, This] = {
+ cb.ifIs[ParArrayCombiner[T]] { pac =>
+ val sz = remaining
+ pac.sizeHint(sz)
+ val targetarr: Array[Any] = pac.lastbuff.internalArray.asInstanceOf[Array[Any]]
+ reverse2combiner_quick(targetarr, arr, i, until)
+ pac.lastbuff.setInternalSize(sz)
+ pac
+ } otherwise super.reverse2combiner(cb)
+ cb
+ }
+ private def reverse2combiner_quick(targ: Array[Any], a: Array[Any], from: Int, ntil: Int) {
+ var j = from
+ var k = ntil - from - 1
+ while (j < ntil) {
+ targ(k) = a(j)
+ j += 1
+ k -= 1
+ }
+ }
+ }
+ /* operations */
+ private def buildsArray[S, That](c: Builder[S, That]) = c.isInstanceOf[ParArrayCombiner[_]]
+ override def map[S, That](f: T => S)(implicit bf: CanBuildFrom[ParArray[T], S, That]) = if (buildsArray(bf(repr))) {
+ // reserve array
+ val targetarr = new Array[Any](length)
+ // fill it in parallel
+ executeAndWait(new Map[S](f, targetarr, 0, length))
+ // wrap it into a parallel array
+ (new ParArray[S](new ExposedArraySeq[S](targetarr.asInstanceOf[Array[AnyRef]], length))).asInstanceOf[That]
+ } else
+ /* tasks */
+ class Map[S](f: T => S, targetarr: Array[Any], offset: Int, howmany: Int) extends super.Task[Unit, Map[S]] {
+ var result = ();
+ def leaf(prev: Option[Unit]) = {
+ val tarr = targetarr
+ val sarr = array
+ var i = offset
+ val until = offset + howmany
+ while (i < until) {
+ tarr(i) = f(sarr(i).asInstanceOf[T])
+ i += 1
+ }
+ }
+ def split = {
+ val fp = howmany / 2
+ List(new Map(f, targetarr, offset, fp), new Map(f, targetarr, offset + fp, howmany - fp))
+ }
+ def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(length, parallelismLevel)
+ }
+object ParArray extends ParFactory[ParArray] {
+ implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParArray[T]] = new GenericCanCombineFrom[T]
+ def newBuilder[T]: Combiner[T, ParArray[T]] = newCombiner
+ def newCombiner[T]: Combiner[T, ParArray[T]] = ParArrayCombiner[T]
+ /** Creates a new parallel array by wrapping the specified array.
+ */
+ def handoff[T](arr: Array[T]): ParArray[T] = wrapOrRebuild(arr, arr.length)
+ /** Creates a new parallel array by wrapping a part of the specified array.
+ */
+ def handoff[T](arr: Array[T], sz: Int): ParArray[T] = wrapOrRebuild(arr, sz)
+ private def wrapOrRebuild[T](arr: AnyRef, sz: Int) = arr match {
+ case arr: Array[AnyRef] => new ParArray[T](new ExposedArraySeq[T](arr, sz))
+ case _ => new ParArray[T](new ExposedArraySeq[T](runtime.ScalaRunTime.toObjectArray(arr), sz))
+ }
+ def createFromCopy[T <: AnyRef : ClassManifest](arr: Array[T]): ParArray[T] = {
+ val newarr = new Array[T](arr.length)
+ Array.copy(arr, 0, newarr, 0, arr.length)
+ handoff(newarr)
+ }