aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorSela <ansela@paypal.com>2016-06-10 14:36:51 -0700
committerMichael Armbrust <michael@databricks.com>2016-06-10 14:36:51 -0700
commit127a6678d7af6b5164a115be7c64525bb80001fe (patch)
treef0c5fe53afa3d8f7388ec225da3a1f843be299e3 /sql/catalyst
parentaec502d9114ad8e18bfbbd63f38780e076d326d1 (diff)
downloadspark-127a6678d7af6b5164a115be7c64525bb80001fe.tar.gz
spark-127a6678d7af6b5164a115be7c64525bb80001fe.tar.bz2
spark-127a6678d7af6b5164a115be7c64525bb80001fe.zip
[SPARK-15489][SQL] Dataset kryo encoder won't load custom user settings
## What changes were proposed in this pull request? Serializer instantiation will consider existing SparkConf ## How was this patch tested? manual test with `ImmutableList` (Guava) and `kryo-serializers`'s `Immutable*Serializer` implementations. Added Test Suite. (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Author: Sela <ansela@paypal.com> Closes #13424 from amitsela/SPARK-15489.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala30
1 files changed, 21 insertions, 9 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index 87c8a2e54a..c597a2a709 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -22,7 +22,7 @@ import java.lang.reflect.Modifier
import scala.language.existentials
import scala.reflect.ClassTag
-import org.apache.spark.SparkConf
+import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.serializer._
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
@@ -547,11 +547,17 @@ case class EncodeUsingSerializer(child: Expression, kryo: Boolean)
(classOf[JavaSerializer].getName, classOf[JavaSerializerInstance].getName)
}
}
+ // try conf from env, otherwise create a new one
+ val env = s"${classOf[SparkEnv].getName}.get()"
val sparkConf = s"new ${classOf[SparkConf].getName}()"
- ctx.addMutableState(
- serializerInstanceClass,
- serializer,
- s"$serializer = ($serializerInstanceClass) new $serializerClass($sparkConf).newInstance();")
+ val serializerInit = s"""
+ if ($env == null) {
+ $serializer = ($serializerInstanceClass) new $serializerClass($sparkConf).newInstance();
+ } else {
+ $serializer = ($serializerInstanceClass) new $serializerClass($env.conf()).newInstance();
+ }
+ """
+ ctx.addMutableState(serializerInstanceClass, serializer, serializerInit)
// Code to serialize.
val input = child.genCode(ctx)
@@ -587,11 +593,17 @@ case class DecodeUsingSerializer[T](child: Expression, tag: ClassTag[T], kryo: B
(classOf[JavaSerializer].getName, classOf[JavaSerializerInstance].getName)
}
}
+ // try conf from env, otherwise create a new one
+ val env = s"${classOf[SparkEnv].getName}.get()"
val sparkConf = s"new ${classOf[SparkConf].getName}()"
- ctx.addMutableState(
- serializerInstanceClass,
- serializer,
- s"$serializer = ($serializerInstanceClass) new $serializerClass($sparkConf).newInstance();")
+ val serializerInit = s"""
+ if ($env == null) {
+ $serializer = ($serializerInstanceClass) new $serializerClass($sparkConf).newInstance();
+ } else {
+ $serializer = ($serializerInstanceClass) new $serializerClass($env.conf()).newInstance();
+ }
+ """
+ ctx.addMutableState(serializerInstanceClass, serializer, serializerInit)
// Code to deserialize.
val input = child.genCode(ctx)