aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorIan O Connell <ioconnell@twitter.com>2014-07-23 16:30:06 -0700
committerMichael Armbrust <michael@databricks.com>2014-07-23 16:30:11 -0700
commitefdaeb111917dd0314f1d00ee8524bed1e2e21ca (patch)
tree6f7b521030d1e3eec44b0b64964e88093c0281e1 /sql
parent1871574a240e6f28adeb6bc8accc98c851cafae5 (diff)
downloadspark-efdaeb111917dd0314f1d00ee8524bed1e2e21ca.tar.gz
spark-efdaeb111917dd0314f1d00ee8524bed1e2e21ca.tar.bz2
spark-efdaeb111917dd0314f1d00ee8524bed1e2e21ca.zip
[SPARK-2102][SQL][CORE] Add option for kryo registration required and use a resource pool in Spark SQL for Kryo instances.
Author: Ian O Connell <ioconnell@twitter.com> Closes #1377 from ianoc/feature/SPARK-2102 and squashes the following commits: 5498566 [Ian O Connell] Docs update suggested by Patrick 20e8555 [Ian O Connell] Slight style change f92c294 [Ian O Connell] Add docs for new KryoSerializer option f3735c8 [Ian O Connell] Add using a kryo resource pool for the SqlSerializer 4e5c342 [Ian O Connell] Register the SparkConf for kryo, it gets swept into serialization 665805a [Ian O Connell] Add a spark.kryo.registrationRequired option for configuring the Kryo Serializer
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala43
1 files changed, 31 insertions, 12 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
index 34b355e906..34654447a5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
@@ -24,10 +24,10 @@ import scala.reflect.ClassTag
import com.clearspring.analytics.stream.cardinality.HyperLogLog
import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{Serializer, Kryo}
-import com.twitter.chill.AllScalaRegistrar
+import com.twitter.chill.{AllScalaRegistrar, ResourcePool}
import org.apache.spark.{SparkEnv, SparkConf}
-import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.serializer.{SerializerInstance, KryoSerializer}
import org.apache.spark.util.MutablePair
import org.apache.spark.util.Utils
@@ -48,22 +48,41 @@ private[sql] class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(co
}
}
-private[sql] object SparkSqlSerializer {
- // TODO (lian) Using KryoSerializer here is workaround, needs further investigation
- // Using SparkSqlSerializer here makes BasicQuerySuite to fail because of Kryo serialization
- // related error.
- @transient lazy val ser: KryoSerializer = {
+private[execution] class KryoResourcePool(size: Int)
+ extends ResourcePool[SerializerInstance](size) {
+
+ val ser: KryoSerializer = {
val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf())
+ // TODO (lian) Using KryoSerializer here is workaround, needs further investigation
+ // Using SparkSqlSerializer here makes BasicQuerySuite to fail because of Kryo serialization
+ // related error.
new KryoSerializer(sparkConf)
}
- def serialize[T: ClassTag](o: T): Array[Byte] = {
- ser.newInstance().serialize(o).array()
- }
+ def newInstance() = ser.newInstance()
+}
- def deserialize[T: ClassTag](bytes: Array[Byte]): T = {
- ser.newInstance().deserialize[T](ByteBuffer.wrap(bytes))
+private[sql] object SparkSqlSerializer {
+ @transient lazy val resourcePool = new KryoResourcePool(30)
+
+ private[this] def acquireRelease[O](fn: SerializerInstance => O): O = {
+ val kryo = resourcePool.borrow
+ try {
+ fn(kryo)
+ } finally {
+ resourcePool.release(kryo)
+ }
}
+
+ def serialize[T: ClassTag](o: T): Array[Byte] =
+ acquireRelease { k =>
+ k.serialize(o).array()
+ }
+
+ def deserialize[T: ClassTag](bytes: Array[Byte]): T =
+ acquireRelease { k =>
+ k.deserialize[T](ByteBuffer.wrap(bytes))
+ }
}
private[sql] class BigDecimalSerializer extends Serializer[BigDecimal] {