diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2011-03-07 19:20:28 -0800 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2011-03-07 19:20:28 -0800 |
commit | 8b6f3db415926a6a5723b23bc3e9f993bcbbfa74 (patch) | |
tree | d73deb0f34b1a7e79e5d0a55099659504b6683db /core | |
parent | 467f056e291964a8235d9aee8b82f08e5517b2b3 (diff) | |
parent | 38f6bce33d63f793d814385047b9bac34e5947f7 (diff) | |
download | spark-8b6f3db415926a6a5723b23bc3e9f993bcbbfa74.tar.gz spark-8b6f3db415926a6a5723b23bc3e9f993bcbbfa74.tar.bz2 spark-8b6f3db415926a6a5723b23bc3e9f993bcbbfa74.zip |
Merge remote branch 'origin/custom-serialization' into new-rdds
Diffstat (limited to 'core')
30 files changed, 265 insertions, 290 deletions
diff --git a/core/lib/asm-3.2/.DS_Store b/core/lib/asm-3.2/.DS_Store Binary files differdeleted file mode 100644 index 52b0f12a32..0000000000 --- a/core/lib/asm-3.2/.DS_Store +++ /dev/null diff --git a/core/lib/asm-3.2/lib/all/README.txt b/core/lib/asm-3.2/lib/all/README.txt deleted file mode 100644 index d7c96a5edb..0000000000 --- a/core/lib/asm-3.2/lib/all/README.txt +++ /dev/null @@ -1,3 +0,0 @@ -It is highly recommended to use only the necessary ASM jars for your -application instead of using the asm-all jar, unless you really need -all ASM packages.
\ No newline at end of file diff --git a/core/lib/asm-3.2/lib/all/asm-all-3.2.jar b/core/lib/asm-3.2/lib/all/asm-all-3.2.jar Binary files differdeleted file mode 100644 index d0ad60ed0a..0000000000 --- a/core/lib/asm-3.2/lib/all/asm-all-3.2.jar +++ /dev/null diff --git a/core/lib/asm-3.2/lib/all/asm-all-3.2.pom b/core/lib/asm-3.2/lib/all/asm-all-3.2.pom deleted file mode 100644 index 9899a54c3b..0000000000 --- a/core/lib/asm-3.2/lib/all/asm-all-3.2.pom +++ /dev/null @@ -1,15 +0,0 @@ -<project> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>asm</groupId> - <artifactId>asm-parent</artifactId> - <version>3.2</version> - </parent> - - <name>ASM All</name> - <groupId>asm</groupId> - <artifactId>asm-all</artifactId> - <packaging>jar</packaging> - -</project> diff --git a/core/lib/asm-3.2/lib/all/asm-debug-all-3.2.jar b/core/lib/asm-3.2/lib/all/asm-debug-all-3.2.jar Binary files differdeleted file mode 100644 index 94b8549142..0000000000 --- a/core/lib/asm-3.2/lib/all/asm-debug-all-3.2.jar +++ /dev/null diff --git a/core/lib/asm-3.2/lib/all/asm-debug-all-3.2.pom b/core/lib/asm-3.2/lib/all/asm-debug-all-3.2.pom deleted file mode 100644 index 9899a54c3b..0000000000 --- a/core/lib/asm-3.2/lib/all/asm-debug-all-3.2.pom +++ /dev/null @@ -1,15 +0,0 @@ -<project> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>asm</groupId> - <artifactId>asm-parent</artifactId> - <version>3.2</version> - </parent> - - <name>ASM All</name> - <groupId>asm</groupId> - <artifactId>asm-all</artifactId> - <packaging>jar</packaging> - -</project> diff --git a/core/lib/asm-3.2/lib/asm-3.2.jar b/core/lib/asm-3.2/lib/asm-3.2.jar Binary files differdeleted file mode 100644 index 334e7fdc7f..0000000000 --- a/core/lib/asm-3.2/lib/asm-3.2.jar +++ /dev/null diff --git a/core/lib/asm-3.2/lib/asm-3.2.pom b/core/lib/asm-3.2/lib/asm-3.2.pom deleted file mode 100644 index c714db09b2..0000000000 --- a/core/lib/asm-3.2/lib/asm-3.2.pom +++ /dev/null @@ -1,14 +0,0 @@ -<project> - <modelVersion>4.0.0</modelVersion> - - <parent> - <artifactId>asm-parent</artifactId> - <groupId>asm</groupId> - <version>3.2</version> - </parent> - - <name>ASM Core</name> - <artifactId>asm</artifactId> - <packaging>jar</packaging> - -</project> diff --git a/core/lib/asm-3.2/lib/asm-analysis-3.2.jar b/core/lib/asm-3.2/lib/asm-analysis-3.2.jar Binary files differdeleted file mode 100644 index 40ee3151cb..0000000000 --- a/core/lib/asm-3.2/lib/asm-analysis-3.2.jar +++ /dev/null diff --git a/core/lib/asm-3.2/lib/asm-analysis-3.2.pom b/core/lib/asm-3.2/lib/asm-analysis-3.2.pom deleted file mode 100644 index b3933387af..0000000000 --- a/core/lib/asm-3.2/lib/asm-analysis-3.2.pom +++ /dev/null @@ -1,21 +0,0 @@ -<project> - <modelVersion>4.0.0</modelVersion> - - <parent> - <artifactId>asm-parent</artifactId> - <groupId>asm</groupId> - <version>3.2</version> - </parent> - - <name>ASM Analysis</name> - <artifactId>asm-analysis</artifactId> - <packaging>jar</packaging> - - <dependencies> - <dependency> - <artifactId>asm-tree</artifactId> - <groupId>asm</groupId> - </dependency> - </dependencies> - -</project> diff --git a/core/lib/asm-3.2/lib/asm-commons-3.2.jar b/core/lib/asm-3.2/lib/asm-commons-3.2.jar Binary files differdeleted file mode 100644 index 8dfed0a9b7..0000000000 --- a/core/lib/asm-3.2/lib/asm-commons-3.2.jar +++ /dev/null diff --git a/core/lib/asm-3.2/lib/asm-commons-3.2.pom b/core/lib/asm-3.2/lib/asm-commons-3.2.pom deleted file mode 100644 index 8517715b4a..0000000000 --- a/core/lib/asm-3.2/lib/asm-commons-3.2.pom +++ /dev/null @@ -1,21 +0,0 @@ -<project> - <modelVersion>4.0.0</modelVersion> - - <parent> - <artifactId>asm-parent</artifactId> - <groupId>asm</groupId> - <version>3.2</version> - </parent> - - <name>ASM Commons</name> - <artifactId>asm-commons</artifactId> - <packaging>jar</packaging> - - <dependencies> - <dependency> - <artifactId>asm-tree</artifactId> - <groupId>asm</groupId> - </dependency> - </dependencies> - -</project> diff --git a/core/lib/asm-3.2/lib/asm-parent-3.2.pom b/core/lib/asm-3.2/lib/asm-parent-3.2.pom deleted file mode 100644 index c220347f6a..0000000000 --- a/core/lib/asm-3.2/lib/asm-parent-3.2.pom +++ /dev/null @@ -1,136 +0,0 @@ -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 - http://maven.apache.org/maven-v4_0_0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <artifactId>asm-parent</artifactId> - <groupId>asm</groupId> - <version>3.2</version> - <packaging>pom</packaging> - - <name>ASM</name> - <description>A very small and fast Java bytecode manipulation framework</description> - <url>http://asm.objectweb.org/</url> - - <organization> - <name>ObjectWeb</name> - <url>http://www.objectweb.org/</url> - </organization> - <inceptionYear>2000</inceptionYear> - - <licenses> - <license> - <name>BSD</name> - <url>http://asm.objectweb.org/license.html</url> - </license> - </licenses> - - <developers> - <developer> - <name>Eric Bruneton</name> - <id>ebruneton</id> - <email>Eric.Bruneton@rd.francetelecom.com</email> - <roles> - <role>Creator</role> - <role>Java Developer</role> - </roles> - </developer> - <developer> - <name>Eugene Kuleshov</name> - <id>eu</id> - <email>eu@javatx.org</email> - <roles> - <role>Java Developer</role> - </roles> - </developer> - </developers> - - <scm> - <connection>scm:cvs:pserver:anonymous:@cvs.forge.objectweb.org:/cvsroot/asm:asm</connection> - <developerConnection>scm:cvs:ext:${maven.username}@cvs.forge.objectweb.org:/cvsroot/asm:asm</developerConnection> - <url>http://cvs.forge.objectweb.org/cgi-bin/viewcvs.cgi/asm/asm/</url> - </scm> - - <issueManagement> - <url>http://forge.objectweb.org/tracker/?group_id=23</url> - </issueManagement> - - <dependencyManagement> - <dependencies> - - <dependency> - <artifactId>asm</artifactId> - <groupId>${project.groupId}</groupId> - <version>${project.version}</version> - </dependency> - - <dependency> - <artifactId>asm-tree</artifactId> - <groupId>${project.groupId}</groupId> - <version>${project.version}</version> - </dependency> - - <dependency> - <artifactId>asm-analysis</artifactId> - <groupId>${project.groupId}</groupId> - <version>${project.version}</version> - </dependency> - - <dependency> - <artifactId>asm-commons</artifactId> - <groupId>${project.groupId}</groupId> - <version>${project.version}</version> - </dependency> - - <dependency> - <artifactId>asm-util</artifactId> - <groupId>${project.groupId}</groupId> - <version>${project.version}</version> - </dependency> - - <dependency> - <artifactId>asm-xml</artifactId> - <groupId>${project.groupId}</groupId> - <version>${project.version}</version> - </dependency> - - </dependencies> - </dependencyManagement> - - <mailingLists> - <mailingList> - <name>ASM Users List</name> - <subscribe>sympa@ow2.org?subject=subscribe%20asm</subscribe> - <unsubscribe>sympa@ow2.org?subject=unsubscribe%20asm</unsubscribe> - <post>asm@ow2.org</post> - <archive>http://www.ow2.org/wws/arc/asm</archive> - </mailingList> - <mailingList> - <name>ASM Team List</name> - <subscribe>sympa@ow2.org?subject=subscribe%20asm-team</subscribe> - <unsubscribe>sympa@ow2.org?subject=unsubscribe%20asm-team</unsubscribe> - <post>asm-team@ow2.org</post> - <archive>http://www.ow2.org/wws/arc/asm-team</archive> - </mailingList> - </mailingLists> - - <distributionManagement> - <downloadUrl>http://mojo.codehaus.org/my-project</downloadUrl> - <repository> - <id>objectweb</id> - <uniqueVersion>false</uniqueVersion> - <name>ObjectWeb Maven 2.0 Repository</name> - <url>dav:https://maven.forge.objectweb.org:8002/maven2/</url> - <layout>default</layout> - </repository> - <snapshotRepository> - <id>objectweb.snapshots</id> - <uniqueVersion>false</uniqueVersion> - <name>ObjectWeb Maven 2.0 Snapshot Repository</name> - <url>dav:https://maven.forge.objectweb.org:8002/maven2-snapshot/</url> - <layout>default</layout> - </snapshotRepository> - </distributionManagement> - -</project> diff --git a/core/lib/asm-3.2/lib/asm-tree-3.2.jar b/core/lib/asm-3.2/lib/asm-tree-3.2.jar Binary files differdeleted file mode 100644 index b21fb86a92..0000000000 --- a/core/lib/asm-3.2/lib/asm-tree-3.2.jar +++ /dev/null diff --git a/core/lib/asm-3.2/lib/asm-tree-3.2.pom b/core/lib/asm-3.2/lib/asm-tree-3.2.pom deleted file mode 100644 index 9f454528f4..0000000000 --- a/core/lib/asm-3.2/lib/asm-tree-3.2.pom +++ /dev/null @@ -1,21 +0,0 @@ -<project> - <modelVersion>4.0.0</modelVersion> - - <parent> - <artifactId>asm-parent</artifactId> - <groupId>asm</groupId> - <version>3.2</version> - </parent> - - <name>ASM Tree</name> - <artifactId>asm-tree</artifactId> - <packaging>jar</packaging> - - <dependencies> - <dependency> - <artifactId>asm</artifactId> - <groupId>asm</groupId> - </dependency> - </dependencies> - -</project> diff --git a/core/lib/asm-3.2/lib/asm-util-3.2.jar b/core/lib/asm-3.2/lib/asm-util-3.2.jar Binary files differdeleted file mode 100644 index 499d229034..0000000000 --- a/core/lib/asm-3.2/lib/asm-util-3.2.jar +++ /dev/null diff --git a/core/lib/asm-3.2/lib/asm-util-3.2.pom b/core/lib/asm-3.2/lib/asm-util-3.2.pom deleted file mode 100644 index e302b0f356..0000000000 --- a/core/lib/asm-3.2/lib/asm-util-3.2.pom +++ /dev/null @@ -1,21 +0,0 @@ -<project> - <modelVersion>4.0.0</modelVersion> - - <parent> - <artifactId>asm-parent</artifactId> - <groupId>asm</groupId> - <version>3.2</version> - </parent> - - <name>ASM Util</name> - <artifactId>asm-util</artifactId> - <packaging>jar</packaging> - - <dependencies> - <dependency> - <artifactId>asm-tree</artifactId> - <groupId>asm</groupId> - </dependency> - </dependencies> - -</project> diff --git a/core/lib/asm-3.2/lib/asm-xml-3.2.jar b/core/lib/asm-3.2/lib/asm-xml-3.2.jar Binary files differdeleted file mode 100644 index 31b31b56fe..0000000000 --- a/core/lib/asm-3.2/lib/asm-xml-3.2.jar +++ /dev/null diff --git a/core/lib/asm-3.2/lib/asm-xml-3.2.pom b/core/lib/asm-3.2/lib/asm-xml-3.2.pom deleted file mode 100644 index 0f3de1f2ab..0000000000 --- a/core/lib/asm-3.2/lib/asm-xml-3.2.pom +++ /dev/null @@ -1,21 +0,0 @@ -<project> - <modelVersion>4.0.0</modelVersion> - - <parent> - <artifactId>asm-parent</artifactId> - <groupId>asm</groupId> - <version>3.2</version> - </parent> - - <name>ASM XML</name> - <artifactId>asm-xml</artifactId> - <packaging>jar</packaging> - - <dependencies> - <dependency> - <artifactId>asm-util</artifactId> - <groupId>asm</groupId> - </dependency> - </dependencies> - -</project> diff --git a/core/lib/asm-all-3.3.1.jar b/core/lib/asm-all-3.3.1.jar Binary files differnew file mode 100644 index 0000000000..df03b32661 --- /dev/null +++ b/core/lib/asm-all-3.3.1.jar diff --git a/core/lib/kryo-1.04-mod/kryo-1.04-mod.jar b/core/lib/kryo-1.04-mod/kryo-1.04-mod.jar Binary files differnew file mode 100644 index 0000000000..7c4a8d3af8 --- /dev/null +++ b/core/lib/kryo-1.04-mod/kryo-1.04-mod.jar diff --git a/core/lib/kryo-1.04-mod/minlog-1.2.jar b/core/lib/kryo-1.04-mod/minlog-1.2.jar Binary files differnew file mode 100644 index 0000000000..2fcada1b7e --- /dev/null +++ b/core/lib/kryo-1.04-mod/minlog-1.2.jar diff --git a/core/lib/kryo-1.04-mod/reflectasm-1.01.jar b/core/lib/kryo-1.04-mod/reflectasm-1.01.jar Binary files differnew file mode 100644 index 0000000000..09179ca473 --- /dev/null +++ b/core/lib/kryo-1.04-mod/reflectasm-1.01.jar diff --git a/core/src/main/scala/spark/Executor.scala b/core/src/main/scala/spark/Executor.scala index 35469aeb3f..1dd5e25033 100644 --- a/core/src/main/scala/spark/Executor.scala +++ b/core/src/main/scala/spark/Executor.scala @@ -24,6 +24,7 @@ class Executor extends mesos.Executor with Logging { // Initialize cache and broadcast system (uses some properties read above) Cache.initialize() + Serializer.initialize() Broadcast.initialize(false) MapOutputTracker.initialize(false) RDDCache.initialize(false) diff --git a/core/src/main/scala/spark/JavaSerializer.scala b/core/src/main/scala/spark/JavaSerializer.scala new file mode 100644 index 0000000000..8ee3044058 --- /dev/null +++ b/core/src/main/scala/spark/JavaSerializer.scala @@ -0,0 +1,48 @@ +package spark + +import java.io._ + +class JavaSerializationStream(out: OutputStream) extends SerializationStream { + val objOut = new ObjectOutputStream(out) + def writeObject[T](t: T) { objOut.writeObject(t) } + def flush() { objOut.flush() } + def close() { objOut.close() } +} + +class JavaDeserializationStream(in: InputStream) extends DeserializationStream { + val objIn = new ObjectInputStream(in) { + override def resolveClass(desc: ObjectStreamClass) = + Class.forName(desc.getName, false, currentThread.getContextClassLoader) + } + + def readObject[T](): T = objIn.readObject().asInstanceOf[T] + def close() { objIn.close() } +} + +class JavaSerializer extends Serializer { + def serialize[T](t: T): Array[Byte] = { + val bos = new ByteArrayOutputStream() + val out = outputStream(bos) + out.writeObject(t) + out.close() + bos.toByteArray + } + + def deserialize[T](bytes: Array[Byte]): T = { + val bis = new ByteArrayInputStream(bytes) + val in = inputStream(bis) + in.readObject().asInstanceOf[T] + } + + def outputStream(s: OutputStream): SerializationStream = { + new JavaSerializationStream(s) + } + + def inputStream(s: InputStream): DeserializationStream = { + new JavaDeserializationStream(s) + } +} + +class JavaSerialization extends SerializationStrategy { + def newSerializer(): Serializer = new JavaSerializer +} diff --git a/core/src/main/scala/spark/KryoSerialization.scala b/core/src/main/scala/spark/KryoSerialization.scala new file mode 100644 index 0000000000..f6e818adb0 --- /dev/null +++ b/core/src/main/scala/spark/KryoSerialization.scala @@ -0,0 +1,145 @@ +package spark + +import java.io._ +import java.nio.ByteBuffer +import java.nio.channels.Channels + +import scala.collection.immutable +import scala.collection.mutable + +import com.esotericsoftware.kryo._ + +object ZigZag { + def writeInt(n: Int, out: OutputStream) { + var value = n + if ((value & ~0x7F) == 0) { + out.write(value) + return + } + out.write(((value & 0x7F) | 0x80)) + value >>>= 7 + if ((value & ~0x7F) == 0) { + out.write(value) + return + } + out.write(((value & 0x7F) | 0x80)) + value >>>= 7 + if ((value & ~0x7F) == 0) { + out.write(value) + return + } + out.write(((value & 0x7F) | 0x80)) + value >>>= 7 + if ((value & ~0x7F) == 0) { + out.write(value) + return + } + out.write(((value & 0x7F) | 0x80)) + value >>>= 7 + out.write(value) + } + + def readInt(in: InputStream): Int = { + var offset = 0 + var result = 0 + while (offset < 32) { + val b = in.read() + if (b == -1) { + throw new EOFException("End of stream") + } + result |= ((b & 0x7F) << offset) + if ((b & 0x80) == 0) { + return result + } + offset += 7 + } + throw new SparkException("Malformed zigzag-encoded integer") + } +} + +class KryoSerializationStream(kryo: Kryo, out: OutputStream) +extends SerializationStream { + val buf = ByteBuffer.allocateDirect(1024*1024) + + def writeObject[T](t: T) { + kryo.writeClassAndObject(buf, t) + ZigZag.writeInt(buf.position(), out) + buf.flip() + Channels.newChannel(out).write(buf) + buf.clear() + } + + def flush() { out.flush() } + def close() { out.close() } +} + +class KryoDeserializationStream(kryo: Kryo, in: InputStream) +extends DeserializationStream { + val buf = new ObjectBuffer(kryo, 1024*1024) + + def readObject[T](): T = { + val len = ZigZag.readInt(in) + buf.readClassAndObject(in, len).asInstanceOf[T] + } + + def close() { in.close() } +} + +class KryoSerializer(kryo: Kryo) extends Serializer { + val buf = new ObjectBuffer(kryo, 1024*1024) + + def serialize[T](t: T): Array[Byte] = { + buf.writeClassAndObject(t) + } + + def deserialize[T](bytes: Array[Byte]): T = { + buf.readClassAndObject(bytes).asInstanceOf[T] + } + + def outputStream(s: OutputStream): SerializationStream = { + new KryoSerializationStream(kryo, s) + } + + def inputStream(s: InputStream): DeserializationStream = { + new KryoDeserializationStream(kryo, s) + } +} + +// Used by clients to register their own classes +trait KryoRegistrator { + def registerClasses(kryo: Kryo): Unit +} + +class KryoSerialization extends SerializationStrategy with Logging { + val kryo = createKryo() + + def createKryo(): Kryo = { + val kryo = new Kryo() + val toRegister: Seq[AnyRef] = Seq( + // Arrays + Array(1), Array(1.0), Array(1.0f), Array(1L), Array(""), Array(("", "")), + // Specialized Tuple2s + ("", ""), (1, 1), (1.0, 1.0), (1L, 1L), + (1, 1.0), (1.0, 1), (1L, 1.0), (1.0, 1L), (1, 1L), (1L, 1), + // Scala collections + Nil, List(1), immutable.Map(1 -> 1), immutable.HashMap(1 -> 1), + mutable.Map(1 -> 1), mutable.HashMap(1 -> 1), mutable.ArrayBuffer(1), + // Options and Either + Some(1), None, Left(1), Right(1), + // Higher-dimensional tuples + (1, 1, 1), (1, 1, 1, 1), (1, 1, 1, 1, 1) + ) + for (obj <- toRegister) { + kryo.register(obj.getClass) + } + val regCls = System.getProperty("spark.kryo.registrator") + if (regCls != null) { + logInfo("Running user registrator: " + regCls) + val reg = Class.forName(regCls).newInstance().asInstanceOf[KryoRegistrator] + reg.registerClasses(kryo) + } + kryo + } + + def newSerializer(): Serializer = new KryoSerializer(kryo) +} diff --git a/core/src/main/scala/spark/LocalFileShuffle.scala b/core/src/main/scala/spark/LocalFileShuffle.scala index fd70c54c0c..057a7ff43d 100644 --- a/core/src/main/scala/spark/LocalFileShuffle.scala +++ b/core/src/main/scala/spark/LocalFileShuffle.scala @@ -47,9 +47,10 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { case None => createCombiner(v) } } + val ser = Serializer.newInstance() for (i <- 0 until numOutputSplits) { val file = LocalFileShuffle.getOutputFile(shuffleId, myIndex, i) - val out = new ObjectOutputStream(new FileOutputStream(file)) + val out = ser.outputStream(new FileOutputStream(file)) buckets(i).foreach(pair => out.writeObject(pair)) out.close() } @@ -69,10 +70,11 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { val indexes = sc.parallelize(0 until numOutputSplits, numOutputSplits) return indexes.flatMap((myId: Int) => { val combiners = new HashMap[K, C] + val ser = Serializer.newInstance() for ((serverUri, inputIds) <- Utils.shuffle(splitsByUri)) { for (i <- inputIds) { val url = "%s/shuffle/%d/%d/%d".format(serverUri, shuffleId, i, myId) - val inputStream = new ObjectInputStream(new URL(url).openStream()) + val inputStream = ser.inputStream(new URL(url).openStream()) try { while (true) { val (k, c) = inputStream.readObject().asInstanceOf[(K, C)] diff --git a/core/src/main/scala/spark/Serializer.scala b/core/src/main/scala/spark/Serializer.scala new file mode 100644 index 0000000000..a182f6bddc --- /dev/null +++ b/core/src/main/scala/spark/Serializer.scala @@ -0,0 +1,40 @@ +package spark + +import java.io.{InputStream, OutputStream} + +trait SerializationStream { + def writeObject[T](t: T): Unit + def flush(): Unit + def close(): Unit +} + +trait DeserializationStream { + def readObject[T](): T + def close(): Unit +} + +trait Serializer { + def serialize[T](t: T): Array[Byte] + def deserialize[T](bytes: Array[Byte]): T + def outputStream(s: OutputStream): SerializationStream + def inputStream(s: InputStream): DeserializationStream +} + +trait SerializationStrategy { + def newSerializer(): Serializer +} + +object Serializer { + var strat: SerializationStrategy = null + + def initialize() { + val cls = System.getProperty("spark.serialization", + "spark.JavaSerialization") + strat = Class.forName(cls).newInstance().asInstanceOf[SerializationStrategy] + } + + // Return a serializer ** for use by a single thread ** + def newInstance(): Serializer = { + strat.newSerializer() + } +} diff --git a/core/src/main/scala/spark/SerializingCache.scala b/core/src/main/scala/spark/SerializingCache.scala new file mode 100644 index 0000000000..cbc64736e6 --- /dev/null +++ b/core/src/main/scala/spark/SerializingCache.scala @@ -0,0 +1,26 @@ +package spark + +import java.io._ + +/** + * Wrapper around a BoundedMemoryCache that stores serialized objects as + * byte arrays in order to reduce storage cost and GC overhead + */ +class SerializingCache extends Cache with Logging { + val bmc = new BoundedMemoryCache + + override def put(key: Any, value: Any) { + val ser = Serializer.newInstance() + bmc.put(key, ser.serialize(value)) + } + + override def get(key: Any): Any = { + val bytes = bmc.get(key) + if (bytes != null) { + val ser = Serializer.newInstance() + return ser.deserialize(bytes.asInstanceOf[Array[Byte]]) + } else { + return null + } + } +} diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 34641918f0..9357db22c4 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -40,6 +40,7 @@ extends Logging { // Start the scheduler, the cache and the broadcast system scheduler.start() Cache.initialize() + Serializer.initialize() Broadcast.initialize(true) MapOutputTracker.initialize(true) RDDCache.initialize(true) |