aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2016-01-04 23:23:41 -0800
committerMichael Armbrust <michael@databricks.com>2016-01-04 23:23:41 -0800
commit53beddc5bf04a35ab73de99158919c2fdd5d4508 (patch)
tree76728d539391ed8d34cb971e71a77bcda8b8a158
parent7058dc115047258197f6c09eee404f1ccf41038d (diff)
downloadspark-53beddc5bf04a35ab73de99158919c2fdd5d4508.tar.gz
spark-53beddc5bf04a35ab73de99158919c2fdd5d4508.tar.bz2
spark-53beddc5bf04a35ab73de99158919c2fdd5d4508.zip
[SPARK-12568][SQL] Add BINARY to Encoders
Author: Michael Armbrust <michael@databricks.com> Closes #10516 from marmbrus/datasetCleanup.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala6
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala9
-rw-r--r--sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala6
3 files changed, 18 insertions, 3 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
index bb0fdc4c3d..22b7e1ea0c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Encoder.scala
@@ -158,6 +158,12 @@ object Encoders {
def TIMESTAMP: Encoder[java.sql.Timestamp] = ExpressionEncoder()
/**
+ * An encoder for arrays of bytes.
+ * @since 1.6.1
+ */
+ def BINARY: Encoder[Array[Byte]] = ExpressionEncoder()
+
+ /**
* Creates an encoder for Java Bean of type T.
*
* T must be publicly accessible.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
index ad4beda9c4..6c058463b9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
@@ -199,6 +199,15 @@ case class ExpressionEncoder[T](
private lazy val constructProjection = GenerateSafeProjection.generate(fromRowExpression :: Nil)
/**
+ * Returns this encoder where it has been bound to its own output (i.e. no remaping of columns
+ * is performed).
+ */
+ def defaultBinding: ExpressionEncoder[T] = {
+ val attrs = schema.toAttributes
+ resolve(attrs, OuterScopes.outerScopes).bind(attrs)
+ }
+
+ /**
* Returns an encoded version of `t` as a Spark SQL row. Note that multiple calls to
* toRow are allowed to return the same actual [[InternalRow]] object. Thus, the caller should
* copy the result before making another call if required.
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
index 666699e18d..3740dea8aa 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
@@ -77,6 +77,8 @@ class JavaSerializable(val value: Int) extends Serializable {
}
class ExpressionEncoderSuite extends SparkFunSuite {
+ OuterScopes.outerScopes.put(getClass.getName, this)
+
implicit def encoder[T : TypeTag]: ExpressionEncoder[T] = ExpressionEncoder()
// test flat encoders
@@ -278,8 +280,6 @@ class ExpressionEncoderSuite extends SparkFunSuite {
}
}
- private val outers: ConcurrentMap[String, AnyRef] = new MapMaker().weakValues().makeMap()
- outers.put(getClass.getName, this)
private def encodeDecodeTest[T : ExpressionEncoder](
input: T,
testName: String): Unit = {
@@ -287,7 +287,7 @@ class ExpressionEncoderSuite extends SparkFunSuite {
val encoder = implicitly[ExpressionEncoder[T]]
val row = encoder.toRow(input)
val schema = encoder.schema.toAttributes
- val boundEncoder = encoder.resolve(schema, outers).bind(schema)
+ val boundEncoder = encoder.defaultBinding
val convertedBack = try boundEncoder.fromRow(row) catch {
case e: Exception =>
fail(