diff options
-rw-r--r-- | src/library/scala/collection/immutable/List.scala | 48 | ||||
-rw-r--r-- | src/library/scala/collection/mutable/ListBuffer.scala | 4 | ||||
-rw-r--r-- | src/library/scala/collection/parallel/ParIterableLike.scala | 3 | ||||
-rw-r--r-- | src/library/scala/collection/parallel/Tasks.scala | 57 | ||||
-rw-r--r-- | src/library/scala/collection/parallel/package.scala | 2 | ||||
-rw-r--r-- | test/files/run/si5374.check | 5 | ||||
-rw-r--r-- | test/files/run/si5374.scala | 52 | ||||
-rw-r--r-- | test/files/run/si5375.check | 1 | ||||
-rw-r--r-- | test/files/run/si5375.scala | 19 |
9 files changed, 150 insertions, 41 deletions
diff --git a/src/library/scala/collection/immutable/List.scala b/src/library/scala/collection/immutable/List.scala index e9ecc75e0f..f789de9fac 100644 --- a/src/library/scala/collection/immutable/List.scala +++ b/src/library/scala/collection/immutable/List.scala @@ -14,6 +14,7 @@ package immutable import generic._ import mutable.{Builder, ListBuffer} import annotation.tailrec +import java.io._ /** A class for immutable linked lists representing ordered collections * of elements of type. @@ -315,8 +316,46 @@ final case class ::[B](private var hd: B, private[scala] var tl: List[B]) extend override def head : B = hd override def tail : List[B] = tl override def isEmpty: Boolean = false - - + + private def writeObject(out: ObjectOutputStream) { + out.writeObject(ListSerializeStart) // needed to differentiate with the legacy `::` serialization + out.writeObject(this.hd) + out.writeObject(this.tl) + } + + private def readObject(in: ObjectInputStream) { + val obj = in.readObject() + if (obj == ListSerializeStart) { + this.hd = in.readObject().asInstanceOf[B] + this.tl = in.readObject().asInstanceOf[List[B]] + } else oldReadObject(in, obj) + } + + /* The oldReadObject method exists here for compatibility reasons. + * :: objects used to be serialized by serializing all the elements to + * the output stream directly, but this was broken (see SI-5374). + */ + private def oldReadObject(in: ObjectInputStream, firstObject: AnyRef) { + hd = firstObject.asInstanceOf[B] + assert(hd != ListSerializeEnd) + var current: ::[B] = this + while (true) in.readObject match { + case ListSerializeEnd => + current.tl = Nil + return + case a : Any => + val list : ::[B] = new ::(a.asInstanceOf[B], Nil) + current.tl = list + current = list + } + } + + private def oldWriteObject(out: ObjectOutputStream) { + var xs: List[B] = this + while (!xs.isEmpty) { out.writeObject(xs.head); xs = xs.tail } + out.writeObject(ListSerializeEnd) + } + } /** $factoryInfo @@ -582,4 +621,9 @@ object List extends SeqFactory[List] { /** Only used for list serialization */ @SerialVersionUID(0L - 8476791151975527571L) +private[scala] case object ListSerializeStart + +/** Only used for list serialization */ +@SerialVersionUID(0L - 8476791151975527571L) private[scala] case object ListSerializeEnd + diff --git a/src/library/scala/collection/mutable/ListBuffer.scala b/src/library/scala/collection/mutable/ListBuffer.scala index eb871135df..53c876ec08 100644 --- a/src/library/scala/collection/mutable/ListBuffer.scala +++ b/src/library/scala/collection/mutable/ListBuffer.scala @@ -41,7 +41,7 @@ import java.io._ * @define mayNotTerminateInf * @define willNotTerminateInf */ -@SerialVersionUID(3419063961353022661L) +@SerialVersionUID(3419063961353022662L) final class ListBuffer[A] extends AbstractBuffer[A] with Buffer[A] @@ -399,7 +399,7 @@ final class ListBuffer[A] private def copy() { var cursor = start val limit = last0.tail - clear + clear() while (cursor ne limit) { this += cursor.head cursor = cursor.tail diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala index 90b64c17f9..390bd72ab5 100644 --- a/src/library/scala/collection/parallel/ParIterableLike.scala +++ b/src/library/scala/collection/parallel/ParIterableLike.scala @@ -895,7 +895,8 @@ self: ParIterableLike[T, Repr, Sequential] => @volatile var result: R1 = null.asInstanceOf[R1] def map(r: R): R1 def leaf(prevr: Option[R1]) = { - result = map(executeAndWaitResult(inner)) + val initialResult = executeAndWaitResult(inner) + result = map(initialResult) } private[parallel] override def signalAbort() { inner.signalAbort diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala index 873291fb2d..b705909cad 100644 --- a/src/library/scala/collection/parallel/Tasks.scala +++ b/src/library/scala/collection/parallel/Tasks.scala @@ -88,7 +88,7 @@ trait Tasks { if (this.throwable == null && that.throwable == null && (this.result == null || that.result == null)) { println("This: " + this + ", thr=" + this.throwable + "; merged with " + that + ", thr=" + that.throwable) } else if (this.throwable != null || that.throwable != null) { - println("merging this thr: " + this.throwable + " with " + that + ", thr=" + that.throwable) + println("merging this: " + this + " with thr: " + this.throwable + " with " + that + ", thr=" + that.throwable) } } @@ -118,7 +118,7 @@ trait Tasks { /** Try to cancel the task. * @return `true` if cancellation is successful. */ - def tryCancel: Boolean + def tryCancel(): Boolean /** If the task has been cancelled successfully, those syncing on it may * automatically be notified, depending on the implementation. If they * aren't, this release method should be called after processing the @@ -161,32 +161,39 @@ trait AdaptiveWorkStealingTasks extends Tasks { def split: Seq[TaskImpl[R, Tp]] - def compute() = if (body.shouldSplitFurther) internal else body.tryLeaf(None) + def compute() = if (body.shouldSplitFurther) { + internal() + release() + } else { + body.tryLeaf(None) + release() + } def internal() = { var last = spawnSubtasks() - + last.body.tryLeaf(None) + last.release() body.result = last.body.result body.throwable = last.body.throwable - + while (last.next != null) { // val lastresult = Option(last.body.result) val beforelast = last last = last.next - if (last.tryCancel) { + if (last.tryCancel()) { // println("Done with " + beforelast.body + ", next direct is " + last.body) last.body.tryLeaf(Some(body.result)) - last.release + last.release() } else { // println("Done with " + beforelast.body + ", next sync is " + last.body) - last.sync + last.sync() } // println("Merging " + body + " with " + last.body) body.tryMerge(last.body.repr) } } - + def spawnSubtasks() = { var last: TaskImpl[R, Tp] = null var head: TaskImpl[R, Tp] = this @@ -196,7 +203,7 @@ trait AdaptiveWorkStealingTasks extends Tasks { for (t <- subtasks.tail.reverse) { t.next = last last = t - t.start + t.start() } } while (head.body.shouldSplitFurther); head.next = last @@ -230,12 +237,12 @@ trait ThreadPoolTasks extends Tasks { // utb: var future: Future[_] = null @volatile var owned = false @volatile var completed = false - + def start() = synchronized { // debuglog("Starting " + body) // utb: future = executor.submit(this) executor.synchronized { - incrTasks + incrTasks() executor.submit(this) } } @@ -249,9 +256,9 @@ trait ThreadPoolTasks extends Tasks { //assert(executor.getCorePoolSize == (coresize + 1)) } } - if (!completed) this.wait + while (!completed) this.wait } - def tryCancel = synchronized { + def tryCancel() = synchronized { // utb: future.cancel(false) if (!owned) { // debuglog("Cancelling " + body) @@ -259,7 +266,7 @@ trait ThreadPoolTasks extends Tasks { true } else false } - def run = { + def run() = { // utb: compute var isOkToRun = false synchronized { @@ -270,17 +277,17 @@ trait ThreadPoolTasks extends Tasks { } if (isOkToRun) { // debuglog("Running body of " + body) - compute - release + compute() } else { // just skip // debuglog("skipping body of " + body) } } - override def release = synchronized { + override def release() = synchronized { + //println("releasing: " + this + ", body: " + this.body) completed = true executor.synchronized { - decrTasks + decrTasks() } this.notifyAll } @@ -305,10 +312,10 @@ trait ThreadPoolTasks extends Tasks { val t = newTaskImpl(task) // debuglog("-----------> Executing without wait: " + task) - t.start + t.start() () => { - t.sync + t.sync() t.body.forwardThrowable t.body.result } @@ -318,9 +325,9 @@ trait ThreadPoolTasks extends Tasks { val t = newTaskImpl(task) // debuglog("-----------> Executing with wait: " + task) - t.start - - t.sync + t.start() + + t.sync() t.body.forwardThrowable t.body.result } @@ -369,7 +376,7 @@ trait FutureThreadPoolTasks extends Tasks { def sync() = future.get def tryCancel = false def run = { - compute + compute() } } diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala index addc366072..f152629c50 100644 --- a/src/library/scala/collection/parallel/package.scala +++ b/src/library/scala/collection/parallel/package.scala @@ -117,7 +117,7 @@ package parallel { /** Composite throwable - thrown when multiple exceptions are thrown at the same time. */ final case class CompositeThrowable( val throwables: Set[Throwable] - ) extends Throwable( + ) extends Exception( "Multiple exceptions thrown during a parallel computation: " + throwables.map(t => t + "\n" + t.getStackTrace.take(10).++("...").mkString("\n")).mkString("\n\n") ) diff --git a/test/files/run/si5374.check b/test/files/run/si5374.check index cdf0bc7e5b..6be88d77ec 100644 --- a/test/files/run/si5374.check +++ b/test/files/run/si5374.check @@ -1,3 +1,6 @@ ListBuffer(1, 2, 3, 1) ListBuffer(1, 2, 3, 1) -ListBuffer()
\ No newline at end of file +ListBuffer() +List(1, 2, 3, 4, 5) +List(1, 2, 3) +ok
\ No newline at end of file diff --git a/test/files/run/si5374.scala b/test/files/run/si5374.scala index a5678c3a81..9b1671e795 100644 --- a/test/files/run/si5374.scala +++ b/test/files/run/si5374.scala @@ -11,15 +11,22 @@ object Test { def main(args: Array[String]) { ticketExample() emptyListBuffer() + list() + legacyList() + objectWithMultipleLists() } - def ticketExample() { + def inAndOut[T <: AnyRef](obj: T): T = { val baos = new ByteArrayOutputStream val oos = new ObjectOutputStream(baos) - oos.writeObject( ListBuffer(1,2,3) ) + oos.writeObject( obj ) val bais = new ByteArrayInputStream( baos.toByteArray ) val ois = new ObjectInputStream(bais) - val lb = ois.readObject.asInstanceOf[ListBuffer[Int]] + ois.readObject.asInstanceOf[T] + } + + def ticketExample() { + val lb = inAndOut(ListBuffer(1, 2, 3)) val lb2 = ListBuffer[Int]() ++= lb lb2 ++= List(1) @@ -29,14 +36,41 @@ object Test { } def emptyListBuffer() { - val baos = new ByteArrayOutputStream - val oos = new ObjectOutputStream(baos) - oos.writeObject( ListBuffer() ) - val bais = new ByteArrayInputStream( baos.toByteArray ) - val ois = new ObjectInputStream(bais) - val lb = ois.readObject.asInstanceOf[ListBuffer[Int]] + val lb = inAndOut(ListBuffer[Int]()) println(lb) } + def list() { + val l = inAndOut(List(1, 2, 3, 4, 5)) + + println(l) + } + + // this byte array corresponds to what List(1, 2, 3) used to be serialized to prior to this fix + val listBytes = Array[Byte](-84, -19, 0, 5, 115, 114, 0, 39, 115, 99, 97, 108, 97, 46, 99, 111, 108, 108, 101, 99, 116, 105, 111, 110, 46, 105, 109, 109, 117, 116, 97, 98, 108, 101, 46, 36, 99, 111, 108, 111, 110, 36, 99, 111, 108, 111, 110, -118, 92, 99, 91, -10, -40, -7, 109, 3, 0, 2, 76, 0, 43, 115, 99, 97, 108, 97, 36, 99, 111, 108, 108, 101, 99, 116, 105, 111, 110, 36, 105, 109, 109, 117, 116, 97, 98, 108, 101, 36, 36, 99, 111, 108, 111, 110, 36, 99, 111, 108, 111, 110, 36, 36, 104, 100, 116, 0, 18, 76, 106, 97, 118, 97, 47, 108, 97, 110, 103, 47, 79, 98, 106, 101, 99, 116, 59, 76, 0, 2, 116, 108, 116, 0, 33, 76, 115, 99, 97, 108, 97, 47, 99, 111, 108, 108, 101, 99, 116, 105, 111, 110, 47, 105, 109, 109, 117, 116, 97, 98, 108, 101, 47, 76, 105, 115, 116, 59, 120, 112, 115, 114, 0, 17, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 73, 110, 116, 101, 103, 101, 114, 18, -30, -96, -92, -9, -127, -121, 56, 2, 0, 1, 73, 0, 5, 118, 97, 108, 117, 101, 120, 114, 0, 16, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 78, 117, 109, 98, 101, 114, -122, -84, -107, 29, 11, -108, -32, -117, 2, 0, 0, 120, 112, 0, 0, 0, 1, 115, 113, 0, 126, 0, 4, 0, 0, 0, 2, 115, 113, 0, 126, 0, 4, 0, 0, 0, 3, 115, 114, 0, 44, 115, 99, 97, 108, 97, 46, 99, 111, 108, 108, 101, 99, 116, 105, 111, 110, 46, 105, 109, 109, 117, 116, 97, 98, 108, 101, 46, 76, 105, 115, 116, 83, 101, 114, 105, 97, 108, 105, 122, 101, 69, 110, 100, 36, -118, 92, 99, 91, -9, 83, 11, 109, 2, 0, 0, 120, 112, 120) + + def legacyList() { + val bais = new ByteArrayInputStream(listBytes) + val ois = new ObjectInputStream(bais) + val l = ois.readObject() + + println(l) + } + + class Foo extends Serializable { + val head = List(1, 2, 3) + val last = head.tail.tail + def structuralSharing: Boolean = head.tail.tail eq last + + assert(structuralSharing) + } + + def objectWithMultipleLists() { + val foo = inAndOut(new Foo) + + if (foo.structuralSharing) println("ok") + else println("no structural sharing") + } + } diff --git a/test/files/run/si5375.check b/test/files/run/si5375.check new file mode 100644 index 0000000000..7d3002ffda --- /dev/null +++ b/test/files/run/si5375.check @@ -0,0 +1 @@ +Composite throwable
\ No newline at end of file diff --git a/test/files/run/si5375.scala b/test/files/run/si5375.scala new file mode 100644 index 0000000000..e4b329deae --- /dev/null +++ b/test/files/run/si5375.scala @@ -0,0 +1,19 @@ + + + +import collection.parallel.CompositeThrowable + + + +object Test { + + def main(args: Array[String]) { + val foos = (1 to 1000) toSeq; + try { + foos.par.map(i => if (i % 37 == 0) sys.error("i div 37") else i) + } catch { + case CompositeThrowable(thr) => println("Composite throwable") + } + } + +} |