diff options
author | Michael Armbrust <michael@databricks.com> | 2015-02-24 13:39:29 -0800 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2015-02-24 13:39:42 -0800 |
commit | 641423dbffd9333ea8d989d0afa7b78426bd3979 (patch) | |
tree | d94b73c595a41b0aa99b08fd1a303bc6a707ba26 | |
parent | a4ff445a9f2f34697d99a607b05cbc7322beec18 (diff) | |
download | spark-641423dbffd9333ea8d989d0afa7b78426bd3979.tar.gz spark-641423dbffd9333ea8d989d0afa7b78426bd3979.tar.bz2 spark-641423dbffd9333ea8d989d0afa7b78426bd3979.zip |
[SPARK-5952][SQL] Lock when using hive metastore client
Author: Michael Armbrust <michael@databricks.com>
Closes #4746 from marmbrus/hiveLock and squashes the following commits:
8b871cf [Michael Armbrust] [SPARK-5952][SQL] Lock when using hive metastore client
(cherry picked from commit a2b9137923e0ba328da8fff2fbbfcf2abf50b033)
Signed-off-by: Michael Armbrust <michael@databricks.com>
-rw-r--r-- | sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 18 |
1 files changed, 12 insertions, 6 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index f7ad2efc95..2cc8d65d3c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -52,6 +52,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with /** Connection to hive metastore. Usages should lock on `this`. */ protected[hive] val client = Hive.get(hive.hiveconf) + /** Usages should lock on `this`. */ protected[hive] lazy val hiveWarehouse = new Warehouse(hive.hiveconf) // TODO: Use this everywhere instead of tuples or databaseName, tableName,. @@ -65,7 +66,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() { override def load(in: QualifiedTableName): LogicalPlan = { logDebug(s"Creating new cached data source for $in") - val table = client.getTable(in.database, in.name) + val table = synchronized { + client.getTable(in.database, in.name) + } val schemaString = table.getProperty("spark.sql.sources.schema") val userSpecifiedSchema = if (schemaString == null) { @@ -134,15 +137,18 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with } } - def hiveDefaultTableFilePath(tableName: String): String = { + def hiveDefaultTableFilePath(tableName: String): String = synchronized { val currentDatabase = client.getDatabase(hive.sessionState.getCurrentDatabase) + hiveWarehouse.getTablePath(currentDatabase, tableName).toString } - def tableExists(tableIdentifier: Seq[String]): Boolean = { + def tableExists(tableIdentifier: Seq[String]): Boolean = synchronized { val tableIdent = processTableIdentifier(tableIdentifier) - val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse( - hive.sessionState.getCurrentDatabase) + val databaseName = + tableIdent + .lift(tableIdent.size - 2) + .getOrElse(hive.sessionState.getCurrentDatabase) val tblName = tableIdent.last client.getTable(databaseName, tblName, false) != null } @@ -219,7 +225,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with } } - override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = { + override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = synchronized { val dbName = if (!caseSensitive) { if (databaseName.isDefined) Some(databaseName.get.toLowerCase) else None } else { |