diff options
author | Sela <ansela@paypal.com> | 2016-06-10 14:36:51 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2016-06-10 14:36:51 -0700 |
commit | 127a6678d7af6b5164a115be7c64525bb80001fe (patch) | |
tree | f0c5fe53afa3d8f7388ec225da3a1f843be299e3 /sql/catalyst | |
parent | aec502d9114ad8e18bfbbd63f38780e076d326d1 (diff) | |
download | spark-127a6678d7af6b5164a115be7c64525bb80001fe.tar.gz spark-127a6678d7af6b5164a115be7c64525bb80001fe.tar.bz2 spark-127a6678d7af6b5164a115be7c64525bb80001fe.zip |
[SPARK-15489][SQL] Dataset kryo encoder won't load custom user settings
## What changes were proposed in this pull request?
Serializer instantiation will consider existing SparkConf
## How was this patch tested?
manual test with `ImmutableList` (Guava) and `kryo-serializers`'s `Immutable*Serializer` implementations.
Added Test Suite.
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)
Author: Sela <ansela@paypal.com>
Closes #13424 from amitsela/SPARK-15489.
Diffstat (limited to 'sql/catalyst')
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala | 30 |
1 files changed, 21 insertions, 9 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala index 87c8a2e54a..c597a2a709 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala @@ -22,7 +22,7 @@ import java.lang.reflect.Modifier import scala.language.existentials import scala.reflect.ClassTag -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, SparkEnv} import org.apache.spark.serializer._ import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.InternalRow @@ -547,11 +547,17 @@ case class EncodeUsingSerializer(child: Expression, kryo: Boolean) (classOf[JavaSerializer].getName, classOf[JavaSerializerInstance].getName) } } + // try conf from env, otherwise create a new one + val env = s"${classOf[SparkEnv].getName}.get()" val sparkConf = s"new ${classOf[SparkConf].getName}()" - ctx.addMutableState( - serializerInstanceClass, - serializer, - s"$serializer = ($serializerInstanceClass) new $serializerClass($sparkConf).newInstance();") + val serializerInit = s""" + if ($env == null) { + $serializer = ($serializerInstanceClass) new $serializerClass($sparkConf).newInstance(); + } else { + $serializer = ($serializerInstanceClass) new $serializerClass($env.conf()).newInstance(); + } + """ + ctx.addMutableState(serializerInstanceClass, serializer, serializerInit) // Code to serialize. val input = child.genCode(ctx) @@ -587,11 +593,17 @@ case class DecodeUsingSerializer[T](child: Expression, tag: ClassTag[T], kryo: B (classOf[JavaSerializer].getName, classOf[JavaSerializerInstance].getName) } } + // try conf from env, otherwise create a new one + val env = s"${classOf[SparkEnv].getName}.get()" val sparkConf = s"new ${classOf[SparkConf].getName}()" - ctx.addMutableState( - serializerInstanceClass, - serializer, - s"$serializer = ($serializerInstanceClass) new $serializerClass($sparkConf).newInstance();") + val serializerInit = s""" + if ($env == null) { + $serializer = ($serializerInstanceClass) new $serializerClass($sparkConf).newInstance(); + } else { + $serializer = ($serializerInstanceClass) new $serializerClass($env.conf()).newInstance(); + } + """ + ctx.addMutableState(serializerInstanceClass, serializer, serializerInit) // Code to deserialize. val input = child.genCode(ctx) |