aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-11-08 20:57:09 -0800
committerReynold Xin <rxin@databricks.com>2015-11-08 20:57:09 -0800
commit97b7080cf2d2846c7257f8926f775f27d457fe7d (patch)
tree28efd3ca15c2e96c0d4f0b5d08cabb9e602ef12e /sql/catalyst
parentb2d195e137fad88d567974659fa7023ff4da96cd (diff)
downloadspark-97b7080cf2d2846c7257f8926f775f27d457fe7d.tar.gz
spark-97b7080cf2d2846c7257f8926f775f27d457fe7d.tar.bz2
spark-97b7080cf2d2846c7257f8926f775f27d457fe7d.zip
[SPARK-11564][SQL] Dataset Java API audit
A few changes: 1. Removed fold, since it can be confusing for distributed collections. 2. Created specific interfaces for each Dataset function (e.g. MapFunction, ReduceFunction, MapPartitionsFunction) 3. Added more documentation and test cases. The other thing I'm considering doing is to have a "collector" interface for FlatMapFunction and MapPartitionsFunction, similar to MapReduce's map function. Author: Reynold Xin <rxin@databricks.com> Closes #9531 from rxin/SPARK-11564.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala38
1 files changed, 32 insertions, 6 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala
index f05e18288d..6569b900fe 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/Encoder.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.encoders
import scala.reflect.ClassTag
import org.apache.spark.util.Utils
-import org.apache.spark.sql.types.{DataType, ObjectType, StructField, StructType}
+import org.apache.spark.sql.types.{ObjectType, StructField, StructType}
import org.apache.spark.sql.catalyst.expressions._
/**
@@ -100,7 +100,7 @@ object Encoder {
expr.transformUp {
case BoundReference(0, t: ObjectType, _) =>
Invoke(
- BoundReference(0, ObjectType(cls), true),
+ BoundReference(0, ObjectType(cls), nullable = true),
s"_${index + 1}",
t)
}
@@ -114,13 +114,13 @@ object Encoder {
} else {
enc.constructExpression.transformUp {
case BoundReference(ordinal, dt, _) =>
- GetInternalRowField(BoundReference(index, enc.schema, true), ordinal, dt)
+ GetInternalRowField(BoundReference(index, enc.schema, nullable = true), ordinal, dt)
}
}
}
val constructExpression =
- NewInstance(cls, constructExpressions, false, ObjectType(cls))
+ NewInstance(cls, constructExpressions, propagateNull = false, ObjectType(cls))
new ExpressionEncoder[Any](
schema,
@@ -130,7 +130,6 @@ object Encoder {
ClassTag.apply(cls))
}
-
def typeTagOfTuple2[T1 : TypeTag, T2 : TypeTag]: TypeTag[(T1, T2)] = typeTag[(T1, T2)]
private def getTypeTag[T](c: Class[T]): TypeTag[T] = {
@@ -148,9 +147,36 @@ object Encoder {
})
}
- def forTuple2[T1, T2](c1: Class[T1], c2: Class[T2]): Encoder[(T1, T2)] = {
+ def forTuple[T1, T2](c1: Class[T1], c2: Class[T2]): Encoder[(T1, T2)] = {
implicit val typeTag1 = getTypeTag(c1)
implicit val typeTag2 = getTypeTag(c2)
ExpressionEncoder[(T1, T2)]()
}
+
+ def forTuple[T1, T2, T3](c1: Class[T1], c2: Class[T2], c3: Class[T3]): Encoder[(T1, T2, T3)] = {
+ implicit val typeTag1 = getTypeTag(c1)
+ implicit val typeTag2 = getTypeTag(c2)
+ implicit val typeTag3 = getTypeTag(c3)
+ ExpressionEncoder[(T1, T2, T3)]()
+ }
+
+ def forTuple[T1, T2, T3, T4](
+ c1: Class[T1], c2: Class[T2], c3: Class[T3], c4: Class[T4]): Encoder[(T1, T2, T3, T4)] = {
+ implicit val typeTag1 = getTypeTag(c1)
+ implicit val typeTag2 = getTypeTag(c2)
+ implicit val typeTag3 = getTypeTag(c3)
+ implicit val typeTag4 = getTypeTag(c4)
+ ExpressionEncoder[(T1, T2, T3, T4)]()
+ }
+
+ def forTuple[T1, T2, T3, T4, T5](
+ c1: Class[T1], c2: Class[T2], c3: Class[T3], c4: Class[T4], c5: Class[T5])
+ : Encoder[(T1, T2, T3, T4, T5)] = {
+ implicit val typeTag1 = getTypeTag(c1)
+ implicit val typeTag2 = getTypeTag(c2)
+ implicit val typeTag3 = getTypeTag(c3)
+ implicit val typeTag4 = getTypeTag(c4)
+ implicit val typeTag5 = getTypeTag(c5)
+ ExpressionEncoder[(T1, T2, T3, T4, T5)]()
+ }
}