diff options
author | Calvin Jia <jia.calvin@gmail.com> | 2015-03-22 11:11:29 -0700 |
---|---|---|
committer | Aaron Davidson <aaron@databricks.com> | 2015-03-22 11:11:29 -0700 |
commit | a41b9c6004cfee84bd56dfa1faf5a0cf084551ae (patch) | |
tree | c6c132e68c41e4936946df44fca68d85f18a4576 /core | |
parent | 6ef48632fbf3e6659ceacaab1dbb8be8238d4d33 (diff) | |
download | spark-a41b9c6004cfee84bd56dfa1faf5a0cf084551ae.tar.gz spark-a41b9c6004cfee84bd56dfa1faf5a0cf084551ae.tar.bz2 spark-a41b9c6004cfee84bd56dfa1faf5a0cf084551ae.zip |
[SPARK-6122][Core] Upgrade Tachyon client version to 0.6.1.
Changes the Tachyon client version from 0.5 to 0.6 in spark core and distribution script.
New dependencies in Tachyon 0.6.0 include
commons-codec:commons-codec:jar:1.5:compile
io.netty:netty-all:jar:4.0.23.Final:compile
These are already in spark core.
Author: Calvin Jia <jia.calvin@gmail.com>
Closes #4867 from calvinjia/upgrade_tachyon_0.6.0 and squashes the following commits:
eed9230 [Calvin Jia] Update tachyon version to 0.6.1.
11907b3 [Calvin Jia] Use TachyonURI for tachyon paths instead of strings.
71bf441 [Calvin Jia] Upgrade Tachyon client version to 0.6.0.
Diffstat (limited to 'core')
-rw-r--r-- | core/pom.xml | 2 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala | 27 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/Utils.scala | 4 |
3 files changed, 17 insertions, 16 deletions
diff --git a/core/pom.xml b/core/pom.xml index 6cd1965ec3..868834dd50 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -275,7 +275,7 @@ <dependency> <groupId>org.tachyonproject</groupId> <artifactId>tachyon-client</artifactId> - <version>0.5.0</version> + <version>0.6.1</version> <exclusions> <exclusion> <groupId>org.apache.hadoop</groupId> diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala index af87303421..2ab6a8f3ec 100644 --- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala @@ -20,8 +20,8 @@ package org.apache.spark.storage import java.text.SimpleDateFormat import java.util.{Date, Random} -import tachyon.client.TachyonFS -import tachyon.client.TachyonFile +import tachyon.TachyonURI +import tachyon.client.{TachyonFile, TachyonFS} import org.apache.spark.Logging import org.apache.spark.executor.ExecutorExitCode @@ -40,7 +40,7 @@ private[spark] class TachyonBlockManager( val master: String) extends Logging { - val client = if (master != null && master != "") TachyonFS.get(master) else null + val client = if (master != null && master != "") TachyonFS.get(new TachyonURI(master)) else null if (client == null) { logError("Failed to connect to the Tachyon as the master address is not configured") @@ -60,11 +60,11 @@ private[spark] class TachyonBlockManager( addShutdownHook() def removeFile(file: TachyonFile): Boolean = { - client.delete(file.getPath(), false) + client.delete(new TachyonURI(file.getPath()), false) } def fileExists(file: TachyonFile): Boolean = { - client.exist(file.getPath()) + client.exist(new TachyonURI(file.getPath())) } def getFile(filename: String): TachyonFile = { @@ -81,7 +81,7 @@ private[spark] class TachyonBlockManager( if (old != null) { old } else { - val path = tachyonDirs(dirId) + "/" + "%02x".format(subDirId) + val path = new TachyonURI(s"${tachyonDirs(dirId)}/${"%02x".format(subDirId)}") client.mkdir(path) val newDir = client.getFile(path) subDirs(dirId)(subDirId) = newDir @@ -89,7 +89,7 @@ private[spark] class TachyonBlockManager( } } } - val filePath = subDir + "/" + filename + val filePath = new TachyonURI(s"$subDir/$filename") if(!client.exist(filePath)) { client.createFile(filePath) } @@ -101,7 +101,7 @@ private[spark] class TachyonBlockManager( // TODO: Some of the logic here could be consolidated/de-duplicated with that in the DiskStore. private def createTachyonDirs(): Array[TachyonFile] = { - logDebug("Creating tachyon directories at root dirs '" + rootDirs + "'") + logDebug(s"Creating tachyon directories at root dirs '$rootDirs'") val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss") rootDirs.split(",").map { rootDir => var foundLocalDir = false @@ -113,22 +113,21 @@ private[spark] class TachyonBlockManager( tries += 1 try { tachyonDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536)) - val path = rootDir + "/" + "spark-tachyon-" + tachyonDirId + val path = new TachyonURI(s"$rootDir/spark-tachyon-$tachyonDirId") if (!client.exist(path)) { foundLocalDir = client.mkdir(path) tachyonDir = client.getFile(path) } } catch { case e: Exception => - logWarning("Attempt " + tries + " to create tachyon dir " + tachyonDir + " failed", e) + logWarning(s"Attempt $tries to create tachyon dir $tachyonDir failed", e) } } if (!foundLocalDir) { - logError("Failed " + MAX_DIR_CREATION_ATTEMPTS + " attempts to create tachyon dir in " + - rootDir) + logError(s"Failed $MAX_DIR_CREATION_ATTEMPTS attempts to create tachyon dir in $rootDir") System.exit(ExecutorExitCode.TACHYON_STORE_FAILED_TO_CREATE_DIR) } - logInfo("Created tachyon directory at " + tachyonDir) + logInfo(s"Created tachyon directory at $tachyonDir") tachyonDir } } @@ -145,7 +144,7 @@ private[spark] class TachyonBlockManager( } } catch { case e: Exception => - logError("Exception while deleting tachyon spark dir: " + tachyonDir, e) + logError(s"Exception while deleting tachyon spark dir: $tachyonDir", e) } } client.close() diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index fa56bb09e4..91d833295e 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -42,6 +42,8 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.log4j.PropertyConfigurator import org.eclipse.jetty.util.MultiException import org.json4s._ + +import tachyon.TachyonURI import tachyon.client.{TachyonFS, TachyonFile} import org.apache.spark._ @@ -970,7 +972,7 @@ private[spark] object Utils extends Logging { * Delete a file or directory and its contents recursively. */ def deleteRecursively(dir: TachyonFile, client: TachyonFS) { - if (!client.delete(dir.getPath(), true)) { + if (!client.delete(new TachyonURI(dir.getPath()), true)) { throw new IOException("Failed to delete the tachyon dir: " + dir) } } |