summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/library/scala/collection/immutable/Range.scala4
-rw-r--r--src/library/scala/collection/parallel/ParIterableLike.scala2
-rw-r--r--src/library/scala/collection/parallel/ParIterableViewLike.scala2
-rw-r--r--src/library/scala/collection/parallel/ParSeqViewLike.scala2
-rw-r--r--src/library/scala/collection/parallel/Tasks.scala2
-rw-r--r--src/library/scala/collection/parallel/UnrolledBuffer.scala28
-rw-r--r--src/library/scala/collection/parallel/immutable/ParRange.scala97
-rw-r--r--test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/MatrixMultiplication.scala2
-rw-r--r--test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala7
-rw-r--r--test/files/scalacheck/parallel-collections/ParallelRangeCheck.scala2
10 files changed, 97 insertions, 51 deletions
diff --git a/src/library/scala/collection/immutable/Range.scala b/src/library/scala/collection/immutable/Range.scala
index 1ebf6e283c..0446eeb198 100644
--- a/src/library/scala/collection/immutable/Range.scala
+++ b/src/library/scala/collection/immutable/Range.scala
@@ -44,7 +44,7 @@ extends IndexedSeq[Int]
with collection.Parallelizable[ParRange]
with Serializable
{
- def par = ParRange(start, end, step, false)
+ def par = new ParRange(this)
// Note that this value is calculated eagerly intentionally: it also
// serves to enforce conditions (step != 0) && (length <= Int.MaxValue)
@@ -249,7 +249,7 @@ object Range {
NumericRange.count[Long](start, end, step, isInclusive)
class Inclusive(start: Int, end: Int, step: Int) extends Range(start, end, step) {
- override def par = ParRange(start, end, step, true)
+ override def par = new ParRange(this)
override def isInclusive = true
override protected def copy(start: Int, end: Int, step: Int): Range = new Inclusive(start, end, step)
}
diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala
index 83e5c6cb59..ed757655f5 100644
--- a/src/library/scala/collection/parallel/ParIterableLike.scala
+++ b/src/library/scala/collection/parallel/ParIterableLike.scala
@@ -644,7 +644,7 @@ self =>
executeAndWaitResult(new Zip(pbf, parallelIterator, thatseq.parallelIterator) mapResult { _.result });
} else super.zip(that)(bf)
- override def zipWithIndex[U >: T, That](implicit bf: CanBuildFrom[Repr, (U, Int), That]): That = this zip new immutable.ParRange(0, size, 1, false)
+ override def zipWithIndex[U >: T, That](implicit bf: CanBuildFrom[Repr, (U, Int), That]): That = this zip immutable.ParRange(0, size, 1, false)
override def zipAll[S, U >: T, That](that: Iterable[S], thisElem: U, thatElem: S)(implicit bf: CanBuildFrom[Repr, (U, S), That]): That = if (bf.isParallel && that.isParSeq) {
val pbf = bf.asParallel
diff --git a/src/library/scala/collection/parallel/ParIterableViewLike.scala b/src/library/scala/collection/parallel/ParIterableViewLike.scala
index 570abdcea6..6fb924e57e 100644
--- a/src/library/scala/collection/parallel/ParIterableViewLike.scala
+++ b/src/library/scala/collection/parallel/ParIterableViewLike.scala
@@ -117,7 +117,7 @@ self =>
override def zip[U >: T, S, That](that: Iterable[S])(implicit bf: CanBuildFrom[This, (U, S), That]): That = newZippedTryParSeq(that).asInstanceOf[That]
override def zipWithIndex[U >: T, That](implicit bf: CanBuildFrom[This, (U, Int), That]): That =
- newZipped(new ParRange(0, parallelIterator.remaining, 1, false)).asInstanceOf[That]
+ newZipped(ParRange(0, parallelIterator.remaining, 1, false)).asInstanceOf[That]
override def zipAll[S, U >: T, That](that: Iterable[S], thisElem: U, thatElem: S)(implicit bf: CanBuildFrom[This, (U, S), That]): That =
newZippedAllTryParSeq(that, thisElem, thatElem).asInstanceOf[That]
diff --git a/src/library/scala/collection/parallel/ParSeqViewLike.scala b/src/library/scala/collection/parallel/ParSeqViewLike.scala
index 2c323ecade..1b5ae06c42 100644
--- a/src/library/scala/collection/parallel/ParSeqViewLike.scala
+++ b/src/library/scala/collection/parallel/ParSeqViewLike.scala
@@ -140,7 +140,7 @@ self =>
override def map[S, That](f: T => S)(implicit bf: CanBuildFrom[This, S, That]): That = newMapped(f).asInstanceOf[That]
override def zip[U >: T, S, That](that: Iterable[S])(implicit bf: CanBuildFrom[This, (U, S), That]): That = newZippedTryParSeq(that).asInstanceOf[That]
override def zipWithIndex[U >: T, That](implicit bf: CanBuildFrom[This, (U, Int), That]): That =
- newZipped(new ParRange(0, parallelIterator.remaining, 1, false)).asInstanceOf[That]
+ newZipped(ParRange(0, parallelIterator.remaining, 1, false)).asInstanceOf[That]
override def reverse: This = newReversed.asInstanceOf[This]
override def reverseMap[S, That](f: T => S)(implicit bf: CanBuildFrom[This, S, That]): That = reverse.map(f)
diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala
index ec38513d9b..b111ecb87c 100644
--- a/src/library/scala/collection/parallel/Tasks.scala
+++ b/src/library/scala/collection/parallel/Tasks.scala
@@ -282,6 +282,7 @@ trait ThreadPoolTasks extends Tasks {
() => {
t.sync
+ t.body.forwardThrowable
t.body.result
}
}
@@ -293,6 +294,7 @@ trait ThreadPoolTasks extends Tasks {
t.start
t.sync
+ t.body.forwardThrowable
t.body.result
}
diff --git a/src/library/scala/collection/parallel/UnrolledBuffer.scala b/src/library/scala/collection/parallel/UnrolledBuffer.scala
index 2c12069e1c..d29c24822c 100644
--- a/src/library/scala/collection/parallel/UnrolledBuffer.scala
+++ b/src/library/scala/collection/parallel/UnrolledBuffer.scala
@@ -10,6 +10,34 @@ import annotation.tailrec
+
+/** A buffer that stores elements in an unrolled linked list.
+ *
+ * Unrolled linked lists store elements in linked fixed size
+ * arrays.
+ *
+ * Unrolled buffers retain locality and low memory overhead
+ * properties of array buffers, but offer much more efficient
+ * element addition, since they never reallocate and copy the
+ * internal array.
+ *
+ * However, they provide `O(n/m)` complexity random access,
+ * where `n` is the number of elements, and `m` the size of
+ * internal array chunks.
+ *
+ * Ideal to use when:
+ * - elements are added to the buffer and then all of the
+ * elements are traversed sequentially
+ * - two unrolled buffers need to be concatenated (see `concat`)
+ *
+ * Better than singly linked lists for random access, but
+ * should still be avoided for such a purpose.
+ *
+ * @author Aleksandar Prokopec
+ *
+ * @coll unrolled buffer
+ * @Coll UnrolledBuffer
+ */
class UnrolledBuffer[T](implicit val manifest: ClassManifest[T])
extends collection.mutable.Buffer[T]
with collection.mutable.BufferLike[T, UnrolledBuffer[T]]
diff --git a/src/library/scala/collection/parallel/immutable/ParRange.scala b/src/library/scala/collection/parallel/immutable/ParRange.scala
index 53f6bd33e6..dbd5449362 100644
--- a/src/library/scala/collection/parallel/immutable/ParRange.scala
+++ b/src/library/scala/collection/parallel/immutable/ParRange.scala
@@ -7,71 +7,82 @@ import scala.collection.immutable.RangeUtils
import scala.collection.parallel.ParSeq
import scala.collection.parallel.Combiner
import scala.collection.generic.CanCombineFrom
+import scala.collection.parallel.ParIterableIterator
-class ParRange(val start: Int, val end: Int, val step: Int, val inclusive: Boolean)
-extends ParSeq[Int]
- with RangeUtils[ParRange] {
- self =>
+class ParRange(range: Range)
+extends ParSeq[Int] {
+self =>
- def seq = new Range(start, end, step)
+ def seq = range
- def length = _length
+ @inline
+ final def length = range.length
- def apply(idx: Int) = _apply(idx)
-
- def create(_start: Int, _end: Int, _step: Int, _inclusive: Boolean) = new ParRange(_start, _end, _step, _inclusive)
+ @inline
+ final def apply(idx: Int) = range.apply(idx)
def parallelIterator = new ParRangeIterator with SCPI
- override def toString = seq.toString // TODO
-
type SCPI = SignalContextPassingIterator[ParRangeIterator]
- class ParRangeIterator
- (var start: Int = self.start, val end: Int = self.end, val step: Int = self.step, val inclusive: Boolean = self.inclusive)
- extends ParIterator with RangeUtils[ParRangeIterator] {
- me: SignalContextPassingIterator[ParRangeIterator] =>
- def remaining = _length
- def next = { val r = start; start += step; r }
- def hasNext = remaining > 0
- def split: Seq[ParIterator] = psplit(remaining / 2, remaining - remaining / 2)
- def psplit(sizes: Int*): Seq[ParIterator] = {
- val incr = sizes.scanLeft(0)(_ + _)
- for ((from, until) <- incr.init zip incr.tail) yield _slice(from, until)
- }
- def create(_start: Int, _end: Int, _step: Int, _inclusive: Boolean) = {
- new ParRangeIterator(_start, _end, _step, _inclusive) with SCPI
+ class ParRangeIterator(range: Range = self.range)
+ extends ParIterator {
+ me: SignalContextPassingIterator[ParRangeIterator] =>
+ override def toString = "ParRangeIterator(over: " + range + ")"
+ private var ind = 0
+ private val len = range.length
+
+ final def remaining = len - ind
+
+ final def hasNext = ind < len
+
+ final def next = if (hasNext) {
+ val r = range.apply(ind)
+ ind += 1
+ r
+ } else Iterator.empty.next
+
+ private def rangeleft = range.drop(ind)
+
+ def split = {
+ val rleft = rangeleft
+ val elemleft = rleft.length
+ if (elemleft < 2) Seq(new ParRangeIterator(rleft) with SCPI)
+ else Seq(
+ new ParRangeIterator(rleft.take(elemleft / 2)) with SCPI,
+ new ParRangeIterator(rleft.drop(elemleft / 2)) with SCPI
+ )
}
- override def toString = "ParRangeIterator(" + start + ", " + end + ", " + step + ", incl: " + inclusive + ")"
+ def psplit(sizes: Int*) = {
+ var rleft = rangeleft
+ for (sz <- sizes) yield {
+ val fronttaken = rleft.take(sz)
+ rleft = rleft.drop(sz)
+ new ParRangeIterator(fronttaken) with SCPI
+ }
+ }
/* accessors */
override def foreach[U](f: Int => U): Unit = {
- _foreach(f)
- start = end + step
+ rangeleft.foreach(f)
+ ind = len
}
override def reduce[U >: Int](op: (U, U) => U): U = {
- var sum = next
- for (elem <- this) sum += elem
- sum
+ val r = rangeleft.reduceLeft(op)
+ ind = len
+ r
}
/* transformers */
override def map2combiner[S, That](f: Int => S, cb: Combiner[S, That]): Combiner[S, That] = {
- //val cb = pbf(self.repr)
- val sz = remaining
- cb.sizeHint(sz)
- if (sz > 0) {
- val last = _last
- while (start != last) {
- f(start)
- start += step
- }
+ while (hasNext) {
+ cb += f(next)
}
cb
}
@@ -82,8 +93,10 @@ extends ParSeq[Int]
object ParRange {
- def apply(start: Int, end: Int, step: Int, inclusive: Boolean) =
- new ParRange(start, end, step, inclusive)
+ def apply(start: Int, end: Int, step: Int, inclusive: Boolean) = new ParRange(
+ if (inclusive) new Range.Inclusive(start, end, step)
+ else new Range(start, end, step)
+ )
}
diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/MatrixMultiplication.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/MatrixMultiplication.scala
index e4eb51d83b..3c1cc47088 100644
--- a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/MatrixMultiplication.scala
+++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/MatrixMultiplication.scala
@@ -38,7 +38,7 @@ extends Resettable(sz, p, what, new Cont(_), new Array[Any](_), classOf[Cont]) {
}
def assignProduct(a: Matrix[T], b: Matrix[T]) = {
- val range = new ParRange(0, n * n, 1, false)
+ val range = ParRange(0, n * n, 1, false)
range.tasksupport.environment = forkjoinpool
for (i <- range) this(i / n, i % n) = calcProduct(a, b, i / n, i % n);
}
diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala
index 6cd1d74c5e..4f32d366a4 100644
--- a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala
+++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala
@@ -20,7 +20,7 @@ object RangeBenches extends StandardParIterableBenches[Int, ParRange] {
val forkJoinPool = new scala.concurrent.forkjoin.ForkJoinPool
def createSequential(sz: Int, p: Int) = new collection.immutable.Range(0, sz, 1)
def createParallel(sz: Int, p: Int) = {
- val pr = new collection.parallel.immutable.ParRange(0, sz, 1, false)
+ val pr = collection.parallel.immutable.ParRange(0, sz, 1, false)
forkJoinPool.setParallelism(p)
pr.tasksupport.environment = forkJoinPool
pr
@@ -79,7 +79,10 @@ object RangeBenches extends StandardParIterableBenches[Int, ParRange] {
def comparisonMap = collection.Map()
def runseq = for (n <- this.seqcoll) modify(n)
- def runpar = for (n <- this.parcoll) modify(n)
+ def runpar = for (n <- this.parcoll.asInstanceOf[ParRange]) {
+ modify(n)
+ ()
+ }
def companion = ForeachModify
}
diff --git a/test/files/scalacheck/parallel-collections/ParallelRangeCheck.scala b/test/files/scalacheck/parallel-collections/ParallelRangeCheck.scala
index c34fb872aa..372d6b9fbd 100644
--- a/test/files/scalacheck/parallel-collections/ParallelRangeCheck.scala
+++ b/test/files/scalacheck/parallel-collections/ParallelRangeCheck.scala
@@ -38,7 +38,7 @@ object ParallelRangeCheck extends ParallelSeqCheck[Int]("ParallelRange[Int]") wi
}
def fromSeq(a: Seq[Int]) = a match {
- case r: Range => new ParRange(r.start, r.end, r.step, false)
+ case r: Range => ParRange(r.start, r.end, r.step, false)
case _ =>
val pa = new parallel.mutable.ParArray[Int](a.length)
for (i <- 0 until a.length) pa(i) = a(i)