aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-05-03 09:43:47 -0700
committerDavies Liu <davies.liu@gmail.com>2016-05-03 09:43:47 -0700
commit5503e453ba00676925531f91f66c0108ac6b1fca (patch)
tree01daa98ed4d5ee7155cb7daba5b14373e077d18b /sql
parent8b6491fc0b49b4e363887ae4b452ba69fe0290d5 (diff)
downloadspark-5503e453ba00676925531f91f66c0108ac6b1fca.tar.gz
spark-5503e453ba00676925531f91f66c0108ac6b1fca.tar.bz2
spark-5503e453ba00676925531f91f66c0108ac6b1fca.zip
[SPARK-15088] [SQL] Remove SparkSqlSerializer
## What changes were proposed in this pull request? This patch removes SparkSqlSerializer. I believe this is now dead code. ## How was this patch tested? Removed a test case related to it. Author: Reynold Xin <rxin@databricks.com> Closes #12864 from rxin/SPARK-15088.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala108
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala10
2 files changed, 0 insertions, 118 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
deleted file mode 100644
index c590f7c6c3..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import java.nio.ByteBuffer
-import java.util.{HashMap => JavaHashMap}
-
-import scala.reflect.ClassTag
-
-import com.esotericsoftware.kryo.{Kryo, Serializer}
-import com.esotericsoftware.kryo.io.{Input, Output}
-import com.twitter.chill.ResourcePool
-
-import org.apache.spark.{SparkConf, SparkEnv}
-import org.apache.spark.network.util.JavaUtils
-import org.apache.spark.serializer.{KryoSerializer, SerializerInstance}
-import org.apache.spark.sql.types.Decimal
-import org.apache.spark.util.MutablePair
-
-private[sql] class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) {
- override def newKryo(): Kryo = {
- val kryo = super.newKryo()
- kryo.setRegistrationRequired(false)
- kryo.register(classOf[MutablePair[_, _]])
- kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericRow])
- kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericInternalRow])
- kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericMutableRow])
- kryo.register(classOf[java.math.BigDecimal], new JavaBigDecimalSerializer)
- kryo.register(classOf[BigDecimal], new ScalaBigDecimalSerializer)
-
- kryo.register(classOf[Decimal])
- kryo.register(classOf[JavaHashMap[_, _]])
-
- kryo.setReferences(false)
- kryo
- }
-}
-
-private[execution] class KryoResourcePool(size: Int)
- extends ResourcePool[SerializerInstance](size) {
-
- val ser: SparkSqlSerializer = {
- val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf())
- new SparkSqlSerializer(sparkConf)
- }
-
- def newInstance(): SerializerInstance = ser.newInstance()
-}
-
-private[sql] object SparkSqlSerializer {
- @transient lazy val resourcePool = new KryoResourcePool(30)
-
- private[this] def acquireRelease[O](fn: SerializerInstance => O): O = {
- val kryo = resourcePool.borrow
- try {
- fn(kryo)
- } finally {
- resourcePool.release(kryo)
- }
- }
-
- def serialize[T: ClassTag](o: T): Array[Byte] =
- acquireRelease { k =>
- JavaUtils.bufferToArray(k.serialize(o))
- }
-
- def deserialize[T: ClassTag](bytes: Array[Byte]): T =
- acquireRelease { k =>
- k.deserialize[T](ByteBuffer.wrap(bytes))
- }
-}
-
-private[sql] class JavaBigDecimalSerializer extends Serializer[java.math.BigDecimal] {
- def write(kryo: Kryo, output: Output, bd: java.math.BigDecimal) {
- // TODO: There are probably more efficient representations than strings...
- output.writeString(bd.toString)
- }
-
- def read(kryo: Kryo, input: Input, tpe: Class[java.math.BigDecimal]): java.math.BigDecimal = {
- new java.math.BigDecimal(input.readString())
- }
-}
-
-private[sql] class ScalaBigDecimalSerializer extends Serializer[BigDecimal] {
- def write(kryo: Kryo, output: Output, bd: BigDecimal) {
- // TODO: There are probably more efficient representations than strings...
- output.writeString(bd.toString)
- }
-
- def read(kryo: Kryo, input: Input, tpe: Class[BigDecimal]): BigDecimal = {
- new java.math.BigDecimal(input.readString())
- }
-}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala
index 4552eb6ce0..34936b38fb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/RowSuite.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, SpecificMutableRow}
-import org.apache.spark.sql.execution.SparkSqlSerializer
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@@ -55,15 +54,6 @@ class RowSuite extends SparkFunSuite with SharedSQLContext {
assert(row.isNullAt(0))
}
- test("serialize w/ kryo") {
- val row = Seq((1, Seq(1), Map(1 -> 1), BigDecimal(1))).toDF().first()
- val serializer = new SparkSqlSerializer(sparkContext.getConf)
- val instance = serializer.newInstance()
- val ser = instance.serialize(row)
- val de = instance.deserialize(ser).asInstanceOf[Row]
- assert(de === row)
- }
-
test("get values by field name on Row created via .toDF") {
val row = Seq((1, Seq(1))).toDF("a", "b").first()
assert(row.getAs[Int]("a") === 1)