aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Armbrust <michael@databricks.com>2015-02-24 13:39:29 -0800
committerMichael Armbrust <michael@databricks.com>2015-02-24 13:39:29 -0800
commita2b9137923e0ba328da8fff2fbbfcf2abf50b033 (patch)
tree4bc3bc4da68ece1b2273aea958a7f9a780685734
parentc5ba975ee85521f708ebeec81144347cf1b40fba (diff)
downloadspark-a2b9137923e0ba328da8fff2fbbfcf2abf50b033.tar.gz
spark-a2b9137923e0ba328da8fff2fbbfcf2abf50b033.tar.bz2
spark-a2b9137923e0ba328da8fff2fbbfcf2abf50b033.zip
[SPARK-5952][SQL] Lock when using hive metastore client
Author: Michael Armbrust <michael@databricks.com> Closes #4746 from marmbrus/hiveLock and squashes the following commits: 8b871cf [Michael Armbrust] [SPARK-5952][SQL] Lock when using hive metastore client
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala18
1 files changed, 12 insertions, 6 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index f7ad2efc95..2cc8d65d3c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -52,6 +52,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
/** Connection to hive metastore. Usages should lock on `this`. */
protected[hive] val client = Hive.get(hive.hiveconf)
+ /** Usages should lock on `this`. */
protected[hive] lazy val hiveWarehouse = new Warehouse(hive.hiveconf)
// TODO: Use this everywhere instead of tuples or databaseName, tableName,.
@@ -65,7 +66,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() {
override def load(in: QualifiedTableName): LogicalPlan = {
logDebug(s"Creating new cached data source for $in")
- val table = client.getTable(in.database, in.name)
+ val table = synchronized {
+ client.getTable(in.database, in.name)
+ }
val schemaString = table.getProperty("spark.sql.sources.schema")
val userSpecifiedSchema =
if (schemaString == null) {
@@ -134,15 +137,18 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}
}
- def hiveDefaultTableFilePath(tableName: String): String = {
+ def hiveDefaultTableFilePath(tableName: String): String = synchronized {
val currentDatabase = client.getDatabase(hive.sessionState.getCurrentDatabase)
+
hiveWarehouse.getTablePath(currentDatabase, tableName).toString
}
- def tableExists(tableIdentifier: Seq[String]): Boolean = {
+ def tableExists(tableIdentifier: Seq[String]): Boolean = synchronized {
val tableIdent = processTableIdentifier(tableIdentifier)
- val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse(
- hive.sessionState.getCurrentDatabase)
+ val databaseName =
+ tableIdent
+ .lift(tableIdent.size - 2)
+ .getOrElse(hive.sessionState.getCurrentDatabase)
val tblName = tableIdent.last
client.getTable(databaseName, tblName, false) != null
}
@@ -219,7 +225,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}
}
- override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
+ override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = synchronized {
val dbName = if (!caseSensitive) {
if (databaseName.isDefined) Some(databaseName.get.toLowerCase) else None
} else {