From e7b4b047a629cc2a5b6fe6eff42a20290ae33414 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Mon, 7 Mar 2011 18:41:53 -0800 Subject: Added pluggable serializers and Kryo serialization --- core/lib/asm-3.2/.DS_Store | Bin 6148 -> 0 bytes core/lib/asm-3.2/lib/all/README.txt | 3 - core/lib/asm-3.2/lib/all/asm-all-3.2.jar | Bin 207939 -> 0 bytes core/lib/asm-3.2/lib/all/asm-all-3.2.pom | 15 --- core/lib/asm-3.2/lib/all/asm-debug-all-3.2.jar | Bin 305420 -> 0 bytes core/lib/asm-3.2/lib/all/asm-debug-all-3.2.pom | 15 --- core/lib/asm-3.2/lib/asm-3.2.jar | Bin 43401 -> 0 bytes core/lib/asm-3.2/lib/asm-3.2.pom | 14 --- core/lib/asm-3.2/lib/asm-analysis-3.2.jar | Bin 17988 -> 0 bytes core/lib/asm-3.2/lib/asm-analysis-3.2.pom | 21 ---- core/lib/asm-3.2/lib/asm-commons-3.2.jar | Bin 37619 -> 0 bytes core/lib/asm-3.2/lib/asm-commons-3.2.pom | 21 ---- core/lib/asm-3.2/lib/asm-parent-3.2.pom | 136 -------------------- core/lib/asm-3.2/lib/asm-tree-3.2.jar | Bin 21881 -> 0 bytes core/lib/asm-3.2/lib/asm-tree-3.2.pom | 21 ---- core/lib/asm-3.2/lib/asm-util-3.2.jar | Bin 36552 -> 0 bytes core/lib/asm-3.2/lib/asm-util-3.2.pom | 21 ---- core/lib/asm-3.2/lib/asm-xml-3.2.jar | Bin 51856 -> 0 bytes core/lib/asm-3.2/lib/asm-xml-3.2.pom | 21 ---- core/lib/asm-all-3.3.1.jar | Bin 0 -> 207006 bytes core/lib/kryo-1.04-mod/kryo-1.04-mod.jar | Bin 0 -> 86177 bytes core/lib/kryo-1.04-mod/minlog-1.2.jar | Bin 0 -> 2595 bytes core/lib/kryo-1.04-mod/reflectasm-1.01.jar | Bin 0 -> 8135 bytes core/src/main/scala/spark/Executor.scala | 1 + core/src/main/scala/spark/JavaSerializer.scala | 48 +++++++ core/src/main/scala/spark/KryoSerialization.scala | 146 ++++++++++++++++++++++ core/src/main/scala/spark/LocalFileShuffle.scala | 6 +- core/src/main/scala/spark/Serializer.scala | 40 ++++++ core/src/main/scala/spark/SparkContext.scala | 1 + run | 5 +- 30 files changed, 244 insertions(+), 291 deletions(-) delete mode 100644 core/lib/asm-3.2/.DS_Store delete mode 100644 core/lib/asm-3.2/lib/all/README.txt delete mode 100644 core/lib/asm-3.2/lib/all/asm-all-3.2.jar delete mode 100644 core/lib/asm-3.2/lib/all/asm-all-3.2.pom delete mode 100644 core/lib/asm-3.2/lib/all/asm-debug-all-3.2.jar delete mode 100644 core/lib/asm-3.2/lib/all/asm-debug-all-3.2.pom delete mode 100644 core/lib/asm-3.2/lib/asm-3.2.jar delete mode 100644 core/lib/asm-3.2/lib/asm-3.2.pom delete mode 100644 core/lib/asm-3.2/lib/asm-analysis-3.2.jar delete mode 100644 core/lib/asm-3.2/lib/asm-analysis-3.2.pom delete mode 100644 core/lib/asm-3.2/lib/asm-commons-3.2.jar delete mode 100644 core/lib/asm-3.2/lib/asm-commons-3.2.pom delete mode 100644 core/lib/asm-3.2/lib/asm-parent-3.2.pom delete mode 100644 core/lib/asm-3.2/lib/asm-tree-3.2.jar delete mode 100644 core/lib/asm-3.2/lib/asm-tree-3.2.pom delete mode 100644 core/lib/asm-3.2/lib/asm-util-3.2.jar delete mode 100644 core/lib/asm-3.2/lib/asm-util-3.2.pom delete mode 100644 core/lib/asm-3.2/lib/asm-xml-3.2.jar delete mode 100644 core/lib/asm-3.2/lib/asm-xml-3.2.pom create mode 100644 core/lib/asm-all-3.3.1.jar create mode 100644 core/lib/kryo-1.04-mod/kryo-1.04-mod.jar create mode 100644 core/lib/kryo-1.04-mod/minlog-1.2.jar create mode 100644 core/lib/kryo-1.04-mod/reflectasm-1.01.jar create mode 100644 core/src/main/scala/spark/JavaSerializer.scala create mode 100644 core/src/main/scala/spark/KryoSerialization.scala create mode 100644 core/src/main/scala/spark/Serializer.scala diff --git a/core/lib/asm-3.2/.DS_Store b/core/lib/asm-3.2/.DS_Store deleted file mode 100644 index 52b0f12a32..0000000000 Binary files a/core/lib/asm-3.2/.DS_Store and /dev/null differ diff --git a/core/lib/asm-3.2/lib/all/README.txt b/core/lib/asm-3.2/lib/all/README.txt deleted file mode 100644 index d7c96a5edb..0000000000 --- a/core/lib/asm-3.2/lib/all/README.txt +++ /dev/null @@ -1,3 +0,0 @@ -It is highly recommended to use only the necessary ASM jars for your -application instead of using the asm-all jar, unless you really need -all ASM packages. \ No newline at end of file diff --git a/core/lib/asm-3.2/lib/all/asm-all-3.2.jar b/core/lib/asm-3.2/lib/all/asm-all-3.2.jar deleted file mode 100644 index d0ad60ed0a..0000000000 Binary files a/core/lib/asm-3.2/lib/all/asm-all-3.2.jar and /dev/null differ diff --git a/core/lib/asm-3.2/lib/all/asm-all-3.2.pom b/core/lib/asm-3.2/lib/all/asm-all-3.2.pom deleted file mode 100644 index 9899a54c3b..0000000000 --- a/core/lib/asm-3.2/lib/all/asm-all-3.2.pom +++ /dev/null @@ -1,15 +0,0 @@ - - 4.0.0 - - - asm - asm-parent - 3.2 - - - ASM All - asm - asm-all - jar - - diff --git a/core/lib/asm-3.2/lib/all/asm-debug-all-3.2.jar b/core/lib/asm-3.2/lib/all/asm-debug-all-3.2.jar deleted file mode 100644 index 94b8549142..0000000000 Binary files a/core/lib/asm-3.2/lib/all/asm-debug-all-3.2.jar and /dev/null differ diff --git a/core/lib/asm-3.2/lib/all/asm-debug-all-3.2.pom b/core/lib/asm-3.2/lib/all/asm-debug-all-3.2.pom deleted file mode 100644 index 9899a54c3b..0000000000 --- a/core/lib/asm-3.2/lib/all/asm-debug-all-3.2.pom +++ /dev/null @@ -1,15 +0,0 @@ - - 4.0.0 - - - asm - asm-parent - 3.2 - - - ASM All - asm - asm-all - jar - - diff --git a/core/lib/asm-3.2/lib/asm-3.2.jar b/core/lib/asm-3.2/lib/asm-3.2.jar deleted file mode 100644 index 334e7fdc7f..0000000000 Binary files a/core/lib/asm-3.2/lib/asm-3.2.jar and /dev/null differ diff --git a/core/lib/asm-3.2/lib/asm-3.2.pom b/core/lib/asm-3.2/lib/asm-3.2.pom deleted file mode 100644 index c714db09b2..0000000000 --- a/core/lib/asm-3.2/lib/asm-3.2.pom +++ /dev/null @@ -1,14 +0,0 @@ - - 4.0.0 - - - asm-parent - asm - 3.2 - - - ASM Core - asm - jar - - diff --git a/core/lib/asm-3.2/lib/asm-analysis-3.2.jar b/core/lib/asm-3.2/lib/asm-analysis-3.2.jar deleted file mode 100644 index 40ee3151cb..0000000000 Binary files a/core/lib/asm-3.2/lib/asm-analysis-3.2.jar and /dev/null differ diff --git a/core/lib/asm-3.2/lib/asm-analysis-3.2.pom b/core/lib/asm-3.2/lib/asm-analysis-3.2.pom deleted file mode 100644 index b3933387af..0000000000 --- a/core/lib/asm-3.2/lib/asm-analysis-3.2.pom +++ /dev/null @@ -1,21 +0,0 @@ - - 4.0.0 - - - asm-parent - asm - 3.2 - - - ASM Analysis - asm-analysis - jar - - - - asm-tree - asm - - - - diff --git a/core/lib/asm-3.2/lib/asm-commons-3.2.jar b/core/lib/asm-3.2/lib/asm-commons-3.2.jar deleted file mode 100644 index 8dfed0a9b7..0000000000 Binary files a/core/lib/asm-3.2/lib/asm-commons-3.2.jar and /dev/null differ diff --git a/core/lib/asm-3.2/lib/asm-commons-3.2.pom b/core/lib/asm-3.2/lib/asm-commons-3.2.pom deleted file mode 100644 index 8517715b4a..0000000000 --- a/core/lib/asm-3.2/lib/asm-commons-3.2.pom +++ /dev/null @@ -1,21 +0,0 @@ - - 4.0.0 - - - asm-parent - asm - 3.2 - - - ASM Commons - asm-commons - jar - - - - asm-tree - asm - - - - diff --git a/core/lib/asm-3.2/lib/asm-parent-3.2.pom b/core/lib/asm-3.2/lib/asm-parent-3.2.pom deleted file mode 100644 index c220347f6a..0000000000 --- a/core/lib/asm-3.2/lib/asm-parent-3.2.pom +++ /dev/null @@ -1,136 +0,0 @@ - - 4.0.0 - - asm-parent - asm - 3.2 - pom - - ASM - A very small and fast Java bytecode manipulation framework - http://asm.objectweb.org/ - - - ObjectWeb - http://www.objectweb.org/ - - 2000 - - - - BSD - http://asm.objectweb.org/license.html - - - - - - Eric Bruneton - ebruneton - Eric.Bruneton@rd.francetelecom.com - - Creator - Java Developer - - - - Eugene Kuleshov - eu - eu@javatx.org - - Java Developer - - - - - - scm:cvs:pserver:anonymous:@cvs.forge.objectweb.org:/cvsroot/asm:asm - scm:cvs:ext:${maven.username}@cvs.forge.objectweb.org:/cvsroot/asm:asm - http://cvs.forge.objectweb.org/cgi-bin/viewcvs.cgi/asm/asm/ - - - - http://forge.objectweb.org/tracker/?group_id=23 - - - - - - - asm - ${project.groupId} - ${project.version} - - - - asm-tree - ${project.groupId} - ${project.version} - - - - asm-analysis - ${project.groupId} - ${project.version} - - - - asm-commons - ${project.groupId} - ${project.version} - - - - asm-util - ${project.groupId} - ${project.version} - - - - asm-xml - ${project.groupId} - ${project.version} - - - - - - - - ASM Users List - sympa@ow2.org?subject=subscribe%20asm - sympa@ow2.org?subject=unsubscribe%20asm - asm@ow2.org - http://www.ow2.org/wws/arc/asm - - - ASM Team List - sympa@ow2.org?subject=subscribe%20asm-team - sympa@ow2.org?subject=unsubscribe%20asm-team - asm-team@ow2.org - http://www.ow2.org/wws/arc/asm-team - - - - - http://mojo.codehaus.org/my-project - - objectweb - false - ObjectWeb Maven 2.0 Repository - dav:https://maven.forge.objectweb.org:8002/maven2/ - default - - - objectweb.snapshots - false - ObjectWeb Maven 2.0 Snapshot Repository - dav:https://maven.forge.objectweb.org:8002/maven2-snapshot/ - default - - - - diff --git a/core/lib/asm-3.2/lib/asm-tree-3.2.jar b/core/lib/asm-3.2/lib/asm-tree-3.2.jar deleted file mode 100644 index b21fb86a92..0000000000 Binary files a/core/lib/asm-3.2/lib/asm-tree-3.2.jar and /dev/null differ diff --git a/core/lib/asm-3.2/lib/asm-tree-3.2.pom b/core/lib/asm-3.2/lib/asm-tree-3.2.pom deleted file mode 100644 index 9f454528f4..0000000000 --- a/core/lib/asm-3.2/lib/asm-tree-3.2.pom +++ /dev/null @@ -1,21 +0,0 @@ - - 4.0.0 - - - asm-parent - asm - 3.2 - - - ASM Tree - asm-tree - jar - - - - asm - asm - - - - diff --git a/core/lib/asm-3.2/lib/asm-util-3.2.jar b/core/lib/asm-3.2/lib/asm-util-3.2.jar deleted file mode 100644 index 499d229034..0000000000 Binary files a/core/lib/asm-3.2/lib/asm-util-3.2.jar and /dev/null differ diff --git a/core/lib/asm-3.2/lib/asm-util-3.2.pom b/core/lib/asm-3.2/lib/asm-util-3.2.pom deleted file mode 100644 index e302b0f356..0000000000 --- a/core/lib/asm-3.2/lib/asm-util-3.2.pom +++ /dev/null @@ -1,21 +0,0 @@ - - 4.0.0 - - - asm-parent - asm - 3.2 - - - ASM Util - asm-util - jar - - - - asm-tree - asm - - - - diff --git a/core/lib/asm-3.2/lib/asm-xml-3.2.jar b/core/lib/asm-3.2/lib/asm-xml-3.2.jar deleted file mode 100644 index 31b31b56fe..0000000000 Binary files a/core/lib/asm-3.2/lib/asm-xml-3.2.jar and /dev/null differ diff --git a/core/lib/asm-3.2/lib/asm-xml-3.2.pom b/core/lib/asm-3.2/lib/asm-xml-3.2.pom deleted file mode 100644 index 0f3de1f2ab..0000000000 --- a/core/lib/asm-3.2/lib/asm-xml-3.2.pom +++ /dev/null @@ -1,21 +0,0 @@ - - 4.0.0 - - - asm-parent - asm - 3.2 - - - ASM XML - asm-xml - jar - - - - asm-util - asm - - - - diff --git a/core/lib/asm-all-3.3.1.jar b/core/lib/asm-all-3.3.1.jar new file mode 100644 index 0000000000..df03b32661 Binary files /dev/null and b/core/lib/asm-all-3.3.1.jar differ diff --git a/core/lib/kryo-1.04-mod/kryo-1.04-mod.jar b/core/lib/kryo-1.04-mod/kryo-1.04-mod.jar new file mode 100644 index 0000000000..7c4a8d3af8 Binary files /dev/null and b/core/lib/kryo-1.04-mod/kryo-1.04-mod.jar differ diff --git a/core/lib/kryo-1.04-mod/minlog-1.2.jar b/core/lib/kryo-1.04-mod/minlog-1.2.jar new file mode 100644 index 0000000000..2fcada1b7e Binary files /dev/null and b/core/lib/kryo-1.04-mod/minlog-1.2.jar differ diff --git a/core/lib/kryo-1.04-mod/reflectasm-1.01.jar b/core/lib/kryo-1.04-mod/reflectasm-1.01.jar new file mode 100644 index 0000000000..09179ca473 Binary files /dev/null and b/core/lib/kryo-1.04-mod/reflectasm-1.01.jar differ diff --git a/core/src/main/scala/spark/Executor.scala b/core/src/main/scala/spark/Executor.scala index b4d023b428..3d994001f1 100644 --- a/core/src/main/scala/spark/Executor.scala +++ b/core/src/main/scala/spark/Executor.scala @@ -24,6 +24,7 @@ class Executor extends mesos.Executor with Logging { // Initialize cache and broadcast system (uses some properties read above) Cache.initialize() + Serializer.initialize() Broadcast.initialize(false) // Create our ClassLoader (using spark properties) and set it on this thread diff --git a/core/src/main/scala/spark/JavaSerializer.scala b/core/src/main/scala/spark/JavaSerializer.scala new file mode 100644 index 0000000000..8ee3044058 --- /dev/null +++ b/core/src/main/scala/spark/JavaSerializer.scala @@ -0,0 +1,48 @@ +package spark + +import java.io._ + +class JavaSerializationStream(out: OutputStream) extends SerializationStream { + val objOut = new ObjectOutputStream(out) + def writeObject[T](t: T) { objOut.writeObject(t) } + def flush() { objOut.flush() } + def close() { objOut.close() } +} + +class JavaDeserializationStream(in: InputStream) extends DeserializationStream { + val objIn = new ObjectInputStream(in) { + override def resolveClass(desc: ObjectStreamClass) = + Class.forName(desc.getName, false, currentThread.getContextClassLoader) + } + + def readObject[T](): T = objIn.readObject().asInstanceOf[T] + def close() { objIn.close() } +} + +class JavaSerializer extends Serializer { + def serialize[T](t: T): Array[Byte] = { + val bos = new ByteArrayOutputStream() + val out = outputStream(bos) + out.writeObject(t) + out.close() + bos.toByteArray + } + + def deserialize[T](bytes: Array[Byte]): T = { + val bis = new ByteArrayInputStream(bytes) + val in = inputStream(bis) + in.readObject().asInstanceOf[T] + } + + def outputStream(s: OutputStream): SerializationStream = { + new JavaSerializationStream(s) + } + + def inputStream(s: InputStream): DeserializationStream = { + new JavaDeserializationStream(s) + } +} + +class JavaSerialization extends SerializationStrategy { + def newSerializer(): Serializer = new JavaSerializer +} diff --git a/core/src/main/scala/spark/KryoSerialization.scala b/core/src/main/scala/spark/KryoSerialization.scala new file mode 100644 index 0000000000..cbbeade0df --- /dev/null +++ b/core/src/main/scala/spark/KryoSerialization.scala @@ -0,0 +1,146 @@ +package spark + +import java.io._ +import java.nio.ByteBuffer +import java.nio.channels.Channels + +import scala.collection.immutable +import scala.collection.mutable + +import com.esotericsoftware.kryo._ + +object ZigZag { + def writeInt(n: Int, out: OutputStream) { + var value = n + if ((value & ~0x7F) == 0) { + out.write(value) + return + } + out.write(((value & 0x7F) | 0x80)) + value >>>= 7 + if ((value & ~0x7F) == 0) { + out.write(value) + return + } + out.write(((value & 0x7F) | 0x80)) + value >>>= 7 + if ((value & ~0x7F) == 0) { + out.write(value) + return + } + out.write(((value & 0x7F) | 0x80)) + value >>>= 7 + if ((value & ~0x7F) == 0) { + out.write(value) + return + } + out.write(((value & 0x7F) | 0x80)) + value >>>= 7 + out.write(value) + } + + def readInt(in: InputStream): Int = { + var offset = 0 + var result = 0 + while (offset < 32) { + val b = in.read() + if (b == -1) { + throw new EOFException("End of stream") + } + result |= ((b & 0x7F) << offset) + if ((b & 0x80) == 0) { + return result + } + offset += 7 + } + throw new SparkException("Malformed zigzag-encoded integer") + } +} + +class KryoSerializationStream(kryo: Kryo, out: OutputStream) +extends SerializationStream { + val buf = ByteBuffer.allocateDirect(1024*1024) + + def writeObject[T](t: T) { + kryo.writeClassAndObject(buf, t) + ZigZag.writeInt(buf.position(), out) + buf.flip() + Channels.newChannel(out).write(buf) + buf.clear() + } + + def flush() { out.flush() } + def close() { out.close() } +} + +class KryoDeserializationStream(kryo: Kryo, in: InputStream) +extends DeserializationStream { + val buf = new ObjectBuffer(kryo, 1024*1024) + + def readObject[T](): T = { + val len = ZigZag.readInt(in) + buf.readClassAndObject(in, len).asInstanceOf[T] + } + + def close() { in.close() } +} + +class KryoSerializer(kryo: Kryo) extends Serializer { + val buf = new ObjectBuffer(kryo, 1024*1024) + + def serialize[T](t: T): Array[Byte] = { + buf.writeClassAndObject(t) + } + + def deserialize[T](bytes: Array[Byte]): T = { + buf.readClassAndObject(bytes).asInstanceOf[T] + } + + def outputStream(s: OutputStream): SerializationStream = { + new KryoSerializationStream(kryo, s) + } + + def inputStream(s: InputStream): DeserializationStream = { + new KryoDeserializationStream(kryo, s) + } +} + +// Used by clients to register their own classes +trait KryoRegistrator { + def registerClasses(kryo: Kryo): Unit +} + +class KryoSerialization extends SerializationStrategy with Logging { + val kryo = createKryo() + + def createKryo(): Kryo = { + val kryo = new Kryo() + val toRegister: Seq[AnyRef] = Seq( + // Arrays + Array(1), Array(1.0), Array(1.0f), Array(1L), Array(""), + // Specialized Tuple2s + ("", ""), (1, 1), (1.0, 1.0), (1L, 1L), + (1, 1.0), (1.0, 1), (1L, 1.0), (1.0, 1L), (1, 1L), (1L, 1), + // Scala collections + Nil, List(1), immutable.Map(1 -> 1), immutable.HashMap(1 -> 1), + mutable.Map(1 -> 1), mutable.HashMap(1 -> 1), mutable.ArrayBuffer(1), + // Options and Either + Some(1), None, Left(1), Right(1), + // Higher-dimensional tuples + (1, 1, 1), (1, 1, 1, 1), (1, 1, 1, 1, 1) + ) + for (obj <- toRegister) { + logInfo("Registering class " + obj.getClass.getName) + kryo.register(obj.getClass) + } + val regCls = System.getProperty("spark.kryo.registrator") + if (regCls != null) { + logInfo("Running user registrator: " + regCls) + val reg = Class.forName(regCls).newInstance().asInstanceOf[KryoRegistrator] + reg.registerClasses(kryo) + } + kryo + } + + def newSerializer(): Serializer = new KryoSerializer(kryo) +} diff --git a/core/src/main/scala/spark/LocalFileShuffle.scala b/core/src/main/scala/spark/LocalFileShuffle.scala index 367599cfb4..b797e03037 100644 --- a/core/src/main/scala/spark/LocalFileShuffle.scala +++ b/core/src/main/scala/spark/LocalFileShuffle.scala @@ -46,9 +46,10 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { case None => createCombiner(v) } } + val ser = Serializer.newInstance() for (i <- 0 until numOutputSplits) { val file = LocalFileShuffle.getOutputFile(shuffleId, myIndex, i) - val out = new ObjectOutputStream(new FileOutputStream(file)) + val out = ser.outputStream(new FileOutputStream(file)) buckets(i).foreach(pair => out.writeObject(pair)) out.close() } @@ -68,10 +69,11 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { val indexes = sc.parallelize(0 until numOutputSplits, numOutputSplits) return indexes.flatMap((myId: Int) => { val combiners = new HashMap[K, C] + val ser = Serializer.newInstance() for ((serverUri, inputIds) <- Utils.shuffle(splitsByUri)) { for (i <- inputIds) { val url = "%s/shuffle/%d/%d/%d".format(serverUri, shuffleId, i, myId) - val inputStream = new ObjectInputStream(new URL(url).openStream()) + val inputStream = ser.inputStream(new URL(url).openStream()) try { while (true) { val (k, c) = inputStream.readObject().asInstanceOf[(K, C)] diff --git a/core/src/main/scala/spark/Serializer.scala b/core/src/main/scala/spark/Serializer.scala new file mode 100644 index 0000000000..a182f6bddc --- /dev/null +++ b/core/src/main/scala/spark/Serializer.scala @@ -0,0 +1,40 @@ +package spark + +import java.io.{InputStream, OutputStream} + +trait SerializationStream { + def writeObject[T](t: T): Unit + def flush(): Unit + def close(): Unit +} + +trait DeserializationStream { + def readObject[T](): T + def close(): Unit +} + +trait Serializer { + def serialize[T](t: T): Array[Byte] + def deserialize[T](bytes: Array[Byte]): T + def outputStream(s: OutputStream): SerializationStream + def inputStream(s: InputStream): DeserializationStream +} + +trait SerializationStrategy { + def newSerializer(): Serializer +} + +object Serializer { + var strat: SerializationStrategy = null + + def initialize() { + val cls = System.getProperty("spark.serialization", + "spark.JavaSerialization") + strat = Class.forName(cls).newInstance().asInstanceOf[SerializationStrategy] + } + + // Return a serializer ** for use by a single thread ** + def newInstance(): Serializer = { + strat.newSerializer() + } +} diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index bf70b5fcb1..086ee2eddd 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -33,6 +33,7 @@ extends Logging { // Start the scheduler, the cache and the broadcast system scheduler.start() Cache.initialize() + Serializer.initialize() Broadcast.initialize(true) // Methods for creating RDDs diff --git a/run b/run index 9fb815987d..10d2845c97 100755 --- a/run +++ b/run @@ -40,7 +40,7 @@ EXAMPLES_DIR=$FWDIR/examples CLASSPATH="$SPARK_CLASSPATH:$CORE_DIR/target/scala_2.8.1/classes:$MESOS_CLASSPATH" CLASSPATH+=:$FWDIR/conf CLASSPATH+=:$CORE_DIR/lib/mesos.jar -CLASSPATH+=:$CORE_DIR/lib/asm-3.2/lib/all/asm-all-3.2.jar +CLASSPATH+=:$CORE_DIR/lib/asm-all-3.3.1.jar CLASSPATH+=:$CORE_DIR/lib/colt.jar CLASSPATH+=:$CORE_DIR/lib/guava-r07/guava-r07.jar CLASSPATH+=:$CORE_DIR/lib/hadoop-0.20.2/hadoop-0.20.2-core.jar @@ -48,6 +48,9 @@ CLASSPATH+=:$CORE_DIR/lib/scalatest-1.2/scalatest-1.2.jar CLASSPATH+=:$CORE_DIR/lib/scalacheck_2.8.0-1.7.jar CLASSPATH+=:$CORE_DIR/lib/jetty-7.1.6.v20100715/jetty-server-7.1.6.v20100715.jar CLASSPATH+=:$CORE_DIR/lib/jetty-7.1.6.v20100715/servlet-api-2.5.jar +CLASSPATH+=:$CORE_DIR/lib/kryo-1.04-mod/kryo-1.04-mod.jar +CLASSPATH+=:$CORE_DIR/lib/kryo-1.04-mod/minlog-1.2.jar +CLASSPATH+=:$CORE_DIR/lib/kryo-1.04-mod/reflectasm-1.01.jar CLASSPATH+=:$CORE_DIR/lib/apache-log4j-1.2.16/log4j-1.2.16.jar CLASSPATH+=:$CORE_DIR/lib/slf4j-1.6.1/slf4j-api-1.6.1.jar CLASSPATH+=:$CORE_DIR/lib/slf4j-1.6.1/slf4j-log4j12-1.6.1.jar -- cgit v1.2.3