aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-02-04 19:53:57 -0800
committerReynold Xin <rxin@databricks.com>2015-02-04 19:53:57 -0800
commit84acd08e0886aa23195f35837c15c09aa7804aff (patch)
tree85ccf925a6a6123463afdfb15e9913c953704ea5 /sql/catalyst
parent206f9bc3622348926d73e43c8010519f7df9b34f (diff)
downloadspark-84acd08e0886aa23195f35837c15c09aa7804aff.tar.gz
spark-84acd08e0886aa23195f35837c15c09aa7804aff.tar.bz2
spark-84acd08e0886aa23195f35837c15c09aa7804aff.zip
[SPARK-5602][SQL] Better support for creating DataFrame from local data collection
1. Added methods to create DataFrames from Seq[Product] 2. Added executeTake to avoid running a Spark job on LocalRelations. Author: Reynold Xin <rxin@databricks.com> Closes #4372 from rxin/localDataFrame and squashes the following commits: f696858 [Reynold Xin] style checker. 839ef7f [Reynold Xin] [SPARK-5602][SQL] Better support for creating DataFrame from local data collection.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala (renamed from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TestRelation.scala)23
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataTypeConversions.scala16
3 files changed, 30 insertions, 11 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index 8e79e532ca..0445f3aa07 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -211,7 +211,7 @@ trait ScalaReflection {
*/
def asRelation: LocalRelation = {
val output = attributesFor[A]
- LocalRelation(output, data)
+ LocalRelation.fromProduct(output, data)
}
}
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TestRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
index d90af45b37..92bd057c6f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/TestRelation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
@@ -17,31 +17,34 @@
package org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.types.{StructType, StructField}
+import org.apache.spark.sql.types.{DataTypeConversions, StructType, StructField}
object LocalRelation {
def apply(output: Attribute*): LocalRelation = new LocalRelation(output)
- def apply(output1: StructField, output: StructField*): LocalRelation = new LocalRelation(
- StructType(output1 +: output).toAttributes
- )
+ def apply(output1: StructField, output: StructField*): LocalRelation = {
+ new LocalRelation(StructType(output1 +: output).toAttributes)
+ }
+
+ def fromProduct(output: Seq[Attribute], data: Seq[Product]): LocalRelation = {
+ val schema = StructType.fromAttributes(output)
+ LocalRelation(output, data.map(row => DataTypeConversions.productToRow(row, schema)))
+ }
}
-case class LocalRelation(output: Seq[Attribute], data: Seq[Product] = Nil)
+case class LocalRelation(output: Seq[Attribute], data: Seq[Row] = Nil)
extends LeafNode with analysis.MultiInstanceRelation {
- // TODO: Validate schema compliance.
- def loadData(newData: Seq[Product]) = new LocalRelation(output, data ++ newData)
-
/**
* Returns an identical copy of this relation with new exprIds for all attributes. Different
* attributes are required when a relation is going to be included multiple times in the same
* query.
*/
- override final def newInstance: this.type = {
- LocalRelation(output.map(_.newInstance), data).asInstanceOf[this.type]
+ override final def newInstance(): this.type = {
+ LocalRelation(output.map(_.newInstance()), data).asInstanceOf[this.type]
}
override protected def stringArgs = Iterator(output)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataTypeConversions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataTypeConversions.scala
index 21f478c80c..c243be07a9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataTypeConversions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataTypeConversions.scala
@@ -19,11 +19,27 @@ package org.apache.spark.sql.types
import java.text.SimpleDateFormat
+import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
protected[sql] object DataTypeConversions {
+ def productToRow(product: Product, schema: StructType): Row = {
+ val mutableRow = new GenericMutableRow(product.productArity)
+ val schemaFields = schema.fields.toArray
+
+ var i = 0
+ while (i < mutableRow.length) {
+ mutableRow(i) =
+ ScalaReflection.convertToCatalyst(product.productElement(i), schemaFields(i).dataType)
+ i += 1
+ }
+
+ mutableRow
+ }
+
def stringToTime(s: String): java.util.Date = {
if (!s.contains('T')) {
// JDBC escape string