diff options
author | Patrick Wendell <pwendell@gmail.com> | 2014-04-09 01:14:46 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-04-09 01:14:46 -0700 |
commit | 87bd1f9ef7d547ee54a8a83214b45462e0751efb (patch) | |
tree | f815ba0464d366300b9bf3833ad3f5fdea2ca9eb /tools | |
parent | 9689b663a2a4947ad60795321c770052f3c637f1 (diff) | |
download | spark-87bd1f9ef7d547ee54a8a83214b45462e0751efb.tar.gz spark-87bd1f9ef7d547ee54a8a83214b45462e0751efb.tar.bz2 spark-87bd1f9ef7d547ee54a8a83214b45462e0751efb.zip |
SPARK-1093: Annotate developer and experimental API's
This patch marks some existing classes as private[spark] and adds two types of API annotations:
- `EXPERIMENTAL API` = experimental user-facing module
- `DEVELOPER API - UNSTABLE` = developer-facing API that might change
There is some discussion of the different mechanisms for doing this here:
https://issues.apache.org/jira/browse/SPARK-1081
I was pretty aggressive with marking things private. Keep in mind that if we want to open something up in the future we can, but we can never reduce visibility.
A few notes here:
- In the past we've been inconsistent with the visiblity of the X-RDD classes. This patch marks them private whenever there is an existing function in RDD that can directly creat them (e.g. CoalescedRDD and rdd.coalesce()). One trade-off here is users can't subclass them.
- Noted that compression and serialization formats don't have to be wire compatible across versions.
- Compression codecs and serialization formats are semi-private as users typically don't instantiate them directly.
- Metrics sources are made private - user only interacts with them through Spark's reflection
Author: Patrick Wendell <pwendell@gmail.com>
Author: Andrew Or <andrewor14@gmail.com>
Closes #274 from pwendell/private-apis and squashes the following commits:
44179e4 [Patrick Wendell] Merge remote-tracking branch 'apache-github/master' into private-apis
042c803 [Patrick Wendell] spark.annotations -> spark.annotation
bfe7b52 [Patrick Wendell] Adding experimental for approximate counts
8d0c873 [Patrick Wendell] Warning in SparkEnv
99b223a [Patrick Wendell] Cleaning up annotations
e849f64 [Patrick Wendell] Merge pull request #2 from andrewor14/annotations
982a473 [Andrew Or] Generalize jQuery matching for non Spark-core API docs
a01c076 [Patrick Wendell] Merge pull request #1 from andrewor14/annotations
c1bcb41 [Andrew Or] DeveloperAPI -> DeveloperApi
0d48908 [Andrew Or] Comments and new lines (minor)
f3954e0 [Andrew Or] Add identifier tags in comments to work around scaladocs bug
99192ef [Andrew Or] Dynamically add badges based on annotations
824011b [Andrew Or] Add support for injecting arbitrary JavaScript to API docs
037755c [Patrick Wendell] Some changes after working with andrew or
f7d124f [Patrick Wendell] Small fixes
c318b24 [Patrick Wendell] Use CSS styles
e4c76b9 [Patrick Wendell] Logging
f390b13 [Patrick Wendell] Better visibility for workaround constructors
d6b0afd [Patrick Wendell] Small chang to existing constructor
403ba52 [Patrick Wendell] Style fix
870a7ba [Patrick Wendell] Work around for SI-8479
7fb13b2 [Patrick Wendell] Changes to UnionRDD and EmptyRDD
4a9e90c [Patrick Wendell] EXPERIMENTAL API --> EXPERIMENTAL
c581dce [Patrick Wendell] Changes after building against Shark.
8452309 [Patrick Wendell] Style fixes
1ed27d2 [Patrick Wendell] Formatting and coloring of badges
cd7a465 [Patrick Wendell] Code review feedback
2f706f1 [Patrick Wendell] Don't use floats
542a736 [Patrick Wendell] Small fixes
cf23ec6 [Patrick Wendell] Marking GraphX as alpha
d86818e [Patrick Wendell] Another naming change
5a76ed6 [Patrick Wendell] More visiblity clean-up
42c1f09 [Patrick Wendell] Using better labels
9d48cbf [Patrick Wendell] Initial pass
Diffstat (limited to 'tools')
-rw-r--r-- | tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala | 103 |
1 files changed, 103 insertions, 0 deletions
diff --git a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala new file mode 100644 index 0000000000..8e8c35615a --- /dev/null +++ b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.tools + +import java.util.concurrent.{CountDownLatch, Executors} +import java.util.concurrent.atomic.AtomicLong + +import org.apache.spark.SparkContext +import org.apache.spark.serializer.KryoSerializer +import org.apache.spark.util.Utils + +/** + * Internal utility for micro-benchmarking shuffle write performance. + * + * Writes simulated shuffle output from several threads and records the observed throughput. + */ +object StoragePerfTester { + def main(args: Array[String]) = { + /** Total amount of data to generate. Distributed evenly amongst maps and reduce splits. */ + val dataSizeMb = Utils.memoryStringToMb(sys.env.getOrElse("OUTPUT_DATA", "1g")) + + /** Number of map tasks. All tasks execute concurrently. */ + val numMaps = sys.env.get("NUM_MAPS").map(_.toInt).getOrElse(8) + + /** Number of reduce splits for each map task. */ + val numOutputSplits = sys.env.get("NUM_REDUCERS").map(_.toInt).getOrElse(500) + + val recordLength = 1000 // ~1KB records + val totalRecords = dataSizeMb * 1000 + val recordsPerMap = totalRecords / numMaps + + val writeData = "1" * recordLength + val executor = Executors.newFixedThreadPool(numMaps) + + System.setProperty("spark.shuffle.compress", "false") + System.setProperty("spark.shuffle.sync", "true") + + // This is only used to instantiate a BlockManager. All thread scheduling is done manually. + val sc = new SparkContext("local[4]", "Write Tester") + val blockManager = sc.env.blockManager + + def writeOutputBytes(mapId: Int, total: AtomicLong) = { + val shuffle = blockManager.shuffleBlockManager.forMapTask(1, mapId, numOutputSplits, + new KryoSerializer(sc.conf)) + val writers = shuffle.writers + for (i <- 1 to recordsPerMap) { + writers(i % numOutputSplits).write(writeData) + } + writers.map {w => + w.commit() + total.addAndGet(w.fileSegment().length) + w.close() + } + + shuffle.releaseWriters(true) + } + + val start = System.currentTimeMillis() + val latch = new CountDownLatch(numMaps) + val totalBytes = new AtomicLong() + for (task <- 1 to numMaps) { + executor.submit(new Runnable() { + override def run() = { + try { + writeOutputBytes(task, totalBytes) + latch.countDown() + } catch { + case e: Exception => + println("Exception in child thread: " + e + " " + e.getMessage) + System.exit(1) + } + } + }) + } + latch.await() + val end = System.currentTimeMillis() + val time = (end - start) / 1000.0 + val bytesPerSecond = totalBytes.get() / time + val bytesPerFile = (totalBytes.get() / (numOutputSplits * numMaps.toDouble)).toLong + + System.err.println("files_total\t\t%s".format(numMaps * numOutputSplits)) + System.err.println("bytes_per_file\t\t%s".format(Utils.bytesToString(bytesPerFile))) + System.err.println("agg_throughput\t\t%s/s".format(Utils.bytesToString(bytesPerSecond.toLong))) + + executor.shutdown() + sc.stop() + } +} |