aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-01-02 13:15:53 +0000
committerSean Owen <sowen@cloudera.com>2016-01-02 13:15:53 +0000
commit15bd73627e04591fd13667b4838c9098342db965 (patch)
tree5b23cdd4e75138f38ad51ebb17dcf45305c9e3c0 /sql/hive
parent94f7a12b3c8e4a6ecd969893e562feb7ffba4c24 (diff)
downloadspark-15bd73627e04591fd13667b4838c9098342db965.tar.gz
spark-15bd73627e04591fd13667b4838c9098342db965.tar.bz2
spark-15bd73627e04591fd13667b4838c9098342db965.zip
[SPARK-12481][CORE][STREAMING][SQL] Remove usage of Hadoop deprecated APIs and reflection that supported 1.x
Remove use of deprecated Hadoop APIs now that 2.2+ is required Author: Sean Owen <sowen@cloudera.com> Closes #10446 from srowen/SPARK-12481.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala70
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala13
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala16
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala5
6 files changed, 17 insertions, 91 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 384ea211df..5d00e73670 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -380,7 +380,7 @@ class HiveContext private[hive](
def calculateTableSize(fs: FileSystem, path: Path): Long = {
val fileStatus = fs.getFileStatus(path)
- val size = if (fileStatus.isDir) {
+ val size = if (fileStatus.isDirectory) {
fs.listStatus(path)
.map { status =>
if (!status.getPath().getName().startsWith(stagingDir)) {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
index 598ccdeee4..d3da22aa0a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
@@ -31,9 +31,7 @@ import org.apache.hadoop.hive.ql.metadata.Hive
import org.apache.hadoop.hive.ql.processors._
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.ql.{Driver, metadata}
-import org.apache.hadoop.hive.shims.{HadoopShims, ShimLoader}
import org.apache.hadoop.security.UserGroupInformation
-import org.apache.hadoop.util.VersionInfo
import org.apache.spark.{SparkConf, SparkException, Logging}
import org.apache.spark.sql.catalyst.expressions.Expression
@@ -65,74 +63,6 @@ private[hive] class ClientWrapper(
extends ClientInterface
with Logging {
- overrideHadoopShims()
-
- // !! HACK ALERT !!
- //
- // Internally, Hive `ShimLoader` tries to load different versions of Hadoop shims by checking
- // major version number gathered from Hadoop jar files:
- //
- // - For major version number 1, load `Hadoop20SShims`, where "20S" stands for Hadoop 0.20 with
- // security.
- // - For major version number 2, load `Hadoop23Shims`, where "23" stands for Hadoop 0.23.
- //
- // However, APIs in Hadoop 2.0.x and 2.1.x versions were in flux due to historical reasons. It
- // turns out that Hadoop 2.0.x versions should also be used together with `Hadoop20SShims`, but
- // `Hadoop23Shims` is chosen because the major version number here is 2.
- //
- // To fix this issue, we try to inspect Hadoop version via `org.apache.hadoop.utils.VersionInfo`
- // and load `Hadoop20SShims` for Hadoop 1.x and 2.0.x versions. If Hadoop version information is
- // not available, we decide whether to override the shims or not by checking for existence of a
- // probe method which doesn't exist in Hadoop 1.x or 2.0.x versions.
- private def overrideHadoopShims(): Unit = {
- val hadoopVersion = VersionInfo.getVersion
- val VersionPattern = """(\d+)\.(\d+).*""".r
-
- hadoopVersion match {
- case null =>
- logError("Failed to inspect Hadoop version")
-
- // Using "Path.getPathWithoutSchemeAndAuthority" as the probe method.
- val probeMethod = "getPathWithoutSchemeAndAuthority"
- if (!classOf[Path].getDeclaredMethods.exists(_.getName == probeMethod)) {
- logInfo(
- s"Method ${classOf[Path].getCanonicalName}.$probeMethod not found, " +
- s"we are probably using Hadoop 1.x or 2.0.x")
- loadHadoop20SShims()
- }
-
- case VersionPattern(majorVersion, minorVersion) =>
- logInfo(s"Inspected Hadoop version: $hadoopVersion")
-
- // Loads Hadoop20SShims for 1.x and 2.0.x versions
- val (major, minor) = (majorVersion.toInt, minorVersion.toInt)
- if (major < 2 || (major == 2 && minor == 0)) {
- loadHadoop20SShims()
- }
- }
-
- // Logs the actual loaded Hadoop shims class
- val loadedShimsClassName = ShimLoader.getHadoopShims.getClass.getCanonicalName
- logInfo(s"Loaded $loadedShimsClassName for Hadoop version $hadoopVersion")
- }
-
- private def loadHadoop20SShims(): Unit = {
- val hadoop20SShimsClassName = "org.apache.hadoop.hive.shims.Hadoop20SShims"
- logInfo(s"Loading Hadoop shims $hadoop20SShimsClassName")
-
- try {
- val shimsField = classOf[ShimLoader].getDeclaredField("hadoopShims")
- // scalastyle:off classforname
- val shimsClass = Class.forName(hadoop20SShimsClassName)
- // scalastyle:on classforname
- val shims = classOf[HadoopShims].cast(shimsClass.newInstance())
- shimsField.setAccessible(true)
- shimsField.set(null, shims)
- } catch { case cause: Throwable =>
- throw new RuntimeException(s"Failed to load $hadoop20SShimsClassName", cause)
- }
- }
-
// Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur.
private val outputBuffer = new CircularBuffer()
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
index 93c016b6c6..777e7857d2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -27,9 +27,10 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities}
import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
import org.apache.hadoop.hive.ql.plan.TableDesc
+import org.apache.hadoop.hive.common.FileUtils
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred._
-import org.apache.hadoop.hive.common.FileUtils
+import org.apache.hadoop.mapreduce.TaskType
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter}
@@ -46,9 +47,7 @@ import org.apache.spark.util.SerializableJobConf
private[hive] class SparkHiveWriterContainer(
jobConf: JobConf,
fileSinkConf: FileSinkDesc)
- extends Logging
- with SparkHadoopMapRedUtil
- with Serializable {
+ extends Logging with Serializable {
private val now = new Date()
private val tableDesc: TableDesc = fileSinkConf.getTableInfo
@@ -68,8 +67,8 @@ private[hive] class SparkHiveWriterContainer(
@transient private var writer: FileSinkOperator.RecordWriter = null
@transient protected lazy val committer = conf.value.getOutputCommitter
- @transient protected lazy val jobContext = newJobContext(conf.value, jID.value)
- @transient private lazy val taskContext = newTaskAttemptContext(conf.value, taID.value)
+ @transient protected lazy val jobContext = new JobContextImpl(conf.value, jID.value)
+ @transient private lazy val taskContext = new TaskAttemptContextImpl(conf.value, taID.value)
@transient private lazy val outputFormat =
conf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef, Writable]]
@@ -131,7 +130,7 @@ private[hive] class SparkHiveWriterContainer(
jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, jobId))
taID = new SerializableWritable[TaskAttemptID](
- new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID))
+ new TaskAttemptID(new TaskID(jID.value, TaskType.MAP, splitID), attemptID))
}
private def setConfParams() {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
index 0f9a1a6ef3..b91a14bdbc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala
@@ -95,7 +95,7 @@ private[orc] object OrcFileOperator extends Logging {
val fs = origPath.getFileSystem(conf)
val path = origPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val paths = SparkHadoopUtil.get.listLeafStatuses(fs, origPath)
- .filterNot(_.isDir)
+ .filterNot(_.isDirectory)
.map(_.getPath)
.filterNot(_.getName.startsWith("_"))
.filterNot(_.getName.startsWith("."))
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 1136670b7a..84ef12a68e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -33,8 +33,6 @@ import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.spark.Logging
import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.{HadoopRDD, RDD}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
@@ -67,7 +65,7 @@ private[orc] class OrcOutputWriter(
path: String,
dataSchema: StructType,
context: TaskAttemptContext)
- extends OutputWriter with SparkHadoopMapRedUtil with HiveInspectors {
+ extends OutputWriter with HiveInspectors {
private val serializer = {
val table = new Properties()
@@ -77,7 +75,7 @@ private[orc] class OrcOutputWriter(
}.mkString(":"))
val serde = new OrcSerde
- val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(context)
+ val configuration = context.getConfiguration
serde.initialize(configuration, table)
serde
}
@@ -99,9 +97,9 @@ private[orc] class OrcOutputWriter(
private lazy val recordWriter: RecordWriter[NullWritable, Writable] = {
recordWriterInstantiated = true
- val conf = SparkHadoopUtil.get.getConfigurationFromJobContext(context)
+ val conf = context.getConfiguration
val uniqueWriteJobId = conf.get("spark.sql.sources.writeJobUUID")
- val taskAttemptId = SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(context)
+ val taskAttemptId = context.getTaskAttemptID
val partition = taskAttemptId.getTaskID.getId
val filename = f"part-r-$partition%05d-$uniqueWriteJobId.orc"
@@ -208,7 +206,7 @@ private[sql] class OrcRelation(
}
override def prepareJobForWrite(job: Job): OutputWriterFactory = {
- SparkHadoopUtil.get.getConfigurationFromJobContext(job) match {
+ job.getConfiguration match {
case conf: JobConf =>
conf.setOutputFormat(classOf[OrcOutputFormat])
case conf =>
@@ -289,8 +287,8 @@ private[orc] case class OrcTableScan(
}
def execute(): RDD[InternalRow] = {
- val job = new Job(sqlContext.sparkContext.hadoopConfiguration)
- val conf = SparkHadoopUtil.get.getConfigurationFromJobContext(job)
+ val job = Job.getInstance(sqlContext.sparkContext.hadoopConfiguration)
+ val conf = job.getConfiguration
// Tries to push down filters if ORC filter push-down is enabled
if (sqlContext.conf.orcFilterPushDown) {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index 01960fd290..e10d21d5e3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -25,7 +25,6 @@ import org.apache.hadoop.io.{NullWritable, Text}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat}
import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
-import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, expressions}
@@ -53,9 +52,9 @@ class AppendingTextOutputFormat(outputFile: Path) extends TextOutputFormat[NullW
numberFormat.setGroupingUsed(false)
override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
- val configuration = SparkHadoopUtil.get.getConfigurationFromJobContext(context)
+ val configuration = context.getConfiguration
val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
- val taskAttemptId = SparkHadoopUtil.get.getTaskAttemptIDFromTaskAttemptContext(context)
+ val taskAttemptId = context.getTaskAttemptID
val split = taskAttemptId.getTaskID.getId
val name = FileOutputFormat.getOutputName(context)
new Path(outputFile, s"$name-${numberFormat.format(split)}-$uniqueWriteJobId")