aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEric Liang <ekl@databricks.com>2015-06-23 22:27:17 -0700
committerYin Huai <yhuai@databricks.com>2015-06-23 22:27:17 -0700
commit50c3a86f42d7dfd1acbda65c1e5afbd3db1406df (patch)
treec06a45dc489cb829daa123460b920a7fde71a75a
parenta458efc66c31dc281af379b914bfa2b077ca6635 (diff)
downloadspark-50c3a86f42d7dfd1acbda65c1e5afbd3db1406df.tar.gz
spark-50c3a86f42d7dfd1acbda65c1e5afbd3db1406df.tar.bz2
spark-50c3a86f42d7dfd1acbda65c1e5afbd3db1406df.zip
[SPARK-6749] [SQL] Make metastore client robust to underlying socket connection loss
This works around a bug in the underlying RetryingMetaStoreClient (HIVE-10384) by refreshing the metastore client on thrift exceptions. We attempt to emulate the proper hive behavior by retrying only as configured by hiveconf. Author: Eric Liang <ekl@databricks.com> Closes #6912 from ericl/spark-6749 and squashes the following commits: 2d54b55 [Eric Liang] use conf from state 0e3a74e [Eric Liang] use shim properly 980b3e5 [Eric Liang] Fix conf parsing hive 0.14 conf. 92459b6 [Eric Liang] Work around RetryingMetaStoreClient bug
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala55
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala19
2 files changed, 72 insertions, 2 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
index 42c2d4c98f..2f771d7679 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.client
import java.io.{BufferedReader, InputStreamReader, File, PrintStream}
import java.net.URI
import java.util.{ArrayList => JArrayList, Map => JMap, List => JList, Set => JSet}
+import javax.annotation.concurrent.GuardedBy
import scala.collection.JavaConversions._
import scala.language.reflectiveCalls
@@ -136,12 +137,62 @@ private[hive] class ClientWrapper(
// TODO: should be a def?s
// When we create this val client, the HiveConf of it (conf) is the one associated with state.
- private val client = Hive.get(conf)
+ @GuardedBy("this")
+ private var client = Hive.get(conf)
+
+ // We use hive's conf for compatibility.
+ private val retryLimit = conf.getIntVar(HiveConf.ConfVars.METASTORETHRIFTFAILURERETRIES)
+ private val retryDelayMillis = shim.getMetastoreClientConnectRetryDelayMillis(conf)
+
+ /**
+ * Runs `f` with multiple retries in case the hive metastore is temporarily unreachable.
+ */
+ private def retryLocked[A](f: => A): A = synchronized {
+ // Hive sometimes retries internally, so set a deadline to avoid compounding delays.
+ val deadline = System.nanoTime + (retryLimit * retryDelayMillis * 1e6).toLong
+ var numTries = 0
+ var caughtException: Exception = null
+ do {
+ numTries += 1
+ try {
+ return f
+ } catch {
+ case e: Exception if causedByThrift(e) =>
+ caughtException = e
+ logWarning(
+ "HiveClientWrapper got thrift exception, destroying client and retrying " +
+ s"(${retryLimit - numTries} tries remaining)", e)
+ Thread.sleep(retryDelayMillis)
+ try {
+ client = Hive.get(state.getConf, true)
+ } catch {
+ case e: Exception if causedByThrift(e) =>
+ logWarning("Failed to refresh hive client, will retry.", e)
+ }
+ }
+ } while (numTries <= retryLimit && System.nanoTime < deadline)
+ if (System.nanoTime > deadline) {
+ logWarning("Deadline exceeded")
+ }
+ throw caughtException
+ }
+
+ private def causedByThrift(e: Throwable): Boolean = {
+ var target = e
+ while (target != null) {
+ val msg = target.getMessage()
+ if (msg != null && msg.matches("(?s).*(TApplication|TProtocol|TTransport)Exception.*")) {
+ return true
+ }
+ target = target.getCause()
+ }
+ false
+ }
/**
* Runs `f` with ThreadLocal session state and classloaders configured for this version of hive.
*/
- private def withHiveState[A](f: => A): A = synchronized {
+ private def withHiveState[A](f: => A): A = retryLocked {
val original = Thread.currentThread().getContextClassLoader
// Set the thread local metastore client to the client associated with this ClientWrapper.
Hive.set(client)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 5ae2dbb50d..e7c1779f80 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -21,6 +21,7 @@ import java.lang.{Boolean => JBoolean, Integer => JInteger}
import java.lang.reflect.{Method, Modifier}
import java.net.URI
import java.util.{ArrayList => JArrayList, List => JList, Map => JMap, Set => JSet}
+import java.util.concurrent.TimeUnit
import scala.collection.JavaConversions._
@@ -64,6 +65,8 @@ private[client] sealed abstract class Shim {
def getDriverResults(driver: Driver): Seq[String]
+ def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long
+
def loadPartition(
hive: Hive,
loadPath: Path,
@@ -192,6 +195,10 @@ private[client] class Shim_v0_12 extends Shim {
res.toSeq
}
+ override def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long = {
+ conf.getIntVar(HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY) * 1000
+ }
+
override def loadPartition(
hive: Hive,
loadPath: Path,
@@ -321,6 +328,12 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
JBoolean.TYPE,
JBoolean.TYPE,
JBoolean.TYPE)
+ private lazy val getTimeVarMethod =
+ findMethod(
+ classOf[HiveConf],
+ "getTimeVar",
+ classOf[HiveConf.ConfVars],
+ classOf[TimeUnit])
override def loadPartition(
hive: Hive,
@@ -359,4 +372,10 @@ private[client] class Shim_v0_14 extends Shim_v0_13 {
numDP: JInteger, holdDDLTime: JBoolean, listBucketingEnabled: JBoolean, JBoolean.FALSE)
}
+ override def getMetastoreClientConnectRetryDelayMillis(conf: HiveConf): Long = {
+ getTimeVarMethod.invoke(
+ conf,
+ HiveConf.ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY,
+ TimeUnit.MILLISECONDS).asInstanceOf[Long]
+ }
}