aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2016-04-14 15:43:44 +0800
committerWenchen Fan <wenchen@databricks.com>2016-04-14 15:43:44 +0800
commitb4819404a65f9b97c1f8deb1fcb8419969831574 (patch)
tree3e8fa19af63386bf700a3890f924600ee2b3e9a1 /sql
parent62b7f306fbf77de7f6cbb36181ebebdb4a55acc5 (diff)
downloadspark-b4819404a65f9b97c1f8deb1fcb8419969831574.tar.gz
spark-b4819404a65f9b97c1f8deb1fcb8419969831574.tar.bz2
spark-b4819404a65f9b97c1f8deb1fcb8419969831574.zip
[SPARK-14596][SQL] Remove not used SqlNewHadoopRDD and some more unused imports
## What changes were proposed in this pull request? Old `HadoopFsRelation` API includes `buildInternalScan()` which uses `SqlNewHadoopRDD` in `ParquetRelation`. Because now the old API is removed, `SqlNewHadoopRDD` is not used anymore. So, this PR removes `SqlNewHadoopRDD` and several unused imports. This was discussed in https://github.com/apache/spark/pull/12326. ## How was this patch tested? Several related existing unit tests and `sbt scalastyle`. Author: hyukjinkwon <gurwls223@gmail.com> Closes #12354 from HyukjinKwon/SPARK-14596.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InputFileName.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala282
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala1
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala5
5 files changed, 10 insertions, 297 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InputFileName.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InputFileName.scala
index dbd0acf06c..2ed6fc0d38 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InputFileName.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/InputFileName.scala
@@ -17,14 +17,14 @@
package org.apache.spark.sql.catalyst.expressions
-import org.apache.spark.rdd.SqlNewHadoopRDDState
+import org.apache.spark.rdd.InputFileNameHolder
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.types.{DataType, StringType}
import org.apache.spark.unsafe.types.UTF8String
/**
- * Expression that returns the name of the current file being read in using [[SqlNewHadoopRDD]]
+ * Expression that returns the name of the current file being read.
*/
@ExpressionDescription(
usage = "_FUNC_() - Returns the name of the current file being read if available",
@@ -40,12 +40,12 @@ case class InputFileName() extends LeafExpression with Nondeterministic {
override protected def initInternal(): Unit = {}
override protected def evalInternal(input: InternalRow): UTF8String = {
- SqlNewHadoopRDDState.getInputFileName()
+ InputFileNameHolder.getInputFileName()
}
override def genCode(ctx: CodegenContext, ev: ExprCode): String = {
ev.isNull = "false"
s"final ${ctx.javaType(dataType)} ${ev.value} = " +
- "org.apache.spark.rdd.SqlNewHadoopRDDState.getInputFileName();"
+ "org.apache.spark.rdd.InputFileNameHolder.getInputFileName();"
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 988c785dbe..468e101fed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.datasources
import org.apache.spark.{Partition, TaskContext}
-import org.apache.spark.rdd.{RDD, SqlNewHadoopRDDState}
+import org.apache.spark.rdd.{InputFileNameHolder, RDD}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
@@ -37,7 +37,6 @@ case class PartitionedFile(
}
}
-
/**
* A collection of files that should be read as a single task possibly from multiple partitioned
* directories.
@@ -50,7 +49,7 @@ class FileScanRDD(
@transient val sqlContext: SQLContext,
readFunction: (PartitionedFile) => Iterator[InternalRow],
@transient val filePartitions: Seq[FilePartition])
- extends RDD[InternalRow](sqlContext.sparkContext, Nil) {
+ extends RDD[InternalRow](sqlContext.sparkContext, Nil) {
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val iterator = new Iterator[Object] with AutoCloseable {
@@ -65,17 +64,17 @@ class FileScanRDD(
if (files.hasNext) {
val nextFile = files.next()
logInfo(s"Reading File $nextFile")
- SqlNewHadoopRDDState.setInputFileName(nextFile.filePath)
+ InputFileNameHolder.setInputFileName(nextFile.filePath)
currentIterator = readFunction(nextFile)
hasNext
} else {
- SqlNewHadoopRDDState.unsetInputFileName()
+ InputFileNameHolder.unsetInputFileName()
false
}
}
override def close() = {
- SqlNewHadoopRDDState.unsetInputFileName()
+ InputFileNameHolder.unsetInputFileName()
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
deleted file mode 100644
index 4d6864d8ba..0000000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
+++ /dev/null
@@ -1,282 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rdd
-
-import java.text.SimpleDateFormat
-import java.util.Date
-
-import scala.reflect.ClassTag
-
-import org.apache.hadoop.conf.{Configurable, Configuration}
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapreduce._
-import org.apache.hadoop.mapreduce.lib.input.{CombineFileSplit, FileSplit}
-import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl}
-
-import org.apache.spark.{Partition => SparkPartition, _}
-import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.executor.DataReadMethod
-import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager}
-
-private[spark] class SqlNewHadoopPartition(
- rddId: Int,
- val index: Int,
- rawSplit: InputSplit with Writable)
- extends SparkPartition {
-
- val serializableHadoopSplit = new SerializableWritable(rawSplit)
-
- override def hashCode(): Int = 41 * (41 + rddId) + index
-}
-
-/**
- * An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS,
- * sources in HBase, or S3), using the new MapReduce API (`org.apache.hadoop.mapreduce`).
- * It is based on [[org.apache.spark.rdd.NewHadoopRDD]]. It has three additions.
- * 1. A shared broadcast Hadoop Configuration.
- * 2. An optional closure `initDriverSideJobFuncOpt` that set configurations at the driver side
- * to the shared Hadoop Configuration.
- * 3. An optional closure `initLocalJobFuncOpt` that set configurations at both the driver side
- * and the executor side to the shared Hadoop Configuration.
- *
- * Note: This is RDD is basically a cloned version of [[org.apache.spark.rdd.NewHadoopRDD]] with
- * changes based on [[org.apache.spark.rdd.HadoopRDD]].
- */
-private[spark] class SqlNewHadoopRDD[V: ClassTag](
- sqlContext: SQLContext,
- broadcastedConf: Broadcast[SerializableConfiguration],
- @transient private val initDriverSideJobFuncOpt: Option[Job => Unit],
- initLocalJobFuncOpt: Option[Job => Unit],
- inputFormatClass: Class[_ <: InputFormat[Void, V]],
- valueClass: Class[V])
- extends RDD[V](sqlContext.sparkContext, Nil) with Logging {
-
- protected def getJob(): Job = {
- val conf = broadcastedConf.value.value
- // "new Job" will make a copy of the conf. Then, it is
- // safe to mutate conf properties with initLocalJobFuncOpt
- // and initDriverSideJobFuncOpt.
- val newJob = Job.getInstance(conf)
- initLocalJobFuncOpt.map(f => f(newJob))
- newJob
- }
-
- def getConf(isDriverSide: Boolean): Configuration = {
- val job = getJob()
- if (isDriverSide) {
- initDriverSideJobFuncOpt.map(f => f(job))
- }
- job.getConfiguration
- }
-
- private val jobTrackerId: String = {
- val formatter = new SimpleDateFormat("yyyyMMddHHmm")
- formatter.format(new Date())
- }
-
- @transient protected val jobId = new JobID(jobTrackerId, id)
-
- override def getPartitions: Array[SparkPartition] = {
- val conf = getConf(isDriverSide = true)
- val inputFormat = inputFormatClass.newInstance
- inputFormat match {
- case configurable: Configurable =>
- configurable.setConf(conf)
- case _ =>
- }
- val jobContext = new JobContextImpl(conf, jobId)
- val rawSplits = inputFormat.getSplits(jobContext).toArray
- val result = new Array[SparkPartition](rawSplits.size)
- for (i <- 0 until rawSplits.size) {
- result(i) =
- new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable])
- }
- result
- }
-
- override def compute(
- theSplit: SparkPartition,
- context: TaskContext): Iterator[V] = {
- val iter = new Iterator[V] {
- val split = theSplit.asInstanceOf[SqlNewHadoopPartition]
- logInfo("Input split: " + split.serializableHadoopSplit)
- val conf = getConf(isDriverSide = false)
-
- val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop)
- val existingBytesRead = inputMetrics.bytesRead
-
- // Sets the thread local variable for the file's name
- split.serializableHadoopSplit.value match {
- case fs: FileSplit => SqlNewHadoopRDDState.setInputFileName(fs.getPath.toString)
- case _ => SqlNewHadoopRDDState.unsetInputFileName()
- }
-
- // Find a function that will return the FileSystem bytes read by this thread. Do this before
- // creating RecordReader, because RecordReader's constructor might read some bytes
- val getBytesReadCallback: Option[() => Long] = split.serializableHadoopSplit.value match {
- case _: FileSplit | _: CombineFileSplit =>
- SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
- case _ => None
- }
-
- // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics.
- // If we do a coalesce, however, we are likely to compute multiple partitions in the same
- // task and in the same thread, in which case we need to avoid override values written by
- // previous partitions (SPARK-13071).
- def updateBytesRead(): Unit = {
- getBytesReadCallback.foreach { getBytesRead =>
- inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
- }
- }
-
- val format = inputFormatClass.newInstance
- format match {
- case configurable: Configurable =>
- configurable.setConf(conf)
- case _ =>
- }
- val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
- val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
- private[this] var reader: RecordReader[Void, V] = format.createRecordReader(
- split.serializableHadoopSplit.value, hadoopAttemptContext)
- reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
-
- // Register an on-task-completion callback to close the input stream.
- context.addTaskCompletionListener(context => close())
-
- private[this] var havePair = false
- private[this] var finished = false
-
- override def hasNext: Boolean = {
- if (context.isInterrupted()) {
- throw new TaskKilledException
- }
- if (!finished && !havePair) {
- finished = !reader.nextKeyValue
- if (finished) {
- // Close and release the reader here; close() will also be called when the task
- // completes, but for tasks that read from many files, it helps to release the
- // resources early.
- close()
- }
- havePair = !finished
- }
- !finished
- }
-
- override def next(): V = {
- if (!hasNext) {
- throw new java.util.NoSuchElementException("End of stream")
- }
- havePair = false
- if (!finished) {
- inputMetrics.incRecordsReadInternal(1)
- }
- if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
- updateBytesRead()
- }
- reader.getCurrentValue
- }
-
- private def close() {
- if (reader != null) {
- SqlNewHadoopRDDState.unsetInputFileName()
- // Close the reader and release it. Note: it's very important that we don't close the
- // reader more than once, since that exposes us to MAPREDUCE-5918 when running against
- // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic
- // corruption issues when reading compressed input.
- try {
- reader.close()
- } catch {
- case e: Exception =>
- if (!ShutdownHookManager.inShutdown()) {
- logWarning("Exception in RecordReader.close()", e)
- }
- } finally {
- reader = null
- }
- if (getBytesReadCallback.isDefined) {
- updateBytesRead()
- } else if (split.serializableHadoopSplit.value.isInstanceOf[FileSplit] ||
- split.serializableHadoopSplit.value.isInstanceOf[CombineFileSplit]) {
- // If we can't get the bytes read from the FS stats, fall back to the split size,
- // which may be inaccurate.
- try {
- inputMetrics.incBytesReadInternal(split.serializableHadoopSplit.value.getLength)
- } catch {
- case e: java.io.IOException =>
- logWarning("Unable to get input size to set InputMetrics for task", e)
- }
- }
- }
- }
- }
- iter
- }
-
- override def getPreferredLocations(hsplit: SparkPartition): Seq[String] = {
- val split = hsplit.asInstanceOf[SqlNewHadoopPartition].serializableHadoopSplit.value
- val locs = HadoopRDD.SPLIT_INFO_REFLECTIONS match {
- case Some(c) =>
- try {
- val infos = c.newGetLocationInfo.invoke(split).asInstanceOf[Array[AnyRef]]
- Some(HadoopRDD.convertSplitLocationInfo(infos))
- } catch {
- case e : Exception =>
- logDebug("Failed to use InputSplit#getLocationInfo.", e)
- None
- }
- case None => None
- }
- locs.getOrElse(split.getLocations.filter(_ != "localhost"))
- }
-
- override def persist(storageLevel: StorageLevel): this.type = {
- if (storageLevel.deserialized) {
- logWarning("Caching NewHadoopRDDs as deserialized objects usually leads to undesired" +
- " behavior because Hadoop's RecordReader reuses the same Writable object for all records." +
- " Use a map transformation to make copies of the records.")
- }
- super.persist(storageLevel)
- }
-
- /**
- * Analogous to [[org.apache.spark.rdd.MapPartitionsRDD]], but passes in an InputSplit to
- * the given function rather than the index of the partition.
- */
- private[spark] class NewHadoopMapPartitionsWithSplitRDD[U: ClassTag, T: ClassTag](
- prev: RDD[T],
- f: (InputSplit, Iterator[T]) => Iterator[U],
- preservesPartitioning: Boolean = false)
- extends RDD[U](prev) {
-
- override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
-
- override def getPartitions: Array[SparkPartition] = firstParent[T].partitions
-
- override def compute(split: SparkPartition, context: TaskContext): Iterator[U] = {
- val partition = split.asInstanceOf[SqlNewHadoopPartition]
- val inputSplit = partition.serializableHadoopSplit.value
- f(inputSplit, firstParent[T].iterator(split, context))
- }
- }
-}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index e74fb00cb2..2f9d63c2e8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -28,7 +28,6 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.catalyst.CatalystConf
-import org.apache.spark.util.Utils
////////////////////////////////////////////////////////////////////////////////////////////////////
// This file defines the configuration options for Spark SQL.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 0b74f07540..dac56d3936 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -22,8 +22,6 @@ import java.io.File
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.mapreduce.Job
-import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet, PredicateHelper}
@@ -34,8 +32,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{IntegerType, StructType}
-import org.apache.spark.util.{SerializableConfiguration, Utils}
-import org.apache.spark.util.collection.BitSet
+import org.apache.spark.util.Utils
class FileSourceStrategySuite extends QueryTest with SharedSQLContext with PredicateHelper {
import testImplicits._