From 7eefc9d2b3f6ebc0ecb5562da7323f1e06afbb35 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Sat, 10 May 2014 12:10:24 -0700 Subject: SPARK-1708. Add a ClassTag on Serializer and things that depend on it This pull request contains a rebased patch from @heathermiller (https://github.com/heathermiller/spark/pull/1) to add ClassTags on Serializer and types that depend on it (Broadcast and AccumulableCollection). Putting these in the public API signatures now will allow us to use Scala Pickling for serialization down the line without breaking binary compatibility. One question remaining is whether we also want them on Accumulator -- Accumulator is passed as part of a bigger Task or TaskResult object via the closure serializer so it doesn't seem super useful to add the ClassTag there. Broadcast and AccumulableCollection in contrast were being serialized directly. CC @rxin, @pwendell, @heathermiller Author: Matei Zaharia Closes #700 from mateiz/spark-1708 and squashes the following commits: 1a3d8b0 [Matei Zaharia] Use fake ClassTag in Java 3b449ed [Matei Zaharia] test fix 2209a27 [Matei Zaharia] Code style fixes 9d48830 [Matei Zaharia] Add a ClassTag on Serializer and things that depend on it --- .../scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'sql/core') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala index 5067c14ddf..1c6e29b3cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution import java.nio.ByteBuffer +import scala.reflect.ClassTag + import com.esotericsoftware.kryo.io.{Input, Output} import com.esotericsoftware.kryo.{Serializer, Kryo} @@ -59,11 +61,11 @@ private[sql] object SparkSqlSerializer { new KryoSerializer(sparkConf) } - def serialize[T](o: T): Array[Byte] = { + def serialize[T: ClassTag](o: T): Array[Byte] = { ser.newInstance().serialize(o).array() } - def deserialize[T](bytes: Array[Byte]): T = { + def deserialize[T: ClassTag](bytes: Array[Byte]): T = { ser.newInstance().deserialize[T](ByteBuffer.wrap(bytes)) } } -- cgit v1.2.3