aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2016-03-31 23:46:08 -0700
committerXiangrui Meng <meng@databricks.com>2016-03-31 23:46:08 -0700
commit1b070637fa03ab4966f76427b15e433050eaa956 (patch)
tree9c9fcf5c50e2306d36c829c9e35f13d5ae533b35 /sql
parent96941b12f8b465df21423275f3cd3ade579b4fa1 (diff)
downloadspark-1b070637fa03ab4966f76427b15e433050eaa956.tar.gz
spark-1b070637fa03ab4966f76427b15e433050eaa956.tar.bz2
spark-1b070637fa03ab4966f76427b15e433050eaa956.zip
[SPARK-14295][SPARK-14274][SQL] Implements buildReader() for LibSVM
## What changes were proposed in this pull request? This PR implements `FileFormat.buildReader()` for the LibSVM data source. Besides that, a new interface method `prepareRead()` is added to `FileFormat`: ```scala def prepareRead( sqlContext: SQLContext, options: Map[String, String], files: Seq[FileStatus]): Map[String, String] = options ``` After migrating from `buildInternalScan()` to `buildReader()`, we lost the opportunity to collect necessary global information, since `buildReader()` works in a per-partition manner. For example, LibSVM needs to infer the total number of features if the `numFeatures` data source option is not set. Any necessary collected global information should be returned using the data source options map. By default, this method just returns the original options untouched. An alternative approach is to absorb `inferSchema()` into `prepareRead()`, since schema inference is also some kind of global information gathering. However, this approach wasn't chosen because schema inference is optional, while `prepareRead()` must be called whenever a `HadoopFsRelation` based data source relation is instantiated. One unaddressed problem is that, when `numFeatures` is absent, now the input data will be scanned twice. The `buildInternalScan()` code path doesn't need to do this because it caches the raw parsed RDD in memory before computing the total number of features. However, with `FileScanRDD`, the raw parsed RDD is created in a different way (e.g. partitioning) from the final RDD. ## How was this patch tested? Tested using existing test suites. Author: Cheng Lian <lian@databricks.com> Closes #12088 from liancheng/spark-14295-libsvm-build-reader.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala1
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala9
3 files changed, 14 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index c66921f485..1850810270 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -299,6 +299,9 @@ case class DataSource(
"It must be specified manually")
}
+ val enrichedOptions =
+ format.prepareRead(sqlContext, caseInsensitiveOptions, fileCatalog.allFiles())
+
HadoopFsRelation(
sqlContext,
fileCatalog,
@@ -306,7 +309,7 @@ case class DataSource(
dataSchema = dataSchema.asNullable,
bucketSpec = bucketSpec,
format,
- options)
+ enrichedOptions)
case _ =>
throw new AnalysisException(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 554298772a..a143ac6aec 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -59,6 +59,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
if (files.fileFormat.toString == "TestFileFormat" ||
files.fileFormat.isInstanceOf[parquet.DefaultSource] ||
files.fileFormat.toString == "ORC" ||
+ files.fileFormat.toString == "LibSVM" ||
files.fileFormat.isInstanceOf[csv.DefaultSource] ||
files.fileFormat.isInstanceOf[text.DefaultSource] ||
files.fileFormat.isInstanceOf[json.DefaultSource]) &&
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 6b95a3d25b..e8834d052c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -439,6 +439,15 @@ trait FileFormat {
files: Seq[FileStatus]): Option[StructType]
/**
+ * Prepares a read job and returns a potentially updated data source option [[Map]]. This method
+ * can be useful for collecting necessary global information for scanning input data.
+ */
+ def prepareRead(
+ sqlContext: SQLContext,
+ options: Map[String, String],
+ files: Seq[FileStatus]): Map[String, String] = options
+
+ /**
* Prepares a write job and returns an [[OutputWriterFactory]]. Client side job preparation can
* be put here. For example, user defined output committer can be configured here
* by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.