aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala39
1 files changed, 20 insertions, 19 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 4c71b69069..19f4c95fca 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -54,9 +54,11 @@ private[spark] class PythonRDD(
val bufferSize = conf.getInt("spark.buffer.size", 65536)
val reuse_worker = conf.getBoolean("spark.python.worker.reuse", true)
- override def getPartitions = firstParent.partitions
+ override def getPartitions: Array[Partition] = firstParent.partitions
- override val partitioner = if (preservePartitoning) firstParent.partitioner else None
+ override val partitioner: Option[Partitioner] = {
+ if (preservePartitoning) firstParent.partitioner else None
+ }
override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
val startTime = System.currentTimeMillis
@@ -92,7 +94,7 @@ private[spark] class PythonRDD(
// Return an iterator that read lines from the process's stdout
val stream = new DataInputStream(new BufferedInputStream(worker.getInputStream, bufferSize))
val stdoutIterator = new Iterator[Array[Byte]] {
- def next(): Array[Byte] = {
+ override def next(): Array[Byte] = {
val obj = _nextObj
if (hasNext) {
_nextObj = read()
@@ -175,7 +177,7 @@ private[spark] class PythonRDD(
var _nextObj = read()
- def hasNext = _nextObj != null
+ override def hasNext: Boolean = _nextObj != null
}
new InterruptibleIterator(context, stdoutIterator)
}
@@ -303,11 +305,10 @@ private class PythonException(msg: String, cause: Exception) extends RuntimeExce
* Form an RDD[(Array[Byte], Array[Byte])] from key-value pairs returned from Python.
* This is used by PySpark's shuffle operations.
*/
-private class PairwiseRDD(prev: RDD[Array[Byte]]) extends
- RDD[(Long, Array[Byte])](prev) {
- override def getPartitions = prev.partitions
- override val partitioner = prev.partitioner
- override def compute(split: Partition, context: TaskContext) =
+private class PairwiseRDD(prev: RDD[Array[Byte]]) extends RDD[(Long, Array[Byte])](prev) {
+ override def getPartitions: Array[Partition] = prev.partitions
+ override val partitioner: Option[Partitioner] = prev.partitioner
+ override def compute(split: Partition, context: TaskContext): Iterator[(Long, Array[Byte])] =
prev.iterator(split, context).grouped(2).map {
case Seq(a, b) => (Utils.deserializeLongValue(a), b)
case x => throw new SparkException("PairwiseRDD: unexpected value: " + x)
@@ -435,7 +436,7 @@ private[spark] object PythonRDD extends Logging {
keyConverterClass: String,
valueConverterClass: String,
minSplits: Int,
- batchSize: Int) = {
+ batchSize: Int): JavaRDD[Array[Byte]] = {
val keyClass = Option(keyClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
val valueClass = Option(valueClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
@@ -462,7 +463,7 @@ private[spark] object PythonRDD extends Logging {
keyConverterClass: String,
valueConverterClass: String,
confAsMap: java.util.HashMap[String, String],
- batchSize: Int) = {
+ batchSize: Int): JavaRDD[Array[Byte]] = {
val mergedConf = getMergedConf(confAsMap, sc.hadoopConfiguration())
val rdd =
newAPIHadoopRDDFromClassNames[K, V, F](sc,
@@ -488,7 +489,7 @@ private[spark] object PythonRDD extends Logging {
keyConverterClass: String,
valueConverterClass: String,
confAsMap: java.util.HashMap[String, String],
- batchSize: Int) = {
+ batchSize: Int): JavaRDD[Array[Byte]] = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val rdd =
newAPIHadoopRDDFromClassNames[K, V, F](sc,
@@ -505,7 +506,7 @@ private[spark] object PythonRDD extends Logging {
inputFormatClass: String,
keyClass: String,
valueClass: String,
- conf: Configuration) = {
+ conf: Configuration): RDD[(K, V)] = {
val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
val fc = Utils.classForName(inputFormatClass).asInstanceOf[Class[F]]
@@ -531,7 +532,7 @@ private[spark] object PythonRDD extends Logging {
keyConverterClass: String,
valueConverterClass: String,
confAsMap: java.util.HashMap[String, String],
- batchSize: Int) = {
+ batchSize: Int): JavaRDD[Array[Byte]] = {
val mergedConf = getMergedConf(confAsMap, sc.hadoopConfiguration())
val rdd =
hadoopRDDFromClassNames[K, V, F](sc,
@@ -557,7 +558,7 @@ private[spark] object PythonRDD extends Logging {
keyConverterClass: String,
valueConverterClass: String,
confAsMap: java.util.HashMap[String, String],
- batchSize: Int) = {
+ batchSize: Int): JavaRDD[Array[Byte]] = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val rdd =
hadoopRDDFromClassNames[K, V, F](sc,
@@ -686,7 +687,7 @@ private[spark] object PythonRDD extends Logging {
pyRDD: JavaRDD[Array[Byte]],
batchSerialized: Boolean,
path: String,
- compressionCodecClass: String) = {
+ compressionCodecClass: String): Unit = {
saveAsHadoopFile(
pyRDD, batchSerialized, path, "org.apache.hadoop.mapred.SequenceFileOutputFormat",
null, null, null, null, new java.util.HashMap(), compressionCodecClass)
@@ -711,7 +712,7 @@ private[spark] object PythonRDD extends Logging {
keyConverterClass: String,
valueConverterClass: String,
confAsMap: java.util.HashMap[String, String],
- compressionCodecClass: String) = {
+ compressionCodecClass: String): Unit = {
val rdd = SerDeUtil.pythonToPairRDD(pyRDD, batchSerialized)
val (kc, vc) = getKeyValueTypes(keyClass, valueClass).getOrElse(
inferKeyValueTypes(rdd, keyConverterClass, valueConverterClass))
@@ -741,7 +742,7 @@ private[spark] object PythonRDD extends Logging {
valueClass: String,
keyConverterClass: String,
valueConverterClass: String,
- confAsMap: java.util.HashMap[String, String]) = {
+ confAsMap: java.util.HashMap[String, String]): Unit = {
val rdd = SerDeUtil.pythonToPairRDD(pyRDD, batchSerialized)
val (kc, vc) = getKeyValueTypes(keyClass, valueClass).getOrElse(
inferKeyValueTypes(rdd, keyConverterClass, valueConverterClass))
@@ -766,7 +767,7 @@ private[spark] object PythonRDD extends Logging {
confAsMap: java.util.HashMap[String, String],
keyConverterClass: String,
valueConverterClass: String,
- useNewAPI: Boolean) = {
+ useNewAPI: Boolean): Unit = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val converted = convertRDD(SerDeUtil.pythonToPairRDD(pyRDD, batchSerialized),
keyConverterClass, valueConverterClass, new JavaToWritableConverter)