aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorcafreeman <cfreeman@alteryx.com>2015-04-17 13:42:19 -0700
committerShivaram Venkataraman <shivaram@cs.berkeley.edu>2015-04-17 13:42:19 -0700
commit59e206deb7346148412bbf5ba4ab626718fadf18 (patch)
treecf4435a81197e76957c4afdcc48686a6e46dc5dc /core
parenta83571acc938582865efb41645aa1e414f339e46 (diff)
downloadspark-59e206deb7346148412bbf5ba4ab626718fadf18.tar.gz
spark-59e206deb7346148412bbf5ba4ab626718fadf18.tar.bz2
spark-59e206deb7346148412bbf5ba4ab626718fadf18.zip
[SPARK-6807] [SparkR] Merge recent SparkR-pkg changes
This PR pulls in recent changes in SparkR-pkg, including cartesian, intersection, sampleByKey, subtract, subtractByKey, except, and some API for StructType and StructField. Author: cafreeman <cfreeman@alteryx.com> Author: Davies Liu <davies@databricks.com> Author: Zongheng Yang <zongheng.y@gmail.com> Author: Shivaram Venkataraman <shivaram.venkataraman@gmail.com> Author: Shivaram Venkataraman <shivaram@cs.berkeley.edu> Author: Sun Rui <rui.sun@intel.com> Closes #5436 from davies/R3 and squashes the following commits: c2b09be [Davies Liu] SQLTypes -> schema a5a02f2 [Davies Liu] Merge branch 'master' of github.com:apache/spark into R3 168b7fe [Davies Liu] sort generics b1fe460 [Davies Liu] fix conflict in README.md e74c04e [Davies Liu] fix schema.R 4f5ac09 [Davies Liu] Merge branch 'master' of github.com:apache/spark into R5 41f8184 [Davies Liu] rm man ae78312 [Davies Liu] Merge pull request #237 from sun-rui/SPARKR-154_3 1bdcb63 [Zongheng Yang] Updates to README.md. 5a553e7 [cafreeman] Use object attribute instead of argument 71372d9 [cafreeman] Update docs and examples 8526d2e71 [cafreeman] Remove `tojson` functions 6ef5f2d [cafreeman] Fix spacing 7741d66 [cafreeman] Rename the SQL DataType function 141efd8 [Shivaram Venkataraman] Merge pull request #245 from hqzizania/upstream 9387402 [Davies Liu] fix style 40199eb [Shivaram Venkataraman] Move except into sorted position 07d0dbc [Sun Rui] [SPARKR-244] Fix test failure after integration of subtract() and subtractByKey() for RDD. 7e8caa3 [Shivaram Venkataraman] Merge pull request #246 from hlin09/fixCombineByKey ed66c81 [cafreeman] Update `subtract` to work with `generics.R` f3ba785 [cafreeman] Fixed duplicate export 275deb4 [cafreeman] Update `NAMESPACE` and tests 1a3b63d [cafreeman] new version of `CreateDF` 836c4bf [cafreeman] Update `createDataFrame` and `toDF` be5d5c1 [cafreeman] refactor schema functions 40338a4 [Zongheng Yang] Merge pull request #244 from sun-rui/SPARKR-154_5 20b97a6 [Zongheng Yang] Merge pull request #234 from hqzizania/assist ba54e34 [Shivaram Venkataraman] Merge pull request #238 from sun-rui/SPARKR-154_4 c9497a3 [Shivaram Venkataraman] Merge pull request #208 from lythesia/master b317aa7 [Zongheng Yang] Merge pull request #243 from hqzizania/master 136a07e [Zongheng Yang] Merge pull request #242 from hqzizania/stats cd66603 [cafreeman] new line at EOF 8b76e81 [Shivaram Venkataraman] Merge pull request #233 from redbaron/fail-early-on-missing-dep 7dd81b7 [cafreeman] Documentation 0e2a94f [cafreeman] Define functions for schema and fields
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/api/r/RRDD.scala131
-rw-r--r--core/src/main/scala/org/apache/spark/api/r/SerDe.scala14
2 files changed, 76 insertions, 69 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala
index 5fa4d483b8..6fea5e1144 100644
--- a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala
@@ -42,10 +42,15 @@ private abstract class BaseRRDD[T: ClassTag, U: ClassTag](
rLibDir: String,
broadcastVars: Array[Broadcast[Object]])
extends RDD[U](parent) with Logging {
+ protected var dataStream: DataInputStream = _
+ private var bootTime: Double = _
override def getPartitions: Array[Partition] = parent.partitions
override def compute(partition: Partition, context: TaskContext): Iterator[U] = {
+ // Timing start
+ bootTime = System.currentTimeMillis / 1000.0
+
// The parent may be also an RRDD, so we should launch it first.
val parentIterator = firstParent[T].iterator(partition, context)
@@ -69,7 +74,7 @@ private abstract class BaseRRDD[T: ClassTag, U: ClassTag](
// the socket used to receive the output of task
val outSocket = serverSocket.accept()
val inputStream = new BufferedInputStream(outSocket.getInputStream)
- val dataStream = openDataStream(inputStream)
+ dataStream = new DataInputStream(inputStream)
serverSocket.close()
try {
@@ -155,6 +160,7 @@ private abstract class BaseRRDD[T: ClassTag, U: ClassTag](
} else if (deserializer == SerializationFormats.ROW) {
dataOut.write(elem.asInstanceOf[Array[Byte]])
} else if (deserializer == SerializationFormats.STRING) {
+ // write string(for StringRRDD)
printOut.println(elem)
}
}
@@ -180,9 +186,41 @@ private abstract class BaseRRDD[T: ClassTag, U: ClassTag](
}.start()
}
- protected def openDataStream(input: InputStream): Closeable
+ protected def readData(length: Int): U
- protected def read(): U
+ protected def read(): U = {
+ try {
+ val length = dataStream.readInt()
+
+ length match {
+ case SpecialLengths.TIMING_DATA =>
+ // Timing data from R worker
+ val boot = dataStream.readDouble - bootTime
+ val init = dataStream.readDouble
+ val broadcast = dataStream.readDouble
+ val input = dataStream.readDouble
+ val compute = dataStream.readDouble
+ val output = dataStream.readDouble
+ logInfo(
+ ("Times: boot = %.3f s, init = %.3f s, broadcast = %.3f s, " +
+ "read-input = %.3f s, compute = %.3f s, write-output = %.3f s, " +
+ "total = %.3f s").format(
+ boot,
+ init,
+ broadcast,
+ input,
+ compute,
+ output,
+ boot + init + broadcast + input + compute + output))
+ read()
+ case length if length >= 0 =>
+ readData(length)
+ }
+ } catch {
+ case eof: EOFException =>
+ throw new SparkException("R worker exited unexpectedly (cranshed)", eof)
+ }
+ }
}
/**
@@ -202,31 +240,16 @@ private class PairwiseRRDD[T: ClassTag](
SerializationFormats.BYTE, packageNames, rLibDir,
broadcastVars.map(x => x.asInstanceOf[Broadcast[Object]])) {
- private var dataStream: DataInputStream = _
-
- override protected def openDataStream(input: InputStream): Closeable = {
- dataStream = new DataInputStream(input)
- dataStream
- }
-
- override protected def read(): (Int, Array[Byte]) = {
- try {
- val length = dataStream.readInt()
-
- length match {
- case length if length == 2 =>
- val hashedKey = dataStream.readInt()
- val contentPairsLength = dataStream.readInt()
- val contentPairs = new Array[Byte](contentPairsLength)
- dataStream.readFully(contentPairs)
- (hashedKey, contentPairs)
- case _ => null // End of input
- }
- } catch {
- case eof: EOFException => {
- throw new SparkException("R worker exited unexpectedly (crashed)", eof)
- }
- }
+ override protected def readData(length: Int): (Int, Array[Byte]) = {
+ length match {
+ case length if length == 2 =>
+ val hashedKey = dataStream.readInt()
+ val contentPairsLength = dataStream.readInt()
+ val contentPairs = new Array[Byte](contentPairsLength)
+ dataStream.readFully(contentPairs)
+ (hashedKey, contentPairs)
+ case _ => null
+ }
}
lazy val asJavaPairRDD : JavaPairRDD[Int, Array[Byte]] = JavaPairRDD.fromRDD(this)
@@ -247,28 +270,13 @@ private class RRDD[T: ClassTag](
parent, -1, func, deserializer, serializer, packageNames, rLibDir,
broadcastVars.map(x => x.asInstanceOf[Broadcast[Object]])) {
- private var dataStream: DataInputStream = _
-
- override protected def openDataStream(input: InputStream): Closeable = {
- dataStream = new DataInputStream(input)
- dataStream
- }
-
- override protected def read(): Array[Byte] = {
- try {
- val length = dataStream.readInt()
-
- length match {
- case length if length > 0 =>
- val obj = new Array[Byte](length)
- dataStream.readFully(obj, 0, length)
- obj
- case _ => null
- }
- } catch {
- case eof: EOFException => {
- throw new SparkException("R worker exited unexpectedly (crashed)", eof)
- }
+ override protected def readData(length: Int): Array[Byte] = {
+ length match {
+ case length if length > 0 =>
+ val obj = new Array[Byte](length)
+ dataStream.readFully(obj)
+ obj
+ case _ => null
}
}
@@ -289,26 +297,21 @@ private class StringRRDD[T: ClassTag](
parent, -1, func, deserializer, SerializationFormats.STRING, packageNames, rLibDir,
broadcastVars.map(x => x.asInstanceOf[Broadcast[Object]])) {
- private var dataStream: BufferedReader = _
-
- override protected def openDataStream(input: InputStream): Closeable = {
- dataStream = new BufferedReader(new InputStreamReader(input))
- dataStream
- }
-
- override protected def read(): String = {
- try {
- dataStream.readLine()
- } catch {
- case e: IOException => {
- throw new SparkException("R worker exited unexpectedly (crashed)", e)
- }
+ override protected def readData(length: Int): String = {
+ length match {
+ case length if length > 0 =>
+ SerDe.readStringBytes(dataStream, length)
+ case _ => null
}
}
lazy val asJavaRDD : JavaRDD[String] = JavaRDD.fromRDD(this)
}
+private object SpecialLengths {
+ val TIMING_DATA = -1
+}
+
private[r] class BufferedStreamThread(
in: InputStream,
name: String,
diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
index ccb2a371f4..371dfe454d 100644
--- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
+++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala
@@ -85,13 +85,17 @@ private[spark] object SerDe {
in.readDouble()
}
+ def readStringBytes(in: DataInputStream, len: Int): String = {
+ val bytes = new Array[Byte](len)
+ in.readFully(bytes)
+ assert(bytes(len - 1) == 0)
+ val str = new String(bytes.dropRight(1), "UTF-8")
+ str
+ }
+
def readString(in: DataInputStream): String = {
val len = in.readInt()
- val asciiBytes = new Array[Byte](len)
- in.readFully(asciiBytes)
- assert(asciiBytes(len - 1) == 0)
- val str = new String(asciiBytes.dropRight(1).map(_.toChar))
- str
+ readStringBytes(in, len)
}
def readBoolean(in: DataInputStream): Boolean = {