1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
|
/* __ *\
** ________ ___ / / ___ Scala API **
** / __/ __// _ | / / / _ | (c) 2003-2013, LAMP/EPFL **
** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
** /____/\___/_/ |_/____/_/ | | **
** |/ **
\* */
package scala
package collection.parallel
import scala.collection.Parallel
import scala.collection.{ IterableView, IterableViewLike }
import scala.collection.{ GenIterableView, GenIterableViewLike }
import scala.collection.GenTraversableOnce
import scala.collection.GenTraversable
import scala.collection.GenIterable
import scala.collection.GenSeq
import scala.collection.generic.{ CanBuildFrom, SliceInterval }
import scala.collection.generic.CanCombineFrom
import scala.collection.parallel.immutable.ParRange
import scala.language.implicitConversions
/** A template view of a non-strict view of parallel iterable collection.
*
* '''Note:''' Regular view traits have type parameters used to carry information
* about the type of the elements, type of the collection they are derived from and
* their own actual representation type. Parallel views have an additional parameter
* which carries information about the type of the sequential version of the view.
*
* @tparam T the type of the elements this view can traverse
* @tparam Coll the type of the parallel collection this view is derived from
* @tparam CollSeq the type of the sequential collection corresponding to the underlying parallel collection
* @tparam This the actual representation type of this view
* @tparam ThisSeq the type of the sequential representation of this view
*
* @since 2.9
*/
trait ParIterableViewLike[+T,
+Coll <: Parallel,
+CollSeq,
+This <: ParIterableView[T, Coll, CollSeq] with ParIterableViewLike[T, Coll, CollSeq, This, ThisSeq],
+ThisSeq <: IterableView[T, CollSeq] with IterableViewLike[T, CollSeq, ThisSeq]]
extends GenIterableView[T, Coll]
with GenIterableViewLike[T, Coll, This]
with ParIterable[T]
with ParIterableLike[T, This, ThisSeq]
{
self =>
override def foreach[U](f: T => U): Unit = super[ParIterableLike].foreach(f)
override protected[this] def newCombiner: Combiner[T, This] = throw new UnsupportedOperationException(this + ".newCombiner")
protected[this] def viewIdentifier: String
protected[this] def viewIdString: String
protected def underlying: Coll
/* wrappers */
trait Transformed[+S] extends ParIterableView[S, Coll, CollSeq] with super.Transformed[S] {
override def splitter: IterableSplitter[S]
override def iterator = splitter
def size = splitter.remaining
}
trait Sliced extends super.Sliced with Transformed[T] {
// override def slice(from1: Int, until1: Int): This = newSliced(from1 max 0, until1 max 0).asInstanceOf[This]
def splitter: IterableSplitter[T] = self.splitter.slice(from, until)
override def seq = self.seq.slice(from, until)
}
trait Mapped[S] extends super.Mapped[S] with Transformed[S]{
def splitter: IterableSplitter[S] = self.splitter.map(mapping)
override def seq = self.seq.map(mapping).asInstanceOf[IterableView[S, CollSeq]]
}
// only use if other is a ParIterable, otherwise force
trait Appended[U >: T] extends super.Appended[U] with Transformed[U] {
def restPar: ParIterable[U] = rest.asParIterable
def splitter: IterableSplitter[U] = self.splitter.appendParIterable[U, IterableSplitter[U]](restPar.splitter)
override def seq = self.seq.++(rest).asInstanceOf[IterableView[U, CollSeq]]
}
trait Forced[S] extends super.Forced[S] with Transformed[S] {
def forcedPar: ParIterable[S] = forced.asParIterable
def splitter: IterableSplitter[S] = forcedPar.splitter
override def seq = forcedPar.seq.view.asInstanceOf[IterableView[S, CollSeq]]
}
// only use if other is a ParSeq, otherwise force
trait Zipped[S] extends super.Zipped[S] with Transformed[(T, S)] {
def otherPar: ParSeq[S] = other.asParSeq
def splitter: IterableSplitter[(T, S)] = self.splitter zipParSeq otherPar.splitter
override def seq = (self.seq zip other).asInstanceOf[IterableView[(T, S), CollSeq]]
}
// only use if other is a ParSeq, otherwise force
trait ZippedAll[U >: T, S] extends super.ZippedAll[U, S] with Transformed[(U, S)] {
def otherPar: ParSeq[S] = other.asParSeq
def splitter: IterableSplitter[(U, S)] = self.splitter.zipAllParSeq(otherPar.splitter, thisElem, thatElem)
override def seq = (self.seq.zipAll(other, thisElem, thatElem)).asInstanceOf[IterableView[(U, S), CollSeq]]
}
protected[this] def thisParSeq: ParSeq[T] = mutable.ParArray.fromTraversables(this.iterator)
private[this] implicit def asThis(xs: Transformed[T]): This = xs.asInstanceOf[This]
/* operation overrides */
override def take(n: Int): This = newSliced(SliceInterval(0, n))
override def drop(n: Int): This = newSliced(SliceInterval(n, splitter.remaining))
override def splitAt(n: Int): (This, This) = (take(n), drop(n))
override def slice(from: Int, until: Int): This = newSliced(SliceInterval(from, until))
override def map[S, That](f: T => S)(implicit bf: CanBuildFrom[This, S, That]): That = newMapped(f).asInstanceOf[That]
override def ++[U >: T, That](xs: GenTraversableOnce[U])(implicit bf: CanBuildFrom[This, U, That]): That = newAppendedTryParIterable(xs.toTraversable).asInstanceOf[That]
override def filter(p: T => Boolean): This = newForced(thisParSeq.filter(p)).asInstanceOf[This]
override def filterNot(p: T => Boolean): This = newForced(thisParSeq.filterNot(p)).asInstanceOf[This]
override def partition(p: T => Boolean): (This, This) = {
val (t, f) = thisParSeq.partition(p)
(newForced(t).asInstanceOf[This], newForced(f).asInstanceOf[This])
}
override def takeWhile(p: T => Boolean): This = newForced(thisParSeq.takeWhile(p)).asInstanceOf[This]
override def dropWhile(p: T => Boolean): This = newForced(thisParSeq.dropWhile(p)).asInstanceOf[This]
override def span(p: T => Boolean): (This, This) = {
val (pref, suff) = thisParSeq.span(p)
(newForced(pref).asInstanceOf[This], newForced(suff).asInstanceOf[This])
}
override def flatMap[S, That](f: T => GenTraversableOnce[S])(implicit bf: CanBuildFrom[This, S, That]): That = newForced(thisParSeq.flatMap(f)).asInstanceOf[That]
override def zip[U >: T, S, That](that: GenIterable[S])(implicit bf: CanBuildFrom[This, (U, S), That]): That = newZippedTryParSeq(that).asInstanceOf[That]
override def zipWithIndex[U >: T, That](implicit bf: CanBuildFrom[This, (U, Int), That]): That =
newZipped(ParRange(0, splitter.remaining, 1, inclusive = false)).asInstanceOf[That]
override def zipAll[S, U >: T, That](that: GenIterable[S], thisElem: U, thatElem: S)(implicit bf: CanBuildFrom[This, (U, S), That]): That =
newZippedAllTryParSeq(that, thisElem, thatElem).asInstanceOf[That]
override def force[U >: T, That](implicit bf: CanBuildFrom[Coll, U, That]) = bf ifParallel { pbf =>
tasksupport.executeAndWaitResult(new Force(pbf, splitter).mapResult(_.result).asInstanceOf[Task[That, ResultMapping[_, Force[U, That], That]]])
} otherwise {
val b = bf(underlying)
b ++= this.iterator
b.result()
}
/* wrapper virtual ctors */
protected def newSliced(_endpoints: SliceInterval): Transformed[T] = new { val endpoints = _endpoints } with Sliced
protected def newMapped[S](f: T => S): Transformed[S] = new Mapped[S] { val mapping = f }
protected def newForced[S](xs: => GenSeq[S]): Transformed[S] = new Forced[S] { val forced = xs }
protected def newAppended[U >: T](that: GenTraversable[U]): Transformed[U] = new Appended[U] { val rest = that }
protected def newDroppedWhile(p: T => Boolean) = unsupported
protected def newTakenWhile(p: T => Boolean) = unsupported
protected def newFlatMapped[S](f: T => GenTraversableOnce[S]) = unsupported
protected def newFiltered(p: T => Boolean) = unsupported
protected def newZipped[S](that: GenIterable[S]): Transformed[(T, S)] = new Zipped[S] { val other = that }
protected def newZippedAll[U >: T, S](that: GenIterable[S], _thisElem: U, _thatElem: S): Transformed[(U, S)] = new ZippedAll[U, S] {
val other = that
val thisElem = _thisElem
val thatElem = _thatElem
}
/* argument sequence dependent ctors */
protected def newForcedTryParIterable[S](xs: => GenSeq[S]): Transformed[S] = {
if (xs.isParIterable) newForced[S](xs)
else newForced(mutable.ParArray.fromTraversables(xs))
}
protected def newAppendedTryParIterable[U >: T](that: GenTraversable[U]): Transformed[U] = {
// we only append if `that` is a parallel iterable, i.e. it has a splitter
if (that.isParIterable) newAppended(that)
else newAppended(mutable.ParArray.fromTraversables(that))
}
protected def newZippedTryParSeq[S](that: GenIterable[S]): Transformed[(T, S)] = {
if (that.isParSeq) newZipped[S](that)
else newZipped[S](mutable.ParArray.fromTraversables(that))
}
protected def newZippedAllTryParSeq[S, U >: T](that: GenIterable[S], thisElem: U, thatElem: S): Transformed[(U, S)] = {
if (that.isParSeq) newZippedAll(that, thisElem, thatElem)
else newZippedAll(mutable.ParArray.fromTraversables(that), thisElem, thatElem)
}
/* tasks */
protected[this] class Force[U >: T, That](cbf: CanCombineFrom[Coll, U, That], protected[this] val pit: IterableSplitter[T])
extends Transformer[Combiner[U, That], Force[U, That]] {
var result: Combiner[U, That] = null
def leaf(prev: Option[Combiner[U, That]]) = result = pit.copy2builder[U, That, Combiner[U, That]](reuse(prev, cbf(self.underlying)))
protected[this] def newSubtask(p: IterableSplitter[T]) = new Force(cbf, p)
override def merge(that: Force[U, That]) = result = result combine that.result
}
}
|