aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKent Yao <yaooqinn@hotmail.com>2015-11-17 19:44:29 -0800
committerDavies Liu <davies.liu@gmail.com>2015-11-17 19:44:29 -0800
commite33053ee0015025bbcfddb20cc9216c225bbe624 (patch)
tree0d87a481ddd84558b0e70987f32a6d514e3c9feb
parentbf25f9bdfc7bd8533890c7df1b35afa912dc6d3d (diff)
downloadspark-e33053ee0015025bbcfddb20cc9216c225bbe624.tar.gz
spark-e33053ee0015025bbcfddb20cc9216c225bbe624.tar.bz2
spark-e33053ee0015025bbcfddb20cc9216c225bbe624.zip
[SPARK-11583] [CORE] MapStatus Using RoaringBitmap More Properly
This PR upgrade the version of RoaringBitmap to 0.5.10, to optimize the memory layout, will be much smaller when most of blocks are empty. This PR is based on #9661 (fix conflicts), see all of the comments at https://github.com/apache/spark/pull/9661 . Author: Kent Yao <yaooqinn@hotmail.com> Author: Davies Liu <davies@databricks.com> Author: Charles Allen <charles@allen-net.com> Closes #9746 from davies/roaring_mapstatus.
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala31
-rw-r--r--pom.xml2
4 files changed, 38 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
index 1efce124c0..b2e9a97129 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -122,8 +122,7 @@ private[spark] class CompressedMapStatus(
/**
* A [[MapStatus]] implementation that only stores the average size of non-empty blocks,
- * plus a bitmap for tracking which blocks are empty. During serialization, this bitmap
- * is compressed.
+ * plus a bitmap for tracking which blocks are empty.
*
* @param loc location where the task is being executed
* @param numNonEmptyBlocks the number of non-empty blocks
@@ -194,6 +193,8 @@ private[spark] object HighlyCompressedMapStatus {
} else {
0
}
+ emptyBlocks.trim()
+ emptyBlocks.runOptimize()
new HighlyCompressedMapStatus(loc, numNonEmptyBlocks, emptyBlocks, avgSize)
}
}
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 1bcb3175a3..d5ba690ed0 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -17,7 +17,7 @@
package org.apache.spark.serializer
-import java.io.{EOFException, IOException, InputStream, OutputStream, DataInput, DataOutput}
+import java.io.{DataInput, DataOutput, EOFException, IOException, InputStream, OutputStream}
import java.nio.ByteBuffer
import javax.annotation.Nullable
@@ -25,9 +25,9 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
-import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => KryoClassSerializer}
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
+import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => KryoClassSerializer}
import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator}
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.roaringbitmap.RoaringBitmap
@@ -38,8 +38,8 @@ import org.apache.spark.broadcast.HttpBroadcast
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
import org.apache.spark.storage._
-import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf}
import org.apache.spark.util.collection.CompactBuffer
+import org.apache.spark.util.{BoundedPriorityQueue, SerializableConfiguration, SerializableJobConf, Utils}
/**
* A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]].
diff --git a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
index b8e466fab4..15c8de61b8 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
@@ -21,6 +21,7 @@ import org.apache.spark.storage.BlockManagerId
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.serializer.JavaSerializer
+import org.roaringbitmap.RoaringBitmap
import scala.util.Random
@@ -97,4 +98,34 @@ class MapStatusSuite extends SparkFunSuite {
val buf = ser.newInstance().serialize(status)
ser.newInstance().deserialize[MapStatus](buf)
}
+
+ test("RoaringBitmap: runOptimize succeeded") {
+ val r = new RoaringBitmap
+ (1 to 200000).foreach(i =>
+ if (i % 200 != 0) {
+ r.add(i)
+ }
+ )
+ val size1 = r.getSizeInBytes
+ val success = r.runOptimize()
+ r.trim()
+ val size2 = r.getSizeInBytes
+ assert(size1 > size2)
+ assert(success)
+ }
+
+ test("RoaringBitmap: runOptimize failed") {
+ val r = new RoaringBitmap
+ (1 to 200000).foreach(i =>
+ if (i % 200 == 0) {
+ r.add(i)
+ }
+ )
+ val size1 = r.getSizeInBytes
+ val success = r.runOptimize()
+ r.trim()
+ val size2 = r.getSizeInBytes
+ assert(size1 === size2)
+ assert(!success)
+ }
}
diff --git a/pom.xml b/pom.xml
index 2a8a445057..940e2d8740 100644
--- a/pom.xml
+++ b/pom.xml
@@ -637,7 +637,7 @@
<dependency>
<groupId>org.roaringbitmap</groupId>
<artifactId>RoaringBitmap</artifactId>
- <version>0.4.5</version>
+ <version>0.5.11</version>
</dependency>
<dependency>
<groupId>commons-net</groupId>