From 492b22576f2ad46b300ce8dc31c5b672aaf517e4 Mon Sep 17 00:00:00 2001 From: Aleksandar Pokopec Date: Thu, 9 Dec 2010 10:08:16 +0000 Subject: Fixed parallel ranges to use the same range log... Fixed parallel ranges to use the same range logic under the hood, and not introduce code duplication. Slight fix in Tasks. No review. --- src/library/scala/collection/immutable/Range.scala | 4 +- .../collection/parallel/ParIterableLike.scala | 2 +- .../collection/parallel/ParIterableViewLike.scala | 2 +- .../scala/collection/parallel/ParSeqViewLike.scala | 2 +- src/library/scala/collection/parallel/Tasks.scala | 2 + .../scala/collection/parallel/UnrolledBuffer.scala | 28 +++++++ .../collection/parallel/immutable/ParRange.scala | 97 ++++++++++++---------- .../parallel_array/MatrixMultiplication.scala | 2 +- .../benchmarks/parallel_range/RangeBenches.scala | 7 +- .../parallel-collections/ParallelRangeCheck.scala | 2 +- 10 files changed, 97 insertions(+), 51 deletions(-) diff --git a/src/library/scala/collection/immutable/Range.scala b/src/library/scala/collection/immutable/Range.scala index 1ebf6e283c..0446eeb198 100644 --- a/src/library/scala/collection/immutable/Range.scala +++ b/src/library/scala/collection/immutable/Range.scala @@ -44,7 +44,7 @@ extends IndexedSeq[Int] with collection.Parallelizable[ParRange] with Serializable { - def par = ParRange(start, end, step, false) + def par = new ParRange(this) // Note that this value is calculated eagerly intentionally: it also // serves to enforce conditions (step != 0) && (length <= Int.MaxValue) @@ -249,7 +249,7 @@ object Range { NumericRange.count[Long](start, end, step, isInclusive) class Inclusive(start: Int, end: Int, step: Int) extends Range(start, end, step) { - override def par = ParRange(start, end, step, true) + override def par = new ParRange(this) override def isInclusive = true override protected def copy(start: Int, end: Int, step: Int): Range = new Inclusive(start, end, step) } diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala index 83e5c6cb59..ed757655f5 100644 --- a/src/library/scala/collection/parallel/ParIterableLike.scala +++ b/src/library/scala/collection/parallel/ParIterableLike.scala @@ -644,7 +644,7 @@ self => executeAndWaitResult(new Zip(pbf, parallelIterator, thatseq.parallelIterator) mapResult { _.result }); } else super.zip(that)(bf) - override def zipWithIndex[U >: T, That](implicit bf: CanBuildFrom[Repr, (U, Int), That]): That = this zip new immutable.ParRange(0, size, 1, false) + override def zipWithIndex[U >: T, That](implicit bf: CanBuildFrom[Repr, (U, Int), That]): That = this zip immutable.ParRange(0, size, 1, false) override def zipAll[S, U >: T, That](that: Iterable[S], thisElem: U, thatElem: S)(implicit bf: CanBuildFrom[Repr, (U, S), That]): That = if (bf.isParallel && that.isParSeq) { val pbf = bf.asParallel diff --git a/src/library/scala/collection/parallel/ParIterableViewLike.scala b/src/library/scala/collection/parallel/ParIterableViewLike.scala index 570abdcea6..6fb924e57e 100644 --- a/src/library/scala/collection/parallel/ParIterableViewLike.scala +++ b/src/library/scala/collection/parallel/ParIterableViewLike.scala @@ -117,7 +117,7 @@ self => override def zip[U >: T, S, That](that: Iterable[S])(implicit bf: CanBuildFrom[This, (U, S), That]): That = newZippedTryParSeq(that).asInstanceOf[That] override def zipWithIndex[U >: T, That](implicit bf: CanBuildFrom[This, (U, Int), That]): That = - newZipped(new ParRange(0, parallelIterator.remaining, 1, false)).asInstanceOf[That] + newZipped(ParRange(0, parallelIterator.remaining, 1, false)).asInstanceOf[That] override def zipAll[S, U >: T, That](that: Iterable[S], thisElem: U, thatElem: S)(implicit bf: CanBuildFrom[This, (U, S), That]): That = newZippedAllTryParSeq(that, thisElem, thatElem).asInstanceOf[That] diff --git a/src/library/scala/collection/parallel/ParSeqViewLike.scala b/src/library/scala/collection/parallel/ParSeqViewLike.scala index 2c323ecade..1b5ae06c42 100644 --- a/src/library/scala/collection/parallel/ParSeqViewLike.scala +++ b/src/library/scala/collection/parallel/ParSeqViewLike.scala @@ -140,7 +140,7 @@ self => override def map[S, That](f: T => S)(implicit bf: CanBuildFrom[This, S, That]): That = newMapped(f).asInstanceOf[That] override def zip[U >: T, S, That](that: Iterable[S])(implicit bf: CanBuildFrom[This, (U, S), That]): That = newZippedTryParSeq(that).asInstanceOf[That] override def zipWithIndex[U >: T, That](implicit bf: CanBuildFrom[This, (U, Int), That]): That = - newZipped(new ParRange(0, parallelIterator.remaining, 1, false)).asInstanceOf[That] + newZipped(ParRange(0, parallelIterator.remaining, 1, false)).asInstanceOf[That] override def reverse: This = newReversed.asInstanceOf[This] override def reverseMap[S, That](f: T => S)(implicit bf: CanBuildFrom[This, S, That]): That = reverse.map(f) diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala index ec38513d9b..b111ecb87c 100644 --- a/src/library/scala/collection/parallel/Tasks.scala +++ b/src/library/scala/collection/parallel/Tasks.scala @@ -282,6 +282,7 @@ trait ThreadPoolTasks extends Tasks { () => { t.sync + t.body.forwardThrowable t.body.result } } @@ -293,6 +294,7 @@ trait ThreadPoolTasks extends Tasks { t.start t.sync + t.body.forwardThrowable t.body.result } diff --git a/src/library/scala/collection/parallel/UnrolledBuffer.scala b/src/library/scala/collection/parallel/UnrolledBuffer.scala index 2c12069e1c..d29c24822c 100644 --- a/src/library/scala/collection/parallel/UnrolledBuffer.scala +++ b/src/library/scala/collection/parallel/UnrolledBuffer.scala @@ -10,6 +10,34 @@ import annotation.tailrec + +/** A buffer that stores elements in an unrolled linked list. + * + * Unrolled linked lists store elements in linked fixed size + * arrays. + * + * Unrolled buffers retain locality and low memory overhead + * properties of array buffers, but offer much more efficient + * element addition, since they never reallocate and copy the + * internal array. + * + * However, they provide `O(n/m)` complexity random access, + * where `n` is the number of elements, and `m` the size of + * internal array chunks. + * + * Ideal to use when: + * - elements are added to the buffer and then all of the + * elements are traversed sequentially + * - two unrolled buffers need to be concatenated (see `concat`) + * + * Better than singly linked lists for random access, but + * should still be avoided for such a purpose. + * + * @author Aleksandar Prokopec + * + * @coll unrolled buffer + * @Coll UnrolledBuffer + */ class UnrolledBuffer[T](implicit val manifest: ClassManifest[T]) extends collection.mutable.Buffer[T] with collection.mutable.BufferLike[T, UnrolledBuffer[T]] diff --git a/src/library/scala/collection/parallel/immutable/ParRange.scala b/src/library/scala/collection/parallel/immutable/ParRange.scala index 53f6bd33e6..dbd5449362 100644 --- a/src/library/scala/collection/parallel/immutable/ParRange.scala +++ b/src/library/scala/collection/parallel/immutable/ParRange.scala @@ -7,71 +7,82 @@ import scala.collection.immutable.RangeUtils import scala.collection.parallel.ParSeq import scala.collection.parallel.Combiner import scala.collection.generic.CanCombineFrom +import scala.collection.parallel.ParIterableIterator -class ParRange(val start: Int, val end: Int, val step: Int, val inclusive: Boolean) -extends ParSeq[Int] - with RangeUtils[ParRange] { - self => +class ParRange(range: Range) +extends ParSeq[Int] { +self => - def seq = new Range(start, end, step) + def seq = range - def length = _length + @inline + final def length = range.length - def apply(idx: Int) = _apply(idx) - - def create(_start: Int, _end: Int, _step: Int, _inclusive: Boolean) = new ParRange(_start, _end, _step, _inclusive) + @inline + final def apply(idx: Int) = range.apply(idx) def parallelIterator = new ParRangeIterator with SCPI - override def toString = seq.toString // TODO - type SCPI = SignalContextPassingIterator[ParRangeIterator] - class ParRangeIterator - (var start: Int = self.start, val end: Int = self.end, val step: Int = self.step, val inclusive: Boolean = self.inclusive) - extends ParIterator with RangeUtils[ParRangeIterator] { - me: SignalContextPassingIterator[ParRangeIterator] => - def remaining = _length - def next = { val r = start; start += step; r } - def hasNext = remaining > 0 - def split: Seq[ParIterator] = psplit(remaining / 2, remaining - remaining / 2) - def psplit(sizes: Int*): Seq[ParIterator] = { - val incr = sizes.scanLeft(0)(_ + _) - for ((from, until) <- incr.init zip incr.tail) yield _slice(from, until) - } - def create(_start: Int, _end: Int, _step: Int, _inclusive: Boolean) = { - new ParRangeIterator(_start, _end, _step, _inclusive) with SCPI + class ParRangeIterator(range: Range = self.range) + extends ParIterator { + me: SignalContextPassingIterator[ParRangeIterator] => + override def toString = "ParRangeIterator(over: " + range + ")" + private var ind = 0 + private val len = range.length + + final def remaining = len - ind + + final def hasNext = ind < len + + final def next = if (hasNext) { + val r = range.apply(ind) + ind += 1 + r + } else Iterator.empty.next + + private def rangeleft = range.drop(ind) + + def split = { + val rleft = rangeleft + val elemleft = rleft.length + if (elemleft < 2) Seq(new ParRangeIterator(rleft) with SCPI) + else Seq( + new ParRangeIterator(rleft.take(elemleft / 2)) with SCPI, + new ParRangeIterator(rleft.drop(elemleft / 2)) with SCPI + ) } - override def toString = "ParRangeIterator(" + start + ", " + end + ", " + step + ", incl: " + inclusive + ")" + def psplit(sizes: Int*) = { + var rleft = rangeleft + for (sz <- sizes) yield { + val fronttaken = rleft.take(sz) + rleft = rleft.drop(sz) + new ParRangeIterator(fronttaken) with SCPI + } + } /* accessors */ override def foreach[U](f: Int => U): Unit = { - _foreach(f) - start = end + step + rangeleft.foreach(f) + ind = len } override def reduce[U >: Int](op: (U, U) => U): U = { - var sum = next - for (elem <- this) sum += elem - sum + val r = rangeleft.reduceLeft(op) + ind = len + r } /* transformers */ override def map2combiner[S, That](f: Int => S, cb: Combiner[S, That]): Combiner[S, That] = { - //val cb = pbf(self.repr) - val sz = remaining - cb.sizeHint(sz) - if (sz > 0) { - val last = _last - while (start != last) { - f(start) - start += step - } + while (hasNext) { + cb += f(next) } cb } @@ -82,8 +93,10 @@ extends ParSeq[Int] object ParRange { - def apply(start: Int, end: Int, step: Int, inclusive: Boolean) = - new ParRange(start, end, step, inclusive) + def apply(start: Int, end: Int, step: Int, inclusive: Boolean) = new ParRange( + if (inclusive) new Range.Inclusive(start, end, step) + else new Range(start, end, step) + ) } diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/MatrixMultiplication.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/MatrixMultiplication.scala index e4eb51d83b..3c1cc47088 100644 --- a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/MatrixMultiplication.scala +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/MatrixMultiplication.scala @@ -38,7 +38,7 @@ extends Resettable(sz, p, what, new Cont(_), new Array[Any](_), classOf[Cont]) { } def assignProduct(a: Matrix[T], b: Matrix[T]) = { - val range = new ParRange(0, n * n, 1, false) + val range = ParRange(0, n * n, 1, false) range.tasksupport.environment = forkjoinpool for (i <- range) this(i / n, i % n) = calcProduct(a, b, i / n, i % n); } diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala index 6cd1d74c5e..4f32d366a4 100644 --- a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala @@ -20,7 +20,7 @@ object RangeBenches extends StandardParIterableBenches[Int, ParRange] { val forkJoinPool = new scala.concurrent.forkjoin.ForkJoinPool def createSequential(sz: Int, p: Int) = new collection.immutable.Range(0, sz, 1) def createParallel(sz: Int, p: Int) = { - val pr = new collection.parallel.immutable.ParRange(0, sz, 1, false) + val pr = collection.parallel.immutable.ParRange(0, sz, 1, false) forkJoinPool.setParallelism(p) pr.tasksupport.environment = forkJoinPool pr @@ -79,7 +79,10 @@ object RangeBenches extends StandardParIterableBenches[Int, ParRange] { def comparisonMap = collection.Map() def runseq = for (n <- this.seqcoll) modify(n) - def runpar = for (n <- this.parcoll) modify(n) + def runpar = for (n <- this.parcoll.asInstanceOf[ParRange]) { + modify(n) + () + } def companion = ForeachModify } diff --git a/test/files/scalacheck/parallel-collections/ParallelRangeCheck.scala b/test/files/scalacheck/parallel-collections/ParallelRangeCheck.scala index c34fb872aa..372d6b9fbd 100644 --- a/test/files/scalacheck/parallel-collections/ParallelRangeCheck.scala +++ b/test/files/scalacheck/parallel-collections/ParallelRangeCheck.scala @@ -38,7 +38,7 @@ object ParallelRangeCheck extends ParallelSeqCheck[Int]("ParallelRange[Int]") wi } def fromSeq(a: Seq[Int]) = a match { - case r: Range => new ParRange(r.start, r.end, r.step, false) + case r: Range => ParRange(r.start, r.end, r.step, false) case _ => val pa = new parallel.mutable.ParArray[Int](a.length) for (i <- 0 until a.length) pa(i) = a(i) -- cgit v1.2.3