diff options
author | Ian O Connell <ioconnell@twitter.com> | 2014-07-23 16:30:06 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-07-23 16:30:11 -0700 |
commit | efdaeb111917dd0314f1d00ee8524bed1e2e21ca (patch) | |
tree | 6f7b521030d1e3eec44b0b64964e88093c0281e1 /sql/core | |
parent | 1871574a240e6f28adeb6bc8accc98c851cafae5 (diff) | |
download | spark-efdaeb111917dd0314f1d00ee8524bed1e2e21ca.tar.gz spark-efdaeb111917dd0314f1d00ee8524bed1e2e21ca.tar.bz2 spark-efdaeb111917dd0314f1d00ee8524bed1e2e21ca.zip |
[SPARK-2102][SQL][CORE] Add option for kryo registration required and use a resource pool in Spark SQL for Kryo instances.
Author: Ian O Connell <ioconnell@twitter.com>
Closes #1377 from ianoc/feature/SPARK-2102 and squashes the following commits:
5498566 [Ian O Connell] Docs update suggested by Patrick
20e8555 [Ian O Connell] Slight style change
f92c294 [Ian O Connell] Add docs for new KryoSerializer option
f3735c8 [Ian O Connell] Add using a kryo resource pool for the SqlSerializer
4e5c342 [Ian O Connell] Register the SparkConf for kryo, it gets swept into serialization
665805a [Ian O Connell] Add a spark.kryo.registrationRequired option for configuring the Kryo Serializer
Diffstat (limited to 'sql/core')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala | 43 |
1 files changed, 31 insertions, 12 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala index 34b355e906..34654447a5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala @@ -24,10 +24,10 @@ import scala.reflect.ClassTag import com.clearspring.analytics.stream.cardinality.HyperLogLog import com.esotericsoftware.kryo.io.{Input, Output} import com.esotericsoftware.kryo.{Serializer, Kryo} -import com.twitter.chill.AllScalaRegistrar +import com.twitter.chill.{AllScalaRegistrar, ResourcePool} import org.apache.spark.{SparkEnv, SparkConf} -import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.serializer.{SerializerInstance, KryoSerializer} import org.apache.spark.util.MutablePair import org.apache.spark.util.Utils @@ -48,22 +48,41 @@ private[sql] class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(co } } -private[sql] object SparkSqlSerializer { - // TODO (lian) Using KryoSerializer here is workaround, needs further investigation - // Using SparkSqlSerializer here makes BasicQuerySuite to fail because of Kryo serialization - // related error. - @transient lazy val ser: KryoSerializer = { +private[execution] class KryoResourcePool(size: Int) + extends ResourcePool[SerializerInstance](size) { + + val ser: KryoSerializer = { val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf()) + // TODO (lian) Using KryoSerializer here is workaround, needs further investigation + // Using SparkSqlSerializer here makes BasicQuerySuite to fail because of Kryo serialization + // related error. new KryoSerializer(sparkConf) } - def serialize[T: ClassTag](o: T): Array[Byte] = { - ser.newInstance().serialize(o).array() - } + def newInstance() = ser.newInstance() +} - def deserialize[T: ClassTag](bytes: Array[Byte]): T = { - ser.newInstance().deserialize[T](ByteBuffer.wrap(bytes)) +private[sql] object SparkSqlSerializer { + @transient lazy val resourcePool = new KryoResourcePool(30) + + private[this] def acquireRelease[O](fn: SerializerInstance => O): O = { + val kryo = resourcePool.borrow + try { + fn(kryo) + } finally { + resourcePool.release(kryo) + } } + + def serialize[T: ClassTag](o: T): Array[Byte] = + acquireRelease { k => + k.serialize(o).array() + } + + def deserialize[T: ClassTag](bytes: Array[Byte]): T = + acquireRelease { k => + k.deserialize[T](ByteBuffer.wrap(bytes)) + } } private[sql] class BigDecimalSerializer extends Serializer[BigDecimal] { |