aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala44
1 files changed, 22 insertions, 22 deletions
diff --git a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala
index d331c210e8..dbc5e029e2 100644
--- a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala
+++ b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala
@@ -19,11 +19,15 @@ package org.apache.spark.sql.hive
import java.rmi.server.UID
import java.util.{Properties, ArrayList => JArrayList}
+import java.io.{OutputStream, InputStream}
import scala.collection.JavaConversions._
import scala.language.implicitConversions
+import scala.reflect.ClassTag
import com.esotericsoftware.kryo.Kryo
+import com.esotericsoftware.kryo.io.Input
+import com.esotericsoftware.kryo.io.Output
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hive.common.StatsSetupConst
@@ -46,6 +50,7 @@ import org.apache.hadoop.{io => hadoopIo}
import org.apache.spark.Logging
import org.apache.spark.sql.types.{Decimal, DecimalType, UTF8String}
+import org.apache.spark.util.Utils._
/**
* This class provides the UDF creation and also the UDF instance serialization and
@@ -61,39 +66,34 @@ private[hive] case class HiveFunctionWrapper(var functionClassName: String)
// for Serialization
def this() = this(null)
- import org.apache.spark.util.Utils._
-
@transient
- private val methodDeSerialize = {
- val method = classOf[Utilities].getDeclaredMethod(
- "deserializeObjectByKryo",
- classOf[Kryo],
- classOf[java.io.InputStream],
- classOf[Class[_]])
- method.setAccessible(true)
-
- method
+ def deserializeObjectByKryo[T: ClassTag](
+ kryo: Kryo,
+ in: InputStream,
+ clazz: Class[_]): T = {
+ val inp = new Input(in)
+ val t: T = kryo.readObject(inp,clazz).asInstanceOf[T]
+ inp.close()
+ t
}
@transient
- private val methodSerialize = {
- val method = classOf[Utilities].getDeclaredMethod(
- "serializeObjectByKryo",
- classOf[Kryo],
- classOf[Object],
- classOf[java.io.OutputStream])
- method.setAccessible(true)
-
- method
+ def serializeObjectByKryo(
+ kryo: Kryo,
+ plan: Object,
+ out: OutputStream ) {
+ val output: Output = new Output(out)
+ kryo.writeObject(output, plan)
+ output.close()
}
def deserializePlan[UDFType](is: java.io.InputStream, clazz: Class[_]): UDFType = {
- methodDeSerialize.invoke(null, Utilities.runtimeSerializationKryo.get(), is, clazz)
+ deserializeObjectByKryo(Utilities.runtimeSerializationKryo.get(), is, clazz)
.asInstanceOf[UDFType]
}
def serializePlan(function: AnyRef, out: java.io.OutputStream): Unit = {
- methodSerialize.invoke(null, Utilities.runtimeSerializationKryo.get(), function, out)
+ serializeObjectByKryo(Utilities.runtimeSerializationKryo.get(), function, out)
}
private var instance: AnyRef = null