aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2011-03-07 18:41:53 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2011-03-07 18:41:53 -0800
commite7b4b047a629cc2a5b6fe6eff42a20290ae33414 (patch)
tree48e53abeb2c5068749bd78848edb433f4e1921ab
parent370b95816f0adbb1e47508b38240eef8f36367bb (diff)
downloadspark-e7b4b047a629cc2a5b6fe6eff42a20290ae33414.tar.gz
spark-e7b4b047a629cc2a5b6fe6eff42a20290ae33414.tar.bz2
spark-e7b4b047a629cc2a5b6fe6eff42a20290ae33414.zip
Added pluggable serializers and Kryo serialization
-rw-r--r--core/lib/asm-3.2/.DS_Storebin6148 -> 0 bytes
-rw-r--r--core/lib/asm-3.2/lib/all/README.txt3
-rw-r--r--core/lib/asm-3.2/lib/all/asm-all-3.2.jarbin207939 -> 0 bytes
-rw-r--r--core/lib/asm-3.2/lib/all/asm-all-3.2.pom15
-rw-r--r--core/lib/asm-3.2/lib/all/asm-debug-all-3.2.jarbin305420 -> 0 bytes
-rw-r--r--core/lib/asm-3.2/lib/all/asm-debug-all-3.2.pom15
-rw-r--r--core/lib/asm-3.2/lib/asm-3.2.jarbin43401 -> 0 bytes
-rw-r--r--core/lib/asm-3.2/lib/asm-3.2.pom14
-rw-r--r--core/lib/asm-3.2/lib/asm-analysis-3.2.jarbin17988 -> 0 bytes
-rw-r--r--core/lib/asm-3.2/lib/asm-analysis-3.2.pom21
-rw-r--r--core/lib/asm-3.2/lib/asm-commons-3.2.jarbin37619 -> 0 bytes
-rw-r--r--core/lib/asm-3.2/lib/asm-commons-3.2.pom21
-rw-r--r--core/lib/asm-3.2/lib/asm-parent-3.2.pom136
-rw-r--r--core/lib/asm-3.2/lib/asm-tree-3.2.jarbin21881 -> 0 bytes
-rw-r--r--core/lib/asm-3.2/lib/asm-tree-3.2.pom21
-rw-r--r--core/lib/asm-3.2/lib/asm-util-3.2.jarbin36552 -> 0 bytes
-rw-r--r--core/lib/asm-3.2/lib/asm-util-3.2.pom21
-rw-r--r--core/lib/asm-3.2/lib/asm-xml-3.2.jarbin51856 -> 0 bytes
-rw-r--r--core/lib/asm-3.2/lib/asm-xml-3.2.pom21
-rw-r--r--core/lib/asm-all-3.3.1.jarbin0 -> 207006 bytes
-rw-r--r--core/lib/kryo-1.04-mod/kryo-1.04-mod.jarbin0 -> 86177 bytes
-rw-r--r--core/lib/kryo-1.04-mod/minlog-1.2.jarbin0 -> 2595 bytes
-rw-r--r--core/lib/kryo-1.04-mod/reflectasm-1.01.jarbin0 -> 8135 bytes
-rw-r--r--core/src/main/scala/spark/Executor.scala1
-rw-r--r--core/src/main/scala/spark/JavaSerializer.scala48
-rw-r--r--core/src/main/scala/spark/KryoSerialization.scala146
-rw-r--r--core/src/main/scala/spark/LocalFileShuffle.scala6
-rw-r--r--core/src/main/scala/spark/Serializer.scala40
-rw-r--r--core/src/main/scala/spark/SparkContext.scala1
-rwxr-xr-xrun5
30 files changed, 244 insertions, 291 deletions
diff --git a/core/lib/asm-3.2/.DS_Store b/core/lib/asm-3.2/.DS_Store
deleted file mode 100644
index 52b0f12a32..0000000000
--- a/core/lib/asm-3.2/.DS_Store
+++ /dev/null
Binary files differ
diff --git a/core/lib/asm-3.2/lib/all/README.txt b/core/lib/asm-3.2/lib/all/README.txt
deleted file mode 100644
index d7c96a5edb..0000000000
--- a/core/lib/asm-3.2/lib/all/README.txt
+++ /dev/null
@@ -1,3 +0,0 @@
-It is highly recommended to use only the necessary ASM jars for your
-application instead of using the asm-all jar, unless you really need
-all ASM packages. \ No newline at end of file
diff --git a/core/lib/asm-3.2/lib/all/asm-all-3.2.jar b/core/lib/asm-3.2/lib/all/asm-all-3.2.jar
deleted file mode 100644
index d0ad60ed0a..0000000000
--- a/core/lib/asm-3.2/lib/all/asm-all-3.2.jar
+++ /dev/null
Binary files differ
diff --git a/core/lib/asm-3.2/lib/all/asm-all-3.2.pom b/core/lib/asm-3.2/lib/all/asm-all-3.2.pom
deleted file mode 100644
index 9899a54c3b..0000000000
--- a/core/lib/asm-3.2/lib/all/asm-all-3.2.pom
+++ /dev/null
@@ -1,15 +0,0 @@
-<project>
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>asm</groupId>
- <artifactId>asm-parent</artifactId>
- <version>3.2</version>
- </parent>
-
- <name>ASM All</name>
- <groupId>asm</groupId>
- <artifactId>asm-all</artifactId>
- <packaging>jar</packaging>
-
-</project>
diff --git a/core/lib/asm-3.2/lib/all/asm-debug-all-3.2.jar b/core/lib/asm-3.2/lib/all/asm-debug-all-3.2.jar
deleted file mode 100644
index 94b8549142..0000000000
--- a/core/lib/asm-3.2/lib/all/asm-debug-all-3.2.jar
+++ /dev/null
Binary files differ
diff --git a/core/lib/asm-3.2/lib/all/asm-debug-all-3.2.pom b/core/lib/asm-3.2/lib/all/asm-debug-all-3.2.pom
deleted file mode 100644
index 9899a54c3b..0000000000
--- a/core/lib/asm-3.2/lib/all/asm-debug-all-3.2.pom
+++ /dev/null
@@ -1,15 +0,0 @@
-<project>
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>asm</groupId>
- <artifactId>asm-parent</artifactId>
- <version>3.2</version>
- </parent>
-
- <name>ASM All</name>
- <groupId>asm</groupId>
- <artifactId>asm-all</artifactId>
- <packaging>jar</packaging>
-
-</project>
diff --git a/core/lib/asm-3.2/lib/asm-3.2.jar b/core/lib/asm-3.2/lib/asm-3.2.jar
deleted file mode 100644
index 334e7fdc7f..0000000000
--- a/core/lib/asm-3.2/lib/asm-3.2.jar
+++ /dev/null
Binary files differ
diff --git a/core/lib/asm-3.2/lib/asm-3.2.pom b/core/lib/asm-3.2/lib/asm-3.2.pom
deleted file mode 100644
index c714db09b2..0000000000
--- a/core/lib/asm-3.2/lib/asm-3.2.pom
+++ /dev/null
@@ -1,14 +0,0 @@
-<project>
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <artifactId>asm-parent</artifactId>
- <groupId>asm</groupId>
- <version>3.2</version>
- </parent>
-
- <name>ASM Core</name>
- <artifactId>asm</artifactId>
- <packaging>jar</packaging>
-
-</project>
diff --git a/core/lib/asm-3.2/lib/asm-analysis-3.2.jar b/core/lib/asm-3.2/lib/asm-analysis-3.2.jar
deleted file mode 100644
index 40ee3151cb..0000000000
--- a/core/lib/asm-3.2/lib/asm-analysis-3.2.jar
+++ /dev/null
Binary files differ
diff --git a/core/lib/asm-3.2/lib/asm-analysis-3.2.pom b/core/lib/asm-3.2/lib/asm-analysis-3.2.pom
deleted file mode 100644
index b3933387af..0000000000
--- a/core/lib/asm-3.2/lib/asm-analysis-3.2.pom
+++ /dev/null
@@ -1,21 +0,0 @@
-<project>
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <artifactId>asm-parent</artifactId>
- <groupId>asm</groupId>
- <version>3.2</version>
- </parent>
-
- <name>ASM Analysis</name>
- <artifactId>asm-analysis</artifactId>
- <packaging>jar</packaging>
-
- <dependencies>
- <dependency>
- <artifactId>asm-tree</artifactId>
- <groupId>asm</groupId>
- </dependency>
- </dependencies>
-
-</project>
diff --git a/core/lib/asm-3.2/lib/asm-commons-3.2.jar b/core/lib/asm-3.2/lib/asm-commons-3.2.jar
deleted file mode 100644
index 8dfed0a9b7..0000000000
--- a/core/lib/asm-3.2/lib/asm-commons-3.2.jar
+++ /dev/null
Binary files differ
diff --git a/core/lib/asm-3.2/lib/asm-commons-3.2.pom b/core/lib/asm-3.2/lib/asm-commons-3.2.pom
deleted file mode 100644
index 8517715b4a..0000000000
--- a/core/lib/asm-3.2/lib/asm-commons-3.2.pom
+++ /dev/null
@@ -1,21 +0,0 @@
-<project>
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <artifactId>asm-parent</artifactId>
- <groupId>asm</groupId>
- <version>3.2</version>
- </parent>
-
- <name>ASM Commons</name>
- <artifactId>asm-commons</artifactId>
- <packaging>jar</packaging>
-
- <dependencies>
- <dependency>
- <artifactId>asm-tree</artifactId>
- <groupId>asm</groupId>
- </dependency>
- </dependencies>
-
-</project>
diff --git a/core/lib/asm-3.2/lib/asm-parent-3.2.pom b/core/lib/asm-3.2/lib/asm-parent-3.2.pom
deleted file mode 100644
index c220347f6a..0000000000
--- a/core/lib/asm-3.2/lib/asm-parent-3.2.pom
+++ /dev/null
@@ -1,136 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
- http://maven.apache.org/maven-v4_0_0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>asm-parent</artifactId>
- <groupId>asm</groupId>
- <version>3.2</version>
- <packaging>pom</packaging>
-
- <name>ASM</name>
- <description>A very small and fast Java bytecode manipulation framework</description>
- <url>http://asm.objectweb.org/</url>
-
- <organization>
- <name>ObjectWeb</name>
- <url>http://www.objectweb.org/</url>
- </organization>
- <inceptionYear>2000</inceptionYear>
-
- <licenses>
- <license>
- <name>BSD</name>
- <url>http://asm.objectweb.org/license.html</url>
- </license>
- </licenses>
-
- <developers>
- <developer>
- <name>Eric Bruneton</name>
- <id>ebruneton</id>
- <email>Eric.Bruneton@rd.francetelecom.com</email>
- <roles>
- <role>Creator</role>
- <role>Java Developer</role>
- </roles>
- </developer>
- <developer>
- <name>Eugene Kuleshov</name>
- <id>eu</id>
- <email>eu@javatx.org</email>
- <roles>
- <role>Java Developer</role>
- </roles>
- </developer>
- </developers>
-
- <scm>
- <connection>scm:cvs:pserver:anonymous:@cvs.forge.objectweb.org:/cvsroot/asm:asm</connection>
- <developerConnection>scm:cvs:ext:${maven.username}@cvs.forge.objectweb.org:/cvsroot/asm:asm</developerConnection>
- <url>http://cvs.forge.objectweb.org/cgi-bin/viewcvs.cgi/asm/asm/</url>
- </scm>
-
- <issueManagement>
- <url>http://forge.objectweb.org/tracker/?group_id=23</url>
- </issueManagement>
-
- <dependencyManagement>
- <dependencies>
-
- <dependency>
- <artifactId>asm</artifactId>
- <groupId>${project.groupId}</groupId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <artifactId>asm-tree</artifactId>
- <groupId>${project.groupId}</groupId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <artifactId>asm-analysis</artifactId>
- <groupId>${project.groupId}</groupId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <artifactId>asm-commons</artifactId>
- <groupId>${project.groupId}</groupId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <artifactId>asm-util</artifactId>
- <groupId>${project.groupId}</groupId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <artifactId>asm-xml</artifactId>
- <groupId>${project.groupId}</groupId>
- <version>${project.version}</version>
- </dependency>
-
- </dependencies>
- </dependencyManagement>
-
- <mailingLists>
- <mailingList>
- <name>ASM Users List</name>
- <subscribe>sympa@ow2.org?subject=subscribe%20asm</subscribe>
- <unsubscribe>sympa@ow2.org?subject=unsubscribe%20asm</unsubscribe>
- <post>asm@ow2.org</post>
- <archive>http://www.ow2.org/wws/arc/asm</archive>
- </mailingList>
- <mailingList>
- <name>ASM Team List</name>
- <subscribe>sympa@ow2.org?subject=subscribe%20asm-team</subscribe>
- <unsubscribe>sympa@ow2.org?subject=unsubscribe%20asm-team</unsubscribe>
- <post>asm-team@ow2.org</post>
- <archive>http://www.ow2.org/wws/arc/asm-team</archive>
- </mailingList>
- </mailingLists>
-
- <distributionManagement>
- <downloadUrl>http://mojo.codehaus.org/my-project</downloadUrl>
- <repository>
- <id>objectweb</id>
- <uniqueVersion>false</uniqueVersion>
- <name>ObjectWeb Maven 2.0 Repository</name>
- <url>dav:https://maven.forge.objectweb.org:8002/maven2/</url>
- <layout>default</layout>
- </repository>
- <snapshotRepository>
- <id>objectweb.snapshots</id>
- <uniqueVersion>false</uniqueVersion>
- <name>ObjectWeb Maven 2.0 Snapshot Repository</name>
- <url>dav:https://maven.forge.objectweb.org:8002/maven2-snapshot/</url>
- <layout>default</layout>
- </snapshotRepository>
- </distributionManagement>
-
-</project>
diff --git a/core/lib/asm-3.2/lib/asm-tree-3.2.jar b/core/lib/asm-3.2/lib/asm-tree-3.2.jar
deleted file mode 100644
index b21fb86a92..0000000000
--- a/core/lib/asm-3.2/lib/asm-tree-3.2.jar
+++ /dev/null
Binary files differ
diff --git a/core/lib/asm-3.2/lib/asm-tree-3.2.pom b/core/lib/asm-3.2/lib/asm-tree-3.2.pom
deleted file mode 100644
index 9f454528f4..0000000000
--- a/core/lib/asm-3.2/lib/asm-tree-3.2.pom
+++ /dev/null
@@ -1,21 +0,0 @@
-<project>
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <artifactId>asm-parent</artifactId>
- <groupId>asm</groupId>
- <version>3.2</version>
- </parent>
-
- <name>ASM Tree</name>
- <artifactId>asm-tree</artifactId>
- <packaging>jar</packaging>
-
- <dependencies>
- <dependency>
- <artifactId>asm</artifactId>
- <groupId>asm</groupId>
- </dependency>
- </dependencies>
-
-</project>
diff --git a/core/lib/asm-3.2/lib/asm-util-3.2.jar b/core/lib/asm-3.2/lib/asm-util-3.2.jar
deleted file mode 100644
index 499d229034..0000000000
--- a/core/lib/asm-3.2/lib/asm-util-3.2.jar
+++ /dev/null
Binary files differ
diff --git a/core/lib/asm-3.2/lib/asm-util-3.2.pom b/core/lib/asm-3.2/lib/asm-util-3.2.pom
deleted file mode 100644
index e302b0f356..0000000000
--- a/core/lib/asm-3.2/lib/asm-util-3.2.pom
+++ /dev/null
@@ -1,21 +0,0 @@
-<project>
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <artifactId>asm-parent</artifactId>
- <groupId>asm</groupId>
- <version>3.2</version>
- </parent>
-
- <name>ASM Util</name>
- <artifactId>asm-util</artifactId>
- <packaging>jar</packaging>
-
- <dependencies>
- <dependency>
- <artifactId>asm-tree</artifactId>
- <groupId>asm</groupId>
- </dependency>
- </dependencies>
-
-</project>
diff --git a/core/lib/asm-3.2/lib/asm-xml-3.2.jar b/core/lib/asm-3.2/lib/asm-xml-3.2.jar
deleted file mode 100644
index 31b31b56fe..0000000000
--- a/core/lib/asm-3.2/lib/asm-xml-3.2.jar
+++ /dev/null
Binary files differ
diff --git a/core/lib/asm-3.2/lib/asm-xml-3.2.pom b/core/lib/asm-3.2/lib/asm-xml-3.2.pom
deleted file mode 100644
index 0f3de1f2ab..0000000000
--- a/core/lib/asm-3.2/lib/asm-xml-3.2.pom
+++ /dev/null
@@ -1,21 +0,0 @@
-<project>
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <artifactId>asm-parent</artifactId>
- <groupId>asm</groupId>
- <version>3.2</version>
- </parent>
-
- <name>ASM XML</name>
- <artifactId>asm-xml</artifactId>
- <packaging>jar</packaging>
-
- <dependencies>
- <dependency>
- <artifactId>asm-util</artifactId>
- <groupId>asm</groupId>
- </dependency>
- </dependencies>
-
-</project>
diff --git a/core/lib/asm-all-3.3.1.jar b/core/lib/asm-all-3.3.1.jar
new file mode 100644
index 0000000000..df03b32661
--- /dev/null
+++ b/core/lib/asm-all-3.3.1.jar
Binary files differ
diff --git a/core/lib/kryo-1.04-mod/kryo-1.04-mod.jar b/core/lib/kryo-1.04-mod/kryo-1.04-mod.jar
new file mode 100644
index 0000000000..7c4a8d3af8
--- /dev/null
+++ b/core/lib/kryo-1.04-mod/kryo-1.04-mod.jar
Binary files differ
diff --git a/core/lib/kryo-1.04-mod/minlog-1.2.jar b/core/lib/kryo-1.04-mod/minlog-1.2.jar
new file mode 100644
index 0000000000..2fcada1b7e
--- /dev/null
+++ b/core/lib/kryo-1.04-mod/minlog-1.2.jar
Binary files differ
diff --git a/core/lib/kryo-1.04-mod/reflectasm-1.01.jar b/core/lib/kryo-1.04-mod/reflectasm-1.01.jar
new file mode 100644
index 0000000000..09179ca473
--- /dev/null
+++ b/core/lib/kryo-1.04-mod/reflectasm-1.01.jar
Binary files differ
diff --git a/core/src/main/scala/spark/Executor.scala b/core/src/main/scala/spark/Executor.scala
index b4d023b428..3d994001f1 100644
--- a/core/src/main/scala/spark/Executor.scala
+++ b/core/src/main/scala/spark/Executor.scala
@@ -24,6 +24,7 @@ class Executor extends mesos.Executor with Logging {
// Initialize cache and broadcast system (uses some properties read above)
Cache.initialize()
+ Serializer.initialize()
Broadcast.initialize(false)
// Create our ClassLoader (using spark properties) and set it on this thread
diff --git a/core/src/main/scala/spark/JavaSerializer.scala b/core/src/main/scala/spark/JavaSerializer.scala
new file mode 100644
index 0000000000..8ee3044058
--- /dev/null
+++ b/core/src/main/scala/spark/JavaSerializer.scala
@@ -0,0 +1,48 @@
+package spark
+
+import java.io._
+
+class JavaSerializationStream(out: OutputStream) extends SerializationStream {
+ val objOut = new ObjectOutputStream(out)
+ def writeObject[T](t: T) { objOut.writeObject(t) }
+ def flush() { objOut.flush() }
+ def close() { objOut.close() }
+}
+
+class JavaDeserializationStream(in: InputStream) extends DeserializationStream {
+ val objIn = new ObjectInputStream(in) {
+ override def resolveClass(desc: ObjectStreamClass) =
+ Class.forName(desc.getName, false, currentThread.getContextClassLoader)
+ }
+
+ def readObject[T](): T = objIn.readObject().asInstanceOf[T]
+ def close() { objIn.close() }
+}
+
+class JavaSerializer extends Serializer {
+ def serialize[T](t: T): Array[Byte] = {
+ val bos = new ByteArrayOutputStream()
+ val out = outputStream(bos)
+ out.writeObject(t)
+ out.close()
+ bos.toByteArray
+ }
+
+ def deserialize[T](bytes: Array[Byte]): T = {
+ val bis = new ByteArrayInputStream(bytes)
+ val in = inputStream(bis)
+ in.readObject().asInstanceOf[T]
+ }
+
+ def outputStream(s: OutputStream): SerializationStream = {
+ new JavaSerializationStream(s)
+ }
+
+ def inputStream(s: InputStream): DeserializationStream = {
+ new JavaDeserializationStream(s)
+ }
+}
+
+class JavaSerialization extends SerializationStrategy {
+ def newSerializer(): Serializer = new JavaSerializer
+}
diff --git a/core/src/main/scala/spark/KryoSerialization.scala b/core/src/main/scala/spark/KryoSerialization.scala
new file mode 100644
index 0000000000..cbbeade0df
--- /dev/null
+++ b/core/src/main/scala/spark/KryoSerialization.scala
@@ -0,0 +1,146 @@
+package spark
+
+import java.io._
+import java.nio.ByteBuffer
+import java.nio.channels.Channels
+
+import scala.collection.immutable
+import scala.collection.mutable
+
+import com.esotericsoftware.kryo._
+
+object ZigZag {
+ def writeInt(n: Int, out: OutputStream) {
+ var value = n
+ if ((value & ~0x7F) == 0) {
+ out.write(value)
+ return
+ }
+ out.write(((value & 0x7F) | 0x80))
+ value >>>= 7
+ if ((value & ~0x7F) == 0) {
+ out.write(value)
+ return
+ }
+ out.write(((value & 0x7F) | 0x80))
+ value >>>= 7
+ if ((value & ~0x7F) == 0) {
+ out.write(value)
+ return
+ }
+ out.write(((value & 0x7F) | 0x80))
+ value >>>= 7
+ if ((value & ~0x7F) == 0) {
+ out.write(value)
+ return
+ }
+ out.write(((value & 0x7F) | 0x80))
+ value >>>= 7
+ out.write(value)
+ }
+
+ def readInt(in: InputStream): Int = {
+ var offset = 0
+ var result = 0
+ while (offset < 32) {
+ val b = in.read()
+ if (b == -1) {
+ throw new EOFException("End of stream")
+ }
+ result |= ((b & 0x7F) << offset)
+ if ((b & 0x80) == 0) {
+ return result
+ }
+ offset += 7
+ }
+ throw new SparkException("Malformed zigzag-encoded integer")
+ }
+}
+
+class KryoSerializationStream(kryo: Kryo, out: OutputStream)
+extends SerializationStream {
+ val buf = ByteBuffer.allocateDirect(1024*1024)
+
+ def writeObject[T](t: T) {
+ kryo.writeClassAndObject(buf, t)
+ ZigZag.writeInt(buf.position(), out)
+ buf.flip()
+ Channels.newChannel(out).write(buf)
+ buf.clear()
+ }
+
+ def flush() { out.flush() }
+ def close() { out.close() }
+}
+
+class KryoDeserializationStream(kryo: Kryo, in: InputStream)
+extends DeserializationStream {
+ val buf = new ObjectBuffer(kryo, 1024*1024)
+
+ def readObject[T](): T = {
+ val len = ZigZag.readInt(in)
+ buf.readClassAndObject(in, len).asInstanceOf[T]
+ }
+
+ def close() { in.close() }
+}
+
+class KryoSerializer(kryo: Kryo) extends Serializer {
+ val buf = new ObjectBuffer(kryo, 1024*1024)
+
+ def serialize[T](t: T): Array[Byte] = {
+ buf.writeClassAndObject(t)
+ }
+
+ def deserialize[T](bytes: Array[Byte]): T = {
+ buf.readClassAndObject(bytes).asInstanceOf[T]
+ }
+
+ def outputStream(s: OutputStream): SerializationStream = {
+ new KryoSerializationStream(kryo, s)
+ }
+
+ def inputStream(s: InputStream): DeserializationStream = {
+ new KryoDeserializationStream(kryo, s)
+ }
+}
+
+// Used by clients to register their own classes
+trait KryoRegistrator {
+ def registerClasses(kryo: Kryo): Unit
+}
+
+class KryoSerialization extends SerializationStrategy with Logging {
+ val kryo = createKryo()
+
+ def createKryo(): Kryo = {
+ val kryo = new Kryo()
+ val toRegister: Seq[AnyRef] = Seq(
+ // Arrays
+ Array(1), Array(1.0), Array(1.0f), Array(1L), Array(""),
+ // Specialized Tuple2s
+ ("", ""), (1, 1), (1.0, 1.0), (1L, 1L),
+ (1, 1.0), (1.0, 1), (1L, 1.0), (1.0, 1L), (1, 1L), (1L, 1),
+ // Scala collections
+ Nil, List(1), immutable.Map(1 -> 1), immutable.HashMap(1 -> 1),
+ mutable.Map(1 -> 1), mutable.HashMap(1 -> 1), mutable.ArrayBuffer(1),
+ // Options and Either
+ Some(1), None, Left(1), Right(1),
+ // Higher-dimensional tuples
+ (1, 1, 1), (1, 1, 1, 1), (1, 1, 1, 1, 1)
+ )
+ for (obj <- toRegister) {
+ logInfo("Registering class " + obj.getClass.getName)
+ kryo.register(obj.getClass)
+ }
+ val regCls = System.getProperty("spark.kryo.registrator")
+ if (regCls != null) {
+ logInfo("Running user registrator: " + regCls)
+ val reg = Class.forName(regCls).newInstance().asInstanceOf[KryoRegistrator]
+ reg.registerClasses(kryo)
+ }
+ kryo
+ }
+
+ def newSerializer(): Serializer = new KryoSerializer(kryo)
+}
diff --git a/core/src/main/scala/spark/LocalFileShuffle.scala b/core/src/main/scala/spark/LocalFileShuffle.scala
index 367599cfb4..b797e03037 100644
--- a/core/src/main/scala/spark/LocalFileShuffle.scala
+++ b/core/src/main/scala/spark/LocalFileShuffle.scala
@@ -46,9 +46,10 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
case None => createCombiner(v)
}
}
+ val ser = Serializer.newInstance()
for (i <- 0 until numOutputSplits) {
val file = LocalFileShuffle.getOutputFile(shuffleId, myIndex, i)
- val out = new ObjectOutputStream(new FileOutputStream(file))
+ val out = ser.outputStream(new FileOutputStream(file))
buckets(i).foreach(pair => out.writeObject(pair))
out.close()
}
@@ -68,10 +69,11 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
val indexes = sc.parallelize(0 until numOutputSplits, numOutputSplits)
return indexes.flatMap((myId: Int) => {
val combiners = new HashMap[K, C]
+ val ser = Serializer.newInstance()
for ((serverUri, inputIds) <- Utils.shuffle(splitsByUri)) {
for (i <- inputIds) {
val url = "%s/shuffle/%d/%d/%d".format(serverUri, shuffleId, i, myId)
- val inputStream = new ObjectInputStream(new URL(url).openStream())
+ val inputStream = ser.inputStream(new URL(url).openStream())
try {
while (true) {
val (k, c) = inputStream.readObject().asInstanceOf[(K, C)]
diff --git a/core/src/main/scala/spark/Serializer.scala b/core/src/main/scala/spark/Serializer.scala
new file mode 100644
index 0000000000..a182f6bddc
--- /dev/null
+++ b/core/src/main/scala/spark/Serializer.scala
@@ -0,0 +1,40 @@
+package spark
+
+import java.io.{InputStream, OutputStream}
+
+trait SerializationStream {
+ def writeObject[T](t: T): Unit
+ def flush(): Unit
+ def close(): Unit
+}
+
+trait DeserializationStream {
+ def readObject[T](): T
+ def close(): Unit
+}
+
+trait Serializer {
+ def serialize[T](t: T): Array[Byte]
+ def deserialize[T](bytes: Array[Byte]): T
+ def outputStream(s: OutputStream): SerializationStream
+ def inputStream(s: InputStream): DeserializationStream
+}
+
+trait SerializationStrategy {
+ def newSerializer(): Serializer
+}
+
+object Serializer {
+ var strat: SerializationStrategy = null
+
+ def initialize() {
+ val cls = System.getProperty("spark.serialization",
+ "spark.JavaSerialization")
+ strat = Class.forName(cls).newInstance().asInstanceOf[SerializationStrategy]
+ }
+
+ // Return a serializer ** for use by a single thread **
+ def newInstance(): Serializer = {
+ strat.newSerializer()
+ }
+}
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index bf70b5fcb1..086ee2eddd 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -33,6 +33,7 @@ extends Logging {
// Start the scheduler, the cache and the broadcast system
scheduler.start()
Cache.initialize()
+ Serializer.initialize()
Broadcast.initialize(true)
// Methods for creating RDDs
diff --git a/run b/run
index 9fb815987d..10d2845c97 100755
--- a/run
+++ b/run
@@ -40,7 +40,7 @@ EXAMPLES_DIR=$FWDIR/examples
CLASSPATH="$SPARK_CLASSPATH:$CORE_DIR/target/scala_2.8.1/classes:$MESOS_CLASSPATH"
CLASSPATH+=:$FWDIR/conf
CLASSPATH+=:$CORE_DIR/lib/mesos.jar
-CLASSPATH+=:$CORE_DIR/lib/asm-3.2/lib/all/asm-all-3.2.jar
+CLASSPATH+=:$CORE_DIR/lib/asm-all-3.3.1.jar
CLASSPATH+=:$CORE_DIR/lib/colt.jar
CLASSPATH+=:$CORE_DIR/lib/guava-r07/guava-r07.jar
CLASSPATH+=:$CORE_DIR/lib/hadoop-0.20.2/hadoop-0.20.2-core.jar
@@ -48,6 +48,9 @@ CLASSPATH+=:$CORE_DIR/lib/scalatest-1.2/scalatest-1.2.jar
CLASSPATH+=:$CORE_DIR/lib/scalacheck_2.8.0-1.7.jar
CLASSPATH+=:$CORE_DIR/lib/jetty-7.1.6.v20100715/jetty-server-7.1.6.v20100715.jar
CLASSPATH+=:$CORE_DIR/lib/jetty-7.1.6.v20100715/servlet-api-2.5.jar
+CLASSPATH+=:$CORE_DIR/lib/kryo-1.04-mod/kryo-1.04-mod.jar
+CLASSPATH+=:$CORE_DIR/lib/kryo-1.04-mod/minlog-1.2.jar
+CLASSPATH+=:$CORE_DIR/lib/kryo-1.04-mod/reflectasm-1.01.jar
CLASSPATH+=:$CORE_DIR/lib/apache-log4j-1.2.16/log4j-1.2.16.jar
CLASSPATH+=:$CORE_DIR/lib/slf4j-1.6.1/slf4j-api-1.6.1.jar
CLASSPATH+=:$CORE_DIR/lib/slf4j-1.6.1/slf4j-log4j12-1.6.1.jar