diff options
author | cafreeman <cfreeman@alteryx.com> | 2015-04-17 13:42:19 -0700 |
---|---|---|
committer | Shivaram Venkataraman <shivaram@cs.berkeley.edu> | 2015-04-17 13:42:19 -0700 |
commit | 59e206deb7346148412bbf5ba4ab626718fadf18 (patch) | |
tree | cf4435a81197e76957c4afdcc48686a6e46dc5dc /sql | |
parent | a83571acc938582865efb41645aa1e414f339e46 (diff) | |
download | spark-59e206deb7346148412bbf5ba4ab626718fadf18.tar.gz spark-59e206deb7346148412bbf5ba4ab626718fadf18.tar.bz2 spark-59e206deb7346148412bbf5ba4ab626718fadf18.zip |
[SPARK-6807] [SparkR] Merge recent SparkR-pkg changes
This PR pulls in recent changes in SparkR-pkg, including
cartesian, intersection, sampleByKey, subtract, subtractByKey, except, and some API for StructType and StructField.
Author: cafreeman <cfreeman@alteryx.com>
Author: Davies Liu <davies@databricks.com>
Author: Zongheng Yang <zongheng.y@gmail.com>
Author: Shivaram Venkataraman <shivaram.venkataraman@gmail.com>
Author: Shivaram Venkataraman <shivaram@cs.berkeley.edu>
Author: Sun Rui <rui.sun@intel.com>
Closes #5436 from davies/R3 and squashes the following commits:
c2b09be [Davies Liu] SQLTypes -> schema
a5a02f2 [Davies Liu] Merge branch 'master' of github.com:apache/spark into R3
168b7fe [Davies Liu] sort generics
b1fe460 [Davies Liu] fix conflict in README.md
e74c04e [Davies Liu] fix schema.R
4f5ac09 [Davies Liu] Merge branch 'master' of github.com:apache/spark into R5
41f8184 [Davies Liu] rm man
ae78312 [Davies Liu] Merge pull request #237 from sun-rui/SPARKR-154_3
1bdcb63 [Zongheng Yang] Updates to README.md.
5a553e7 [cafreeman] Use object attribute instead of argument
71372d9 [cafreeman] Update docs and examples
8526d2e71 [cafreeman] Remove `tojson` functions
6ef5f2d [cafreeman] Fix spacing
7741d66 [cafreeman] Rename the SQL DataType function
141efd8 [Shivaram Venkataraman] Merge pull request #245 from hqzizania/upstream
9387402 [Davies Liu] fix style
40199eb [Shivaram Venkataraman] Move except into sorted position
07d0dbc [Sun Rui] [SPARKR-244] Fix test failure after integration of subtract() and subtractByKey() for RDD.
7e8caa3 [Shivaram Venkataraman] Merge pull request #246 from hlin09/fixCombineByKey
ed66c81 [cafreeman] Update `subtract` to work with `generics.R`
f3ba785 [cafreeman] Fixed duplicate export
275deb4 [cafreeman] Update `NAMESPACE` and tests
1a3b63d [cafreeman] new version of `CreateDF`
836c4bf [cafreeman] Update `createDataFrame` and `toDF`
be5d5c1 [cafreeman] refactor schema functions
40338a4 [Zongheng Yang] Merge pull request #244 from sun-rui/SPARKR-154_5
20b97a6 [Zongheng Yang] Merge pull request #234 from hqzizania/assist
ba54e34 [Shivaram Venkataraman] Merge pull request #238 from sun-rui/SPARKR-154_4
c9497a3 [Shivaram Venkataraman] Merge pull request #208 from lythesia/master
b317aa7 [Zongheng Yang] Merge pull request #243 from hqzizania/master
136a07e [Zongheng Yang] Merge pull request #242 from hqzizania/stats
cd66603 [cafreeman] new line at EOF
8b76e81 [Shivaram Venkataraman] Merge pull request #233 from redbaron/fail-early-on-missing-dep
7dd81b7 [cafreeman] Documentation
0e2a94f [cafreeman] Define functions for schema and fields
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala | 32 |
1 files changed, 29 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala index d1ea7cc3e9..ae77f72998 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala @@ -23,7 +23,7 @@ import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.api.r.SerDe import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, NamedExpression} -import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.types._ import org.apache.spark.sql.{Column, DataFrame, GroupedData, Row, SQLContext, SaveMode} private[r] object SQLUtils { @@ -39,8 +39,34 @@ private[r] object SQLUtils { arr.toSeq } - def createDF(rdd: RDD[Array[Byte]], schemaString: String, sqlContext: SQLContext): DataFrame = { - val schema = DataType.fromJson(schemaString).asInstanceOf[StructType] + def createStructType(fields : Seq[StructField]): StructType = { + StructType(fields) + } + + def getSQLDataType(dataType: String): DataType = { + dataType match { + case "byte" => org.apache.spark.sql.types.ByteType + case "integer" => org.apache.spark.sql.types.IntegerType + case "double" => org.apache.spark.sql.types.DoubleType + case "numeric" => org.apache.spark.sql.types.DoubleType + case "character" => org.apache.spark.sql.types.StringType + case "string" => org.apache.spark.sql.types.StringType + case "binary" => org.apache.spark.sql.types.BinaryType + case "raw" => org.apache.spark.sql.types.BinaryType + case "logical" => org.apache.spark.sql.types.BooleanType + case "boolean" => org.apache.spark.sql.types.BooleanType + case "timestamp" => org.apache.spark.sql.types.TimestampType + case "date" => org.apache.spark.sql.types.DateType + case _ => throw new IllegalArgumentException(s"Invaid type $dataType") + } + } + + def createStructField(name: String, dataType: String, nullable: Boolean): StructField = { + val dtObj = getSQLDataType(dataType) + StructField(name, dtObj, nullable) + } + + def createDF(rdd: RDD[Array[Byte]], schema: StructType, sqlContext: SQLContext): DataFrame = { val num = schema.fields.size val rowRDD = rdd.map(bytesToRow) sqlContext.createDataFrame(rowRDD, schema) |