diff options
author | hyukjinkwon <gurwls223@gmail.com> | 2016-02-21 19:11:03 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-02-21 19:11:03 -0800 |
commit | 819b0ea029d6ef51e661a59e7672480f57322cbb (patch) | |
tree | c454653494fd37f89cbc840250ee1039a55ba25e | |
parent | 55d6fdf22d1d6379180ac09f364c38982897d9ff (diff) | |
download | spark-819b0ea029d6ef51e661a59e7672480f57322cbb.tar.gz spark-819b0ea029d6ef51e661a59e7672480f57322cbb.tar.bz2 spark-819b0ea029d6ef51e661a59e7672480f57322cbb.zip |
[SPARK-13381][SQL] Support for loading CSV with a single function call
https://issues.apache.org/jira/browse/SPARK-13381
This PR adds the support to load CSV data directly by a single call with given paths.
Also, I corrected this to refer all paths rather than the first path in schema inference, which JSON datasource dose.
Several unitests were added for each functionality.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #11262 from HyukjinKwon/SPARK-13381.
3 files changed, 23 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 962fdadf14..20c861de23 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -345,6 +345,17 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { } /** + * Loads a CSV file and returns the result as a [[DataFrame]]. + * + * This function goes through the input once to determine the input schema. To avoid going + * through the entire data once, specify the schema explicitly using [[schema]]. + * + * @since 2.0.0 + */ + @scala.annotation.varargs + def csv(paths: String*): DataFrame = format("csv").load(paths : _*) + + /** * Loads a Parquet file, returning the result as a [[DataFrame]]. This function returns an empty * [[DataFrame]] if no paths are passed in. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala index 471ed0d560..da945c44cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala @@ -37,9 +37,9 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ -private[csv] class CSVRelation( +private[sql] class CSVRelation( private val inputRDD: Option[RDD[String]], - override val paths: Array[String], + override val paths: Array[String] = Array.empty[String], private val maybeDataSchema: Option[StructType], override val userDefinedPartitionColumns: Option[StructType], private val parameters: Map[String, String]) @@ -127,7 +127,7 @@ private[csv] class CSVRelation( } private def inferSchema(paths: Array[String]): StructType = { - val rdd = baseRdd(Array(paths.head)) + val rdd = baseRdd(paths) val firstLine = findFirstLine(rdd) val firstRow = new LineCsvReader(params).parseLine(firstLine) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 9d1f4569ad..7671bc1066 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -91,6 +91,15 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils { verifyCars(cars, withHeader = false, checkTypes = false) } + test("simple csv test with calling another function to load") { + val cars = sqlContext + .read + .option("header", "false") + .csv(testFile(carsFile)) + + verifyCars(cars, withHeader = false, checkTypes = false) + } + test("simple csv test with type inference") { val cars = sqlContext .read |