aboutsummaryrefslogtreecommitdiff
path: root/tools
diff options
context:
space:
mode:
Diffstat (limited to 'tools')
-rw-r--r--tools/pom.xml15
-rw-r--r--tools/src/main/scala/org/apache/spark/tools/GenerateMIMAIgnore.scala53
-rw-r--r--tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala367
-rw-r--r--tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala111
4 files changed, 20 insertions, 526 deletions
diff --git a/tools/pom.xml b/tools/pom.xml
index b3a5ae2771..9bb20e1381 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -35,16 +35,6 @@
<dependencies>
<dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-streaming_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-reflect</artifactId>
</dependency>
@@ -52,6 +42,11 @@
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.clapper</groupId>
+ <artifactId>classutil_${scala.binary.version}</artifactId>
+ <version>1.0.6</version>
+ </dependency>
</dependencies>
<build>
diff --git a/tools/src/main/scala/org/apache/spark/tools/GenerateMIMAIgnore.scala b/tools/src/main/scala/org/apache/spark/tools/GenerateMIMAIgnore.scala
index a947fac1d7..738bd2150a 100644
--- a/tools/src/main/scala/org/apache/spark/tools/GenerateMIMAIgnore.scala
+++ b/tools/src/main/scala/org/apache/spark/tools/GenerateMIMAIgnore.scala
@@ -18,15 +18,13 @@
// scalastyle:off classforname
package org.apache.spark.tools
-import java.io.File
-import java.util.jar.JarFile
-
import scala.collection.mutable
-import scala.collection.JavaConverters._
import scala.reflect.runtime.{universe => unv}
import scala.reflect.runtime.universe.runtimeMirror
import scala.util.Try
+import org.clapper.classutil.ClassFinder
+
/**
* A tool for generating classes to be excluded during binary checking with MIMA. It is expected
* that this tool is run with ./spark-class.
@@ -42,12 +40,13 @@ object GenerateMIMAIgnore {
private val classLoader = Thread.currentThread().getContextClassLoader
private val mirror = runtimeMirror(classLoader)
+ private def isDeveloperApi(sym: unv.Symbol) = sym.annotations.exists {
+ _.tpe =:= mirror.staticClass("org.apache.spark.annotation.DeveloperApi").toType
+ }
- private def isDeveloperApi(sym: unv.Symbol) =
- sym.annotations.exists(_.tpe =:= unv.typeOf[org.apache.spark.annotation.DeveloperApi])
-
- private def isExperimental(sym: unv.Symbol) =
- sym.annotations.exists(_.tpe =:= unv.typeOf[org.apache.spark.annotation.Experimental])
+ private def isExperimental(sym: unv.Symbol) = sym.annotations.exists {
+ _.tpe =:= mirror.staticClass("org.apache.spark.annotation.Experimental").toType
+ }
private def isPackagePrivate(sym: unv.Symbol) =
@@ -160,35 +159,13 @@ object GenerateMIMAIgnore {
* and subpackages both from directories and jars present on the classpath.
*/
private def getClasses(packageName: String): Set[String] = {
- val path = packageName.replace('.', '/')
- val resources = classLoader.getResources(path)
-
- val jars = resources.asScala.filter(_.getProtocol == "jar")
- .map(_.getFile.split(":")(1).split("!")(0)).toSeq
-
- jars.flatMap(getClassesFromJar(_, path))
- .map(_.getName)
- .filterNot(shouldExclude).toSet
- }
-
- /**
- * Get all classes in a package from a jar file.
- */
- private def getClassesFromJar(jarPath: String, packageName: String) = {
- import scala.collection.mutable
- val jar = new JarFile(new File(jarPath))
- val enums = jar.entries().asScala.map(_.getName).filter(_.startsWith(packageName))
- val classes = mutable.HashSet[Class[_]]()
- for (entry <- enums if entry.endsWith(".class")) {
- try {
- classes += Class.forName(entry.replace('/', '.').stripSuffix(".class"), false, classLoader)
- } catch {
- // scalastyle:off println
- case _: Throwable => println("Unable to load:" + entry)
- // scalastyle:on println
- }
- }
- classes
+ val finder = ClassFinder()
+ finder
+ .getClasses
+ .map(_.name)
+ .filter(_.startsWith(packageName))
+ .filterNot(shouldExclude)
+ .toSet
}
}
// scalastyle:on classforname
diff --git a/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala b/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
deleted file mode 100644
index ccd8fd3969..0000000000
--- a/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
+++ /dev/null
@@ -1,367 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.tools
-
-import java.lang.reflect.{Method, Type}
-
-import scala.collection.mutable.ArrayBuffer
-import scala.language.existentials
-
-import org.apache.spark._
-import org.apache.spark.api.java._
-import org.apache.spark.rdd.{DoubleRDDFunctions, OrderedRDDFunctions, PairRDDFunctions, RDD}
-import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.api.java.{JavaDStream, JavaPairDStream, JavaStreamingContext}
-import org.apache.spark.streaming.dstream.{DStream, PairDStreamFunctions}
-
-
-private[spark] abstract class SparkType(val name: String)
-
-private[spark] case class BaseType(override val name: String) extends SparkType(name) {
- override def toString: String = {
- name
- }
-}
-
-private[spark]
-case class ParameterizedType(override val name: String,
- parameters: Seq[SparkType],
- typebounds: String = "") extends SparkType(name) {
- override def toString: String = {
- if (typebounds != "") {
- typebounds + " " + name + "<" + parameters.mkString(", ") + ">"
- } else {
- name + "<" + parameters.mkString(", ") + ">"
- }
- }
-}
-
-private[spark]
-case class SparkMethod(name: String, returnType: SparkType, parameters: Seq[SparkType]) {
- override def toString: String = {
- returnType + " " + name + "(" + parameters.mkString(", ") + ")"
- }
-}
-
-/**
- * A tool for identifying methods that need to be ported from Scala to the Java API.
- *
- * It uses reflection to find methods in the Scala API and rewrites those methods' signatures
- * into appropriate Java equivalents. If those equivalent methods have not been implemented in
- * the Java API, they are printed.
- */
-object JavaAPICompletenessChecker {
-
- private def parseType(typeStr: String): SparkType = {
- if (!typeStr.contains("<")) {
- // Base types might begin with "class" or "interface", so we have to strip that off:
- BaseType(typeStr.trim.split(" ").last)
- } else if (typeStr.endsWith("[]")) {
- ParameterizedType("Array", Seq(parseType(typeStr.stripSuffix("[]"))))
- } else {
- val parts = typeStr.split("<", 2)
- val name = parts(0).trim
- assert (parts(1).last == '>')
- val parameters = parts(1).dropRight(1)
- ParameterizedType(name, parseTypeList(parameters))
- }
- }
-
- private def parseTypeList(typeStr: String): Seq[SparkType] = {
- val types: ArrayBuffer[SparkType] = new ArrayBuffer[SparkType]
- var stack = 0
- var token: StringBuffer = new StringBuffer()
- for (c <- typeStr.trim) {
- if (c == ',' && stack == 0) {
- types += parseType(token.toString)
- token = new StringBuffer()
- } else if (c == ' ' && stack != 0) {
- // continue
- } else {
- if (c == '<') {
- stack += 1
- } else if (c == '>') {
- stack -= 1
- }
- token.append(c)
- }
- }
- assert (stack == 0)
- if (token.toString != "") {
- types += parseType(token.toString)
- }
- types.toSeq
- }
-
- private def parseReturnType(typeStr: String): SparkType = {
- if (typeStr(0) == '<') {
- val parts = typeStr.drop(0).split(">", 2)
- val parsed = parseType(parts(1)).asInstanceOf[ParameterizedType]
- ParameterizedType(parsed.name, parsed.parameters, parts(0))
- } else {
- parseType(typeStr)
- }
- }
-
- private def toSparkMethod(method: Method): SparkMethod = {
- val returnType = parseReturnType(method.getGenericReturnType.toString)
- val name = method.getName
- val parameters = method.getGenericParameterTypes.map(t => parseType(t.toString))
- SparkMethod(name, returnType, parameters)
- }
-
- private def toJavaType(scalaType: SparkType, isReturnType: Boolean): SparkType = {
- val renameSubstitutions = Map(
- "scala.collection.Map" -> "java.util.Map",
- // TODO: the JavaStreamingContext API accepts Array arguments
- // instead of Lists, so this isn't a trivial translation / sub:
- "scala.collection.Seq" -> "java.util.List",
- "scala.Function2" -> "org.apache.spark.api.java.function.Function2",
- "scala.collection.Iterator" -> "java.util.Iterator",
- "scala.collection.mutable.Queue" -> "java.util.Queue",
- "double" -> "java.lang.Double"
- )
- // Keep applying the substitutions until we've reached a fixedpoint.
- def applySubs(scalaType: SparkType): SparkType = {
- scalaType match {
- case ParameterizedType(name, parameters, typebounds) =>
- name match {
- case "org.apache.spark.rdd.RDD" =>
- if (parameters(0).name == classOf[Tuple2[_, _]].getName) {
- val tupleParams =
- parameters(0).asInstanceOf[ParameterizedType].parameters.map(applySubs)
- ParameterizedType(classOf[JavaPairRDD[_, _]].getName, tupleParams)
- } else {
- ParameterizedType(classOf[JavaRDD[_]].getName, parameters.map(applySubs))
- }
- case "org.apache.spark.streaming.dstream.DStream" =>
- if (parameters(0).name == classOf[Tuple2[_, _]].getName) {
- val tupleParams =
- parameters(0).asInstanceOf[ParameterizedType].parameters.map(applySubs)
- ParameterizedType("org.apache.spark.streaming.api.java.JavaPairDStream",
- tupleParams)
- } else {
- ParameterizedType("org.apache.spark.streaming.api.java.JavaDStream",
- parameters.map(applySubs))
- }
- case "scala.Option" => {
- if (isReturnType) {
- ParameterizedType("org.apache.spark.api.java.Optional", parameters.map(applySubs))
- } else {
- applySubs(parameters(0))
- }
- }
- case "scala.Function1" =>
- val firstParamName = parameters.last.name
- if (firstParamName.startsWith("scala.collection.Traversable") ||
- firstParamName.startsWith("scala.collection.Iterator")) {
- ParameterizedType("org.apache.spark.api.java.function.FlatMapFunction",
- Seq(parameters(0),
- parameters.last.asInstanceOf[ParameterizedType].parameters(0)).map(applySubs))
- } else if (firstParamName == "scala.runtime.BoxedUnit") {
- ParameterizedType("org.apache.spark.api.java.function.VoidFunction",
- parameters.dropRight(1).map(applySubs))
- } else {
- ParameterizedType("org.apache.spark.api.java.function.Function",
- parameters.map(applySubs))
- }
- case _ =>
- ParameterizedType(renameSubstitutions.getOrElse(name, name),
- parameters.map(applySubs))
- }
- case BaseType(name) =>
- if (renameSubstitutions.contains(name)) {
- BaseType(renameSubstitutions(name))
- } else {
- scalaType
- }
- }
- }
- var oldType = scalaType
- var newType = applySubs(scalaType)
- while (oldType != newType) {
- oldType = newType
- newType = applySubs(scalaType)
- }
- newType
- }
-
- private def toJavaMethod(method: SparkMethod): SparkMethod = {
- val params = method.parameters
- .filterNot(_.name == "scala.reflect.ClassTag")
- .map(toJavaType(_, isReturnType = false))
- SparkMethod(method.name, toJavaType(method.returnType, isReturnType = true), params)
- }
-
- private def isExcludedByName(method: Method): Boolean = {
- val name = method.getDeclaringClass.getName + "." + method.getName
- // Scala methods that are declared as private[mypackage] become public in the resulting
- // Java bytecode. As a result, we need to manually exclude those methods here.
- // This list also includes a few methods that are only used by the web UI or other
- // internal Spark components.
- val excludedNames = Seq(
- "org.apache.spark.rdd.RDD.origin",
- "org.apache.spark.rdd.RDD.elementClassTag",
- "org.apache.spark.rdd.RDD.checkpointData",
- "org.apache.spark.rdd.RDD.partitioner",
- "org.apache.spark.rdd.RDD.partitions",
- "org.apache.spark.rdd.RDD.firstParent",
- "org.apache.spark.rdd.RDD.doCheckpoint",
- "org.apache.spark.rdd.RDD.markCheckpointed",
- "org.apache.spark.rdd.RDD.clearDependencies",
- "org.apache.spark.rdd.RDD.getDependencies",
- "org.apache.spark.rdd.RDD.getPartitions",
- "org.apache.spark.rdd.RDD.dependencies",
- "org.apache.spark.rdd.RDD.getPreferredLocations",
- "org.apache.spark.rdd.RDD.collectPartitions",
- "org.apache.spark.rdd.RDD.computeOrReadCheckpoint",
- "org.apache.spark.rdd.PairRDDFunctions.getKeyClass",
- "org.apache.spark.rdd.PairRDDFunctions.getValueClass",
- "org.apache.spark.SparkContext.stringToText",
- "org.apache.spark.SparkContext.makeRDD",
- "org.apache.spark.SparkContext.runJob",
- "org.apache.spark.SparkContext.runApproximateJob",
- "org.apache.spark.SparkContext.clean",
- "org.apache.spark.SparkContext.metadataCleaner",
- "org.apache.spark.SparkContext.ui",
- "org.apache.spark.SparkContext.newShuffleId",
- "org.apache.spark.SparkContext.newRddId",
- "org.apache.spark.SparkContext.cleanup",
- "org.apache.spark.SparkContext.receiverJobThread",
- "org.apache.spark.SparkContext.getRDDStorageInfo",
- "org.apache.spark.SparkContext.addedFiles",
- "org.apache.spark.SparkContext.addedJars",
- "org.apache.spark.SparkContext.persistentRdds",
- "org.apache.spark.SparkContext.executorEnvs",
- "org.apache.spark.SparkContext.checkpointDir",
- "org.apache.spark.SparkContext.getSparkHome",
- "org.apache.spark.SparkContext.executorMemoryRequested",
- "org.apache.spark.SparkContext.getExecutorStorageStatus",
- "org.apache.spark.streaming.dstream.DStream.generatedRDDs",
- "org.apache.spark.streaming.dstream.DStream.zeroTime",
- "org.apache.spark.streaming.dstream.DStream.rememberDuration",
- "org.apache.spark.streaming.dstream.DStream.storageLevel",
- "org.apache.spark.streaming.dstream.DStream.mustCheckpoint",
- "org.apache.spark.streaming.dstream.DStream.checkpointDuration",
- "org.apache.spark.streaming.dstream.DStream.checkpointData",
- "org.apache.spark.streaming.dstream.DStream.graph",
- "org.apache.spark.streaming.dstream.DStream.isInitialized",
- "org.apache.spark.streaming.dstream.DStream.parentRememberDuration",
- "org.apache.spark.streaming.dstream.DStream.initialize",
- "org.apache.spark.streaming.dstream.DStream.validate",
- "org.apache.spark.streaming.dstream.DStream.setContext",
- "org.apache.spark.streaming.dstream.DStream.setGraph",
- "org.apache.spark.streaming.dstream.DStream.remember",
- "org.apache.spark.streaming.dstream.DStream.getOrCompute",
- "org.apache.spark.streaming.dstream.DStream.generateJob",
- "org.apache.spark.streaming.dstream.DStream.clearOldMetadata",
- "org.apache.spark.streaming.dstream.DStream.addMetadata",
- "org.apache.spark.streaming.dstream.DStream.updateCheckpointData",
- "org.apache.spark.streaming.dstream.DStream.restoreCheckpointData",
- "org.apache.spark.streaming.dstream.DStream.isTimeValid",
- "org.apache.spark.streaming.StreamingContext.nextNetworkInputStreamId",
- "org.apache.spark.streaming.StreamingContext.checkpointDir",
- "org.apache.spark.streaming.StreamingContext.checkpointDuration",
- "org.apache.spark.streaming.StreamingContext.receiverJobThread",
- "org.apache.spark.streaming.StreamingContext.scheduler",
- "org.apache.spark.streaming.StreamingContext.initialCheckpoint",
- "org.apache.spark.streaming.StreamingContext.getNewNetworkStreamId",
- "org.apache.spark.streaming.StreamingContext.validate",
- "org.apache.spark.streaming.StreamingContext.createNewSparkContext",
- "org.apache.spark.streaming.StreamingContext.rddToFileName",
- "org.apache.spark.streaming.StreamingContext.getSparkCheckpointDir",
- "org.apache.spark.streaming.StreamingContext.env",
- "org.apache.spark.streaming.StreamingContext.graph",
- "org.apache.spark.streaming.StreamingContext.isCheckpointPresent"
- )
- val excludedPatterns = Seq(
- """^org\.apache\.spark\.SparkContext\..*To.*Functions""",
- """^org\.apache\.spark\.SparkContext\..*WritableConverter""",
- """^org\.apache\.spark\.SparkContext\..*To.*Writable"""
- ).map(_.r)
- lazy val excludedByPattern =
- !excludedPatterns.map(_.findFirstIn(name)).filter(_.isDefined).isEmpty
- name.contains("$") || excludedNames.contains(name) || excludedByPattern
- }
-
- private def isExcludedByInterface(method: Method): Boolean = {
- val excludedInterfaces =
- Set("org.apache.spark.Logging", "org.apache.hadoop.mapreduce.HadoopMapReduceUtil")
- def toComparisionKey(method: Method): (Class[_], String, Type) =
- (method.getReturnType, method.getName, method.getGenericReturnType)
- val interfaces = method.getDeclaringClass.getInterfaces.filter { i =>
- excludedInterfaces.contains(i.getName)
- }
- val excludedMethods = interfaces.flatMap(_.getMethods.map(toComparisionKey))
- excludedMethods.contains(toComparisionKey(method))
- }
-
- private def printMissingMethods(scalaClass: Class[_], javaClass: Class[_]) {
- val methods = scalaClass.getMethods
- .filterNot(_.isAccessible)
- .filterNot(isExcludedByName)
- .filterNot(isExcludedByInterface)
- val javaEquivalents = methods.map(m => toJavaMethod(toSparkMethod(m))).toSet
-
- val javaMethods = javaClass.getMethods.map(toSparkMethod).toSet
-
- val missingMethods = javaEquivalents -- javaMethods
-
- for (method <- missingMethods) {
- // scalastyle:off println
- println(method)
- // scalastyle:on println
- }
- }
-
- def main(args: Array[String]) {
- // scalastyle:off println
- println("Missing RDD methods")
- printMissingMethods(classOf[RDD[_]], classOf[JavaRDD[_]])
- println()
-
- println("Missing PairRDD methods")
- printMissingMethods(classOf[PairRDDFunctions[_, _]], classOf[JavaPairRDD[_, _]])
- println()
-
- println("Missing DoubleRDD methods")
- printMissingMethods(classOf[DoubleRDDFunctions], classOf[JavaDoubleRDD])
- println()
-
- println("Missing OrderedRDD methods")
- printMissingMethods(classOf[OrderedRDDFunctions[_, _, _]], classOf[JavaPairRDD[_, _]])
- println()
-
- println("Missing SparkContext methods")
- printMissingMethods(classOf[SparkContext], classOf[JavaSparkContext])
- println()
-
- println("Missing StreamingContext methods")
- printMissingMethods(classOf[StreamingContext], classOf[JavaStreamingContext])
- println()
-
- println("Missing DStream methods")
- printMissingMethods(classOf[DStream[_]], classOf[JavaDStream[_]])
- println()
-
- println("Missing PairDStream methods")
- printMissingMethods(classOf[PairDStreamFunctions[_, _]], classOf[JavaPairDStream[_, _]])
- println()
- // scalastyle:on println
- }
-}
diff --git a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala b/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
deleted file mode 100644
index 8a5c7c0e73..0000000000
--- a/tools/src/main/scala/org/apache/spark/tools/StoragePerfTester.scala
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.tools
-
-import java.util.concurrent.{CountDownLatch, Executors}
-import java.util.concurrent.atomic.AtomicLong
-
-import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.executor.ShuffleWriteMetrics
-import org.apache.spark.serializer.KryoSerializer
-import org.apache.spark.shuffle.hash.HashShuffleManager
-import org.apache.spark.util.Utils
-
-/**
- * Internal utility for micro-benchmarking shuffle write performance.
- *
- * Writes simulated shuffle output from several threads and records the observed throughput.
- */
-object StoragePerfTester {
- def main(args: Array[String]): Unit = {
- /** Total amount of data to generate. Distributed evenly amongst maps and reduce splits. */
- val dataSizeMb = Utils.memoryStringToMb(sys.env.getOrElse("OUTPUT_DATA", "1g"))
-
- /** Number of map tasks. All tasks execute concurrently. */
- val numMaps = sys.env.get("NUM_MAPS").map(_.toInt).getOrElse(8)
-
- /** Number of reduce splits for each map task. */
- val numOutputSplits = sys.env.get("NUM_REDUCERS").map(_.toInt).getOrElse(500)
-
- val recordLength = 1000 // ~1KB records
- val totalRecords = dataSizeMb * 1000
- val recordsPerMap = totalRecords / numMaps
-
- val writeKey = "1" * (recordLength / 2)
- val writeValue = "1" * (recordLength / 2)
- val executor = Executors.newFixedThreadPool(numMaps)
-
- val conf = new SparkConf()
- .set("spark.shuffle.compress", "false")
- .set("spark.shuffle.sync", "true")
- .set("spark.shuffle.manager", "org.apache.spark.shuffle.hash.HashShuffleManager")
-
- // This is only used to instantiate a BlockManager. All thread scheduling is done manually.
- val sc = new SparkContext("local[4]", "Write Tester", conf)
- val hashShuffleManager = sc.env.shuffleManager.asInstanceOf[HashShuffleManager]
-
- def writeOutputBytes(mapId: Int, total: AtomicLong): Unit = {
- val shuffle = hashShuffleManager.shuffleBlockResolver.forMapTask(1, mapId, numOutputSplits,
- new KryoSerializer(sc.conf), new ShuffleWriteMetrics())
- val writers = shuffle.writers
- for (i <- 1 to recordsPerMap) {
- writers(i % numOutputSplits).write(writeKey, writeValue)
- }
- writers.map { w =>
- w.commitAndClose()
- total.addAndGet(w.fileSegment().length)
- }
-
- shuffle.releaseWriters(true)
- }
-
- val start = System.currentTimeMillis()
- val latch = new CountDownLatch(numMaps)
- val totalBytes = new AtomicLong()
- for (task <- 1 to numMaps) {
- executor.submit(new Runnable() {
- override def run(): Unit = {
- try {
- writeOutputBytes(task, totalBytes)
- latch.countDown()
- } catch {
- case e: Exception =>
- // scalastyle:off println
- println("Exception in child thread: " + e + " " + e.getMessage)
- // scalastyle:on println
- System.exit(1)
- }
- }
- })
- }
- latch.await()
- val end = System.currentTimeMillis()
- val time = (end - start) / 1000.0
- val bytesPerSecond = totalBytes.get() / time
- val bytesPerFile = (totalBytes.get() / (numOutputSplits * numMaps.toDouble)).toLong
-
- // scalastyle:off println
- System.err.println("files_total\t\t%s".format(numMaps * numOutputSplits))
- System.err.println("bytes_per_file\t\t%s".format(Utils.bytesToString(bytesPerFile)))
- System.err.println("agg_throughput\t\t%s/s".format(Utils.bytesToString(bytesPerSecond.toLong)))
- // scalastyle:on println
-
- executor.shutdown()
- sc.stop()
- }
-}