aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorcafreeman <cfreeman@alteryx.com>2015-04-17 13:42:19 -0700
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2015-04-17 13:42:19 -0700
commit59e206deb7346148412bbf5ba4ab626718fadf18 (patch)
treecf4435a81197e76957c4afdcc48686a6e46dc5dc /sql
parenta83571acc938582865efb41645aa1e414f339e46 (diff)
downloadspark-59e206deb7346148412bbf5ba4ab626718fadf18.tar.gz
spark-59e206deb7346148412bbf5ba4ab626718fadf18.tar.bz2
spark-59e206deb7346148412bbf5ba4ab626718fadf18.zip
[SPARK-6807] [SparkR] Merge recent SparkR-pkg changes
This PR pulls in recent changes in SparkR-pkg, including cartesian, intersection, sampleByKey, subtract, subtractByKey, except, and some API for StructType and StructField. Author: cafreeman <cfreeman@alteryx.com> Author: Davies Liu <davies@databricks.com> Author: Zongheng Yang <zongheng.y@gmail.com> Author: Shivaram Venkataraman <shivaram.venkataraman@gmail.com> Author: Shivaram Venkataraman <shivaram@cs.berkeley.edu> Author: Sun Rui <rui.sun@intel.com> Closes #5436 from davies/R3 and squashes the following commits: c2b09be [Davies Liu] SQLTypes -> schema a5a02f2 [Davies Liu] Merge branch 'master' of github.com:apache/spark into R3 168b7fe [Davies Liu] sort generics b1fe460 [Davies Liu] fix conflict in README.md e74c04e [Davies Liu] fix schema.R 4f5ac09 [Davies Liu] Merge branch 'master' of github.com:apache/spark into R5 41f8184 [Davies Liu] rm man ae78312 [Davies Liu] Merge pull request #237 from sun-rui/SPARKR-154_3 1bdcb63 [Zongheng Yang] Updates to README.md. 5a553e7 [cafreeman] Use object attribute instead of argument 71372d9 [cafreeman] Update docs and examples 8526d2e71 [cafreeman] Remove `tojson` functions 6ef5f2d [cafreeman] Fix spacing 7741d66 [cafreeman] Rename the SQL DataType function 141efd8 [Shivaram Venkataraman] Merge pull request #245 from hqzizania/upstream 9387402 [Davies Liu] fix style 40199eb [Shivaram Venkataraman] Move except into sorted position 07d0dbc [Sun Rui] [SPARKR-244] Fix test failure after integration of subtract() and subtractByKey() for RDD. 7e8caa3 [Shivaram Venkataraman] Merge pull request #246 from hlin09/fixCombineByKey ed66c81 [cafreeman] Update `subtract` to work with `generics.R` f3ba785 [cafreeman] Fixed duplicate export 275deb4 [cafreeman] Update `NAMESPACE` and tests 1a3b63d [cafreeman] new version of `CreateDF` 836c4bf [cafreeman] Update `createDataFrame` and `toDF` be5d5c1 [cafreeman] refactor schema functions 40338a4 [Zongheng Yang] Merge pull request #244 from sun-rui/SPARKR-154_5 20b97a6 [Zongheng Yang] Merge pull request #234 from hqzizania/assist ba54e34 [Shivaram Venkataraman] Merge pull request #238 from sun-rui/SPARKR-154_4 c9497a3 [Shivaram Venkataraman] Merge pull request #208 from lythesia/master b317aa7 [Zongheng Yang] Merge pull request #243 from hqzizania/master 136a07e [Zongheng Yang] Merge pull request #242 from hqzizania/stats cd66603 [cafreeman] new line at EOF 8b76e81 [Shivaram Venkataraman] Merge pull request #233 from redbaron/fail-early-on-missing-dep 7dd81b7 [cafreeman] Documentation 0e2a94f [cafreeman] Define functions for schema and fields
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala32
1 files changed, 29 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
index d1ea7cc3e9..ae77f72998 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
@@ -23,7 +23,7 @@ import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.api.r.SerDe
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, NamedExpression}
-import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.types._
import org.apache.spark.sql.{Column, DataFrame, GroupedData, Row, SQLContext, SaveMode}
private[r] object SQLUtils {
@@ -39,8 +39,34 @@ private[r] object SQLUtils {
arr.toSeq
}
- def createDF(rdd: RDD[Array[Byte]], schemaString: String, sqlContext: SQLContext): DataFrame = {
- val schema = DataType.fromJson(schemaString).asInstanceOf[StructType]
+ def createStructType(fields : Seq[StructField]): StructType = {
+ StructType(fields)
+ }
+
+ def getSQLDataType(dataType: String): DataType = {
+ dataType match {
+ case "byte" => org.apache.spark.sql.types.ByteType
+ case "integer" => org.apache.spark.sql.types.IntegerType
+ case "double" => org.apache.spark.sql.types.DoubleType
+ case "numeric" => org.apache.spark.sql.types.DoubleType
+ case "character" => org.apache.spark.sql.types.StringType
+ case "string" => org.apache.spark.sql.types.StringType
+ case "binary" => org.apache.spark.sql.types.BinaryType
+ case "raw" => org.apache.spark.sql.types.BinaryType
+ case "logical" => org.apache.spark.sql.types.BooleanType
+ case "boolean" => org.apache.spark.sql.types.BooleanType
+ case "timestamp" => org.apache.spark.sql.types.TimestampType
+ case "date" => org.apache.spark.sql.types.DateType
+ case _ => throw new IllegalArgumentException(s"Invaid type $dataType")
+ }
+ }
+
+ def createStructField(name: String, dataType: String, nullable: Boolean): StructField = {
+ val dtObj = getSQLDataType(dataType)
+ StructField(name, dtObj, nullable)
+ }
+
+ def createDF(rdd: RDD[Array[Byte]], schema: StructType, sqlContext: SQLContext): DataFrame = {
val num = schema.fields.size
val rowRDD = rdd.map(bytesToRow)
sqlContext.createDataFrame(rowRDD, schema)