diff options
author | Sean Owen <sowen@cloudera.com> | 2015-12-23 13:24:06 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-12-23 13:24:06 -0800 |
commit | ae1f54aa0ed69f9daa1f32766ca234bda9320452 (patch) | |
tree | ab42a85d477d289391bc5a8e7ad21d3cd94c78cd /core/src | |
parent | 43b2a6390087b7ce262a54dc8ab8dd825db62e21 (diff) | |
download | spark-ae1f54aa0ed69f9daa1f32766ca234bda9320452.tar.gz spark-ae1f54aa0ed69f9daa1f32766ca234bda9320452.tar.bz2 spark-ae1f54aa0ed69f9daa1f32766ca234bda9320452.zip |
[SPARK-12500][CORE] Fix Tachyon deprecations; pull Tachyon dependency into one class
Fix Tachyon deprecations; pull Tachyon dependency into `TachyonBlockManager` only
CC calvinjia as I probably need a double-check that the usage of the new API is correct.
Author: Sean Owen <sowen@cloudera.com>
Closes #10449 from srowen/SPARK-12500.
Diffstat (limited to 'core/src')
3 files changed, 104 insertions, 84 deletions
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala index d14fe46135..7f88f2fe6d 100644 --- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala @@ -26,13 +26,17 @@ import scala.util.control.NonFatal import com.google.common.io.ByteStreams -import tachyon.client.{ReadType, WriteType, TachyonFS, TachyonFile} +import tachyon.{Constants, TachyonURI} +import tachyon.client.ClientContext +import tachyon.client.file.{TachyonFile, TachyonFileSystem} +import tachyon.client.file.TachyonFileSystem.TachyonFileSystemFactory +import tachyon.client.file.options.DeleteOptions import tachyon.conf.TachyonConf -import tachyon.TachyonURI +import tachyon.exception.{FileAlreadyExistsException, FileDoesNotExistException} import org.apache.spark.Logging import org.apache.spark.executor.ExecutorExitCode -import org.apache.spark.util.{ShutdownHookManager, Utils} +import org.apache.spark.util.Utils /** @@ -44,15 +48,15 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log var rootDirs: String = _ var master: String = _ - var client: tachyon.client.TachyonFS = _ + var client: TachyonFileSystem = _ private var subDirsPerTachyonDir: Int = _ // Create one Tachyon directory for each path mentioned in spark.tachyonStore.folderName; // then, inside this directory, create multiple subdirectories that we will hash files into, // in order to avoid having really large inodes at the top level in Tachyon. private var tachyonDirs: Array[TachyonFile] = _ - private var subDirs: Array[Array[tachyon.client.TachyonFile]] = _ - + private var subDirs: Array[Array[TachyonFile]] = _ + private val shutdownDeleteTachyonPaths = new scala.collection.mutable.HashSet[String]() override def init(blockManager: BlockManager, executorId: String): Unit = { super.init(blockManager, executorId) @@ -62,7 +66,10 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log rootDirs = s"$storeDir/$appFolderName/$executorId" master = blockManager.conf.get(ExternalBlockStore.MASTER_URL, "tachyon://localhost:19998") client = if (master != null && master != "") { - TachyonFS.get(new TachyonURI(master), new TachyonConf()) + val tachyonConf = new TachyonConf() + tachyonConf.set(Constants.MASTER_ADDRESS, master) + ClientContext.reset(tachyonConf) + TachyonFileSystemFactory.get } else { null } @@ -80,7 +87,7 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log // in order to avoid having really large inodes at the top level in Tachyon. tachyonDirs = createTachyonDirs() subDirs = Array.fill(tachyonDirs.length)(new Array[TachyonFile](subDirsPerTachyonDir)) - tachyonDirs.foreach(tachyonDir => ShutdownHookManager.registerShutdownDeleteDir(tachyonDir)) + tachyonDirs.foreach(registerShutdownDeleteDir) } override def toString: String = {"ExternalBlockStore-Tachyon"} @@ -89,6 +96,7 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log val file = getFile(blockId) if (fileExists(file)) { removeFile(file) + true } else { false } @@ -101,7 +109,7 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log override def putBytes(blockId: BlockId, bytes: ByteBuffer): Unit = { val file = getFile(blockId) - val os = file.getOutStream(WriteType.TRY_CACHE) + val os = client.getOutStream(new TachyonURI(client.getInfo(file).getPath)) try { Utils.writeByteBuffer(bytes, os) } catch { @@ -115,7 +123,7 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log override def putValues(blockId: BlockId, values: Iterator[_]): Unit = { val file = getFile(blockId) - val os = file.getOutStream(WriteType.TRY_CACHE) + val os = client.getOutStream(new TachyonURI(client.getInfo(file).getPath)) try { blockManager.dataSerializeStream(blockId, os, values) } catch { @@ -129,12 +137,17 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log override def getBytes(blockId: BlockId): Option[ByteBuffer] = { val file = getFile(blockId) - if (file == null || file.getLocationHosts.size == 0) { + if (file == null) { return None } - val is = file.getInStream(ReadType.CACHE) + val is = try { + client.getInStream(file) + } catch { + case _: FileDoesNotExistException => + return None + } try { - val size = file.length + val size = client.getInfo(file).length val bs = new Array[Byte](size.asInstanceOf[Int]) ByteStreams.readFully(is, bs) Some(ByteBuffer.wrap(bs)) @@ -149,25 +162,37 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log override def getValues(blockId: BlockId): Option[Iterator[_]] = { val file = getFile(blockId) - if (file == null || file.getLocationHosts().size() == 0) { + if (file == null) { return None } - val is = file.getInStream(ReadType.CACHE) - Option(is).map { is => - blockManager.dataDeserializeStream(blockId, is) + val is = try { + client.getInStream(file) + } catch { + case _: FileDoesNotExistException => + return None + } + try { + Some(blockManager.dataDeserializeStream(blockId, is)) + } finally { + is.close() } } override def getSize(blockId: BlockId): Long = { - getFile(blockId.name).length + client.getInfo(getFile(blockId.name)).length } - def removeFile(file: TachyonFile): Boolean = { - client.delete(new TachyonURI(file.getPath()), false) + def removeFile(file: TachyonFile): Unit = { + client.delete(file) } def fileExists(file: TachyonFile): Boolean = { - client.exist(new TachyonURI(file.getPath())) + try { + client.getInfo(file) + true + } catch { + case _: FileDoesNotExistException => false + } } def getFile(filename: String): TachyonFile = { @@ -186,18 +211,18 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log } else { val path = new TachyonURI(s"${tachyonDirs(dirId)}/${"%02x".format(subDirId)}") client.mkdir(path) - val newDir = client.getFile(path) + val newDir = client.loadMetadata(path) subDirs(dirId)(subDirId) = newDir newDir } } } val filePath = new TachyonURI(s"$subDir/$filename") - if(!client.exist(filePath)) { - client.createFile(filePath) + try { + client.create(filePath) + } catch { + case _: FileAlreadyExistsException => client.loadMetadata(filePath) } - val file = client.getFile(filePath) - file } def getFile(blockId: BlockId): TachyonFile = getFile(blockId.name) @@ -217,9 +242,11 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log try { tachyonDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536)) val path = new TachyonURI(s"$rootDir/spark-tachyon-$tachyonDirId") - if (!client.exist(path)) { + try { foundLocalDir = client.mkdir(path) - tachyonDir = client.getFile(path) + tachyonDir = client.loadMetadata(path) + } catch { + case _: FileAlreadyExistsException => // continue } } catch { case NonFatal(e) => @@ -240,14 +267,60 @@ private[spark] class TachyonBlockManager() extends ExternalBlockManager with Log logDebug("Shutdown hook called") tachyonDirs.foreach { tachyonDir => try { - if (!ShutdownHookManager.hasRootAsShutdownDeleteDir(tachyonDir)) { - Utils.deleteRecursively(tachyonDir, client) + if (!hasRootAsShutdownDeleteDir(tachyonDir)) { + deleteRecursively(tachyonDir, client) } } catch { case NonFatal(e) => logError("Exception while deleting tachyon spark dir: " + tachyonDir, e) } } - client.close() } + + /** + * Delete a file or directory and its contents recursively. + */ + private def deleteRecursively(dir: TachyonFile, client: TachyonFileSystem) { + client.delete(dir, new DeleteOptions.Builder(ClientContext.getConf).setRecursive(true).build()) + } + + // Register the tachyon path to be deleted via shutdown hook + private def registerShutdownDeleteDir(file: TachyonFile) { + val absolutePath = client.getInfo(file).getPath + shutdownDeleteTachyonPaths.synchronized { + shutdownDeleteTachyonPaths += absolutePath + } + } + + // Remove the tachyon path to be deleted via shutdown hook + private def removeShutdownDeleteDir(file: TachyonFile) { + val absolutePath = client.getInfo(file).getPath + shutdownDeleteTachyonPaths.synchronized { + shutdownDeleteTachyonPaths -= absolutePath + } + } + + // Is the path already registered to be deleted via a shutdown hook ? + private def hasShutdownDeleteTachyonDir(file: TachyonFile): Boolean = { + val absolutePath = client.getInfo(file).getPath + shutdownDeleteTachyonPaths.synchronized { + shutdownDeleteTachyonPaths.contains(absolutePath) + } + } + + // Note: if file is child of some registered path, while not equal to it, then return true; + // else false. This is to ensure that two shutdown hooks do not try to delete each others + // paths - resulting in Exception and incomplete cleanup. + private def hasRootAsShutdownDeleteDir(file: TachyonFile): Boolean = { + val absolutePath = client.getInfo(file).getPath + val hasRoot = shutdownDeleteTachyonPaths.synchronized { + shutdownDeleteTachyonPaths.exists( + path => !absolutePath.equals(path) && absolutePath.startsWith(path)) + } + if (hasRoot) { + logInfo(s"path = $absolutePath, already present as root for deletion.") + } + hasRoot + } + } diff --git a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala index 1a0f3b477b..0065b1fc66 100644 --- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala +++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala @@ -21,7 +21,6 @@ import java.io.File import java.util.PriorityQueue import scala.util.{Failure, Success, Try} -import tachyon.client.TachyonFile import org.apache.hadoop.fs.FileSystem import org.apache.spark.Logging @@ -52,7 +51,6 @@ private[spark] object ShutdownHookManager extends Logging { } private val shutdownDeletePaths = new scala.collection.mutable.HashSet[String]() - private val shutdownDeleteTachyonPaths = new scala.collection.mutable.HashSet[String]() // Add a shutdown hook to delete the temp dirs when the JVM exits addShutdownHook(TEMP_DIR_SHUTDOWN_PRIORITY) { () => @@ -77,14 +75,6 @@ private[spark] object ShutdownHookManager extends Logging { } } - // Register the tachyon path to be deleted via shutdown hook - def registerShutdownDeleteDir(tachyonfile: TachyonFile) { - val absolutePath = tachyonfile.getPath() - shutdownDeleteTachyonPaths.synchronized { - shutdownDeleteTachyonPaths += absolutePath - } - } - // Remove the path to be deleted via shutdown hook def removeShutdownDeleteDir(file: File) { val absolutePath = file.getAbsolutePath() @@ -93,14 +83,6 @@ private[spark] object ShutdownHookManager extends Logging { } } - // Remove the tachyon path to be deleted via shutdown hook - def removeShutdownDeleteDir(tachyonfile: TachyonFile) { - val absolutePath = tachyonfile.getPath() - shutdownDeleteTachyonPaths.synchronized { - shutdownDeleteTachyonPaths.remove(absolutePath) - } - } - // Is the path already registered to be deleted via a shutdown hook ? def hasShutdownDeleteDir(file: File): Boolean = { val absolutePath = file.getAbsolutePath() @@ -109,14 +91,6 @@ private[spark] object ShutdownHookManager extends Logging { } } - // Is the path already registered to be deleted via a shutdown hook ? - def hasShutdownDeleteTachyonDir(file: TachyonFile): Boolean = { - val absolutePath = file.getPath() - shutdownDeleteTachyonPaths.synchronized { - shutdownDeleteTachyonPaths.contains(absolutePath) - } - } - // Note: if file is child of some registered path, while not equal to it, then return true; // else false. This is to ensure that two shutdown hooks do not try to delete each others // paths - resulting in IOException and incomplete cleanup. @@ -133,22 +107,6 @@ private[spark] object ShutdownHookManager extends Logging { retval } - // Note: if file is child of some registered path, while not equal to it, then return true; - // else false. This is to ensure that two shutdown hooks do not try to delete each others - // paths - resulting in Exception and incomplete cleanup. - def hasRootAsShutdownDeleteDir(file: TachyonFile): Boolean = { - val absolutePath = file.getPath() - val retval = shutdownDeleteTachyonPaths.synchronized { - shutdownDeleteTachyonPaths.exists { path => - !absolutePath.equals(path) && absolutePath.startsWith(path) - } - } - if (retval) { - logInfo("path = " + file + ", already present as root for deletion.") - } - retval - } - /** * Detect whether this thread might be executing a shutdown hook. Will always return true if * the current thread is a running a shutdown hook but may spuriously return true otherwise (e.g. diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 1a07f7ca7e..b8ca6b07e4 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -44,8 +44,6 @@ import org.apache.log4j.PropertyConfigurator import org.eclipse.jetty.util.MultiException import org.json4s._ import org.slf4j.Logger -import tachyon.TachyonURI -import tachyon.client.{TachyonFS, TachyonFile} import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil @@ -947,15 +945,6 @@ private[spark] object Utils extends Logging { } /** - * Delete a file or directory and its contents recursively. - */ - def deleteRecursively(dir: TachyonFile, client: TachyonFS) { - if (!client.delete(new TachyonURI(dir.getPath()), true)) { - throw new IOException("Failed to delete the tachyon dir: " + dir) - } - } - - /** * Check to see if file is a symbolic link. */ def isSymlink(file: File): Boolean = { |