aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org
diff options
context:
space:
mode:
authorpj.fanning <pj.fanning@workday.com>2017-02-22 18:03:25 -0800
committerWenchen Fan <wenchen@databricks.com>2017-02-22 18:03:25 -0800
commitd3147502e7837d81e27193164b3513bb69fa3797 (patch)
tree511ef8f2e5c45228deece764e357edd20c02f40f /sql/core/src/main/scala/org
parentdc005ed53c87216efff50268009217ba26e34a10 (diff)
downloadspark-d3147502e7837d81e27193164b3513bb69fa3797.tar.gz
spark-d3147502e7837d81e27193164b3513bb69fa3797.tar.bz2
spark-d3147502e7837d81e27193164b3513bb69fa3797.zip
[SPARK-15615][SQL] Add an API to load DataFrame from Dataset[String] storing JSON
## What changes were proposed in this pull request? SPARK-15615 proposes replacing the sqlContext.read.json(rdd) with a dataset equivalent. SPARK-15463 adds a CSV API for reading from Dataset[String] so this keeps the API consistent. I am deprecating the existing RDD based APIs. ## How was this patch tested? There are existing tests. I left most tests to use the existing APIs as they delegate to the new json API. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: pj.fanning <pj.fanning@workday.com> Author: PJ Fanning <pjfanning@users.noreply.github.com> Closes #16895 from pjfanning/SPARK-15615.
Diffstat (limited to 'sql/core/src/main/scala/org')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala20
1 files changed, 18 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index cb9493a575..4c1341ed5d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -323,6 +323,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* @param jsonRDD input RDD with one JSON object per record
* @since 1.4.0
*/
+ @deprecated("Use json(Dataset[String]) instead.", "2.2.0")
def json(jsonRDD: JavaRDD[String]): DataFrame = json(jsonRDD.rdd)
/**
@@ -335,7 +336,22 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* @param jsonRDD input RDD with one JSON object per record
* @since 1.4.0
*/
+ @deprecated("Use json(Dataset[String]) instead.", "2.2.0")
def json(jsonRDD: RDD[String]): DataFrame = {
+ json(sparkSession.createDataset(jsonRDD)(Encoders.STRING))
+ }
+
+ /**
+ * Loads a `Dataset[String]` storing JSON objects (<a href="http://jsonlines.org/">JSON Lines
+ * text format or newline-delimited JSON</a>) and returns the result as a `DataFrame`.
+ *
+ * Unless the schema is specified using `schema` function, this function goes through the
+ * input once to determine the input schema.
+ *
+ * @param jsonDataset input Dataset with one JSON object per record
+ * @since 2.2.0
+ */
+ def json(jsonDataset: Dataset[String]): DataFrame = {
val parsedOptions = new JSONOptions(
extraOptions.toMap,
sparkSession.sessionState.conf.sessionLocalTimeZone,
@@ -344,12 +360,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
val schema = userSpecifiedSchema.getOrElse {
JsonInferSchema.infer(
- jsonRDD,
+ jsonDataset.rdd,
parsedOptions,
createParser)
}
- val parsed = jsonRDD.mapPartitions { iter =>
+ val parsed = jsonDataset.rdd.mapPartitions { iter =>
val parser = new JacksonParser(schema, parsedOptions)
iter.flatMap(parser.parse(_, createParser, UTF8String.fromString))
}