diff options
author | Imran Rashid <irashid@cloudera.com> | 2015-03-03 15:33:19 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-03-03 15:33:26 -0800 |
commit | 9a0b75cdd85c1933c590480dd233b1803726ed71 (patch) | |
tree | caeca6b5a0a998b1f3e4624b5ebe1f346321e66c | |
parent | 8446ad0ebd2abb10ef405dc2ce53d2724604ce83 (diff) | |
download | spark-9a0b75cdd85c1933c590480dd233b1803726ed71.tar.gz spark-9a0b75cdd85c1933c590480dd233b1803726ed71.tar.bz2 spark-9a0b75cdd85c1933c590480dd233b1803726ed71.zip |
[SPARK-5949] HighlyCompressedMapStatus needs more classes registered w/ kryo
https://issues.apache.org/jira/browse/SPARK-5949
Author: Imran Rashid <irashid@cloudera.com>
Closes #4877 from squito/SPARK-5949_register_roaring_bitmap and squashes the following commits:
7e13316 [Imran Rashid] style style style
5f6bb6d [Imran Rashid] more style
709bfe0 [Imran Rashid] style
a5cb744 [Imran Rashid] update tests to cover both types of RoaringBitmapContainers
09610c6 [Imran Rashid] formatting
f9a0b7c [Imran Rashid] put primitive array registrations together
97beaf8 [Imran Rashid] SPARK-5949 HighlyCompressedMapStatus needs more classes registered w/ kryo
(cherry picked from commit 1f1fccc5ceb0c5b7656a0594be3a67bd3b432e85)
Signed-off-by: Reynold Xin <rxin@databricks.com>
-rw-r--r-- | core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala | 15 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala | 23 |
2 files changed, 33 insertions, 5 deletions
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 02158aa0f8..9ce64d41fb 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -20,22 +20,23 @@ package org.apache.spark.serializer import java.io.{EOFException, InputStream, OutputStream} import java.nio.ByteBuffer +import scala.reflect.ClassTag + import com.esotericsoftware.kryo.{Kryo, KryoException} import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer} import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator} +import org.roaringbitmap.{ArrayContainer, BitmapContainer, RoaringArray, RoaringBitmap} import org.apache.spark._ import org.apache.spark.api.python.PythonBroadcast import org.apache.spark.broadcast.HttpBroadcast -import org.apache.spark.network.nio.{PutBlock, GotBlock, GetBlock} +import org.apache.spark.network.nio.{GetBlock, GotBlock, PutBlock} import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus} import org.apache.spark.storage._ import org.apache.spark.util.BoundedPriorityQueue import org.apache.spark.util.collection.CompactBuffer -import scala.reflect.ClassTag - /** * A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]]. * @@ -202,9 +203,17 @@ private[serializer] object KryoSerializer { classOf[GetBlock], classOf[CompressedMapStatus], classOf[HighlyCompressedMapStatus], + classOf[RoaringBitmap], + classOf[RoaringArray], + classOf[RoaringArray.Element], + classOf[Array[RoaringArray.Element]], + classOf[ArrayContainer], + classOf[BitmapContainer], classOf[CompactBuffer[_]], classOf[BlockManagerId], classOf[Array[Byte]], + classOf[Array[Short]], + classOf[Array[Long]], classOf[BoundedPriorityQueue[_]], classOf[SparkConf] ) diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index a70f67af2e..523d898207 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -23,9 +23,10 @@ import scala.reflect.ClassTag import com.esotericsoftware.kryo.Kryo import org.scalatest.FunSuite -import org.apache.spark.{SparkConf, SharedSparkContext} +import org.apache.spark.{SharedSparkContext, SparkConf} +import org.apache.spark.scheduler.HighlyCompressedMapStatus import org.apache.spark.serializer.KryoTest._ - +import org.apache.spark.storage.BlockManagerId class KryoSerializerSuite extends FunSuite with SharedSparkContext { conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") @@ -242,6 +243,24 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { ser.newInstance().deserialize[ClassLoaderTestingObject](bytes) } } + + test("registration of HighlyCompressedMapStatus") { + val conf = new SparkConf(false) + conf.set("spark.kryo.registrationRequired", "true") + + // these cases require knowing the internals of RoaringBitmap a little. Blocks span 2^16 + // values, and they use a bitmap (dense) if they have more than 4096 values, and an + // array (sparse) if they use less. So we just create two cases, one sparse and one dense. + // and we use a roaring bitmap for the empty blocks, so we trigger the dense case w/ mostly + // empty blocks + + val ser = new KryoSerializer(conf).newInstance() + val denseBlockSizes = new Array[Long](5000) + val sparseBlockSizes = Array[Long](0L, 1L, 0L, 2L) + Seq(denseBlockSizes, sparseBlockSizes).foreach { blockSizes => + ser.serialize(HighlyCompressedMapStatus(BlockManagerId("exec-1", "host", 1234), blockSizes)) + } + } } |