diff options
Diffstat (limited to 'src/library/scala/collection/immutable/Stream.scala')
-rw-r--r-- | src/library/scala/collection/immutable/Stream.scala | 86 |
1 files changed, 60 insertions, 26 deletions
diff --git a/src/library/scala/collection/immutable/Stream.scala b/src/library/scala/collection/immutable/Stream.scala index 5bb4ef5f21..60de147477 100644 --- a/src/library/scala/collection/immutable/Stream.scala +++ b/src/library/scala/collection/immutable/Stream.scala @@ -6,7 +6,8 @@ ** |/ ** \* */ -package scala.collection +package scala +package collection package immutable import generic._ @@ -181,11 +182,14 @@ import scala.language.implicitConversions * @define coll stream * @define orderDependent * @define orderDependentFold + * @define willTerminateInf Note: lazily evaluated; will terminate for infinite-sized collections. */ +@deprecatedInheritance("This class will be sealed.", "2.11.0") abstract class Stream[+A] extends AbstractSeq[A] with LinearSeq[A] with GenericTraversableTemplate[A, Stream] - with LinearSeqOptimized[A, Stream[A]] { + with LinearSeqOptimized[A, Stream[A]] + with Serializable { self => override def companion: GenericCompanion[Stream] = Stream @@ -286,9 +290,8 @@ self => len } - /** It's an imperfect world, but at least we can bottle up the - * imperfection in a capsule. - */ + // It's an imperfect world, but at least we can bottle up the + // imperfection in a capsule. @inline private def asThat[That](x: AnyRef): That = x.asInstanceOf[That] @inline private def asStream[B](x: AnyRef): Stream[B] = x.asInstanceOf[Stream[B]] @inline private def isStreamBuilder[B, That](bf: CanBuildFrom[Stream[A], B, That]) = @@ -385,12 +388,17 @@ self => // 1) stackoverflows (could be achieved with tailrec, too) // 2) out of memory errors for big streams (`this` reference can be eliminated from the stack) var rest: Stream[A] = this - while (rest.nonEmpty && !pf.isDefinedAt(rest.head)) rest = rest.tail + + // Avoids calling both `pf.isDefined` and `pf.apply`. + var newHead: B = null.asInstanceOf[B] + val runWith = pf.runWith((b: B) => newHead = b) + + while (rest.nonEmpty && !runWith(rest.head)) rest = rest.tail // without the call to the companion object, a thunk is created for the tail of the new stream, // and the closure of the thunk will reference `this` if (rest.isEmpty) Stream.Empty.asInstanceOf[That] - else Stream.collectedTail(rest, pf, bf).asInstanceOf[That] + else Stream.collectedTail(newHead, rest, pf, bf).asInstanceOf[That] } } @@ -725,10 +733,15 @@ self => * // produces: "5, 6, 7, 8, 9" * }}} */ - override def take(n: Int): Stream[A] = + override def take(n: Int): Stream[A] = ( + // Note that the n == 1 condition appears redundant but is not. + // It prevents "tail" from being referenced (and its head being evaluated) + // when obtaining the last element of the result. Such are the challenges + // of working with a lazy-but-not-really sequence. if (n <= 0 || isEmpty) Stream.empty else if (n == 1) cons(head, Stream.empty) else cons(head, tail take n-1) + ) @tailrec final override def drop(n: Int): Stream[A] = if (n <= 0 || isEmpty) this @@ -784,8 +797,23 @@ self => these } - // there's nothing we can do about dropRight, so we just keep the definition - // in LinearSeq + /** + * @inheritdoc + * $willTerminateInf + */ + override def dropRight(n: Int): Stream[A] = { + // We make dropRight work for possibly infinite streams by carrying + // a buffer of the dropped size. As long as the buffer is full and the + // rest is non-empty, we can feed elements off the buffer head. When + // the rest becomes empty, the full buffer is the dropped elements. + def advance(stub0: List[A], stub1: List[A], rest: Stream[A]): Stream[A] = { + if (rest.isEmpty) Stream.empty + else if (stub0.isEmpty) advance(stub1.reverse, Nil, rest) + else cons(stub0.head, advance(stub0.tail, rest.head :: stub1, rest.tail)) + } + if (n <= 0) this + else advance((this take n).toList, Nil, this drop n) + } /** Returns the longest prefix of this `Stream` whose elements satisfy the * predicate `p`. @@ -927,19 +955,21 @@ self => * `Stream`. * @example {{{ * val sov: Stream[Vector[Int]] = Vector(0) #:: Vector(0, 0) #:: sov.zip(sov.tail).map { n => n._1 ++ n._2 } - * sov flatten take 10 mkString ", " + * sov.flatten take 10 mkString ", " * // produces: "0, 0, 0, 0, 0, 0, 0, 0, 0, 0" * }}} */ override def flatten[B](implicit asTraversable: A => /*<:<!!!*/ GenTraversableOnce[B]): Stream[B] = { - def flatten1(t: Traversable[B]): Stream[B] = - if (!t.isEmpty) - cons(t.head, flatten1(t.tail)) - else - tail.flatten - - if (isEmpty) Stream.empty - else flatten1(asTraversable(head).seq.toTraversable) + var st: Stream[A] = this + while (st.nonEmpty) { + val h = asTraversable(st.head) + if (h.isEmpty) { + st = st.tail + } else { + return h.toStream #::: st.tail.flatten + } + } + Stream.empty } override def view = new StreamView[A, Stream[A]] { @@ -973,7 +1003,7 @@ final class StreamIterator[+A] private() extends AbstractIterator[A] with Iterat def hasNext: Boolean = these.v.nonEmpty def next(): A = - if (isEmpty) Iterator.empty.next + if (isEmpty) Iterator.empty.next() else { val cur = these.v val result = cur.head @@ -1023,7 +1053,7 @@ object Stream extends SeqFactory[Stream] { def result: Stream[A] = parts.toStream flatMap (_.toStream) } - object Empty extends Stream[Nothing] with Serializable { + object Empty extends Stream[Nothing] { override def isEmpty = true override def head = throw new NoSuchElementException("head of empty stream") override def tail = throw new UnsupportedOperationException("tail of empty stream") @@ -1074,15 +1104,19 @@ object Stream extends SeqFactory[Stream] { /** A lazy cons cell, from which streams are built. */ @SerialVersionUID(-602202424901551803L) - final class Cons[+A](hd: A, tl: => Stream[A]) extends Stream[A] with Serializable { + final class Cons[+A](hd: A, tl: => Stream[A]) extends Stream[A] { override def isEmpty = false override def head = hd @volatile private[this] var tlVal: Stream[A] = _ - def tailDefined: Boolean = tlVal ne null + @volatile private[this] var tlGen = tl _ + def tailDefined: Boolean = tlGen eq null override def tail: Stream[A] = { if (!tailDefined) synchronized { - if (!tailDefined) tlVal = tl + if (!tailDefined) { + tlVal = tlGen() + tlGen = null + } } tlVal @@ -1149,8 +1183,8 @@ object Stream extends SeqFactory[Stream] { cons(stream.head, stream.tail filter p) } - private[immutable] def collectedTail[A, B, That](stream: Stream[A], pf: PartialFunction[A, B], bf: CanBuildFrom[Stream[A], B, That]) = { - cons(pf(stream.head), stream.tail.collect(pf)(bf).asInstanceOf[Stream[B]]) + private[immutable] def collectedTail[A, B, That](head: B, stream: Stream[A], pf: PartialFunction[A, B], bf: CanBuildFrom[Stream[A], B, That]) = { + cons(head, stream.tail.collect(pf)(bf).asInstanceOf[Stream[B]]) } } |