summaryrefslogtreecommitdiff
path: root/src/library/scala/collection/immutable/Stream.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/library/scala/collection/immutable/Stream.scala')
-rw-r--r--src/library/scala/collection/immutable/Stream.scala86
1 files changed, 60 insertions, 26 deletions
diff --git a/src/library/scala/collection/immutable/Stream.scala b/src/library/scala/collection/immutable/Stream.scala
index 5bb4ef5f21..60de147477 100644
--- a/src/library/scala/collection/immutable/Stream.scala
+++ b/src/library/scala/collection/immutable/Stream.scala
@@ -6,7 +6,8 @@
** |/ **
\* */
-package scala.collection
+package scala
+package collection
package immutable
import generic._
@@ -181,11 +182,14 @@ import scala.language.implicitConversions
* @define coll stream
* @define orderDependent
* @define orderDependentFold
+ * @define willTerminateInf Note: lazily evaluated; will terminate for infinite-sized collections.
*/
+@deprecatedInheritance("This class will be sealed.", "2.11.0")
abstract class Stream[+A] extends AbstractSeq[A]
with LinearSeq[A]
with GenericTraversableTemplate[A, Stream]
- with LinearSeqOptimized[A, Stream[A]] {
+ with LinearSeqOptimized[A, Stream[A]]
+ with Serializable {
self =>
override def companion: GenericCompanion[Stream] = Stream
@@ -286,9 +290,8 @@ self =>
len
}
- /** It's an imperfect world, but at least we can bottle up the
- * imperfection in a capsule.
- */
+ // It's an imperfect world, but at least we can bottle up the
+ // imperfection in a capsule.
@inline private def asThat[That](x: AnyRef): That = x.asInstanceOf[That]
@inline private def asStream[B](x: AnyRef): Stream[B] = x.asInstanceOf[Stream[B]]
@inline private def isStreamBuilder[B, That](bf: CanBuildFrom[Stream[A], B, That]) =
@@ -385,12 +388,17 @@ self =>
// 1) stackoverflows (could be achieved with tailrec, too)
// 2) out of memory errors for big streams (`this` reference can be eliminated from the stack)
var rest: Stream[A] = this
- while (rest.nonEmpty && !pf.isDefinedAt(rest.head)) rest = rest.tail
+
+ // Avoids calling both `pf.isDefined` and `pf.apply`.
+ var newHead: B = null.asInstanceOf[B]
+ val runWith = pf.runWith((b: B) => newHead = b)
+
+ while (rest.nonEmpty && !runWith(rest.head)) rest = rest.tail
// without the call to the companion object, a thunk is created for the tail of the new stream,
// and the closure of the thunk will reference `this`
if (rest.isEmpty) Stream.Empty.asInstanceOf[That]
- else Stream.collectedTail(rest, pf, bf).asInstanceOf[That]
+ else Stream.collectedTail(newHead, rest, pf, bf).asInstanceOf[That]
}
}
@@ -725,10 +733,15 @@ self =>
* // produces: "5, 6, 7, 8, 9"
* }}}
*/
- override def take(n: Int): Stream[A] =
+ override def take(n: Int): Stream[A] = (
+ // Note that the n == 1 condition appears redundant but is not.
+ // It prevents "tail" from being referenced (and its head being evaluated)
+ // when obtaining the last element of the result. Such are the challenges
+ // of working with a lazy-but-not-really sequence.
if (n <= 0 || isEmpty) Stream.empty
else if (n == 1) cons(head, Stream.empty)
else cons(head, tail take n-1)
+ )
@tailrec final override def drop(n: Int): Stream[A] =
if (n <= 0 || isEmpty) this
@@ -784,8 +797,23 @@ self =>
these
}
- // there's nothing we can do about dropRight, so we just keep the definition
- // in LinearSeq
+ /**
+ * @inheritdoc
+ * $willTerminateInf
+ */
+ override def dropRight(n: Int): Stream[A] = {
+ // We make dropRight work for possibly infinite streams by carrying
+ // a buffer of the dropped size. As long as the buffer is full and the
+ // rest is non-empty, we can feed elements off the buffer head. When
+ // the rest becomes empty, the full buffer is the dropped elements.
+ def advance(stub0: List[A], stub1: List[A], rest: Stream[A]): Stream[A] = {
+ if (rest.isEmpty) Stream.empty
+ else if (stub0.isEmpty) advance(stub1.reverse, Nil, rest)
+ else cons(stub0.head, advance(stub0.tail, rest.head :: stub1, rest.tail))
+ }
+ if (n <= 0) this
+ else advance((this take n).toList, Nil, this drop n)
+ }
/** Returns the longest prefix of this `Stream` whose elements satisfy the
* predicate `p`.
@@ -927,19 +955,21 @@ self =>
* `Stream`.
* @example {{{
* val sov: Stream[Vector[Int]] = Vector(0) #:: Vector(0, 0) #:: sov.zip(sov.tail).map { n => n._1 ++ n._2 }
- * sov flatten take 10 mkString ", "
+ * sov.flatten take 10 mkString ", "
* // produces: "0, 0, 0, 0, 0, 0, 0, 0, 0, 0"
* }}}
*/
override def flatten[B](implicit asTraversable: A => /*<:<!!!*/ GenTraversableOnce[B]): Stream[B] = {
- def flatten1(t: Traversable[B]): Stream[B] =
- if (!t.isEmpty)
- cons(t.head, flatten1(t.tail))
- else
- tail.flatten
-
- if (isEmpty) Stream.empty
- else flatten1(asTraversable(head).seq.toTraversable)
+ var st: Stream[A] = this
+ while (st.nonEmpty) {
+ val h = asTraversable(st.head)
+ if (h.isEmpty) {
+ st = st.tail
+ } else {
+ return h.toStream #::: st.tail.flatten
+ }
+ }
+ Stream.empty
}
override def view = new StreamView[A, Stream[A]] {
@@ -973,7 +1003,7 @@ final class StreamIterator[+A] private() extends AbstractIterator[A] with Iterat
def hasNext: Boolean = these.v.nonEmpty
def next(): A =
- if (isEmpty) Iterator.empty.next
+ if (isEmpty) Iterator.empty.next()
else {
val cur = these.v
val result = cur.head
@@ -1023,7 +1053,7 @@ object Stream extends SeqFactory[Stream] {
def result: Stream[A] = parts.toStream flatMap (_.toStream)
}
- object Empty extends Stream[Nothing] with Serializable {
+ object Empty extends Stream[Nothing] {
override def isEmpty = true
override def head = throw new NoSuchElementException("head of empty stream")
override def tail = throw new UnsupportedOperationException("tail of empty stream")
@@ -1074,15 +1104,19 @@ object Stream extends SeqFactory[Stream] {
/** A lazy cons cell, from which streams are built. */
@SerialVersionUID(-602202424901551803L)
- final class Cons[+A](hd: A, tl: => Stream[A]) extends Stream[A] with Serializable {
+ final class Cons[+A](hd: A, tl: => Stream[A]) extends Stream[A] {
override def isEmpty = false
override def head = hd
@volatile private[this] var tlVal: Stream[A] = _
- def tailDefined: Boolean = tlVal ne null
+ @volatile private[this] var tlGen = tl _
+ def tailDefined: Boolean = tlGen eq null
override def tail: Stream[A] = {
if (!tailDefined)
synchronized {
- if (!tailDefined) tlVal = tl
+ if (!tailDefined) {
+ tlVal = tlGen()
+ tlGen = null
+ }
}
tlVal
@@ -1149,8 +1183,8 @@ object Stream extends SeqFactory[Stream] {
cons(stream.head, stream.tail filter p)
}
- private[immutable] def collectedTail[A, B, That](stream: Stream[A], pf: PartialFunction[A, B], bf: CanBuildFrom[Stream[A], B, That]) = {
- cons(pf(stream.head), stream.tail.collect(pf)(bf).asInstanceOf[Stream[B]])
+ private[immutable] def collectedTail[A, B, That](head: B, stream: Stream[A], pf: PartialFunction[A, B], bf: CanBuildFrom[Stream[A], B, That]) = {
+ cons(head, stream.tail.collect(pf)(bf).asInstanceOf[Stream[B]])
}
}