diff options
author | Aleksandar Pokopec <aleksandar.prokopec@epfl.ch> | 2010-07-23 16:39:51 +0000 |
---|---|---|
committer | Aleksandar Pokopec <aleksandar.prokopec@epfl.ch> | 2010-07-23 16:39:51 +0000 |
commit | da234921b783d8ab31ddeb54a9c912f80715846d (patch) | |
tree | fd32fc40df0973011bc0460b7d49121f8aa1a709 /src/library/scala/collection/parallel/mutable/ParArray.scala | |
parent | 97b7cc4ddb806641ce5d1584ae913312755c012d (diff) | |
download | scala-da234921b783d8ab31ddeb54a9c912f80715846d.tar.gz scala-da234921b783d8ab31ddeb54a9c912f80715846d.tar.bz2 scala-da234921b783d8ab31ddeb54a9c912f80715846d.zip |
Parallel collection library renamings. No review
Diffstat (limited to 'src/library/scala/collection/parallel/mutable/ParArray.scala')
-rw-r--r-- | src/library/scala/collection/parallel/mutable/ParArray.scala | 605 |
1 files changed, 605 insertions, 0 deletions
diff --git a/src/library/scala/collection/parallel/mutable/ParArray.scala b/src/library/scala/collection/parallel/mutable/ParArray.scala new file mode 100644 index 0000000000..2443888465 --- /dev/null +++ b/src/library/scala/collection/parallel/mutable/ParArray.scala @@ -0,0 +1,605 @@ +package scala.collection.parallel.mutable + + + +import scala.collection.generic.GenericParTemplate +import scala.collection.generic.GenericCompanion +import scala.collection.generic.GenericParCompanion +import scala.collection.generic.CanCombineFrom +import scala.collection.generic.CanBuildFrom +import scala.collection.generic.ParFactory +import scala.collection.generic.Sizing +import scala.collection.parallel.Combiner +import scala.collection.parallel.ParSeqLike +import scala.collection.parallel.CHECK_RATE +import scala.collection.mutable.ArraySeq +import scala.collection.mutable.Builder +import scala.collection.Sequentializable + + + + +/** Parallel sequence holding elements in a linear array. + * + * `ParArray` is a parallel sequence with a predefined size. The size of the array + * cannot be changed after it's been created. + * + * `ParArray` internally keeps an array containing the elements. This means that + * bulk operations based on traversal are fast, but those returning a parallel array as a result + * are slightly slower. The reason for this is that `ParArray` uses lazy builders that + * create the internal data array only after the size of the array is known. The fragments + * are then copied into the resulting data array in parallel using fast array copy operations. + * Operations for which the resulting array size is known in advance are optimised to use this + * information. + * + * @tparam T type of the elements in the array + * + * @define Coll ParArray + * @define coll parallel array + */ +class ParArray[T] private[mutable] (val arrayseq: ArraySeq[T]) +extends ParSeq[T] + with GenericParTemplate[T, ParArray] + with ParSeqLike[T, ParArray[T], ArraySeq[T]] +{ + self => + + private val array: Array[Any] = arrayseq.array.asInstanceOf[Array[Any]] + + override def companion: GenericCompanion[ParArray] with GenericParCompanion[ParArray] = ParArray + + def this(sz: Int) = this { + require(sz >= 0) + new ArraySeq[T](sz) + } + + def apply(i: Int) = array(i).asInstanceOf[T] + + def update(i: Int, elem: T) = array(i) = elem + + def length = arrayseq.length + + def seq = arrayseq + + type SCPI = SignalContextPassingIterator[ParArrayIterator] + + def parallelIterator: ParArrayIterator = { + val pit = new ParArrayIterator with SCPI + pit + } + + class ParArrayIterator(var i: Int = 0, val until: Int = length, val arr: Array[Any] = array) + extends super.ParIterator { + me: SignalContextPassingIterator[ParArrayIterator] => + + def hasNext = i < until + + def next = { + val elem = arr(i) + i += 1 + elem.asInstanceOf[T] + } + + def remaining = until - i + + def psplit(sizesIncomplete: Int*): Seq[ParIterator] = { + var traversed = i + val total = sizesIncomplete.reduceLeft(_ + _) + val left = remaining + val sizes = if (total >= left) sizesIncomplete else sizesIncomplete :+ (left - total) + for (sz <- sizes) yield if (traversed < until) { + val start = traversed + val end = (traversed + sz) min until + traversed = end + new ParArrayIterator(start, end, arr) with SCPI + } else { + new ParArrayIterator(traversed, traversed, arr) with SCPI + } + } + + override def split: Seq[ParIterator] = { + val left = remaining + if (left >= 2) { + val splitpoint = left / 2 + Seq(new ParArrayIterator(i, i + splitpoint, arr) with SCPI, + new ParArrayIterator(i + splitpoint, until, arr) with SCPI) + } else { + Seq(this) + } + } + + override def toString = "ParArrayIterator(" + i + ", " + until + ")" + + /* overrides for efficiency */ + + /* accessors */ + + override def foreach[U](f: T => U) = { + foreach_quick(f, arr, until, i) + i = until + } + + private def foreach_quick[U](f: T => U, a: Array[Any], ntil: Int, from: Int) = { + var j = from + while (j < ntil) { + f(a(j).asInstanceOf[T]) + j += 1 + } + } + + override def count(p: T => Boolean) = { + val c = count_quick(p, arr, until, i) + i = until + c + } + + private def count_quick(p: T => Boolean, a: Array[Any], ntil: Int, from: Int) = { + var cnt = 0 + var j = from + while (j < ntil) { + if (p(a(j).asInstanceOf[T])) cnt += 1 + j += 1 + } + cnt + } + + override def foldLeft[S](z: S)(op: (S, T) => S): S = { + val r = foldLeft_quick(arr, until, op, z) + i = until + r + } + + private def foldLeft_quick[S](a: Array[Any], ntil: Int, op: (S, T) => S, z: S): S = { + var j = i + var sum = z + while (j < ntil) { + sum = op(sum, a(j).asInstanceOf[T]) + j += 1 + } + sum + } + + def aggregate[S](z: S)(seqop: (S, T) => S, combop: (S, S) => S): S = foldLeft[S](z)(seqop) + + override def sum[U >: T](implicit num: Numeric[U]): U = { + var s = sum_quick(num, arr, until, i, num.zero) + i = until + s + } + + private def sum_quick[U >: T](num: Numeric[U], a: Array[Any], ntil: Int, from: Int, zero: U): U = { + var j = from + var sum = zero + while (j < ntil) { + sum = num.plus(sum, a(j).asInstanceOf[T]) + j += 1 + } + sum + } + + override def product[U >: T](implicit num: Numeric[U]): U = { + var p = product_quick(num, arr, until, i, num.one) + i = until + p + } + + private def product_quick[U >: T](num: Numeric[U], a: Array[Any], ntil: Int, from: Int, one: U): U = { + var j = from + var prod = one + while (j < ntil) { + prod = num.times(prod, a(j).asInstanceOf[T]) + j += 1 + } + prod + } + + override def forall(p: T => Boolean): Boolean = { + if (isAborted) return false + + var all = true + while (i < until) { + val nextuntil = if (i + CHECK_RATE > until) until else i + CHECK_RATE + + all = forall_quick(p, array, nextuntil, i) + if (all) i = nextuntil + else { + i = until + abort + } + + if (isAborted) return false + } + all + } + + // it's faster to use a separate small method + private def forall_quick(p: T => Boolean, a: Array[Any], nextuntil: Int, start: Int): Boolean = { + var j = start + while (j < nextuntil) { + if (p(a(j).asInstanceOf[T])) j += 1 + else return false + } + return true + } + + override def exists(p: T => Boolean): Boolean = { + if (isAborted) return true + + var some = false + while (i < until) { + val nextuntil = if (i + CHECK_RATE > until) until else i + CHECK_RATE + + some = exists_quick(p, array, nextuntil, i) + if (some) { + i = until + abort + } else i = nextuntil + + if (isAborted) return true + } + some + } + + // faster to use separate small method + private def exists_quick(p: T => Boolean, a: Array[Any], nextuntil: Int, start: Int): Boolean = { + var j = start + while (j < nextuntil) { + if (p(a(j).asInstanceOf[T])) return true + else j += 1 + } + return false + } + + override def find(p: T => Boolean): Option[T] = { + if (isAborted) return None + + var r: Option[T] = None + while (i < until) { + val nextuntil = if ((i + CHECK_RATE) < until) (i + CHECK_RATE) else until + + r = find_quick(p, array, nextuntil, i) + + if (r != None) { + i = until + abort + } else i = nextuntil + + if (isAborted) return r + } + r + } + + private def find_quick(p: T => Boolean, a: Array[Any], nextuntil: Int, start: Int): Option[T] = { + var j = start + while (j < nextuntil) { + val elem = a(j).asInstanceOf[T] + if (p(elem)) return Some(elem) + else j += 1 + } + return None + } + + override def drop(n: Int): ParArrayIterator = { + i += n + this + } + + override def copyToArray[U >: T](array: Array[U], from: Int, len: Int) { + val totallen = (self.length - i) min len min (array.length - from) + Array.copy(arr, i, array, from, totallen) + i += totallen + } + + override def prefixLength(pred: T => Boolean): Int = { + val r = prefixLength_quick(pred, arr, until, i) + i += r + 1 + r + } + + private def prefixLength_quick(pred: T => Boolean, a: Array[Any], ntil: Int, startpos: Int): Int = { + var j = startpos + var endpos = ntil + while (j < endpos) { + if (pred(a(j).asInstanceOf[T])) j += 1 + else endpos = j + } + endpos - startpos + } + + override def indexWhere(pred: T => Boolean): Int = { + val r = indexWhere_quick(pred, arr, until, i) + val ret = if (r != -1) r - i else r + i = until + ret + } + + private def indexWhere_quick(pred: T => Boolean, a: Array[Any], ntil: Int, from: Int): Int = { + var j = from + var pos = -1 + while (j < ntil) { + if (pred(a(j).asInstanceOf[T])) { + pos = j + j = ntil + } else j += 1 + } + pos + } + + override def lastIndexWhere(pred: T => Boolean): Int = { + val r = lastIndexWhere_quick(pred, arr, i, until) + val ret = if (r != -1) r - i else r + i = until + ret + } + + private def lastIndexWhere_quick(pred: T => Boolean, a: Array[Any], from: Int, ntil: Int): Int = { + var pos = -1 + var j = ntil - 1 + while (j >= from) { + if (pred(a(j).asInstanceOf[T])) { + pos = j + j = -1 + } else j -= 1 + } + pos + } + + override def sameElements(that: Iterator[_]): Boolean = { + var same = true + while (i < until && that.hasNext) { + if (arr(i) != that.next) { + i = until + same = false + } + i += 1 + } + same + } + + /* transformers */ + + override def map2combiner[S, That](f: T => S, cb: Combiner[S, That]): Combiner[S, That] = { + //val cb = cbf(self.repr) + cb.sizeHint(remaining) + map2combiner_quick(f, arr, cb, until, i) + i = until + cb + } + + private def map2combiner_quick[S, That](f: T => S, a: Array[Any], cb: Builder[S, That], ntil: Int, from: Int) { + var j = from + while (j < ntil) { + cb += f(a(j).asInstanceOf[T]) + j += 1 + } + } + + override def collect2combiner[S, That](pf: PartialFunction[T, S], pbf: CanCombineFrom[ParArray[T], S, That]): Combiner[S, That] = { + val cb = pbf(self.repr) + collect2combiner_quick(pf, arr, cb, until, i) + i = until + cb + } + + private def collect2combiner_quick[S, That](pf: PartialFunction[T, S], a: Array[Any], cb: Builder[S, That], ntil: Int, from: Int) { + var j = from + while (j < ntil) { + val curr = a(j).asInstanceOf[T] + if (pf.isDefinedAt(curr)) cb += pf(curr) + j += 1 + } + } + + override def flatmap2combiner[S, That](f: T => Traversable[S], pbf: CanCombineFrom[ParArray[T], S, That]): Combiner[S, That] = { + val cb = pbf(self.repr) + while (i < until) { + val traversable = f(arr(i).asInstanceOf[T]) + if (traversable.isInstanceOf[Iterable[_]]) cb ++= traversable.asInstanceOf[Iterable[S]].iterator + else cb ++= traversable + i += 1 + } + cb + } + + override def filter2combiner[U >: T, This >: ParArray[T]](pred: T => Boolean, cb: Combiner[U, This]) = { + filter2combiner_quick(pred, cb, arr, until, i) + i = until + cb + } + + private def filter2combiner_quick[U >: T, This >: ParArray[T]](pred: T => Boolean, cb: Builder[U, This], a: Array[Any], ntil: Int, from: Int) { + var j = i + while(j < ntil) { + var curr = a(j).asInstanceOf[T] + if (pred(curr)) cb += curr + j += 1 + } + } + + override def filterNot2combiner[U >: T, This >: ParArray[T]](pred: T => Boolean, cb: Combiner[U, This]) = { + filterNot2combiner_quick(pred, cb, arr, until, i) + i = until + cb + } + + private def filterNot2combiner_quick[U >: T, This >: ParArray[T]](pred: T => Boolean, cb: Builder[U, This], a: Array[Any], ntil: Int, from: Int) { + var j = i + while(j < ntil) { + var curr = a(j).asInstanceOf[T] + if (!pred(curr)) cb += curr + j += 1 + } + } + + override def copy2builder[U >: T, Coll, Bld <: Builder[U, Coll]](cb: Bld): Bld = { + cb.sizeHint(remaining) + cb.ifIs[ParArrayCombiner[T]] { pac => + val targetarr: Array[Any] = pac.lastbuff.internalArray.asInstanceOf[Array[Any]] + Array.copy(arr, i, targetarr, pac.lastbuff.size, until - i) + pac.lastbuff.setInternalSize(remaining) + } otherwise { + copy2builder_quick(cb, arr, until, i) + i = until + } + cb + } + + private def copy2builder_quick[U >: T, Coll](b: Builder[U, Coll], a: Array[Any], ntil: Int, from: Int) { + var j = from + while (j < ntil) { + b += a(j).asInstanceOf[T] + j += 1 + } + } + + override def partition2combiners[U >: T, This >: ParArray[T]](pred: T => Boolean, btrue: Combiner[U, This], bfalse: Combiner[U, This]) = { + partition2combiners_quick(pred, btrue, bfalse, arr, until, i) + i = until + (btrue, bfalse) + } + + private def partition2combiners_quick[U >: T, This >: ParArray[T]](p: T => Boolean, btrue: Builder[U, This], bfalse: Builder[U, This], a: Array[Any], ntil: Int, from: Int) { + var j = from + while (j < ntil) { + val curr = a(j).asInstanceOf[T] + if (p(curr)) btrue += curr else bfalse += curr + j += 1 + } + } + + override def take2combiner[U >: T, This >: ParArray[T]](n: Int, cb: Combiner[U, This]) = { + cb.sizeHint(n) + val ntil = i + n + val a = arr + while (i < ntil) { + cb += a(i).asInstanceOf[T] + i += 1 + } + cb + } + + override def drop2combiner[U >: T, This >: ParArray[T]](n: Int, cb: Combiner[U, This]) = { + drop(n) + cb.sizeHint(remaining) + while (i < until) { + cb += arr(i).asInstanceOf[T] + i += 1 + } + cb + } + + override def reverse2combiner[U >: T, This >: ParArray[T]](cb: Combiner[U, This]): Combiner[U, This] = { + cb.ifIs[ParArrayCombiner[T]] { pac => + val sz = remaining + pac.sizeHint(sz) + val targetarr: Array[Any] = pac.lastbuff.internalArray.asInstanceOf[Array[Any]] + reverse2combiner_quick(targetarr, arr, i, until) + pac.lastbuff.setInternalSize(sz) + pac + } otherwise super.reverse2combiner(cb) + cb + } + + private def reverse2combiner_quick(targ: Array[Any], a: Array[Any], from: Int, ntil: Int) { + var j = from + var k = ntil - from - 1 + while (j < ntil) { + targ(k) = a(j) + j += 1 + k -= 1 + } + } + + } + + /* operations */ + + private def buildsArray[S, That](c: Builder[S, That]) = c.isInstanceOf[ParArrayCombiner[_]] + + override def map[S, That](f: T => S)(implicit bf: CanBuildFrom[ParArray[T], S, That]) = if (buildsArray(bf(repr))) { + // reserve array + val targetarr = new Array[Any](length) + + // fill it in parallel + executeAndWait(new Map[S](f, targetarr, 0, length)) + + // wrap it into a parallel array + (new ParArray[S](new ExposedArraySeq[S](targetarr.asInstanceOf[Array[AnyRef]], length))).asInstanceOf[That] + } else super.map(f)(bf) + + /* tasks */ + + class Map[S](f: T => S, targetarr: Array[Any], offset: Int, howmany: Int) extends super.Task[Unit, Map[S]] { + var result = (); + def leaf(prev: Option[Unit]) = { + val tarr = targetarr + val sarr = array + var i = offset + val until = offset + howmany + while (i < until) { + tarr(i) = f(sarr(i).asInstanceOf[T]) + i += 1 + } + } + def split = { + val fp = howmany / 2 + List(new Map(f, targetarr, offset, fp), new Map(f, targetarr, offset + fp, howmany - fp)) + } + def shouldSplitFurther = howmany > collection.parallel.thresholdFromSize(length, parallelismLevel) + } + +} + + + + + +object ParArray extends ParFactory[ParArray] { + implicit def canBuildFrom[T]: CanCombineFrom[Coll, T, ParArray[T]] = new GenericCanCombineFrom[T] + def newBuilder[T]: Combiner[T, ParArray[T]] = newCombiner + def newCombiner[T]: Combiner[T, ParArray[T]] = ParArrayCombiner[T] + + /** Creates a new parallel array by wrapping the specified array. + */ + def handoff[T](arr: Array[T]): ParArray[T] = wrapOrRebuild(arr, arr.length) + + /** Creates a new parallel array by wrapping a part of the specified array. + */ + def handoff[T](arr: Array[T], sz: Int): ParArray[T] = wrapOrRebuild(arr, sz) + + private def wrapOrRebuild[T](arr: AnyRef, sz: Int) = arr match { + case arr: Array[AnyRef] => new ParArray[T](new ExposedArraySeq[T](arr, sz)) + case _ => new ParArray[T](new ExposedArraySeq[T](runtime.ScalaRunTime.toObjectArray(arr), sz)) + } + + def createFromCopy[T <: AnyRef : ClassManifest](arr: Array[T]): ParArray[T] = { + val newarr = new Array[T](arr.length) + Array.copy(arr, 0, newarr, 0, arr.length) + handoff(newarr) + } + +} + + + + + + + + + + + + + + + + + + + + + + + + |