aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main/scala/org
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2017-02-15 16:21:43 -0800
committerShixiong Zhu <shixiong@databricks.com>2017-02-15 16:21:43 -0800
commit21b4ba2d6f21a9759af879471715c123073bd67a (patch)
treea6da8f62bf906e64df1fccbe81458c706254fdda /sql/core/src/main/scala/org
parentf6c3bba22501ee7753d85c6e51ffe851d43869c1 (diff)
downloadspark-21b4ba2d6f21a9759af879471715c123073bd67a.tar.gz
spark-21b4ba2d6f21a9759af879471715c123073bd67a.tar.bz2
spark-21b4ba2d6f21a9759af879471715c123073bd67a.zip
[SPARK-19599][SS] Clean up HDFSMetadataLog
## What changes were proposed in this pull request? SPARK-19464 removed support for Hadoop 2.5 and earlier, so we can do some cleanup for HDFSMetadataLog. This PR includes the following changes: - ~~Remove the workaround codes for HADOOP-10622.~~ Unfortunately, there is another issue [HADOOP-14084](https://issues.apache.org/jira/browse/HADOOP-14084) that prevents us from removing the workaround codes. - Remove unnecessary `writer: (T, OutputStream) => Unit` and just call `serialize` directly. - Remove catching FileNotFoundException. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixiong@databricks.com> Closes #16932 from zsxwing/metadata-cleanup.
Diffstat (limited to 'sql/core/src/main/scala/org')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala39
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala4
2 files changed, 19 insertions, 24 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index bfdc2cb0ac..3155ce04a1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -114,15 +114,18 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
case ut: UninterruptibleThread =>
// When using a local file system, "writeBatch" must be called on a
// [[org.apache.spark.util.UninterruptibleThread]] so that interrupts can be disabled
- // while writing the batch file. This is because there is a potential dead-lock in
- // Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). If the thread running
- // "Shell.runCommand" is interrupted, then the thread can get deadlocked. In our case,
- // `writeBatch` creates a file using HDFS API and will call "Shell.runCommand" to set
- // the file permission if using the local file system, and can get deadlocked if the
- // stream execution thread is stopped by interrupt. Hence, we make sure that
- // "writeBatch" is called on [[UninterruptibleThread]] which allows us to disable
- // interrupts here. Also see SPARK-14131.
- ut.runUninterruptibly { writeBatch(batchId, metadata, serialize) }
+ // while writing the batch file.
+ //
+ // This is because Hadoop "Shell.runCommand" swallows InterruptException (HADOOP-14084).
+ // If the user tries to stop a query, and the thread running "Shell.runCommand" is
+ // interrupted, then InterruptException will be dropped and the query will be still
+ // running. (Note: `writeBatch` creates a file using HDFS APIs and will call
+ // "Shell.runCommand" to set the file permission if using the local file system)
+ //
+ // Hence, we make sure that "writeBatch" is called on [[UninterruptibleThread]] which
+ // allows us to disable interrupts here, in order to propagate the interrupt state
+ // correctly. Also see SPARK-19599.
+ ut.runUninterruptibly { writeBatch(batchId, metadata) }
case _ =>
throw new IllegalStateException(
"HDFSMetadataLog.add() on a local file system must be executed on " +
@@ -132,20 +135,19 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
// For a distributed file system, such as HDFS or S3, if the network is broken, write
// operations may just hang until timeout. We should enable interrupts to allow stopping
// the query fast.
- writeBatch(batchId, metadata, serialize)
+ writeBatch(batchId, metadata)
}
true
}
}
- def writeTempBatch(metadata: T, writer: (T, OutputStream) => Unit = serialize): Option[Path] = {
- var nextId = 0
+ def writeTempBatch(metadata: T): Option[Path] = {
while (true) {
val tempPath = new Path(metadataPath, s".${UUID.randomUUID.toString}.tmp")
try {
val output = fileManager.create(tempPath)
try {
- writer(metadata, output)
+ serialize(metadata, output)
return Some(tempPath)
} finally {
IOUtils.closeQuietly(output)
@@ -164,7 +166,6 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
// big problem because it requires the attacker must have the permission to write the
// metadata path. In addition, the old Streaming also have this issue, people can create
// malicious checkpoint files to crash a Streaming application too.
- nextId += 1
}
}
None
@@ -176,8 +177,8 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
* There may be multiple [[HDFSMetadataLog]] using the same metadata path. Although it is not a
* valid behavior, we still need to prevent it from destroying the files.
*/
- private def writeBatch(batchId: Long, metadata: T, writer: (T, OutputStream) => Unit): Unit = {
- val tempPath = writeTempBatch(metadata, writer).getOrElse(
+ private def writeBatch(batchId: Long, metadata: T): Unit = {
+ val tempPath = writeTempBatch(metadata).getOrElse(
throw new IllegalStateException(s"Unable to create temp batch file $batchId"))
try {
// Try to commit the batch
@@ -195,12 +196,6 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
// So throw an exception to tell the user this is not a valid behavior.
throw new ConcurrentModificationException(
s"Multiple HDFSMetadataLog are using $path", e)
- case e: FileNotFoundException =>
- // Sometimes, "create" will succeed when multiple writers are calling it at the same
- // time. However, only one writer can call "rename" successfully, others will get
- // FileNotFoundException because the first writer has removed it.
- throw new ConcurrentModificationException(
- s"Multiple HDFSMetadataLog are using $path", e)
} finally {
fileManager.delete(tempPath)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 3149ef04f7..239d49b08a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -179,8 +179,8 @@ class StreamExecution(
/**
* The thread that runs the micro-batches of this stream. Note that this thread must be
- * [[org.apache.spark.util.UninterruptibleThread]] to avoid potential deadlocks in using
- * [[HDFSMetadataLog]]. See SPARK-14131 for more details.
+ * [[org.apache.spark.util.UninterruptibleThread]] to avoid swallowing `InterruptException` when
+ * using [[HDFSMetadataLog]]. See SPARK-19599 for more details.
*/
val microBatchThread =
new StreamExecutionThread(s"stream execution thread for $prettyIdString") {