aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-04-02 16:46:50 -0700
committerMichael Armbrust <michael@databricks.com>2015-04-02 16:46:50 -0700
commit5db89127e72630aec7c5552f2c84018ae18d03fe (patch)
tree33d4588ade4c2c883c80ef071f7899dd754ace69
parentd3944b6f2aeb36629bf89207629cc5e55d327241 (diff)
downloadspark-5db89127e72630aec7c5552f2c84018ae18d03fe.tar.gz
spark-5db89127e72630aec7c5552f2c84018ae18d03fe.tar.bz2
spark-5db89127e72630aec7c5552f2c84018ae18d03fe.zip
[SPARK-6618][SPARK-6669][SQL] Lock Hive metastore client correctly.
Author: Yin Huai <yhuai@databricks.com> Author: Michael Armbrust <michael@databricks.com> Closes #5333 from yhuai/lookupRelationLock and squashes the following commits: 59c884f [Michael Armbrust] [SQL] Lock metastore client in analyzeTable 7667030 [Yin Huai] Merge pull request #2 from marmbrus/pr/5333 e4a9b0b [Michael Armbrust] Correctly lock on MetastoreCatalog d6fc32f [Yin Huai] Missing `)`. 1e241af [Yin Huai] Protect InsertIntoHive. fee7e9c [Yin Huai] A test? 5416b0f [Yin Huai] Just protect client.
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala14
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala51
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala11
4 files changed, 53 insertions, 27 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 46991fbd68..7c6a7df2bd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -181,7 +181,9 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
val tableFullName =
relation.hiveQlTable.getDbName + "." + relation.hiveQlTable.getTableName
- catalog.client.alterTable(tableFullName, new Table(hiveTTable))
+ catalog.synchronized {
+ catalog.client.alterTable(tableFullName, new Table(hiveTTable))
+ }
}
case otherRelation =>
throw new UnsupportedOperationException(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 14cdb42073..bbd920a405 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -67,7 +67,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() {
override def load(in: QualifiedTableName): LogicalPlan = {
logDebug(s"Creating new cached data source for $in")
- val table = synchronized {
+ val table = HiveMetastoreCatalog.this.synchronized {
client.getTable(in.database, in.name)
}
@@ -183,12 +183,16 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
def lookupRelation(
tableIdentifier: Seq[String],
- alias: Option[String]): LogicalPlan = synchronized {
+ alias: Option[String]): LogicalPlan = {
val tableIdent = processTableIdentifier(tableIdentifier)
val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse(
hive.sessionState.getCurrentDatabase)
val tblName = tableIdent.last
- val table = try client.getTable(databaseName, tblName) catch {
+ val table = try {
+ synchronized {
+ client.getTable(databaseName, tblName)
+ }
+ } catch {
case te: org.apache.hadoop.hive.ql.metadata.InvalidTableException =>
throw new NoSuchTableException
}
@@ -210,7 +214,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
} else {
val partitions: Seq[Partition] =
if (table.isPartitioned) {
- HiveShim.getAllPartitionsOf(client, table).toSeq
+ synchronized {
+ HiveShim.getAllPartitionsOf(client, table).toSeq
+ }
} else {
Nil
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index cdf012b511..6c96747439 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -50,7 +50,7 @@ case class InsertIntoHiveTable(
@transient val sc: HiveContext = sqlContext.asInstanceOf[HiveContext]
@transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass
@transient private lazy val hiveContext = new Context(sc.hiveconf)
- @transient private lazy val db = Hive.get(sc.hiveconf)
+ @transient private lazy val catalog = sc.catalog
private def newSerializer(tableDesc: TableDesc): Serializer = {
val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
@@ -199,38 +199,45 @@ case class InsertIntoHiveTable(
orderedPartitionSpec.put(entry.getName,partitionSpec.get(entry.getName).getOrElse(""))
}
val partVals = MetaStoreUtils.getPvals(table.hiveQlTable.getPartCols, partitionSpec)
- db.validatePartitionNameCharacters(partVals)
+ catalog.synchronized {
+ catalog.client.validatePartitionNameCharacters(partVals)
+ }
// inheritTableSpecs is set to true. It should be set to false for a IMPORT query
// which is currently considered as a Hive native command.
val inheritTableSpecs = true
// TODO: Correctly set isSkewedStoreAsSubdir.
val isSkewedStoreAsSubdir = false
if (numDynamicPartitions > 0) {
- db.loadDynamicPartitions(
- outputPath,
- qualifiedTableName,
- orderedPartitionSpec,
- overwrite,
- numDynamicPartitions,
- holdDDLTime,
- isSkewedStoreAsSubdir
- )
+ catalog.synchronized {
+ catalog.client.loadDynamicPartitions(
+ outputPath,
+ qualifiedTableName,
+ orderedPartitionSpec,
+ overwrite,
+ numDynamicPartitions,
+ holdDDLTime,
+ isSkewedStoreAsSubdir)
+ }
} else {
- db.loadPartition(
+ catalog.synchronized {
+ catalog.client.loadPartition(
+ outputPath,
+ qualifiedTableName,
+ orderedPartitionSpec,
+ overwrite,
+ holdDDLTime,
+ inheritTableSpecs,
+ isSkewedStoreAsSubdir)
+ }
+ }
+ } else {
+ catalog.synchronized {
+ catalog.client.loadTable(
outputPath,
qualifiedTableName,
- orderedPartitionSpec,
overwrite,
- holdDDLTime,
- inheritTableSpecs,
- isSkewedStoreAsSubdir)
+ holdDDLTime)
}
- } else {
- db.loadTable(
- outputPath,
- qualifiedTableName,
- overwrite,
- holdDDLTime)
}
// Invalidate the cache.
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 310c2bfdf1..2065f0d60d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -457,4 +457,15 @@ class SQLQuerySuite extends QueryTest {
dropTempTable("data")
setConf("spark.sql.hive.convertCTAS", originalConf)
}
+
+ test("sanity test for SPARK-6618") {
+ (1 to 100).par.map { i =>
+ val tableName = s"SPARK_6618_table_$i"
+ sql(s"CREATE TABLE $tableName (col1 string)")
+ catalog.lookupRelation(Seq(tableName))
+ table(tableName)
+ tables()
+ sql(s"DROP TABLE $tableName")
+ }
+ }
}