aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2015-02-24 13:39:29 -0800
committerMichael Armbrust <michael@databricks.com>2015-02-24 13:39:42 -0800
commit641423dbffd9333ea8d989d0afa7b78426bd3979 (patch)
treed94b73c595a41b0aa99b08fd1a303bc6a707ba26
parenta4ff445a9f2f34697d99a607b05cbc7322beec18 (diff)
downloadspark-641423dbffd9333ea8d989d0afa7b78426bd3979.tar.gz
spark-641423dbffd9333ea8d989d0afa7b78426bd3979.tar.bz2
spark-641423dbffd9333ea8d989d0afa7b78426bd3979.zip
[SPARK-5952][SQL] Lock when using hive metastore client
Author: Michael Armbrust <michael@databricks.com> Closes #4746 from marmbrus/hiveLock and squashes the following commits: 8b871cf [Michael Armbrust] [SPARK-5952][SQL] Lock when using hive metastore client (cherry picked from commit a2b9137923e0ba328da8fff2fbbfcf2abf50b033) Signed-off-by: Michael Armbrust <michael@databricks.com>
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala18
1 files changed, 12 insertions, 6 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index f7ad2efc95..2cc8d65d3c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -52,6 +52,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
/** Connection to hive metastore. Usages should lock on `this`. */
protected[hive] val client = Hive.get(hive.hiveconf)
+ /** Usages should lock on `this`. */
protected[hive] lazy val hiveWarehouse = new Warehouse(hive.hiveconf)
// TODO: Use this everywhere instead of tuples or databaseName, tableName,.
@@ -65,7 +66,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() {
override def load(in: QualifiedTableName): LogicalPlan = {
logDebug(s"Creating new cached data source for $in")
- val table = client.getTable(in.database, in.name)
+ val table = synchronized {
+ client.getTable(in.database, in.name)
+ }
val schemaString = table.getProperty("spark.sql.sources.schema")
val userSpecifiedSchema =
if (schemaString == null) {
@@ -134,15 +137,18 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}
}
- def hiveDefaultTableFilePath(tableName: String): String = {
+ def hiveDefaultTableFilePath(tableName: String): String = synchronized {
val currentDatabase = client.getDatabase(hive.sessionState.getCurrentDatabase)
+
hiveWarehouse.getTablePath(currentDatabase, tableName).toString
}
- def tableExists(tableIdentifier: Seq[String]): Boolean = {
+ def tableExists(tableIdentifier: Seq[String]): Boolean = synchronized {
val tableIdent = processTableIdentifier(tableIdentifier)
- val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse(
- hive.sessionState.getCurrentDatabase)
+ val databaseName =
+ tableIdent
+ .lift(tableIdent.size - 2)
+ .getOrElse(hive.sessionState.getCurrentDatabase)
val tblName = tableIdent.last
client.getTable(databaseName, tblName, false) != null
}
@@ -219,7 +225,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}
}
- override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
+ override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = synchronized {
val dbName = if (!caseSensitive) {
if (databaseName.isDefined) Some(databaseName.get.toLowerCase) else None
} else {