aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorCalvin Jia <jia.calvin@gmail.com>2015-03-22 11:11:29 -0700
committerAaron Davidson <aaron@databricks.com>2015-03-22 11:11:29 -0700
commita41b9c6004cfee84bd56dfa1faf5a0cf084551ae (patch)
treec6c132e68c41e4936946df44fca68d85f18a4576 /core
parent6ef48632fbf3e6659ceacaab1dbb8be8238d4d33 (diff)
downloadspark-a41b9c6004cfee84bd56dfa1faf5a0cf084551ae.tar.gz
spark-a41b9c6004cfee84bd56dfa1faf5a0cf084551ae.tar.bz2
spark-a41b9c6004cfee84bd56dfa1faf5a0cf084551ae.zip
[SPARK-6122][Core] Upgrade Tachyon client version to 0.6.1.
Changes the Tachyon client version from 0.5 to 0.6 in spark core and distribution script. New dependencies in Tachyon 0.6.0 include commons-codec:commons-codec:jar:1.5:compile io.netty:netty-all:jar:4.0.23.Final:compile These are already in spark core. Author: Calvin Jia <jia.calvin@gmail.com> Closes #4867 from calvinjia/upgrade_tachyon_0.6.0 and squashes the following commits: eed9230 [Calvin Jia] Update tachyon version to 0.6.1. 11907b3 [Calvin Jia] Use TachyonURI for tachyon paths instead of strings. 71bf441 [Calvin Jia] Upgrade Tachyon client version to 0.6.0.
Diffstat (limited to 'core')
-rw-r--r--core/pom.xml2
-rw-r--r--core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala27
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala4
3 files changed, 17 insertions, 16 deletions
diff --git a/core/pom.xml b/core/pom.xml
index 6cd1965ec3..868834dd50 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -275,7 +275,7 @@
<dependency>
<groupId>org.tachyonproject</groupId>
<artifactId>tachyon-client</artifactId>
- <version>0.5.0</version>
+ <version>0.6.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
index af87303421..2ab6a8f3ec 100644
--- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
@@ -20,8 +20,8 @@ package org.apache.spark.storage
import java.text.SimpleDateFormat
import java.util.{Date, Random}
-import tachyon.client.TachyonFS
-import tachyon.client.TachyonFile
+import tachyon.TachyonURI
+import tachyon.client.{TachyonFile, TachyonFS}
import org.apache.spark.Logging
import org.apache.spark.executor.ExecutorExitCode
@@ -40,7 +40,7 @@ private[spark] class TachyonBlockManager(
val master: String)
extends Logging {
- val client = if (master != null && master != "") TachyonFS.get(master) else null
+ val client = if (master != null && master != "") TachyonFS.get(new TachyonURI(master)) else null
if (client == null) {
logError("Failed to connect to the Tachyon as the master address is not configured")
@@ -60,11 +60,11 @@ private[spark] class TachyonBlockManager(
addShutdownHook()
def removeFile(file: TachyonFile): Boolean = {
- client.delete(file.getPath(), false)
+ client.delete(new TachyonURI(file.getPath()), false)
}
def fileExists(file: TachyonFile): Boolean = {
- client.exist(file.getPath())
+ client.exist(new TachyonURI(file.getPath()))
}
def getFile(filename: String): TachyonFile = {
@@ -81,7 +81,7 @@ private[spark] class TachyonBlockManager(
if (old != null) {
old
} else {
- val path = tachyonDirs(dirId) + "/" + "%02x".format(subDirId)
+ val path = new TachyonURI(s"${tachyonDirs(dirId)}/${"%02x".format(subDirId)}")
client.mkdir(path)
val newDir = client.getFile(path)
subDirs(dirId)(subDirId) = newDir
@@ -89,7 +89,7 @@ private[spark] class TachyonBlockManager(
}
}
}
- val filePath = subDir + "/" + filename
+ val filePath = new TachyonURI(s"$subDir/$filename")
if(!client.exist(filePath)) {
client.createFile(filePath)
}
@@ -101,7 +101,7 @@ private[spark] class TachyonBlockManager(
// TODO: Some of the logic here could be consolidated/de-duplicated with that in the DiskStore.
private def createTachyonDirs(): Array[TachyonFile] = {
- logDebug("Creating tachyon directories at root dirs '" + rootDirs + "'")
+ logDebug(s"Creating tachyon directories at root dirs '$rootDirs'")
val dateFormat = new SimpleDateFormat("yyyyMMddHHmmss")
rootDirs.split(",").map { rootDir =>
var foundLocalDir = false
@@ -113,22 +113,21 @@ private[spark] class TachyonBlockManager(
tries += 1
try {
tachyonDirId = "%s-%04x".format(dateFormat.format(new Date), rand.nextInt(65536))
- val path = rootDir + "/" + "spark-tachyon-" + tachyonDirId
+ val path = new TachyonURI(s"$rootDir/spark-tachyon-$tachyonDirId")
if (!client.exist(path)) {
foundLocalDir = client.mkdir(path)
tachyonDir = client.getFile(path)
}
} catch {
case e: Exception =>
- logWarning("Attempt " + tries + " to create tachyon dir " + tachyonDir + " failed", e)
+ logWarning(s"Attempt $tries to create tachyon dir $tachyonDir failed", e)
}
}
if (!foundLocalDir) {
- logError("Failed " + MAX_DIR_CREATION_ATTEMPTS + " attempts to create tachyon dir in " +
- rootDir)
+ logError(s"Failed $MAX_DIR_CREATION_ATTEMPTS attempts to create tachyon dir in $rootDir")
System.exit(ExecutorExitCode.TACHYON_STORE_FAILED_TO_CREATE_DIR)
}
- logInfo("Created tachyon directory at " + tachyonDir)
+ logInfo(s"Created tachyon directory at $tachyonDir")
tachyonDir
}
}
@@ -145,7 +144,7 @@ private[spark] class TachyonBlockManager(
}
} catch {
case e: Exception =>
- logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
+ logError(s"Exception while deleting tachyon spark dir: $tachyonDir", e)
}
}
client.close()
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index fa56bb09e4..91d833295e 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -42,6 +42,8 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.log4j.PropertyConfigurator
import org.eclipse.jetty.util.MultiException
import org.json4s._
+
+import tachyon.TachyonURI
import tachyon.client.{TachyonFS, TachyonFile}
import org.apache.spark._
@@ -970,7 +972,7 @@ private[spark] object Utils extends Logging {
* Delete a file or directory and its contents recursively.
*/
def deleteRecursively(dir: TachyonFile, client: TachyonFS) {
- if (!client.delete(dir.getPath(), true)) {
+ if (!client.delete(new TachyonURI(dir.getPath()), true)) {
throw new IOException("Failed to delete the tachyon dir: " + dir)
}
}