diff options
author | Wenchen Fan <wenchen@databricks.com> | 2016-04-11 22:59:42 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2016-04-11 22:59:42 -0700 |
commit | 678b96e77bf77a64b8df14b19db5a3bb18febfe3 (patch) | |
tree | 1052c9b1428b03620e088b0c0524df6191b94605 /mllib/src/main | |
parent | 52a801124f429ab133f9a3867c1da6ebd8fa7d4e (diff) | |
download | spark-678b96e77bf77a64b8df14b19db5a3bb18febfe3.tar.gz spark-678b96e77bf77a64b8df14b19db5a3bb18febfe3.tar.bz2 spark-678b96e77bf77a64b8df14b19db5a3bb18febfe3.zip |
[SPARK-14535][SQL] Remove buildInternalScan from FileFormat
## What changes were proposed in this pull request?
Now `HadoopFsRelation` with all kinds of file formats can be handled in `FileSourceStrategy`, we can remove the branches for `HadoopFsRelation` in `FileSourceStrategy` and the `buildInternalScan` API from `FileFormat`.
## How was this patch tested?
existing tests.
Author: Wenchen Fan <wenchen@databricks.com>
Closes #12300 from cloud-fan/remove.
Diffstat (limited to 'mllib/src/main')
-rw-r--r-- | mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala | 34 |
1 files changed, 1 insertions, 33 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala index 2e9b6be9a2..4737b6fe52 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala @@ -178,39 +178,6 @@ class DefaultSource extends FileFormat with DataSourceRegister { } } - override def buildInternalScan( - sqlContext: SQLContext, - dataSchema: StructType, - requiredColumns: Array[String], - filters: Array[Filter], - bucketSet: Option[BitSet], - inputFiles: Seq[FileStatus], - broadcastedConf: Broadcast[SerializableConfiguration], - options: Map[String, String]): RDD[InternalRow] = { - // TODO: This does not handle cases where column pruning has been performed. - - verifySchema(dataSchema) - val dataFiles = inputFiles.filterNot(_.getPath.getName startsWith "_") - - val path = if (dataFiles.length == 1) dataFiles.head.getPath.toUri.toString - else if (dataFiles.isEmpty) throw new IOException("No input path specified for libsvm data") - else throw new IOException("Multiple input paths are not supported for libsvm data.") - - val numFeatures = options.getOrElse("numFeatures", "-1").toInt - val vectorType = options.getOrElse("vectorType", "sparse") - - val sc = sqlContext.sparkContext - val baseRdd = MLUtils.loadLibSVMFile(sc, path, numFeatures) - val sparse = vectorType == "sparse" - baseRdd.map { pt => - val features = if (sparse) pt.features.toSparse else pt.features.toDense - Row(pt.label, features) - }.mapPartitions { externalRows => - val converter = RowEncoder(dataSchema) - externalRows.map(converter.toRow) - } - } - override def buildReader( sqlContext: SQLContext, dataSchema: StructType, @@ -218,6 +185,7 @@ class DefaultSource extends FileFormat with DataSourceRegister { requiredSchema: StructType, filters: Seq[Filter], options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = { + verifySchema(dataSchema) val numFeatures = options("numFeatures").toInt assert(numFeatures > 0) |