aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2015-12-23 13:24:06 -0800
committerReynold Xin <rxin@databricks.com>2015-12-23 13:24:06 -0800
commitae1f54aa0ed69f9daa1f32766ca234bda9320452 (patch)
treeab42a85d477d289391bc5a8e7ad21d3cd94c78cd /core/src
parent43b2a6390087b7ce262a54dc8ab8dd825db62e21 (diff)
downloadspark-ae1f54aa0ed69f9daa1f32766ca234bda9320452.tar.gz
spark-ae1f54aa0ed69f9daa1f32766ca234bda9320452.tar.bz2
spark-ae1f54aa0ed69f9daa1f32766ca234bda9320452.zip
[SPARK-12500][CORE] Fix Tachyon deprecations; pull Tachyon dependency into one class
Fix Tachyon deprecations; pull Tachyon dependency into `TachyonBlockManager` only CC calvinjia as I probably need a double-check that the usage of the new API is correct. Author: Sean Owen <sowen@cloudera.com> Closes #10449 from srowen/SPARK-12500.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala135
-rw-r--r--core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala42
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala11
3 files changed, 104 insertions, 84 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
index d14fe46135..7f88f2fe6d 100644
--- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
@@ -26,13 +26,17 @@ import scala.util.control.NonFatal
import com.google.common.io.ByteStreams
-import tachyon.client.{ReadType, WriteType, TachyonFS, TachyonFile}
+import tachyon.{Constants, TachyonURI}
+import tachyon.client.ClientContext
+import tachyon.client.file.{TachyonFile, TachyonFileSystem}
+import tachyon.client.file.TachyonFileSystem.TachyonFileSystemFactory
+import tachyon.client.file.options.DeleteOptions
import tachyon.conf.TachyonConf
-import tachyon.TachyonURI
+import tachyon.exception.{FileAlreadyExistsException, FileDoesNotExistException}
import org.apache.spark.Logging
import org.apache.spark.executor.ExecutorExitCode
-import org.apache.spark.util.{ShutdownHookManager, Utils}
+import org.apache.spark.util.Utils
/**
@@ -44,15 +48,15 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
var rootDirs: String = _
var master: String = _
- var client: tachyon.client.TachyonFS = _
+ var client: TachyonFileSystem = _
private var subDirsPerTachyonDir: Int = _
// Create one Tachyon directory for each path mentioned in spark.tachyonStore.folderName;
// then, inside this directory, create multiple subdirectories that we will hash files into,
// in order to avoid having really large inodes at the top level in Tachyon.
private var tachyonDirs: Array[TachyonFile] = _
- private var subDirs: Array[Array[tachyon.client.TachyonFile]] = _
-
+ private var subDirs: Array[Array[TachyonFile]] = _
+ private val shutdownDeleteTachyonPaths = new scala.collection.mutable.HashSet[String]()
override def init(blockManager: BlockManager, executorId: String): Unit = {
super.init(blockManager, executorId)
@@ -62,7 +66,10 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
rootDirs = s"$storeDir/$appFolderName/$executorId"
master = blockManager.conf.get(ExternalBlockStore.MASTER_URL, "tachyon://localhost:19998")
client = if (master != null && master != "") {
- TachyonFS.get(new TachyonURI(master), new TachyonConf())
+ val tachyonConf = new TachyonConf()
+ tachyonConf.set(Constants.MASTER_ADDRESS, master)
+ ClientContext.reset(tachyonConf)
+ TachyonFileSystemFactory.get
} else {
null
}
@@ -80,7 +87,7 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
// in order to avoid having really large inodes at the top level in Tachyon.
tachyonDirs = createTachyonDirs()
subDirs = Array.fill(tachyonDirs.length)(new Array[TachyonFile](subDirsPerTachyonDir))
- tachyonDirs.foreach(tachyonDir => ShutdownHookManager.registerShutdownDeleteDir(tachyonDir))
+ tachyonDirs.foreach(registerShutdownDeleteDir)
}
override def toString: String = {"ExternalBlockStore-Tachyon"}
@@ -89,6 +96,7 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
val file = getFile(blockId)
if (fileExists(file)) {
removeFile(file)
+ true
} else {
false
}
@@ -101,7 +109,7 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
override def putBytes(blockId: BlockId, bytes: ByteBuffer): Unit = {
val file = getFile(blockId)
- val os = file.getOutStream(WriteType.TRY_CACHE)
+ val os = client.getOutStream(new TachyonURI(client.getInfo(file).getPath))
try {
Utils.writeByteBuffer(bytes, os)
} catch {
@@ -115,7 +123,7 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
override def putValues(blockId: BlockId, values: Iterator[_]): Unit = {
val file = getFile(blockId)
- val os = file.getOutStream(WriteType.TRY_CACHE)
+ val os = client.getOutStream(new TachyonURI(client.getInfo(file).getPath))
try {
blockManager.dataSerializeStream(blockId, os, values)
} catch {
@@ -129,12 +137,17 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
override def getBytes(blockId: BlockId): Option[ByteBuffer] = {
val file = getFile(blockId)
- if (file == null || file.getLocationHosts.size == 0) {
+ if (file == null) {
return None
}
- val is = file.getInStream(ReadType.CACHE)
+ val is = try {
+ client.getInStream(file)
+ } catch {
+ case _: FileDoesNotExistException =>
+ return None
+ }
try {
- val size = file.length
+ val size = client.getInfo(file).length
val bs = new Array[Byte](size.asInstanceOf[Int])
ByteStreams.readFully(is, bs)
Some(ByteBuffer.wrap(bs))
@@ -149,25 +162,37 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
override def getValues(blockId: BlockId): Option[Iterator[_]] = {
val file = getFile(blockId)
- if (file == null || file.getLocationHosts().size() == 0) {
+ if (file == null) {
return None
}
- val is = file.getInStream(ReadType.CACHE)
- Option(is).map { is =>
- blockManager.dataDeserializeStream(blockId, is)
+ val is = try {
+ client.getInStream(file)
+ } catch {
+ case _: FileDoesNotExistException =>
+ return None
+ }
+ try {
+ Some(blockManager.dataDeserializeStream(blockId, is))
+ } finally {
+ is.close()
}
}
override def getSize(blockId: BlockId): Long = {
- getFile(blockId.name).length
+ client.getInfo(getFile(blockId.name)).length
}
- def removeFile(file: TachyonFile): Boolean = {
- client.delete(new TachyonURI(file.getPath()), false)
+ def removeFile(file: TachyonFile): Unit = {
+ client.delete(file)
}
def fileExists(file: TachyonFile): Boolean = {
- client.exist(new TachyonURI(file.getPath()))
+ try {
+ client.getInfo(file)
+ true
+ } catch {
+ case _: FileDoesNotExistException => false
+ }
}
def getFile(filename: String): TachyonFile = {
@@ -186,18 +211,18 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
} else {
val path = new TachyonURI(s"${tachyonDirs(dirId)}/${"%02x".format(subDirId)}")
client.mkdir(path)
- val newDir = client.getFile(path)
+ val newDir = client.loadMetadata(path)
subDirs(dirId)(subDirId) = newDir
newDir
}
}
}
val filePath = new TachyonURI(s"$subDir/$filename")
- if(!client.exist(filePath)) {
- client.createFile(filePath)
+ try {
+ client.create(filePath)
+ } catch {
+ case _: FileAlreadyExistsException => client.loadMetadata(filePath)
}
- val file = client.getFile(filePath)
- file
}
def getFile(blockId: BlockId): TachyonFile = getFile(blockId.name)
@@ -217,9 +242,11 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
try {
tachyonDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536))
val path = new TachyonURI(s"$rootDir/spark-tachyon-$tachyonDirId")
- if (!client.exist(path)) {
+ try {
foundLocalDir = client.mkdir(path)
- tachyonDir = client.getFile(path)
+ tachyonDir = client.loadMetadata(path)
+ } catch {
+ case _: FileAlreadyExistsException => // continue
}
} catch {
case NonFatal(e) =>
@@ -240,14 +267,60 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log
logDebug("Shutdown hook called")
tachyonDirs.foreach { tachyonDir =>
try {
- if (!ShutdownHookManager.hasRootAsShutdownDeleteDir(tachyonDir)) {
- Utils.deleteRecursively(tachyonDir, client)
+ if (!hasRootAsShutdownDeleteDir(tachyonDir)) {
+ deleteRecursively(tachyonDir, client)
}
} catch {
case NonFatal(e) =>
logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
}
}
- client.close()
}
+
+ /**
+ * Delete a file or directory and its contents recursively.
+ */
+ private def deleteRecursively(dir: TachyonFile, client: TachyonFileSystem) {
+ client.delete(dir, new DeleteOptions.Builder(ClientContext.getConf).setRecursive(true).build())
+ }
+
+ // Register the tachyon path to be deleted via shutdown hook
+ private def registerShutdownDeleteDir(file: TachyonFile) {
+ val absolutePath = client.getInfo(file).getPath
+ shutdownDeleteTachyonPaths.synchronized {
+ shutdownDeleteTachyonPaths += absolutePath
+ }
+ }
+
+ // Remove the tachyon path to be deleted via shutdown hook
+ private def removeShutdownDeleteDir(file: TachyonFile) {
+ val absolutePath = client.getInfo(file).getPath
+ shutdownDeleteTachyonPaths.synchronized {
+ shutdownDeleteTachyonPaths -= absolutePath
+ }
+ }
+
+ // Is the path already registered to be deleted via a shutdown hook ?
+ private def hasShutdownDeleteTachyonDir(file: TachyonFile): Boolean = {
+ val absolutePath = client.getInfo(file).getPath
+ shutdownDeleteTachyonPaths.synchronized {
+ shutdownDeleteTachyonPaths.contains(absolutePath)
+ }
+ }
+
+ // Note: if file is child of some registered path, while not equal to it, then return true;
+ // else false. This is to ensure that two shutdown hooks do not try to delete each others
+ // paths - resulting in Exception and incomplete cleanup.
+ private def hasRootAsShutdownDeleteDir(file: TachyonFile): Boolean = {
+ val absolutePath = client.getInfo(file).getPath
+ val hasRoot = shutdownDeleteTachyonPaths.synchronized {
+ shutdownDeleteTachyonPaths.exists(
+ path => !absolutePath.equals(path) && absolutePath.startsWith(path))
+ }
+ if (hasRoot) {
+ logInfo(s"path = $absolutePath, already present as root for deletion.")
+ }
+ hasRoot
+ }
+
}
diff --git a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
index 1a0f3b477b..0065b1fc66 100644
--- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
+++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala
@@ -21,7 +21,6 @@ import java.io.File
import java.util.PriorityQueue
import scala.util.{Failure, Success, Try}
-import tachyon.client.TachyonFile
import org.apache.hadoop.fs.FileSystem
import org.apache.spark.Logging
@@ -52,7 +51,6 @@ private[spark] object ShutdownHookManager extends Logging {
}
private val shutdownDeletePaths = new scala.collection.mutable.HashSet[String]()
- private val shutdownDeleteTachyonPaths = new scala.collection.mutable.HashSet[String]()
// Add a shutdown hook to delete the temp dirs when the JVM exits
addShutdownHook(TEMP_DIR_SHUTDOWN_PRIORITY) { () =>
@@ -77,14 +75,6 @@ private[spark] object ShutdownHookManager extends Logging {
}
}
- // Register the tachyon path to be deleted via shutdown hook
- def registerShutdownDeleteDir(tachyonfile: TachyonFile) {
- val absolutePath = tachyonfile.getPath()
- shutdownDeleteTachyonPaths.synchronized {
- shutdownDeleteTachyonPaths += absolutePath
- }
- }
-
// Remove the path to be deleted via shutdown hook
def removeShutdownDeleteDir(file: File) {
val absolutePath = file.getAbsolutePath()
@@ -93,14 +83,6 @@ private[spark] object ShutdownHookManager extends Logging {
}
}
- // Remove the tachyon path to be deleted via shutdown hook
- def removeShutdownDeleteDir(tachyonfile: TachyonFile) {
- val absolutePath = tachyonfile.getPath()
- shutdownDeleteTachyonPaths.synchronized {
- shutdownDeleteTachyonPaths.remove(absolutePath)
- }
- }
-
// Is the path already registered to be deleted via a shutdown hook ?
def hasShutdownDeleteDir(file: File): Boolean = {
val absolutePath = file.getAbsolutePath()
@@ -109,14 +91,6 @@ private[spark] object ShutdownHookManager extends Logging {
}
}
- // Is the path already registered to be deleted via a shutdown hook ?
- def hasShutdownDeleteTachyonDir(file: TachyonFile): Boolean = {
- val absolutePath = file.getPath()
- shutdownDeleteTachyonPaths.synchronized {
- shutdownDeleteTachyonPaths.contains(absolutePath)
- }
- }
-
// Note: if file is child of some registered path, while not equal to it, then return true;
// else false. This is to ensure that two shutdown hooks do not try to delete each others
// paths - resulting in IOException and incomplete cleanup.
@@ -133,22 +107,6 @@ private[spark] object ShutdownHookManager extends Logging {
retval
}
- // Note: if file is child of some registered path, while not equal to it, then return true;
- // else false. This is to ensure that two shutdown hooks do not try to delete each others
- // paths - resulting in Exception and incomplete cleanup.
- def hasRootAsShutdownDeleteDir(file: TachyonFile): Boolean = {
- val absolutePath = file.getPath()
- val retval = shutdownDeleteTachyonPaths.synchronized {
- shutdownDeleteTachyonPaths.exists { path =>
- !absolutePath.equals(path) && absolutePath.startsWith(path)
- }
- }
- if (retval) {
- logInfo("path = " + file + ", already present as root for deletion.")
- }
- retval
- }
-
/**
* Detect whether this thread might be executing a shutdown hook. Will always return true if
* the current thread is a running a shutdown hook but may spuriously return true otherwise (e.g.
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 1a07f7ca7e..b8ca6b07e4 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -44,8 +44,6 @@ import org.apache.log4j.PropertyConfigurator
import org.eclipse.jetty.util.MultiException
import org.json4s._
import org.slf4j.Logger
-import tachyon.TachyonURI
-import tachyon.client.{TachyonFS, TachyonFile}
import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
@@ -947,15 +945,6 @@ private[spark] object Utils extends Logging {
}
/**
- * Delete a file or directory and its contents recursively.
- */
- def deleteRecursively(dir: TachyonFile, client: TachyonFS) {
- if (!client.delete(new TachyonURI(dir.getPath()), true)) {
- throw new IOException("Failed to delete the tachyon dir: " + dir)
- }
- }
-
- /**
* Check to see if file is a symbolic link.
*/
def isSymlink(file: File): Boolean = {