aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala68
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala19
2 files changed, 75 insertions, 12 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index db2239d26a..82c7b1a3c6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -22,7 +22,6 @@ import java.net.URI
import java.text.SimpleDateFormat
import java.util.{Date, Locale, Random}
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.common.FileUtils
import org.apache.hadoop.hive.ql.exec.TaskRunner
@@ -86,6 +85,7 @@ case class InsertIntoHiveTable(
val hadoopConf = sessionState.newHadoopConf()
val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
+ val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
private def executionId: String = {
val rand: Random = new Random
@@ -93,7 +93,7 @@ case class InsertIntoHiveTable(
"hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
}
- private def getStagingDir(inputPath: Path, hadoopConf: Configuration): Path = {
+ private def getStagingDir(inputPath: Path): Path = {
val inputPathUri: URI = inputPath.toUri
val inputPathName: String = inputPathUri.getPath
val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
@@ -121,21 +121,69 @@ case class InsertIntoHiveTable(
return dir
}
- private def getExternalScratchDir(extURI: URI, hadoopConf: Configuration): Path = {
- getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), hadoopConf)
+ private def getExternalScratchDir(extURI: URI): Path = {
+ getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath))
}
- def getExternalTmpPath(path: Path, hadoopConf: Configuration): Path = {
+ def getExternalTmpPath(path: Path): Path = {
+ import org.apache.spark.sql.hive.client.hive._
+
+ val hiveVersion = externalCatalog.asInstanceOf[HiveExternalCatalog].client.version
+ // Before Hive 1.1, when inserting into a table, Hive will create the staging directory under
+ // a common scratch directory. After the writing is finished, Hive will simply empty the table
+ // directory and move the staging directory to it.
+ // After Hive 1.1, Hive will create the staging directory under the table directory, and when
+ // moving staging directory to table directory, Hive will still empty the table directory, but
+ // will exclude the staging directory there.
+ // We have to follow the Hive behavior here, to avoid troubles. For example, if we create
+ // staging directory under the table director for Hive prior to 1.1, the staging directory will
+ // be removed by Hive when Hive is trying to empty the table directory.
+ if (hiveVersion == v12 || hiveVersion == v13 || hiveVersion == v14 || hiveVersion == v1_0) {
+ oldVersionExternalTempPath(path)
+ } else if (hiveVersion == v1_1 || hiveVersion == v1_2) {
+ newVersionExternalTempPath(path)
+ } else {
+ throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion)
+ }
+ }
+
+ // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13
+ def oldVersionExternalTempPath(path: Path): Path = {
+ val extURI: URI = path.toUri
+ val scratchPath = new Path(scratchDir, executionId)
+ var dirPath = new Path(
+ extURI.getScheme,
+ extURI.getAuthority,
+ scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID())
+
+ try {
+ val fs: FileSystem = dirPath.getFileSystem(hadoopConf)
+ dirPath = new Path(fs.makeQualified(dirPath).toString())
+
+ if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) {
+ throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString)
+ }
+ fs.deleteOnExit(dirPath)
+ } catch {
+ case e: IOException =>
+ throw new RuntimeException("Cannot create staging directory: " + dirPath.toString, e)
+
+ }
+ dirPath
+ }
+
+ // Mostly copied from Context.java#getExternalTmpPath of Hive 1.2
+ def newVersionExternalTempPath(path: Path): Path = {
val extURI: URI = path.toUri
if (extURI.getScheme == "viewfs") {
- getExtTmpPathRelTo(path.getParent, hadoopConf)
+ getExtTmpPathRelTo(path.getParent)
} else {
- new Path(getExternalScratchDir(extURI, hadoopConf), "-ext-10000")
+ new Path(getExternalScratchDir(extURI), "-ext-10000")
}
}
- def getExtTmpPathRelTo(path: Path, hadoopConf: Configuration): Path = {
- new Path(getStagingDir(path, hadoopConf), "-ext-10000") // Hive uses 10000
+ def getExtTmpPathRelTo(path: Path): Path = {
+ new Path(getStagingDir(path), "-ext-10000") // Hive uses 10000
}
private def saveAsHiveFile(
@@ -172,7 +220,7 @@ case class InsertIntoHiveTable(
// instances within the closure, since Serializer is not serializable while TableDesc is.
val tableDesc = table.tableDesc
val tableLocation = table.hiveQlTable.getDataLocation
- val tmpLocation = getExternalTmpPath(tableLocation, hadoopConf)
+ val tmpLocation = getExternalTmpPath(tableLocation)
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index a001048a9e..9b26383a16 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -26,13 +26,15 @@ import org.apache.hadoop.mapred.TextInputFormat
import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal}
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.hive.HiveUtils
+import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StructType
import org.apache.spark.tags.ExtendedHiveTest
@@ -45,7 +47,7 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils}
* is not fully tested.
*/
@ExtendedHiveTest
-class VersionsSuite extends SparkFunSuite with Logging {
+class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSingleton with Logging {
private val clientBuilder = new HiveClientBuilder
import clientBuilder.buildClient
@@ -532,5 +534,18 @@ class VersionsSuite extends SparkFunSuite with Logging {
client.reset()
assert(client.listTables("default").isEmpty)
}
+
+ ///////////////////////////////////////////////////////////////////////////
+ // End-To-End tests
+ ///////////////////////////////////////////////////////////////////////////
+
+ test(s"$version: CREATE TABLE AS SELECT") {
+ withTable("tbl") {
+ spark.sql("CREATE TABLE tbl AS SELECT 1 AS a")
+ assert(spark.table("tbl").collect().toSeq == Seq(Row(1)))
+ }
+ }
+
+ // TODO: add more tests.
}
}