summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-12-09 10:08:16 +0000
committerAleksandar Pokopec <aleksandar.prokopec@epfl.ch>2010-12-09 10:08:16 +0000
commit492b22576f2ad46b300ce8dc31c5b672aaf517e4 (patch)
tree25c5188138feedf55706042494fdc7227960fbc0
parenta730fb5cc6cea39a29e9ff4cd666fa8498f6adec (diff)
downloadscala-492b22576f2ad46b300ce8dc31c5b672aaf517e4.tar.gz
scala-492b22576f2ad46b300ce8dc31c5b672aaf517e4.tar.bz2
scala-492b22576f2ad46b300ce8dc31c5b672aaf517e4.zip
Fixed parallel ranges to use the same range log...
Fixed parallel ranges to use the same range logic under the hood, and not introduce code duplication. Slight fix in Tasks. No review.
-rw-r--r--src/library/scala/collection/immutable/Range.scala4
-rw-r--r--src/library/scala/collection/parallel/ParIterableLike.scala2
-rw-r--r--src/library/scala/collection/parallel/ParIterableViewLike.scala2
-rw-r--r--src/library/scala/collection/parallel/ParSeqViewLike.scala2
-rw-r--r--src/library/scala/collection/parallel/Tasks.scala2
-rw-r--r--src/library/scala/collection/parallel/UnrolledBuffer.scala28
-rw-r--r--src/library/scala/collection/parallel/immutable/ParRange.scala97
-rw-r--r--test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/MatrixMultiplication.scala2
-rw-r--r--test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala7
-rw-r--r--test/files/scalacheck/parallel-collections/ParallelRangeCheck.scala2
10 files changed, 97 insertions, 51 deletions
diff --git a/src/library/scala/collection/immutable/Range.scala b/src/library/scala/collection/immutable/Range.scala
index 1ebf6e283c..0446eeb198 100644
--- a/src/library/scala/collection/immutable/Range.scala
+++ b/src/library/scala/collection/immutable/Range.scala
@@ -44,7 +44,7 @@ extends IndexedSeq[Int]
with collection.Parallelizable[ParRange]
with Serializable
{
- def par = ParRange(start, end, step, false)
+ def par = new ParRange(this)
// Note that this value is calculated eagerly intentionally: it also
// serves to enforce conditions (step != 0) && (length <= Int.MaxValue)
@@ -249,7 +249,7 @@ object Range {
NumericRange.count[Long](start, end, step, isInclusive)
class Inclusive(start: Int, end: Int, step: Int) extends Range(start, end, step) {
- override def par = ParRange(start, end, step, true)
+ override def par = new ParRange(this)
override def isInclusive = true
override protected def copy(start: Int, end: Int, step: Int): Range = new Inclusive(start, end, step)
}
diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala
index 83e5c6cb59..ed757655f5 100644
--- a/src/library/scala/collection/parallel/ParIterableLike.scala
+++ b/src/library/scala/collection/parallel/ParIterableLike.scala
@@ -644,7 +644,7 @@ self =>
executeAndWaitResult(new Zip(pbf, parallelIterator, thatseq.parallelIterator) mapResult { _.result });
} else super.zip(that)(bf)
- override def zipWithIndex[U >: T, That](implicit bf: CanBuildFrom[Repr, (U, Int), That]): That = this zip new immutable.ParRange(0, size, 1, false)
+ override def zipWithIndex[U >: T, That](implicit bf: CanBuildFrom[Repr, (U, Int), That]): That = this zip immutable.ParRange(0, size, 1, false)
override def zipAll[S, U >: T, That](that: Iterable[S], thisElem: U, thatElem: S)(implicit bf: CanBuildFrom[Repr, (U, S), That]): That = if (bf.isParallel && that.isParSeq) {
val pbf = bf.asParallel
diff --git a/src/library/scala/collection/parallel/ParIterableViewLike.scala b/src/library/scala/collection/parallel/ParIterableViewLike.scala
index 570abdcea6..6fb924e57e 100644
--- a/src/library/scala/collection/parallel/ParIterableViewLike.scala
+++ b/src/library/scala/collection/parallel/ParIterableViewLike.scala
@@ -117,7 +117,7 @@ self =>
override def zip[U >: T, S, That](that: Iterable[S])(implicit bf: CanBuildFrom[This, (U, S), That]): That = newZippedTryParSeq(that).asInstanceOf[That]
override def zipWithIndex[U >: T, That](implicit bf: CanBuildFrom[This, (U, Int), That]): That =
- newZipped(new ParRange(0, parallelIterator.remaining, 1, false)).asInstanceOf[That]
+ newZipped(ParRange(0, parallelIterator.remaining, 1, false)).asInstanceOf[That]
override def zipAll[S, U >: T, That](that: Iterable[S], thisElem: U, thatElem: S)(implicit bf: CanBuildFrom[This, (U, S), That]): That =
newZippedAllTryParSeq(that, thisElem, thatElem).asInstanceOf[That]
diff --git a/src/library/scala/collection/parallel/ParSeqViewLike.scala b/src/library/scala/collection/parallel/ParSeqViewLike.scala
index 2c323ecade..1b5ae06c42 100644
--- a/src/library/scala/collection/parallel/ParSeqViewLike.scala
+++ b/src/library/scala/collection/parallel/ParSeqViewLike.scala
@@ -140,7 +140,7 @@ self =>
override def map[S, That](f: T => S)(implicit bf: CanBuildFrom[This, S, That]): That = newMapped(f).asInstanceOf[That]
override def zip[U >: T, S, That](that: Iterable[S])(implicit bf: CanBuildFrom[This, (U, S), That]): That = newZippedTryParSeq(that).asInstanceOf[That]
override def zipWithIndex[U >: T, That](implicit bf: CanBuildFrom[This, (U, Int), That]): That =
- newZipped(new ParRange(0, parallelIterator.remaining, 1, false)).asInstanceOf[That]
+ newZipped(ParRange(0, parallelIterator.remaining, 1, false)).asInstanceOf[That]
override def reverse: This = newReversed.asInstanceOf[This]
override def reverseMap[S, That](f: T => S)(implicit bf: CanBuildFrom[This, S, That]): That = reverse.map(f)
diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala
index ec38513d9b..b111ecb87c 100644
--- a/src/library/scala/collection/parallel/Tasks.scala
+++ b/src/library/scala/collection/parallel/Tasks.scala
@@ -282,6 +282,7 @@ trait ThreadPoolTasks extends Tasks {
() => {
t.sync
+ t.body.forwardThrowable
t.body.result
}
}
@@ -293,6 +294,7 @@ trait ThreadPoolTasks extends Tasks {
t.start
t.sync
+ t.body.forwardThrowable
t.body.result
}
diff --git a/src/library/scala/collection/parallel/UnrolledBuffer.scala b/src/library/scala/collection/parallel/UnrolledBuffer.scala
index 2c12069e1c..d29c24822c 100644
--- a/src/library/scala/collection/parallel/UnrolledBuffer.scala
+++ b/src/library/scala/collection/parallel/UnrolledBuffer.scala
@@ -10,6 +10,34 @@ import annotation.tailrec
+
+/** A buffer that stores elements in an unrolled linked list.
+ *
+ * Unrolled linked lists store elements in linked fixed size
+ * arrays.
+ *
+ * Unrolled buffers retain locality and low memory overhead
+ * properties of array buffers, but offer much more efficient
+ * element addition, since they never reallocate and copy the
+ * internal array.
+ *
+ * However, they provide `O(n/m)` complexity random access,
+ * where `n` is the number of elements, and `m` the size of
+ * internal array chunks.
+ *
+ * Ideal to use when:
+ * - elements are added to the buffer and then all of the
+ * elements are traversed sequentially
+ * - two unrolled buffers need to be concatenated (see `concat`)
+ *
+ * Better than singly linked lists for random access, but
+ * should still be avoided for such a purpose.
+ *
+ * @author Aleksandar Prokopec
+ *
+ * @coll unrolled buffer
+ * @Coll UnrolledBuffer
+ */
class UnrolledBuffer[T](implicit val manifest: ClassManifest[T])
extends collection.mutable.Buffer[T]
with collection.mutable.BufferLike[T, UnrolledBuffer[T]]
diff --git a/src/library/scala/collection/parallel/immutable/ParRange.scala b/src/library/scala/collection/parallel/immutable/ParRange.scala
index 53f6bd33e6..dbd5449362 100644
--- a/src/library/scala/collection/parallel/immutable/ParRange.scala
+++ b/src/library/scala/collection/parallel/immutable/ParRange.scala
@@ -7,71 +7,82 @@ import scala.collection.immutable.RangeUtils
import scala.collection.parallel.ParSeq
import scala.collection.parallel.Combiner
import scala.collection.generic.CanCombineFrom
+import scala.collection.parallel.ParIterableIterator
-class ParRange(val start: Int, val end: Int, val step: Int, val inclusive: Boolean)
-extends ParSeq[Int]
- with RangeUtils[ParRange] {
- self =>
+class ParRange(range: Range)
+extends ParSeq[Int] {
+self =>
- def seq = new Range(start, end, step)
+ def seq = range
- def length = _length
+ @inline
+ final def length = range.length
- def apply(idx: Int) = _apply(idx)
-
- def create(_start: Int, _end: Int, _step: Int, _inclusive: Boolean) = new ParRange(_start, _end, _step, _inclusive)
+ @inline
+ final def apply(idx: Int) = range.apply(idx)
def parallelIterator = new ParRangeIterator with SCPI
- override def toString = seq.toString // TODO
-
type SCPI = SignalContextPassingIterator[ParRangeIterator]
- class ParRangeIterator
- (var start: Int = self.start, val end: Int = self.end, val step: Int = self.step, val inclusive: Boolean = self.inclusive)
- extends ParIterator with RangeUtils[ParRangeIterator] {
- me: SignalContextPassingIterator[ParRangeIterator] =>
- def remaining = _length
- def next = { val r = start; start += step; r }
- def hasNext = remaining > 0
- def split: Seq[ParIterator] = psplit(remaining / 2, remaining - remaining / 2)
- def psplit(sizes: Int*): Seq[ParIterator] = {
- val incr = sizes.scanLeft(0)(_ + _)
- for ((from, until) <- incr.init zip incr.tail) yield _slice(from, until)
- }
- def create(_start: Int, _end: Int, _step: Int, _inclusive: Boolean) = {
- new ParRangeIterator(_start, _end, _step, _inclusive) with SCPI
+ class ParRangeIterator(range: Range = self.range)
+ extends ParIterator {
+ me: SignalContextPassingIterator[ParRangeIterator] =>
+ override def toString = "ParRangeIterator(over: " + range + ")"
+ private var ind = 0
+ private val len = range.length
+
+ final def remaining = len - ind
+
+ final def hasNext = ind < len
+
+ final def next = if (hasNext) {
+ val r = range.apply(ind)
+ ind += 1
+ r
+ } else Iterator.empty.next
+
+ private def rangeleft = range.drop(ind)
+
+ def split = {
+ val rleft = rangeleft
+ val elemleft = rleft.length
+ if (elemleft < 2) Seq(new ParRangeIterator(rleft) with SCPI)
+ else Seq(
+ new ParRangeIterator(rleft.take(elemleft / 2)) with SCPI,
+ new ParRangeIterator(rleft.drop(elemleft / 2)) with SCPI
+ )
}
- override def toString = "ParRangeIterator(" + start + ", " + end + ", " + step + ", incl: " + inclusive + ")"
+ def psplit(sizes: Int*) = {
+ var rleft = rangeleft
+ for (sz <- sizes) yield {
+ val fronttaken = rleft.take(sz)
+ rleft = rleft.drop(sz)
+ new ParRangeIterator(fronttaken) with SCPI
+ }
+ }
/* accessors */
override def foreach[U](f: Int => U): Unit = {
- _foreach(f)
- start = end + step
+ rangeleft.foreach(f)
+ ind = len
}
override def reduce[U >: Int](op: (U, U) => U): U = {
- var sum = next
- for (elem <- this) sum += elem
- sum
+ val r = rangeleft.reduceLeft(op)
+ ind = len
+ r
}
/* transformers */
override def map2combiner[S, That](f: Int => S, cb: Combiner[S, That]): Combiner[S, That] = {
- //val cb = pbf(self.repr)
- val sz = remaining
- cb.sizeHint(sz)
- if (sz > 0) {
- val last = _last
- while (start != last) {
- f(start)
- start += step
- }
+ while (hasNext) {
+ cb += f(next)
}
cb
}
@@ -82,8 +93,10 @@ extends ParSeq[Int]
object ParRange {
- def apply(start: Int, end: Int, step: Int, inclusive: Boolean) =
- new ParRange(start, end, step, inclusive)
+ def apply(start: Int, end: Int, step: Int, inclusive: Boolean) = new ParRange(
+ if (inclusive) new Range.Inclusive(start, end, step)
+ else new Range(start, end, step)
+ )
}
diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/MatrixMultiplication.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/MatrixMultiplication.scala
index e4eb51d83b..3c1cc47088 100644
--- a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/MatrixMultiplication.scala
+++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_array/MatrixMultiplication.scala
@@ -38,7 +38,7 @@ extends Resettable(sz, p, what, new Cont(_), new Array[Any](_), classOf[Cont]) {
}
def assignProduct(a: Matrix[T], b: Matrix[T]) = {
- val range = new ParRange(0, n * n, 1, false)
+ val range = ParRange(0, n * n, 1, false)
range.tasksupport.environment = forkjoinpool
for (i <- range) this(i / n, i % n) = calcProduct(a, b, i / n, i % n);
}
diff --git a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala
index 6cd1d74c5e..4f32d366a4 100644
--- a/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala
+++ b/test/benchmarks/src/scala/collection/parallel/benchmarks/parallel_range/RangeBenches.scala
@@ -20,7 +20,7 @@ object RangeBenches extends StandardParIterableBenches[Int, ParRange] {
val forkJoinPool = new scala.concurrent.forkjoin.ForkJoinPool
def createSequential(sz: Int, p: Int) = new collection.immutable.Range(0, sz, 1)
def createParallel(sz: Int, p: Int) = {
- val pr = new collection.parallel.immutable.ParRange(0, sz, 1, false)
+ val pr = collection.parallel.immutable.ParRange(0, sz, 1, false)
forkJoinPool.setParallelism(p)
pr.tasksupport.environment = forkJoinPool
pr
@@ -79,7 +79,10 @@ object RangeBenches extends StandardParIterableBenches[Int, ParRange] {
def comparisonMap = collection.Map()
def runseq = for (n <- this.seqcoll) modify(n)
- def runpar = for (n <- this.parcoll) modify(n)
+ def runpar = for (n <- this.parcoll.asInstanceOf[ParRange]) {
+ modify(n)
+ ()
+ }
def companion = ForeachModify
}
diff --git a/test/files/scalacheck/parallel-collections/ParallelRangeCheck.scala b/test/files/scalacheck/parallel-collections/ParallelRangeCheck.scala
index c34fb872aa..372d6b9fbd 100644
--- a/test/files/scalacheck/parallel-collections/ParallelRangeCheck.scala
+++ b/test/files/scalacheck/parallel-collections/ParallelRangeCheck.scala
@@ -38,7 +38,7 @@ object ParallelRangeCheck extends ParallelSeqCheck[Int]("ParallelRange[Int]") wi
}
def fromSeq(a: Seq[Int]) = a match {
- case r: Range => new ParRange(r.start, r.end, r.step, false)
+ case r: Range => ParRange(r.start, r.end, r.step, false)
case _ =>
val pa = new parallel.mutable.ParArray[Int](a.length)
for (i <- 0 until a.length) pa(i) = a(i)