From b4819404a65f9b97c1f8deb1fcb8419969831574 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Thu, 14 Apr 2016 15:43:44 +0800 Subject: [SPARK-14596][SQL] Remove not used SqlNewHadoopRDD and some more unused imports ## What changes were proposed in this pull request? Old `HadoopFsRelation` API includes `buildInternalScan()` which uses `SqlNewHadoopRDD` in `ParquetRelation`. Because now the old API is removed, `SqlNewHadoopRDD` is not used anymore. So, this PR removes `SqlNewHadoopRDD` and several unused imports. This was discussed in https://github.com/apache/spark/pull/12326. ## How was this patch tested? Several related existing unit tests and `sbt scalastyle`. Author: hyukjinkwon Closes #12354 from HyukjinKwon/SPARK-14596. --- .../scala/org/apache/spark/rdd/HadoopRDD.scala | 8 ++--- .../org/apache/spark/rdd/InputFileNameHolder.scala | 41 ++++++++++++++++++++++ .../apache/spark/rdd/SqlNewHadoopRDDState.scala | 41 ---------------------- 3 files changed, 44 insertions(+), 46 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/rdd/InputFileNameHolder.scala delete mode 100644 core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDDState.scala (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 08db96edd6..ac5ba9e79f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -213,15 +213,13 @@ class HadoopRDD[K, V]( logInfo("Input split: " + split.inputSplit) val jobConf = getJobConf() - // TODO: there is a lot of duplicate code between this and NewHadoopRDD and SqlNewHadoopRDD - val inputMetrics = context.taskMetrics().registerInputMetrics(DataReadMethod.Hadoop) val existingBytesRead = inputMetrics.bytesRead // Sets the thread local variable for the file's name split.inputSplit.value match { - case fs: FileSplit => SqlNewHadoopRDDState.setInputFileName(fs.getPath.toString) - case _ => SqlNewHadoopRDDState.unsetInputFileName() + case fs: FileSplit => InputFileNameHolder.setInputFileName(fs.getPath.toString) + case _ => InputFileNameHolder.unsetInputFileName() } // Find a function that will return the FileSystem bytes read by this thread. Do this before @@ -271,7 +269,7 @@ class HadoopRDD[K, V]( override def close() { if (reader != null) { - SqlNewHadoopRDDState.unsetInputFileName() + InputFileNameHolder.unsetInputFileName() // Close the reader and release it. Note: it's very important that we don't close the // reader more than once, since that exposes us to MAPREDUCE-5918 when running against // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic diff --git a/core/src/main/scala/org/apache/spark/rdd/InputFileNameHolder.scala b/core/src/main/scala/org/apache/spark/rdd/InputFileNameHolder.scala new file mode 100644 index 0000000000..108e9d2558 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/rdd/InputFileNameHolder.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import org.apache.spark.unsafe.types.UTF8String + +/** + * This holds file names of the current Spark task. This is used in HadoopRDD, + * FileScanRDD and InputFileName function in Spark SQL. + */ +private[spark] object InputFileNameHolder { + /** + * The thread variable for the name of the current file being read. This is used by + * the InputFileName function in Spark SQL. + */ + private[this] val inputFileName: ThreadLocal[UTF8String] = new ThreadLocal[UTF8String] { + override protected def initialValue(): UTF8String = UTF8String.fromString("") + } + + def getInputFileName(): UTF8String = inputFileName.get() + + private[spark] def setInputFileName(file: String) = inputFileName.set(UTF8String.fromString(file)) + + private[spark] def unsetInputFileName(): Unit = inputFileName.remove() + +} diff --git a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDDState.scala b/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDDState.scala deleted file mode 100644 index 3f15fff793..0000000000 --- a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDDState.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.rdd - -import org.apache.spark.unsafe.types.UTF8String - -/** - * State for SqlNewHadoopRDD objects. This is split this way because of the package splits. - * TODO: Move/Combine this with org.apache.spark.sql.datasources.SqlNewHadoopRDD - */ -private[spark] object SqlNewHadoopRDDState { - /** - * The thread variable for the name of the current file being read. This is used by - * the InputFileName function in Spark SQL. - */ - private[this] val inputFileName: ThreadLocal[UTF8String] = new ThreadLocal[UTF8String] { - override protected def initialValue(): UTF8String = UTF8String.fromString("") - } - - def getInputFileName(): UTF8String = inputFileName.get() - - private[spark] def setInputFileName(file: String) = inputFileName.set(UTF8String.fromString(file)) - - private[spark] def unsetInputFileName(): Unit = inputFileName.remove() - -} -- cgit v1.2.3