From 4a3fb06ac2d11125feb08acbbd4df76d1e91b677 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 16 Oct 2012 01:10:01 -0700 Subject: Updated Kryo to 2.20. --- core/src/main/scala/spark/KryoSerializer.scala | 205 ++++++++----------------- project/SparkBuild.scala | 2 +- 2 files changed, 69 insertions(+), 138 deletions(-) diff --git a/core/src/main/scala/spark/KryoSerializer.scala b/core/src/main/scala/spark/KryoSerializer.scala index 44b630e478..f24196ea49 100644 --- a/core/src/main/scala/spark/KryoSerializer.scala +++ b/core/src/main/scala/spark/KryoSerializer.scala @@ -9,153 +9,80 @@ import scala.collection.mutable import com.esotericsoftware.kryo._ import com.esotericsoftware.kryo.{Serializer => KSerializer} -import com.esotericsoftware.kryo.serialize.ClassSerializer -import com.esotericsoftware.kryo.serialize.SerializableSerializer +import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} +import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer} import de.javakaffee.kryoserializers.KryoReflectionFactorySupport import serializer.{SerializerInstance, DeserializationStream, SerializationStream} import spark.broadcast._ import spark.storage._ -/** - * Zig-zag encoder used to write object sizes to serialization streams. - * Based on Kryo's integer encoder. - */ -private[spark] object ZigZag { - def writeInt(n: Int, out: OutputStream) { - var value = n - if ((value & ~0x7F) == 0) { - out.write(value) - return - } - out.write(((value & 0x7F) | 0x80)) - value >>>= 7 - if ((value & ~0x7F) == 0) { - out.write(value) - return - } - out.write(((value & 0x7F) | 0x80)) - value >>>= 7 - if ((value & ~0x7F) == 0) { - out.write(value) - return - } - out.write(((value & 0x7F) | 0x80)) - value >>>= 7 - if ((value & ~0x7F) == 0) { - out.write(value) - return - } - out.write(((value & 0x7F) | 0x80)) - value >>>= 7 - out.write(value) - } +private[spark] +class KryoSerializationStream(kryo: Kryo, outStream: OutputStream) extends SerializationStream { - def readInt(in: InputStream): Int = { - var offset = 0 - var result = 0 - while (offset < 32) { - val b = in.read() - if (b == -1) { - throw new EOFException("End of stream") - } - result |= ((b & 0x7F) << offset) - if ((b & 0x80) == 0) { - return result - } - offset += 7 - } - throw new SparkException("Malformed zigzag-encoded integer") - } -} - -private[spark] -class KryoSerializationStream(kryo: Kryo, threadBuffer: ByteBuffer, out: OutputStream) -extends SerializationStream { - val channel = Channels.newChannel(out) + val output = new KryoOutput(outStream) def writeObject[T](t: T): SerializationStream = { - kryo.writeClassAndObject(threadBuffer, t) - ZigZag.writeInt(threadBuffer.position(), out) - threadBuffer.flip() - channel.write(threadBuffer) - threadBuffer.clear() + kryo.writeClassAndObject(output, t) this } - def flush() { out.flush() } - def close() { out.close() } + def flush() { output.flush() } + def close() { output.close() } } -private[spark] -class KryoDeserializationStream(objectBuffer: ObjectBuffer, in: InputStream) -extends DeserializationStream { +private[spark] +class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends DeserializationStream { + + val input = new KryoInput(inStream) + def readObject[T](): T = { - val len = ZigZag.readInt(in) - objectBuffer.readClassAndObject(in, len).asInstanceOf[T] + try { + kryo.readClassAndObject(input).asInstanceOf[T] + } catch { + // DeserializationStream uses the EOF exception to indicate stopping condition. + case e: com.esotericsoftware.kryo.KryoException => throw new java.io.EOFException + } } - def close() { in.close() } + def close() { + input.close() + inStream.close() + } } private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance { - val kryo = ks.kryo - val threadBuffer = ks.threadBuffer.get() - val objectBuffer = ks.objectBuffer.get() + + val kryo = ks.kryo.get() + val output = ks.output.get() + val input = ks.input.get() def serialize[T](t: T): ByteBuffer = { - // Write it to our thread-local scratch buffer first to figure out the size, then return a new - // ByteBuffer of the appropriate size - threadBuffer.clear() - kryo.writeClassAndObject(threadBuffer, t) - val newBuf = ByteBuffer.allocate(threadBuffer.position) - threadBuffer.flip() - newBuf.put(threadBuffer) - newBuf.flip() - newBuf + output.clear() + kryo.writeClassAndObject(output, t) + ByteBuffer.wrap(output.toBytes) } def deserialize[T](bytes: ByteBuffer): T = { - kryo.readClassAndObject(bytes).asInstanceOf[T] + input.setBuffer(bytes.array) + kryo.readClassAndObject(input).asInstanceOf[T] } def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = { val oldClassLoader = kryo.getClassLoader kryo.setClassLoader(loader) - val obj = kryo.readClassAndObject(bytes).asInstanceOf[T] + input.setBuffer(bytes.array) + val obj = kryo.readClassAndObject(input).asInstanceOf[T] kryo.setClassLoader(oldClassLoader) obj } def serializeStream(s: OutputStream): SerializationStream = { - threadBuffer.clear() - new KryoSerializationStream(kryo, threadBuffer, s) + new KryoSerializationStream(kryo, s) } def deserializeStream(s: InputStream): DeserializationStream = { - new KryoDeserializationStream(objectBuffer, s) - } - - override def serializeMany[T](iterator: Iterator[T]): ByteBuffer = { - threadBuffer.clear() - while (iterator.hasNext) { - val element = iterator.next() - // TODO: Do we also want to write the object's size? Doesn't seem necessary. - kryo.writeClassAndObject(threadBuffer, element) - } - val newBuf = ByteBuffer.allocate(threadBuffer.position) - threadBuffer.flip() - newBuf.put(threadBuffer) - newBuf.flip() - newBuf - } - - override def deserializeMany(buffer: ByteBuffer): Iterator[Any] = { - buffer.rewind() - new Iterator[Any] { - override def hasNext: Boolean = buffer.remaining > 0 - override def next(): Any = kryo.readClassAndObject(buffer) - } + new KryoDeserializationStream(kryo, s) } } @@ -171,18 +98,19 @@ trait KryoRegistrator { * A Spark serializer that uses the [[http://code.google.com/p/kryo/wiki/V1Documentation Kryo 1.x library]]. */ class KryoSerializer extends spark.serializer.Serializer with Logging { - // Make this lazy so that it only gets called once we receive our first task on each executor, - // so we can pull out any custom Kryo registrator from the user's JARs. - lazy val kryo = createKryo() - val bufferSize = System.getProperty("spark.kryoserializer.buffer.mb", "32").toInt * 1024 * 1024 + val bufferSize = System.getProperty("spark.kryoserializer.buffer.mb", "2").toInt * 1024 * 1024 - val objectBuffer = new ThreadLocal[ObjectBuffer] { - override def initialValue = new ObjectBuffer(kryo, bufferSize) + val kryo = new ThreadLocal[Kryo] { + override def initialValue = createKryo() } - val threadBuffer = new ThreadLocal[ByteBuffer] { - override def initialValue = ByteBuffer.allocate(bufferSize) + val output = new ThreadLocal[KryoOutput] { + override def initialValue = new KryoOutput(bufferSize) + } + + val input = new ThreadLocal[KryoInput] { + override def initialValue = new KryoInput(bufferSize) } def createKryo(): Kryo = { @@ -213,41 +141,44 @@ class KryoSerializer extends spark.serializer.Serializer with Logging { kryo.register(obj.getClass) } - // Register the following classes for passing closures. - kryo.register(classOf[Class[_]], new ClassSerializer(kryo)) - kryo.setRegistrationOptional(true) - // Allow sending SerializableWritable - kryo.register(classOf[SerializableWritable[_]], new SerializableSerializer()) - kryo.register(classOf[HttpBroadcast[_]], new SerializableSerializer()) + kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer()) + kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer()) // Register some commonly used Scala singleton objects. Because these // are singletons, we must return the exact same local object when we // deserialize rather than returning a clone as FieldSerializer would. - class SingletonSerializer(obj: AnyRef) extends KSerializer { - override def writeObjectData(buf: ByteBuffer, obj: AnyRef) {} - override def readObjectData[T](buf: ByteBuffer, cls: Class[T]): T = obj.asInstanceOf[T] + class SingletonSerializer[T](obj: T) extends KSerializer[T] { + override def write(kryo: Kryo, output: KryoOutput, obj: T) {} + override def read(kryo: Kryo, input: KryoInput, cls: java.lang.Class[T]): T = obj } - kryo.register(None.getClass, new SingletonSerializer(None)) - kryo.register(Nil.getClass, new SingletonSerializer(Nil)) + kryo.register(None.getClass, new SingletonSerializer[AnyRef](None)) + kryo.register(Nil.getClass, new SingletonSerializer[AnyRef](Nil)) // Register maps with a special serializer since they have complex internal structure class ScalaMapSerializer(buildMap: Array[(Any, Any)] => scala.collection.Map[Any, Any]) - extends KSerializer { - override def writeObjectData(buf: ByteBuffer, obj: AnyRef) { + extends KSerializer[Array[(Any, Any)] => scala.collection.Map[Any, Any]] { + override def write( + kryo: Kryo, + output: KryoOutput, + obj: Array[(Any, Any)] => scala.collection.Map[Any, Any]) { val map = obj.asInstanceOf[scala.collection.Map[Any, Any]] - kryo.writeObject(buf, map.size.asInstanceOf[java.lang.Integer]) + kryo.writeObject(output, map.size.asInstanceOf[java.lang.Integer]) for ((k, v) <- map) { - kryo.writeClassAndObject(buf, k) - kryo.writeClassAndObject(buf, v) + kryo.writeClassAndObject(output, k) + kryo.writeClassAndObject(output, v) } } - override def readObjectData[T](buf: ByteBuffer, cls: Class[T]): T = { - val size = kryo.readObject(buf, classOf[java.lang.Integer]).intValue + override def read ( + kryo: Kryo, + input: KryoInput, + cls: Class[Array[(Any, Any)] => scala.collection.Map[Any, Any]]) + : Array[(Any, Any)] => scala.collection.Map[Any, Any] = { + val size = kryo.readObject(input, classOf[java.lang.Integer]).intValue val elems = new Array[(Any, Any)](size) for (i <- 0 until size) - elems(i) = (kryo.readClassAndObject(buf), kryo.readClassAndObject(buf)) - buildMap(elems).asInstanceOf[T] + elems(i) = (kryo.readClassAndObject(input), kryo.readClassAndObject(input)) + buildMap(elems).asInstanceOf[Array[(Any, Any)] => scala.collection.Map[Any, Any]] } } kryo.register(mutable.HashMap().getClass, new ScalaMapSerializer(mutable.HashMap() ++ _)) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index c9cf17d90a..1023019d24 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -120,7 +120,7 @@ object SparkBuild extends Build { "org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION, "asm" % "asm-all" % "3.3.1", "com.google.protobuf" % "protobuf-java" % "2.4.1", - "de.javakaffee" % "kryo-serializers" % "0.9", + "de.javakaffee" % "kryo-serializers" % "0.20", "com.typesafe.akka" % "akka-actor" % "2.0.3", "com.typesafe.akka" % "akka-remote" % "2.0.3", "com.typesafe.akka" % "akka-slf4j" % "2.0.3", -- cgit v1.2.3 From 60f7338092ad0c3a608c0e466f66047a508a35be Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 21 Dec 2012 15:49:33 -0800 Subject: Remove the call to close input stream in Kryo serializer. --- core/src/main/scala/spark/KryoSerializer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/KryoSerializer.scala b/core/src/main/scala/spark/KryoSerializer.scala index f24196ea49..93d7327324 100644 --- a/core/src/main/scala/spark/KryoSerializer.scala +++ b/core/src/main/scala/spark/KryoSerializer.scala @@ -46,8 +46,8 @@ class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends Deser } def close() { + // Kryo's Input automatically closes the input stream it is using. input.close() - inStream.close() } } -- cgit v1.2.3 From c68a0760379ff8d8a1ae194934ae54d19f1eb213 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 21 Dec 2012 16:03:17 -0800 Subject: Updated Kryo documentation for Kryo version update. --- docs/tuning.md | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/docs/tuning.md b/docs/tuning.md index f18de8ff3a..9aaa53cd65 100644 --- a/docs/tuning.md +++ b/docs/tuning.md @@ -33,7 +33,7 @@ in your operations) and performance. It provides two serialization libraries: Java serialization is flexible but often quite slow, and leads to large serialized formats for many classes. * [Kryo serialization](http://code.google.com/p/kryo/wiki/V1Documentation): Spark can also use - the Kryo library (currently just version 1) to serialize objects more quickly. Kryo is significantly + the Kryo library (version 2) to serialize objects more quickly. Kryo is significantly faster and more compact than Java serialization (often as much as 10x), but does not support all `Serializable` types and requires you to *register* the classes you'll use in the program in advance for best performance. @@ -47,6 +47,8 @@ Finally, to register your classes with Kryo, create a public class that extends `spark.kryo.registrator` system property to point to it, as follows: {% highlight scala %} +import com.esotericsoftware.kryo.Kryo + class MyRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) { kryo.register(classOf[MyClass1]) @@ -60,7 +62,7 @@ System.setProperty("spark.kryo.registrator", "mypackage.MyRegistrator") val sc = new SparkContext(...) {% endhighlight %} -The [Kryo documentation](http://code.google.com/p/kryo/wiki/V1Documentation) describes more advanced +The [Kryo documentation](http://code.google.com/p/kryo/) describes more advanced registration options, such as adding custom serialization code. If your objects are large, you may also need to increase the `spark.kryoserializer.buffer.mb` @@ -147,7 +149,7 @@ the space allocated to the RDD cache to mitigate this. **Measuring the Impact of GC** -The first step in GC tuning is to collect statistics on how frequently garbage collection occurs and the amount of +The first step in GC tuning is to collect statistics on how frequently garbage collection occurs and the amount of time spent GC. This can be done by adding `-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps` to your `SPARK_JAVA_OPTS` environment variable. Next time your Spark job is run, you will see messages printed in the worker's logs each time a garbage collection occurs. Note these logs will be on your cluster's worker nodes (in the `stdout` files in @@ -155,15 +157,15 @@ their work directories), *not* on your driver program. **Cache Size Tuning** -One important configuration parameter for GC is the amount of memory that should be used for -caching RDDs. By default, Spark uses 66% of the configured memory (`SPARK_MEM`) to cache RDDs. This means that +One important configuration parameter for GC is the amount of memory that should be used for +caching RDDs. By default, Spark uses 66% of the configured memory (`SPARK_MEM`) to cache RDDs. This means that 33% of memory is available for any objects created during task execution. In case your tasks slow down and you find that your JVM is garbage-collecting frequently or running out of -memory, lowering this value will help reduce the memory consumption. To change this to say 50%, you can call -`System.setProperty("spark.storage.memoryFraction", "0.5")`. Combined with the use of serialized caching, -using a smaller cache should be sufficient to mitigate most of the garbage collection problems. -In case you are interested in further tuning the Java GC, continue reading below. +memory, lowering this value will help reduce the memory consumption. To change this to say 50%, you can call +`System.setProperty("spark.storage.memoryFraction", "0.5")`. Combined with the use of serialized caching, +using a smaller cache should be sufficient to mitigate most of the garbage collection problems. +In case you are interested in further tuning the Java GC, continue reading below. **Advanced GC Tuning** @@ -172,9 +174,9 @@ To further tune garbage collection, we first need to understand some basic infor * Java Heap space is divided in to two regions Young and Old. The Young generation is meant to hold short-lived objects while the Old generation is intended for objects with longer lifetimes. -* The Young generation is further divided into three regions [Eden, Survivor1, Survivor2]. +* The Young generation is further divided into three regions [Eden, Survivor1, Survivor2]. -* A simplified description of the garbage collection procedure: When Eden is full, a minor GC is run on Eden and objects +* A simplified description of the garbage collection procedure: When Eden is full, a minor GC is run on Eden and objects that are alive from Eden and Survivor1 are copied to Survivor2. The Survivor regions are swapped. If an object is old enough or Survivor2 is full, it is moved to Old. Finally when Old is close to full, a full GC is invoked. @@ -186,7 +188,7 @@ temporary objects created during task execution. Some steps which may be useful before a task completes, it means that there isn't enough memory available for executing tasks. * In the GC stats that are printed, if the OldGen is close to being full, reduce the amount of memory used for caching. - This can be done using the `spark.storage.memoryFraction` property. It is better to cache fewer objects than to slow + This can be done using the `spark.storage.memoryFraction` property. It is better to cache fewer objects than to slow down task execution! * If there are too many minor collections but not many major GCs, allocating more memory for Eden would help. You @@ -195,8 +197,8 @@ temporary objects created during task execution. Some steps which may be useful up by 4/3 is to account for space used by survivor regions as well.) * As an example, if your task is reading data from HDFS, the amount of memory used by the task can be estimated using - the size of the data block read from HDFS. Note that the size of a decompressed block is often 2 or 3 times the - size of the block. So if we wish to have 3 or 4 tasks worth of working space, and the HDFS block size is 64 MB, + the size of the data block read from HDFS. Note that the size of a decompressed block is often 2 or 3 times the + size of the block. So if we wish to have 3 or 4 tasks worth of working space, and the HDFS block size is 64 MB, we can estimate size of Eden to be `4*3*64MB`. * Monitor how the frequency and time taken by garbage collection changes with the new settings. -- cgit v1.2.3 From a6bb41c6d389f1b98d5542000a7a9705ba282273 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Fri, 21 Dec 2012 16:25:50 -0800 Subject: Updated Kryo version for Maven pom file. --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 52a4e9d932..b33cee26b8 100644 --- a/pom.xml +++ b/pom.xml @@ -185,7 +185,7 @@ de.javakaffee kryo-serializers - 0.9 + 0.20 com.typesafe.akka -- cgit v1.2.3 From 61be8566e24c664442780154debfea884d81f46b Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Mon, 24 Dec 2012 02:26:11 -0800 Subject: Allow distinct() to be called without parentheses when using the default number of splits. --- core/src/main/scala/spark/RDD.scala | 4 +++- core/src/test/scala/spark/RDDSuite.scala | 12 ++++++++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index bb4c13c494..d15c6f7396 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -185,9 +185,11 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial /** * Return a new RDD containing the distinct elements in this RDD. */ - def distinct(numSplits: Int = splits.size): RDD[T] = + def distinct(numSplits: Int): RDD[T] = map(x => (x, null)).reduceByKey((x, y) => x, numSplits).map(_._1) + def distinct(): RDD[T] = distinct(splits.size) + /** * Return a sampled subset of this RDD. */ diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index b3c820ed94..08da9a1c4d 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -8,9 +8,9 @@ import spark.rdd.CoalescedRDD import SparkContext._ class RDDSuite extends FunSuite with BeforeAndAfter { - + var sc: SparkContext = _ - + after { if (sc != null) { sc.stop() @@ -19,11 +19,15 @@ class RDDSuite extends FunSuite with BeforeAndAfter { // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown System.clearProperty("spark.master.port") } - + test("basic operations") { sc = new SparkContext("local", "test") val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) assert(nums.collect().toList === List(1, 2, 3, 4)) + val dups = sc.makeRDD(Array(1, 1, 2, 2, 3, 3, 4, 4), 2) + assert(dups.distinct.count === 4) + assert(dups.distinct().collect === dups.distinct.collect) + assert(dups.distinct(2).collect === dups.distinct.collect) assert(nums.reduce(_ + _) === 10) assert(nums.fold(0)(_ + _) === 10) assert(nums.map(_.toString).collect().toList === List("1", "2", "3", "4")) @@ -121,7 +125,7 @@ class RDDSuite extends FunSuite with BeforeAndAfter { val zipped = nums.zip(nums.map(_ + 1.0)) assert(zipped.glom().map(_.toList).collect().toList === List(List((1, 2.0), (2, 3.0)), List((3, 4.0), (4, 5.0)))) - + intercept[IllegalArgumentException] { nums.zip(sc.parallelize(1 to 4, 1)).collect() } -- cgit v1.2.3 From 903f3518dfcd686cda2256b07fbc1dde6aec0178 Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Mon, 24 Dec 2012 13:18:45 -0800 Subject: fall back to filter-map-collect when calling lookup() on an RDD without a partitioner --- core/src/main/scala/spark/PairRDDFunctions.scala | 2 +- core/src/test/scala/spark/JavaAPISuite.java | 11 +++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 08ae06e865..d3e206b353 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -438,7 +438,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( val res = self.context.runJob(self, process _, Array(index), false) res(0) case None => - throw new UnsupportedOperationException("lookup() called on an RDD without a partitioner") + self.filter(_._1 == key).map(_._2).collect } } diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java index 46a0b68f89..33d5fc2d89 100644 --- a/core/src/test/scala/spark/JavaAPISuite.java +++ b/core/src/test/scala/spark/JavaAPISuite.java @@ -130,6 +130,17 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(2, foreachCalls); } + @Test + public void lookup() { + JavaPairRDD categories = sc.parallelizePairs(Arrays.asList( + new Tuple2("Apples", "Fruit"), + new Tuple2("Oranges", "Fruit"), + new Tuple2("Oranges", "Citrus") + )); + Assert.assertEquals(2, categories.lookup("Oranges").size()); + Assert.assertEquals(2, categories.groupByKey().lookup("Oranges").get(0).size()); + } + @Test public void groupBy() { JavaRDD rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); -- cgit v1.2.3