summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/library/scala/collection/immutable/List.scala48
-rw-r--r--src/library/scala/collection/mutable/ListBuffer.scala4
-rw-r--r--src/library/scala/collection/parallel/ParIterableLike.scala3
-rw-r--r--src/library/scala/collection/parallel/Tasks.scala57
-rw-r--r--src/library/scala/collection/parallel/package.scala2
-rw-r--r--test/files/run/si5374.check5
-rw-r--r--test/files/run/si5374.scala52
-rw-r--r--test/files/run/si5375.check1
-rw-r--r--test/files/run/si5375.scala19
9 files changed, 150 insertions, 41 deletions
diff --git a/src/library/scala/collection/immutable/List.scala b/src/library/scala/collection/immutable/List.scala
index e9ecc75e0f..f789de9fac 100644
--- a/src/library/scala/collection/immutable/List.scala
+++ b/src/library/scala/collection/immutable/List.scala
@@ -14,6 +14,7 @@ package immutable
import generic._
import mutable.{Builder, ListBuffer}
import annotation.tailrec
+import java.io._
/** A class for immutable linked lists representing ordered collections
* of elements of type.
@@ -315,8 +316,46 @@ final case class ::[B](private var hd: B, private[scala] var tl: List[B]) extend
override def head : B = hd
override def tail : List[B] = tl
override def isEmpty: Boolean = false
-
-
+
+ private def writeObject(out: ObjectOutputStream) {
+ out.writeObject(ListSerializeStart) // needed to differentiate with the legacy `::` serialization
+ out.writeObject(this.hd)
+ out.writeObject(this.tl)
+ }
+
+ private def readObject(in: ObjectInputStream) {
+ val obj = in.readObject()
+ if (obj == ListSerializeStart) {
+ this.hd = in.readObject().asInstanceOf[B]
+ this.tl = in.readObject().asInstanceOf[List[B]]
+ } else oldReadObject(in, obj)
+ }
+
+ /* The oldReadObject method exists here for compatibility reasons.
+ * :: objects used to be serialized by serializing all the elements to
+ * the output stream directly, but this was broken (see SI-5374).
+ */
+ private def oldReadObject(in: ObjectInputStream, firstObject: AnyRef) {
+ hd = firstObject.asInstanceOf[B]
+ assert(hd != ListSerializeEnd)
+ var current: ::[B] = this
+ while (true) in.readObject match {
+ case ListSerializeEnd =>
+ current.tl = Nil
+ return
+ case a : Any =>
+ val list : ::[B] = new ::(a.asInstanceOf[B], Nil)
+ current.tl = list
+ current = list
+ }
+ }
+
+ private def oldWriteObject(out: ObjectOutputStream) {
+ var xs: List[B] = this
+ while (!xs.isEmpty) { out.writeObject(xs.head); xs = xs.tail }
+ out.writeObject(ListSerializeEnd)
+ }
+
}
/** $factoryInfo
@@ -582,4 +621,9 @@ object List extends SeqFactory[List] {
/** Only used for list serialization */
@SerialVersionUID(0L - 8476791151975527571L)
+private[scala] case object ListSerializeStart
+
+/** Only used for list serialization */
+@SerialVersionUID(0L - 8476791151975527571L)
private[scala] case object ListSerializeEnd
+
diff --git a/src/library/scala/collection/mutable/ListBuffer.scala b/src/library/scala/collection/mutable/ListBuffer.scala
index eb871135df..53c876ec08 100644
--- a/src/library/scala/collection/mutable/ListBuffer.scala
+++ b/src/library/scala/collection/mutable/ListBuffer.scala
@@ -41,7 +41,7 @@ import java.io._
* @define mayNotTerminateInf
* @define willNotTerminateInf
*/
-@SerialVersionUID(3419063961353022661L)
+@SerialVersionUID(3419063961353022662L)
final class ListBuffer[A]
extends AbstractBuffer[A]
with Buffer[A]
@@ -399,7 +399,7 @@ final class ListBuffer[A]
private def copy() {
var cursor = start
val limit = last0.tail
- clear
+ clear()
while (cursor ne limit) {
this += cursor.head
cursor = cursor.tail
diff --git a/src/library/scala/collection/parallel/ParIterableLike.scala b/src/library/scala/collection/parallel/ParIterableLike.scala
index 90b64c17f9..390bd72ab5 100644
--- a/src/library/scala/collection/parallel/ParIterableLike.scala
+++ b/src/library/scala/collection/parallel/ParIterableLike.scala
@@ -895,7 +895,8 @@ self: ParIterableLike[T, Repr, Sequential] =>
@volatile var result: R1 = null.asInstanceOf[R1]
def map(r: R): R1
def leaf(prevr: Option[R1]) = {
- result = map(executeAndWaitResult(inner))
+ val initialResult = executeAndWaitResult(inner)
+ result = map(initialResult)
}
private[parallel] override def signalAbort() {
inner.signalAbort
diff --git a/src/library/scala/collection/parallel/Tasks.scala b/src/library/scala/collection/parallel/Tasks.scala
index 873291fb2d..b705909cad 100644
--- a/src/library/scala/collection/parallel/Tasks.scala
+++ b/src/library/scala/collection/parallel/Tasks.scala
@@ -88,7 +88,7 @@ trait Tasks {
if (this.throwable == null && that.throwable == null && (this.result == null || that.result == null)) {
println("This: " + this + ", thr=" + this.throwable + "; merged with " + that + ", thr=" + that.throwable)
} else if (this.throwable != null || that.throwable != null) {
- println("merging this thr: " + this.throwable + " with " + that + ", thr=" + that.throwable)
+ println("merging this: " + this + " with thr: " + this.throwable + " with " + that + ", thr=" + that.throwable)
}
}
@@ -118,7 +118,7 @@ trait Tasks {
/** Try to cancel the task.
* @return `true` if cancellation is successful.
*/
- def tryCancel: Boolean
+ def tryCancel(): Boolean
/** If the task has been cancelled successfully, those syncing on it may
* automatically be notified, depending on the implementation. If they
* aren't, this release method should be called after processing the
@@ -161,32 +161,39 @@ trait AdaptiveWorkStealingTasks extends Tasks {
def split: Seq[TaskImpl[R, Tp]]
- def compute() = if (body.shouldSplitFurther) internal else body.tryLeaf(None)
+ def compute() = if (body.shouldSplitFurther) {
+ internal()
+ release()
+ } else {
+ body.tryLeaf(None)
+ release()
+ }
def internal() = {
var last = spawnSubtasks()
-
+
last.body.tryLeaf(None)
+ last.release()
body.result = last.body.result
body.throwable = last.body.throwable
-
+
while (last.next != null) {
// val lastresult = Option(last.body.result)
val beforelast = last
last = last.next
- if (last.tryCancel) {
+ if (last.tryCancel()) {
// println("Done with " + beforelast.body + ", next direct is " + last.body)
last.body.tryLeaf(Some(body.result))
- last.release
+ last.release()
} else {
// println("Done with " + beforelast.body + ", next sync is " + last.body)
- last.sync
+ last.sync()
}
// println("Merging " + body + " with " + last.body)
body.tryMerge(last.body.repr)
}
}
-
+
def spawnSubtasks() = {
var last: TaskImpl[R, Tp] = null
var head: TaskImpl[R, Tp] = this
@@ -196,7 +203,7 @@ trait AdaptiveWorkStealingTasks extends Tasks {
for (t <- subtasks.tail.reverse) {
t.next = last
last = t
- t.start
+ t.start()
}
} while (head.body.shouldSplitFurther);
head.next = last
@@ -230,12 +237,12 @@ trait ThreadPoolTasks extends Tasks {
// utb: var future: Future[_] = null
@volatile var owned = false
@volatile var completed = false
-
+
def start() = synchronized {
// debuglog("Starting " + body)
// utb: future = executor.submit(this)
executor.synchronized {
- incrTasks
+ incrTasks()
executor.submit(this)
}
}
@@ -249,9 +256,9 @@ trait ThreadPoolTasks extends Tasks {
//assert(executor.getCorePoolSize == (coresize + 1))
}
}
- if (!completed) this.wait
+ while (!completed) this.wait
}
- def tryCancel = synchronized {
+ def tryCancel() = synchronized {
// utb: future.cancel(false)
if (!owned) {
// debuglog("Cancelling " + body)
@@ -259,7 +266,7 @@ trait ThreadPoolTasks extends Tasks {
true
} else false
}
- def run = {
+ def run() = {
// utb: compute
var isOkToRun = false
synchronized {
@@ -270,17 +277,17 @@ trait ThreadPoolTasks extends Tasks {
}
if (isOkToRun) {
// debuglog("Running body of " + body)
- compute
- release
+ compute()
} else {
// just skip
// debuglog("skipping body of " + body)
}
}
- override def release = synchronized {
+ override def release() = synchronized {
+ //println("releasing: " + this + ", body: " + this.body)
completed = true
executor.synchronized {
- decrTasks
+ decrTasks()
}
this.notifyAll
}
@@ -305,10 +312,10 @@ trait ThreadPoolTasks extends Tasks {
val t = newTaskImpl(task)
// debuglog("-----------> Executing without wait: " + task)
- t.start
+ t.start()
() => {
- t.sync
+ t.sync()
t.body.forwardThrowable
t.body.result
}
@@ -318,9 +325,9 @@ trait ThreadPoolTasks extends Tasks {
val t = newTaskImpl(task)
// debuglog("-----------> Executing with wait: " + task)
- t.start
-
- t.sync
+ t.start()
+
+ t.sync()
t.body.forwardThrowable
t.body.result
}
@@ -369,7 +376,7 @@ trait FutureThreadPoolTasks extends Tasks {
def sync() = future.get
def tryCancel = false
def run = {
- compute
+ compute()
}
}
diff --git a/src/library/scala/collection/parallel/package.scala b/src/library/scala/collection/parallel/package.scala
index addc366072..f152629c50 100644
--- a/src/library/scala/collection/parallel/package.scala
+++ b/src/library/scala/collection/parallel/package.scala
@@ -117,7 +117,7 @@ package parallel {
/** Composite throwable - thrown when multiple exceptions are thrown at the same time. */
final case class CompositeThrowable(
val throwables: Set[Throwable]
- ) extends Throwable(
+ ) extends Exception(
"Multiple exceptions thrown during a parallel computation: " +
throwables.map(t => t + "\n" + t.getStackTrace.take(10).++("...").mkString("\n")).mkString("\n\n")
)
diff --git a/test/files/run/si5374.check b/test/files/run/si5374.check
index cdf0bc7e5b..6be88d77ec 100644
--- a/test/files/run/si5374.check
+++ b/test/files/run/si5374.check
@@ -1,3 +1,6 @@
ListBuffer(1, 2, 3, 1)
ListBuffer(1, 2, 3, 1)
-ListBuffer() \ No newline at end of file
+ListBuffer()
+List(1, 2, 3, 4, 5)
+List(1, 2, 3)
+ok \ No newline at end of file
diff --git a/test/files/run/si5374.scala b/test/files/run/si5374.scala
index a5678c3a81..9b1671e795 100644
--- a/test/files/run/si5374.scala
+++ b/test/files/run/si5374.scala
@@ -11,15 +11,22 @@ object Test {
def main(args: Array[String]) {
ticketExample()
emptyListBuffer()
+ list()
+ legacyList()
+ objectWithMultipleLists()
}
- def ticketExample() {
+ def inAndOut[T <: AnyRef](obj: T): T = {
val baos = new ByteArrayOutputStream
val oos = new ObjectOutputStream(baos)
- oos.writeObject( ListBuffer(1,2,3) )
+ oos.writeObject( obj )
val bais = new ByteArrayInputStream( baos.toByteArray )
val ois = new ObjectInputStream(bais)
- val lb = ois.readObject.asInstanceOf[ListBuffer[Int]]
+ ois.readObject.asInstanceOf[T]
+ }
+
+ def ticketExample() {
+ val lb = inAndOut(ListBuffer(1, 2, 3))
val lb2 = ListBuffer[Int]() ++= lb
lb2 ++= List(1)
@@ -29,14 +36,41 @@ object Test {
}
def emptyListBuffer() {
- val baos = new ByteArrayOutputStream
- val oos = new ObjectOutputStream(baos)
- oos.writeObject( ListBuffer() )
- val bais = new ByteArrayInputStream( baos.toByteArray )
- val ois = new ObjectInputStream(bais)
- val lb = ois.readObject.asInstanceOf[ListBuffer[Int]]
+ val lb = inAndOut(ListBuffer[Int]())
println(lb)
}
+ def list() {
+ val l = inAndOut(List(1, 2, 3, 4, 5))
+
+ println(l)
+ }
+
+ // this byte array corresponds to what List(1, 2, 3) used to be serialized to prior to this fix
+ val listBytes = Array[Byte](-84, -19, 0, 5, 115, 114, 0, 39, 115, 99, 97, 108, 97, 46, 99, 111, 108, 108, 101, 99, 116, 105, 111, 110, 46, 105, 109, 109, 117, 116, 97, 98, 108, 101, 46, 36, 99, 111, 108, 111, 110, 36, 99, 111, 108, 111, 110, -118, 92, 99, 91, -10, -40, -7, 109, 3, 0, 2, 76, 0, 43, 115, 99, 97, 108, 97, 36, 99, 111, 108, 108, 101, 99, 116, 105, 111, 110, 36, 105, 109, 109, 117, 116, 97, 98, 108, 101, 36, 36, 99, 111, 108, 111, 110, 36, 99, 111, 108, 111, 110, 36, 36, 104, 100, 116, 0, 18, 76, 106, 97, 118, 97, 47, 108, 97, 110, 103, 47, 79, 98, 106, 101, 99, 116, 59, 76, 0, 2, 116, 108, 116, 0, 33, 76, 115, 99, 97, 108, 97, 47, 99, 111, 108, 108, 101, 99, 116, 105, 111, 110, 47, 105, 109, 109, 117, 116, 97, 98, 108, 101, 47, 76, 105, 115, 116, 59, 120, 112, 115, 114, 0, 17, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 73, 110, 116, 101, 103, 101, 114, 18, -30, -96, -92, -9, -127, -121, 56, 2, 0, 1, 73, 0, 5, 118, 97, 108, 117, 101, 120, 114, 0, 16, 106, 97, 118, 97, 46, 108, 97, 110, 103, 46, 78, 117, 109, 98, 101, 114, -122, -84, -107, 29, 11, -108, -32, -117, 2, 0, 0, 120, 112, 0, 0, 0, 1, 115, 113, 0, 126, 0, 4, 0, 0, 0, 2, 115, 113, 0, 126, 0, 4, 0, 0, 0, 3, 115, 114, 0, 44, 115, 99, 97, 108, 97, 46, 99, 111, 108, 108, 101, 99, 116, 105, 111, 110, 46, 105, 109, 109, 117, 116, 97, 98, 108, 101, 46, 76, 105, 115, 116, 83, 101, 114, 105, 97, 108, 105, 122, 101, 69, 110, 100, 36, -118, 92, 99, 91, -9, 83, 11, 109, 2, 0, 0, 120, 112, 120)
+
+ def legacyList() {
+ val bais = new ByteArrayInputStream(listBytes)
+ val ois = new ObjectInputStream(bais)
+ val l = ois.readObject()
+
+ println(l)
+ }
+
+ class Foo extends Serializable {
+ val head = List(1, 2, 3)
+ val last = head.tail.tail
+ def structuralSharing: Boolean = head.tail.tail eq last
+
+ assert(structuralSharing)
+ }
+
+ def objectWithMultipleLists() {
+ val foo = inAndOut(new Foo)
+
+ if (foo.structuralSharing) println("ok")
+ else println("no structural sharing")
+ }
+
}
diff --git a/test/files/run/si5375.check b/test/files/run/si5375.check
new file mode 100644
index 0000000000..7d3002ffda
--- /dev/null
+++ b/test/files/run/si5375.check
@@ -0,0 +1 @@
+Composite throwable \ No newline at end of file
diff --git a/test/files/run/si5375.scala b/test/files/run/si5375.scala
new file mode 100644
index 0000000000..e4b329deae
--- /dev/null
+++ b/test/files/run/si5375.scala
@@ -0,0 +1,19 @@
+
+
+
+import collection.parallel.CompositeThrowable
+
+
+
+object Test {
+
+ def main(args: Array[String]) {
+ val foos = (1 to 1000) toSeq;
+ try {
+ foos.par.map(i => if (i % 37 == 0) sys.error("i div 37") else i)
+ } catch {
+ case CompositeThrowable(thr) => println("Composite throwable")
+ }
+ }
+
+}