aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2016-02-21 19:11:03 -0800
committerReynold Xin <rxin@databricks.com>2016-02-21 19:11:03 -0800
commit819b0ea029d6ef51e661a59e7672480f57322cbb (patch)
treec454653494fd37f89cbc840250ee1039a55ba25e /sql
parent55d6fdf22d1d6379180ac09f364c38982897d9ff (diff)
downloadspark-819b0ea029d6ef51e661a59e7672480f57322cbb.tar.gz
spark-819b0ea029d6ef51e661a59e7672480f57322cbb.tar.bz2
spark-819b0ea029d6ef51e661a59e7672480f57322cbb.zip
[SPARK-13381][SQL] Support for loading CSV with a single function call
https://issues.apache.org/jira/browse/SPARK-13381 This PR adds the support to load CSV data directly by a single call with given paths. Also, I corrected this to refer all paths rather than the first path in schema inference, which JSON datasource dose. Several unitests were added for each functionality. Author: hyukjinkwon <gurwls223@gmail.com> Closes #11262 from HyukjinKwon/SPARK-13381.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala9
3 files changed, 23 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 962fdadf14..20c861de23 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -345,6 +345,17 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
}
/**
+ * Loads a CSV file and returns the result as a [[DataFrame]].
+ *
+ * This function goes through the input once to determine the input schema. To avoid going
+ * through the entire data once, specify the schema explicitly using [[schema]].
+ *
+ * @since 2.0.0
+ */
+ @scala.annotation.varargs
+ def csv(paths: String*): DataFrame = format("csv").load(paths : _*)
+
+ /**
* Loads a Parquet file, returning the result as a [[DataFrame]]. This function returns an empty
* [[DataFrame]] if no paths are passed in.
*
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
index 471ed0d560..da945c44cd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVRelation.scala
@@ -37,9 +37,9 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
-private[csv] class CSVRelation(
+private[sql] class CSVRelation(
private val inputRDD: Option[RDD[String]],
- override val paths: Array[String],
+ override val paths: Array[String] = Array.empty[String],
private val maybeDataSchema: Option[StructType],
override val userDefinedPartitionColumns: Option[StructType],
private val parameters: Map[String, String])
@@ -127,7 +127,7 @@ private[csv] class CSVRelation(
}
private def inferSchema(paths: Array[String]): StructType = {
- val rdd = baseRdd(Array(paths.head))
+ val rdd = baseRdd(paths)
val firstLine = findFirstLine(rdd)
val firstRow = new LineCsvReader(params).parseLine(firstLine)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
index 9d1f4569ad..7671bc1066 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala
@@ -91,6 +91,15 @@ class CSVSuite extends QueryTest with SharedSQLContext with SQLTestUtils {
verifyCars(cars, withHeader = false, checkTypes = false)
}
+ test("simple csv test with calling another function to load") {
+ val cars = sqlContext
+ .read
+ .option("header", "false")
+ .csv(testFile(carsFile))
+
+ verifyCars(cars, withHeader = false, checkTypes = false)
+ }
+
test("simple csv test with type inference") {
val cars = sqlContext
.read