diff options
author | Haoyuan Li <haoyuan@cs.berkeley.edu> | 2014-04-04 20:36:24 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-04-04 20:38:20 -0700 |
commit | b50ddfde0342990979979e58348f54c10b500c90 (patch) | |
tree | cc7fa4d089375cded5056d9a93079e0b23a32ae7 /examples/src | |
parent | 1347ebd4b52ffb9197fc4137a55dff6badb149ba (diff) | |
download | spark-b50ddfde0342990979979e58348f54c10b500c90.tar.gz spark-b50ddfde0342990979979e58348f54c10b500c90.tar.bz2 spark-b50ddfde0342990979979e58348f54c10b500c90.zip |
SPARK-1305: Support persisting RDD's directly to Tachyon
Move the PR#468 of apache-incubator-spark to the apache-spark
"Adding an option to persist Spark RDD blocks into Tachyon."
Author: Haoyuan Li <haoyuan@cs.berkeley.edu>
Author: RongGu <gurongwalker@gmail.com>
Closes #158 from RongGu/master and squashes the following commits:
72b7768 [Haoyuan Li] merge master
9f7fa1b [Haoyuan Li] fix code style
ae7834b [Haoyuan Li] minor cleanup
a8b3ec6 [Haoyuan Li] merge master branch
e0f4891 [Haoyuan Li] better check offheap.
55b5918 [RongGu] address matei's comment on the replication of offHeap storagelevel
7cd4600 [RongGu] remove some logic code for tachyonstore's replication
51149e7 [RongGu] address aaron's comment on returning value of the remove() function in tachyonstore
8adfcfa [RongGu] address arron's comment on inTachyonSize
120e48a [RongGu] changed the root-level dir name in Tachyon
5cc041c [Haoyuan Li] address aaron's comments
9b97935 [Haoyuan Li] address aaron's comments
d9a6438 [Haoyuan Li] fix for pspark
77d2703 [Haoyuan Li] change python api.git status
3dcace4 [Haoyuan Li] address matei's comments
91fa09d [Haoyuan Li] address patrick's comments
589eafe [Haoyuan Li] use TRY_CACHE instead of MUST_CACHE
64348b2 [Haoyuan Li] update conf docs.
ed73e19 [Haoyuan Li] Merge branch 'master' of github.com:RongGu/spark-1
619a9a8 [RongGu] set number of directories in TachyonStore back to 64; added a TODO tag for duplicated code from the DiskStore
be79d77 [RongGu] find a way to clean up some unnecessay metods and classed to make the code simpler
49cc724 [Haoyuan Li] update docs with off_headp option
4572f9f [RongGu] reserving the old apply function API of StorageLevel
04301d3 [RongGu] rename StorageLevel.TACHYON to Storage.OFF_HEAP
c9aeabf [RongGu] rename the StorgeLevel.TACHYON as StorageLevel.OFF_HEAP
76805aa [RongGu] unifies the config properties name prefix; add the configs into docs/configuration.md
e700d9c [RongGu] add the SparkTachyonHdfsLR example and some comments
fd84156 [RongGu] use randomUUID to generate sparkapp directory name on tachyon;minor code style fix
939e467 [Haoyuan Li] 0.4.1-thrift from maven central
86a2eab [Haoyuan Li] tachyon 0.4.1-thrift is in the staging repo. but jenkins failed to download it. temporarily revert it back to 0.4.1
16c5798 [RongGu] make the dependency on tachyon as tachyon-0.4.1-thrift
eacb2e8 [RongGu] Merge branch 'master' of https://github.com/RongGu/spark-1
bbeb4de [RongGu] fix the JsonProtocolSuite test failure problem
6adb58f [RongGu] Merge branch 'master' of https://github.com/RongGu/spark-1
d827250 [RongGu] fix JsonProtocolSuie test failure
716e93b [Haoyuan Li] revert the version
ca14469 [Haoyuan Li] bump tachyon version to 0.4.1-thrift
2825a13 [RongGu] up-merging to the current master branch of the apache spark
6a22c1a [Haoyuan Li] fix scalastyle
8968b67 [Haoyuan Li] exclude more libraries from tachyon dependency to be the same as referencing tachyon-client.
77be7e8 [RongGu] address mateiz's comment about the temp folder name problem. The implementation followed mateiz's advice.
1dcadf9 [Haoyuan Li] typo
bf278fa [Haoyuan Li] fix python tests
e82909c [Haoyuan Li] minor cleanup
776a56c [Haoyuan Li] address patrick's and ali's comments from the previous PR
8859371 [Haoyuan Li] various minor fixes and clean up
e3ddbba [Haoyuan Li] add doc to use Tachyon cache mode.
fcaeab2 [Haoyuan Li] address Aaron's comment
e554b1e [Haoyuan Li] add python code
47304b3 [Haoyuan Li] make tachyonStore in BlockMananger lazy val; add more comments StorageLevels.
dc8ef24 [Haoyuan Li] add old storelevel constructor
e01a271 [Haoyuan Li] update tachyon 0.4.1
8011a96 [RongGu] fix a brought-in mistake in StorageLevel
70ca182 [RongGu] a bit change in comment
556978b [RongGu] fix the scalastyle errors
791189b [RongGu] "Adding an option to persist Spark RDD blocks into Tachyon." move the PR#468 of apache-incubator-spark to the apache-spark
Diffstat (limited to 'examples/src')
3 files changed, 133 insertions, 1 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala b/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala index e5a09ecec0..d3babc3ed1 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala @@ -18,8 +18,8 @@ package org.apache.spark.examples import scala.math.random + import org.apache.spark._ -import SparkContext._ /** Computes an approximation to pi */ object SparkPi { diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala new file mode 100644 index 0000000000..53b303d658 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonHdfsLR.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples + +import java.util.Random +import scala.math.exp +import org.apache.spark.util.Vector +import org.apache.spark._ +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.scheduler.InputFormatInfo +import org.apache.spark.storage.StorageLevel + +/** + * Logistic regression based classification. + * This example uses Tachyon to persist rdds during computation. + */ +object SparkTachyonHdfsLR { + val D = 10 // Numer of dimensions + val rand = new Random(42) + + case class DataPoint(x: Vector, y: Double) + + def parsePoint(line: String): DataPoint = { + val tok = new java.util.StringTokenizer(line, " ") + var y = tok.nextToken.toDouble + var x = new Array[Double](D) + var i = 0 + while (i < D) { + x(i) = tok.nextToken.toDouble; i += 1 + } + DataPoint(new Vector(x), y) + } + + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: SparkTachyonHdfsLR <master> <file> <iters>") + System.exit(1) + } + val inputPath = args(1) + val conf = SparkHadoopUtil.get.newConfiguration() + val sc = new SparkContext(args(0), "SparkTachyonHdfsLR", + System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass), Map(), + InputFormatInfo.computePreferredLocations( + Seq(new InputFormatInfo(conf, classOf[org.apache.hadoop.mapred.TextInputFormat], inputPath)) + )) + val lines = sc.textFile(inputPath) + val points = lines.map(parsePoint _).persist(StorageLevel.OFF_HEAP) + val ITERATIONS = args(2).toInt + + // Initialize w to a random value + var w = Vector(D, _ => 2 * rand.nextDouble - 1) + println("Initial w: " + w) + + for (i <- 1 to ITERATIONS) { + println("On iteration " + i) + val gradient = points.map { p => + (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y * p.x + }.reduce(_ + _) + w -= gradient + } + + println("Final w: " + w) + System.exit(0) + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala new file mode 100644 index 0000000000..ce78f0876e --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/SparkTachyonPi.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples + +import scala.math.random + +import org.apache.spark._ +import org.apache.spark.storage.StorageLevel + +/** + * Computes an approximation to pi + * This example uses Tachyon to persist rdds during computation. + */ +object SparkTachyonPi { + def main(args: Array[String]) { + if (args.length == 0) { + System.err.println("Usage: SparkTachyonPi <master> [<slices>]") + System.exit(1) + } + val spark = new SparkContext(args(0), "SparkTachyonPi", + System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass)) + + val slices = if (args.length > 1) args(1).toInt else 2 + val n = 100000 * slices + + val rdd = spark.parallelize(1 to n, slices) + rdd.persist(StorageLevel.OFF_HEAP) + val count = rdd.map { i => + val x = random * 2 - 1 + val y = random * 2 - 1 + if (x * x + y * y < 1) 1 else 0 + }.reduce(_ + _) + println("Pi is roughly " + 4.0 * count / n) + + spark.stop() + } +} |