diff options
10 files changed, 97 insertions, 51 deletions
diff --git a/src/library/scala/collection/immutable/Range.scala b/src/library/scala/collection/immutable/Range.scala index 1ebf6e283c..0446eeb198 100644 --- a/src/library/scala/collection/immutable/Range.scala +++ b/src/library/scala/collection/immutable/Range.scala @@ -44,7 +44,7 @@ extends IndexedSeq[Int] with collection.Parallelizable[ParRange] with Serializable { - def par = ParRange(start, end, step, false) + def par = new ParRange(this) // Note that this value is calculated eagerly intentionally: it also // serves to enforce conditions (step != 0) && (length <= Int.MaxValue) @@ -249,7 +249,7 @@ object Range { NumericRange.count[Long](start, end, step, isInclusive) class Inclusive(start: Int, end: Int, step: Int) extends Range(start, end, step) { - override def par = ParRange(start, end, step, true) + override def par = new ParRange(this) override def isInclusive = true override protected def copy(start: Int, end: Int, step: Int): Range = new Inclusive(start, end, step) } diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala index 83e5c6cb59..ed757655f5 100644 --- a/src/library/scala/collection/parallel/ParIterableLike.scala +++ b/src/library/scala/collection/parallel/ParIterableLike.scala @@ -644,7 +644,7 @@ self => executeAndWaitResult(new Zip(pbf, parallelIterator, thatseq.parallelIterator) mapResult { _.result }); } else super.zip(that)(bf) - override def zipWithIndex[U >: T, That](implicit bf: CanBuildFrom[Repr, (U, Int), That]): That = this zip new immutable.ParRange(0, size, 1, false) + override def zipWithIndex[U >: T, That](implicit bf: CanBuildFrom[Repr, (U, Int), That]): That = this zip immutable.ParRange(0, size, 1, false) override def zipAll[S, U >: T, That](that: Iterable[S], thisElem: U, thatElem: S)(implicit bf: CanBuildFrom[Repr, (U, S), That]): That = if (bf.isParallel && that.isParSeq) { val pbf = bf.asParallel diff --git a/src/library/scala/collection/parallel/ParIterableViewLike.scala b/src/library/scala/collection/parallel/ParIterableViewLike.scala index 570abdcea6..6fb924e57e 100644 --- a/src/library/scala/collection/parallel/ParIterableViewLike.scala +++ b/src/library/scala/collection/parallel/ParIterableViewLike.scala @@ -117,7 +117,7 @@ self => override def zip[U >: T, S, That](that: Iterable[S])(implicit bf: CanBuildFrom[This, (U, S), That]): That = newZippedTryParSeq(that).asInstanceOf[That] override def zipWithIndex[U >: T, That](implicit bf: CanBuildFrom[This, (U, Int), That]): That = - newZipped(new ParRange(0, parallelIterator.remaining, 1, false)).asInstanceOf[That] + newZipped(ParRange(0, parallelIterator.remaining, 1, false)).asInstanceOf[That] override def zipAll[S, U >: T, That](that: Iterable[S], thisElem: U, thatElem: S)(implicit bf: CanBuildFrom[This, (U, S), That]): That = newZippedAllTryParSeq(that, thisElem, thatElem).asInstanceOf[That] diff --git a/src/library/scala/collection/parallel/ParSeqViewLike.scala b/src/library/scala/collection/parallel/ParSeqViewLike.scala index 2c323ecade..1b5ae06c42 100644 --- a/src/library/scala/collection/parallel/ParSeqViewLike.scala +++ b/src/library/scala/collection/parallel/ParSeqViewLike.scala @@ -140,7 +140,7 @@ self => override def map[S, That](f: T => S)(implicit bf: CanBuildFrom[This, S, That]): That = newMapped(f).asInstanceOf[That] override def zip[U >: T, S, That](that: Iterable[S])(implicit bf: CanBuildFrom[This, (U, S), That]): That = newZippedTryParSeq(that).asInstanceOf[That] override def zipWithIndex[U >: T, That](implicit bf: CanBuildFrom[This, (U, Int), That]): That = - newZipped(new ParRange(0, parallelIterator.remaining, 1, false)).asInstanceOf[That] + newZipped(ParRange(0, parallelIterator.remaining, 1, false)).asInstanceOf[That] override def reverse: This = newReversed.asInstanceOf[This] override def reverseMap[S, That](f: T => S)(implicit bf: CanBuildFrom[This, S, That]): That = reverse.map(f) diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala index ec38513d9b..b111ecb87c 100644 --- a/src/library/scala/collection/parallel/Tasks.scala +++ b/src/library/scala/collection/parallel/Tasks.scala @@ -282,6 +282,7 @@ trait ThreadPoolTasks extends Tasks { () => { t.sync + t.body.forwardThrowable t.body.result } } @@ -293,6 +294,7 @@ trait ThreadPoolTasks extends Tasks { t.start t.sync + t.body.forwardThrowable t.body.result } diff --git a/src/library/scala/collection/parallel/UnrolledBuffer.scala b/src/library/scala/collection/parallel/UnrolledBuffer.scala index 2c12069e1c..d29c24822c 100644 --- a/src/library/scala/collection/parallel/UnrolledBuffer.scala +++ b/src/library/scala/collection/parallel/UnrolledBuffer.scala @@ -10,6 +10,34 @@ import annotation.tailrec + +/** A buffer that stores elements in an unrolled linked list. + * + * Unrolled linked lists store elements in linked fixed size + * arrays. + * + * Unrolled buffers retain locality and low memory overhead + * properties of array buffers, but offer much more efficient + * element addition, since they never reallocate and copy the + * internal array. + * + * However, they provide `O(n/m)` complexity random access, + * where `n` is the number of elements, and `m` the size of + * internal array chunks. + * + * Ideal to use when: + * - elements are added to the buffer and then all of the + * elements are traversed sequentially + * - two unrolled buffers need to be concatenated (see `concat`) + * + * Better than singly linked lists for random access, but + * should still be avoided for such a purpose. + * + * @author Aleksandar Prokopec + * + * @coll unrolled buffer + * @Coll UnrolledBuffer + */ class UnrolledBuffer[T](implicit val manifest: ClassManifest[T]) extends collection.mutable.Buffer[T] with collection.mutable.BufferLike[T, UnrolledBuffer[T]] diff --git a/src/library/scala/collection/parallel/immutable/ParRange.scala b/src/library/scala/collection/parallel/immutable/ParRange.scala index 53f6bd33e6..dbd5449362 100644 --- a/src/library/scala/collection/parallel/immutable/ParRange.scala +++ b/src/library/scala/collection/parallel/immutable/ParRange.scala @@ -7,71 +7,82 @@ import scala.collection.immutable.RangeUtils import scala.collection.parallel.ParSeq import scala.collection.parallel.Combiner import scala.collection.generic.CanCombineFrom +import scala.collection.parallel.ParIterableIterator -class ParRange(val start: Int, val end: Int, val step: Int, val inclusive: Boolean) -extends ParSeq[Int] - with RangeUtils[ParRange] { - self => +class ParRange(range: Range) +extends ParSeq[Int] { +self => - def seq = new Range(start, end, step) + def seq = range - def length = _length + @inline + final def length = range.length - def apply(idx: Int) = _apply(idx) - - def create(_start: Int, _end: Int, _step: Int, _inclusive: Boolean) = new ParRange(_start, _end, _step, _inclusive) + @inline + final def apply(idx: Int) = range.apply(idx) def parallelIterator = new ParRangeIterator with SCPI - override def toString = seq.toString // TODO - type SCPI = SignalContextPassingIterator[ParRangeIterator] - class ParRangeIterator - (var start: Int = self.start, val end: Int = self.end, val step: Int = self.step, val inclusive: Boolean = self.inclusive) - extends ParIterator with RangeUtils[ParRangeIterator] { - me: SignalContextPassingIterator[ParRangeIterator] => - def remaining = _length - def next = { val r = start; start += step; r } - def hasNext = remaining > 0 - def split: Seq[ParIterator] = psplit(remaining / 2, remaining - remaining / 2) - def psplit(sizes: Int*): Seq[ParIterator] = { - val incr = sizes.scanLeft(0)(_ + _) - for ((from, until) <- incr.init zip incr.tail) yield _slice(from, until) - } - def create(_start: Int, _end: Int, _step: Int, _inclusive: Boolean) = { - new ParRangeIterator(_start, _end, _step, _inclusive) with SCPI + class ParRangeIterator(range: Range = self.range) + extends ParIterator { + me: SignalContextPassingIterator[ParRangeIterator] => + override def toString = "ParRangeIterator(over: " + range + ")" + private var ind = 0 + private val len = range.length + + final def remaining = len - ind + + final def hasNext = ind < len + + final def next = if (hasNext) { + val r = range.apply(ind) + ind += 1 + r + } else Iterator.empty.next + + private def rangeleft = range.drop(ind) + + def split = { + val rleft = rangeleft + val elemleft = rleft.length + if (elemleft < 2) Seq(new ParRangeIterator(rleft) with SCPI) + else Seq( + new ParRangeIterator(rleft.take(elemleft / 2)) with SCPI, + new ParRangeIterator(rleft.drop(elemleft / 2)) with SCPI + ) } - override def toString = "ParRangeIterator(" + start + ", " + end + ", " + step + ", incl: " + inclusive + ")" + def psplit(sizes: Int*) = { + var rleft = rangeleft + for (sz <- sizes) yield { + val fronttaken = rleft.take(sz) + rleft = rleft.drop(sz) + new ParRangeIterator(fronttaken) with SCPI + } + } /* accessors */ override def foreach[U](f: Int => U): Unit = { - _foreach(f) - start = end + step + rangeleft.foreach(f) + ind = len } override def reduce[U >: Int](op: (U, U) => U): U = { - var sum = next - for (elem <- this) sum += elem - sum + val r = rangeleft.reduceLeft(op) + ind = len + r } /* transformers */ override def map2combiner[S, That](f: Int => S, cb: Combiner[S, That]): Combiner[S, That] = { - //val cb = pbf(self.repr) - val sz = remaining - cb.sizeHint(sz) - if (sz > 0) { - val last = _last - while (start != last) { - f(start) - start += step - } + while (hasNext) { + cb += f(next) } cb } @@ -82,8 +93,10 @@ extends ParSeq[Int] object ParRange { - def apply(start: Int, end: Int, step: Int, inclusive: Boolean) = - new ParRange(start, end, step, inclusive) + def apply(start: Int, end: Int, step: Int, inclusive: Boolean) = new ParRange( + if (inclusive) new Range.Inclusive(start, end, step) + else new Range(start, end, step) + ) } diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/MatrixMultiplication.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/MatrixMultiplication.scala index e4eb51d83b..3c1cc47088 100644 --- a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/MatrixMultiplication.scala +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/MatrixMultiplication.scala @@ -38,7 +38,7 @@ extends Resettable(sz, p, what, new Cont(_), new Array[Any](_), classOf[Cont]) { } def assignProduct(a: Matrix[T], b: Matrix[T]) = { - val range = new ParRange(0, n * n, 1, false) + val range = ParRange(0, n * n, 1, false) range.tasksupport.environment = forkjoinpool for (i <- range) this(i / n, i % n) = calcProduct(a, b, i / n, i % n); } diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala index 6cd1d74c5e..4f32d366a4 100644 --- a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala +++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala @@ -20,7 +20,7 @@ object RangeBenches extends StandardParIterableBenches[Int, ParRange] { val forkJoinPool = new scala.concurrent.forkjoin.ForkJoinPool def createSequential(sz: Int, p: Int) = new collection.immutable.Range(0, sz, 1) def createParallel(sz: Int, p: Int) = { - val pr = new collection.parallel.immutable.ParRange(0, sz, 1, false) + val pr = collection.parallel.immutable.ParRange(0, sz, 1, false) forkJoinPool.setParallelism(p) pr.tasksupport.environment = forkJoinPool pr @@ -79,7 +79,10 @@ object RangeBenches extends StandardParIterableBenches[Int, ParRange] { def comparisonMap = collection.Map() def runseq = for (n <- this.seqcoll) modify(n) - def runpar = for (n <- this.parcoll) modify(n) + def runpar = for (n <- this.parcoll.asInstanceOf[ParRange]) { + modify(n) + () + } def companion = ForeachModify } diff --git a/test/files/scalacheck/parallel-collections/ParallelRangeCheck.scala b/test/files/scalacheck/parallel-collections/ParallelRangeCheck.scala index c34fb872aa..372d6b9fbd 100644 --- a/test/files/scalacheck/parallel-collections/ParallelRangeCheck.scala +++ b/test/files/scalacheck/parallel-collections/ParallelRangeCheck.scala @@ -38,7 +38,7 @@ object ParallelRangeCheck extends ParallelSeqCheck[Int]("ParallelRange[Int]") wi } def fromSeq(a: Seq[Int]) = a match { - case r: Range => new ParRange(r.start, r.end, r.step, false) + case r: Range => ParRange(r.start, r.end, r.step, false) case _ => val pa = new parallel.mutable.ParArray[Int](a.length) for (i <- 0 until a.length) pa(i) = a(i) |