aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2016-12-15 09:23:55 +0800
committerWenchen Fan <wenchen@databricks.com>2016-12-15 09:23:55 +0800
commit8db4d95c02080cd120740d106292f0281e8f9249 (patch)
tree0697a95a267eef43be61e5d7151e6ed3c2d6ac5e /sql/hive
parent324388531648de20ee61bd42518a068d4789925c (diff)
downloadspark-8db4d95c02080cd120740d106292f0281e8f9249.tar.gz
spark-8db4d95c02080cd120740d106292f0281e8f9249.tar.bz2
spark-8db4d95c02080cd120740d106292f0281e8f9249.zip
[SPARK-18703][SQL] Drop Staging Directories and Data Files After each Insertion/CTAS of Hive serde Tables
### What changes were proposed in this pull request? Below are the files/directories generated for three inserts againsts a Hive table: ``` /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1 /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000 /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000/._SUCCESS.crc /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000/.part-00000.crc /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000/_SUCCESS /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-29_149_4298858301766472202-1/-ext-10000/part-00000 /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1 /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1/-ext-10000 /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1/-ext-10000/._SUCCESS.crc /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1/-ext-10000/.part-00000.crc /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1/-ext-10000/_SUCCESS /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_454_6445008511655931341-1/-ext-10000/part-00000 /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1 /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1/-ext-10000 /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1/-ext-10000/._SUCCESS.crc /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1/-ext-10000/.part-00000.crc /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1/-ext-10000/_SUCCESS /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.hive-staging_hive_2016-12-03_20-56-30_722_3388423608658711001-1/-ext-10000/part-00000 /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.part-00000.crc /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/part-00000 ``` The first 18 files are temporary. We do not drop it until the end of JVM termination. If JVM does not appropriately terminate, these temporary files/directories will not be dropped. Only the last two files are needed, as shown below. ``` /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/.part-00000.crc /private/var/folders/4b/sgmfldk15js406vk7lw5llzw0000gn/T/spark-41eaa5ce-0288-471e-bba1-09cc482813ff/part-00000 ``` The temporary files/directories could accumulate a lot when we issue many inserts, since each insert generats at least six files. This could eat a lot of spaces and slow down the JVM termination. When the JVM does not terminates approprately, the files might not be dropped. This PR is to drop the created staging files and temporary data files after each insert/CTAS. ### How was this patch tested? Added a test case Author: gatorsmile <gatorsmile@gmail.com> Closes #16134 from gatorsmile/deleteFiles.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala16
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala24
2 files changed, 38 insertions, 2 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 82c7b1a3c6..aa858e808e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -22,6 +22,8 @@ import java.net.URI
import java.text.SimpleDateFormat
import java.util.{Date, Locale, Random}
+import scala.util.control.NonFatal
+
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.common.FileUtils
import org.apache.hadoop.hive.ql.exec.TaskRunner
@@ -84,6 +86,7 @@ case class InsertIntoHiveTable(
def output: Seq[Attribute] = Seq.empty
val hadoopConf = sessionState.newHadoopConf()
+ var createdTempDir: Option[Path] = None
val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
@@ -111,12 +114,12 @@ case class InsertIntoHiveTable(
if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) {
throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'")
}
+ createdTempDir = Some(dir)
fs.deleteOnExit(dir)
} catch {
case e: IOException =>
throw new RuntimeException(
"Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e)
-
}
return dir
}
@@ -163,11 +166,11 @@ case class InsertIntoHiveTable(
if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) {
throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString)
}
+ createdTempDir = Some(dirPath)
fs.deleteOnExit(dirPath)
} catch {
case e: IOException =>
throw new RuntimeException("Cannot create staging directory: " + dirPath.toString, e)
-
}
dirPath
}
@@ -378,6 +381,15 @@ case class InsertIntoHiveTable(
isSrcLocal = false)
}
+ // Attempt to delete the staging directory and the inclusive files. If failed, the files are
+ // expected to be dropped at the normal termination of VM since deleteOnExit is used.
+ try {
+ createdTempDir.foreach { path => path.getFileSystem(hadoopConf).delete(path, true) }
+ } catch {
+ case NonFatal(e) =>
+ logWarning(s"Unable to delete staging directory: $stagingDir.\n" + e)
+ }
+
// Invalidate the cache.
sqlContext.sharedState.cacheManager.invalidateCache(table)
sqlContext.sessionState.catalog.refreshTable(table.catalogTable.identifier)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 9b26383a16..8dd06998ba 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -546,6 +546,30 @@ class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSinglet
}
}
+ test(s"$version: Delete the temporary staging directory and files after each insert") {
+ withTempDir { tmpDir =>
+ withTable("tab") {
+ spark.sql(
+ s"""
+ |CREATE TABLE tab(c1 string)
+ |location '${tmpDir.toURI.toString}'
+ """.stripMargin)
+
+ (1 to 3).map { i =>
+ spark.sql(s"INSERT OVERWRITE TABLE tab SELECT '$i'")
+ }
+ def listFiles(path: File): List[String] = {
+ val dir = path.listFiles()
+ val folders = dir.filter(_.isDirectory).toList
+ val filePaths = dir.map(_.getName).toList
+ folders.flatMap(listFiles) ++: filePaths
+ }
+ val expectedFiles = ".part-00000.crc" :: "part-00000" :: Nil
+ assert(listFiles(tmpDir).sorted == expectedFiles)
+ }
+ }
+ }
+
// TODO: add more tests.
}
}