aboutsummaryrefslogtreecommitdiff
path: root/mllib/src/main
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-04-11 22:59:42 -0700
committerYin Huai <yhuai@databricks.com>2016-04-11 22:59:42 -0700
commit678b96e77bf77a64b8df14b19db5a3bb18febfe3 (patch)
tree1052c9b1428b03620e088b0c0524df6191b94605 /mllib/src/main
parent52a801124f429ab133f9a3867c1da6ebd8fa7d4e (diff)
downloadspark-678b96e77bf77a64b8df14b19db5a3bb18febfe3.tar.gz
spark-678b96e77bf77a64b8df14b19db5a3bb18febfe3.tar.bz2
spark-678b96e77bf77a64b8df14b19db5a3bb18febfe3.zip
[SPARK-14535][SQL] Remove buildInternalScan from FileFormat
## What changes were proposed in this pull request? Now `HadoopFsRelation` with all kinds of file formats can be handled in `FileSourceStrategy`, we can remove the branches for `HadoopFsRelation` in `FileSourceStrategy` and the `buildInternalScan` API from `FileFormat`. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #12300 from cloud-fan/remove.
Diffstat (limited to 'mllib/src/main')
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala34
1 files changed, 1 insertions, 33 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
index 2e9b6be9a2..4737b6fe52 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/source/libsvm/LibSVMRelation.scala
@@ -178,39 +178,6 @@ class DefaultSource extends FileFormat with DataSourceRegister {
}
}
- override def buildInternalScan(
- sqlContext: SQLContext,
- dataSchema: StructType,
- requiredColumns: Array[String],
- filters: Array[Filter],
- bucketSet: Option[BitSet],
- inputFiles: Seq[FileStatus],
- broadcastedConf: Broadcast[SerializableConfiguration],
- options: Map[String, String]): RDD[InternalRow] = {
- // TODO: This does not handle cases where column pruning has been performed.
-
- verifySchema(dataSchema)
- val dataFiles = inputFiles.filterNot(_.getPath.getName startsWith "_")
-
- val path = if (dataFiles.length == 1) dataFiles.head.getPath.toUri.toString
- else if (dataFiles.isEmpty) throw new IOException("No input path specified for libsvm data")
- else throw new IOException("Multiple input paths are not supported for libsvm data.")
-
- val numFeatures = options.getOrElse("numFeatures", "-1").toInt
- val vectorType = options.getOrElse("vectorType", "sparse")
-
- val sc = sqlContext.sparkContext
- val baseRdd = MLUtils.loadLibSVMFile(sc, path, numFeatures)
- val sparse = vectorType == "sparse"
- baseRdd.map { pt =>
- val features = if (sparse) pt.features.toSparse else pt.features.toDense
- Row(pt.label, features)
- }.mapPartitions { externalRows =>
- val converter = RowEncoder(dataSchema)
- externalRows.map(converter.toRow)
- }
- }
-
override def buildReader(
sqlContext: SQLContext,
dataSchema: StructType,
@@ -218,6 +185,7 @@ class DefaultSource extends FileFormat with DataSourceRegister {
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = {
+ verifySchema(dataSchema)
val numFeatures = options("numFeatures").toInt
assert(numFeatures > 0)