From 59e206deb7346148412bbf5ba4ab626718fadf18 Mon Sep 17 00:00:00 2001 From: cafreeman Date: Fri, 17 Apr 2015 13:42:19 -0700 Subject: [SPARK-6807] [SparkR] Merge recent SparkR-pkg changes This PR pulls in recent changes in SparkR-pkg, including cartesian, intersection, sampleByKey, subtract, subtractByKey, except, and some API for StructType and StructField. Author: cafreeman Author: Davies Liu Author: Zongheng Yang Author: Shivaram Venkataraman Author: Shivaram Venkataraman Author: Sun Rui Closes #5436 from davies/R3 and squashes the following commits: c2b09be [Davies Liu] SQLTypes -> schema a5a02f2 [Davies Liu] Merge branch 'master' of github.com:apache/spark into R3 168b7fe [Davies Liu] sort generics b1fe460 [Davies Liu] fix conflict in README.md e74c04e [Davies Liu] fix schema.R 4f5ac09 [Davies Liu] Merge branch 'master' of github.com:apache/spark into R5 41f8184 [Davies Liu] rm man ae78312 [Davies Liu] Merge pull request #237 from sun-rui/SPARKR-154_3 1bdcb63 [Zongheng Yang] Updates to README.md. 5a553e7 [cafreeman] Use object attribute instead of argument 71372d9 [cafreeman] Update docs and examples 8526d2e71 [cafreeman] Remove `tojson` functions 6ef5f2d [cafreeman] Fix spacing 7741d66 [cafreeman] Rename the SQL DataType function 141efd8 [Shivaram Venkataraman] Merge pull request #245 from hqzizania/upstream 9387402 [Davies Liu] fix style 40199eb [Shivaram Venkataraman] Move except into sorted position 07d0dbc [Sun Rui] [SPARKR-244] Fix test failure after integration of subtract() and subtractByKey() for RDD. 7e8caa3 [Shivaram Venkataraman] Merge pull request #246 from hlin09/fixCombineByKey ed66c81 [cafreeman] Update `subtract` to work with `generics.R` f3ba785 [cafreeman] Fixed duplicate export 275deb4 [cafreeman] Update `NAMESPACE` and tests 1a3b63d [cafreeman] new version of `CreateDF` 836c4bf [cafreeman] Update `createDataFrame` and `toDF` be5d5c1 [cafreeman] refactor schema functions 40338a4 [Zongheng Yang] Merge pull request #244 from sun-rui/SPARKR-154_5 20b97a6 [Zongheng Yang] Merge pull request #234 from hqzizania/assist ba54e34 [Shivaram Venkataraman] Merge pull request #238 from sun-rui/SPARKR-154_4 c9497a3 [Shivaram Venkataraman] Merge pull request #208 from lythesia/master b317aa7 [Zongheng Yang] Merge pull request #243 from hqzizania/master 136a07e [Zongheng Yang] Merge pull request #242 from hqzizania/stats cd66603 [cafreeman] new line at EOF 8b76e81 [Shivaram Venkataraman] Merge pull request #233 from redbaron/fail-early-on-missing-dep 7dd81b7 [cafreeman] Documentation 0e2a94f [cafreeman] Define functions for schema and fields --- R/pkg/inst/worker/worker.R | 59 ++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 54 insertions(+), 5 deletions(-) (limited to 'R/pkg/inst/worker') diff --git a/R/pkg/inst/worker/worker.R b/R/pkg/inst/worker/worker.R index c6542928e8..014bf7bd7b 100644 --- a/R/pkg/inst/worker/worker.R +++ b/R/pkg/inst/worker/worker.R @@ -17,6 +17,23 @@ # Worker class +# Get current system time +currentTimeSecs <- function() { + as.numeric(Sys.time()) +} + +# Get elapsed time +elapsedSecs <- function() { + proc.time()[3] +} + +# Constants +specialLengths <- list(END_OF_STERAM = 0L, TIMING_DATA = -1L) + +# Timing R process boot +bootTime <- currentTimeSecs() +bootElap <- elapsedSecs() + rLibDir <- Sys.getenv("SPARKR_RLIBDIR") # Set libPaths to include SparkR package as loadNamespace needs this # TODO: Figure out if we can avoid this by not loading any objects that require @@ -37,7 +54,7 @@ serializer <- SparkR:::readString(inputCon) # Include packages as required packageNames <- unserialize(SparkR:::readRaw(inputCon)) for (pkg in packageNames) { - suppressPackageStartupMessages(require(as.character(pkg), character.only=TRUE)) + suppressPackageStartupMessages(library(as.character(pkg), character.only=TRUE)) } # read function dependencies @@ -46,6 +63,9 @@ computeFunc <- unserialize(SparkR:::readRawLen(inputCon, funcLen)) env <- environment(computeFunc) parent.env(env) <- .GlobalEnv # Attach under global environment. +# Timing init envs for computing +initElap <- elapsedSecs() + # Read and set broadcast variables numBroadcastVars <- SparkR:::readInt(inputCon) if (numBroadcastVars > 0) { @@ -56,6 +76,9 @@ if (numBroadcastVars > 0) { } } +# Timing broadcast +broadcastElap <- elapsedSecs() + # If -1: read as normal RDD; if >= 0, treat as pairwise RDD and treat the int # as number of partitions to create. numPartitions <- SparkR:::readInt(inputCon) @@ -73,14 +96,23 @@ if (isEmpty != 0) { } else if (deserializer == "row") { data <- SparkR:::readDeserializeRows(inputCon) } + # Timing reading input data for execution + inputElap <- elapsedSecs() + output <- computeFunc(partition, data) + # Timing computing + computeElap <- elapsedSecs() + if (serializer == "byte") { SparkR:::writeRawSerialize(outputCon, output) } else if (serializer == "row") { SparkR:::writeRowSerialize(outputCon, output) } else { - SparkR:::writeStrings(outputCon, output) + # write lines one-by-one with flag + lapply(output, function(line) SparkR:::writeString(outputCon, line)) } + # Timing output + outputElap <- elapsedSecs() } else { if (deserializer == "byte") { # Now read as many characters as described in funcLen @@ -90,6 +122,8 @@ if (isEmpty != 0) { } else if (deserializer == "row") { data <- SparkR:::readDeserializeRows(inputCon) } + # Timing reading input data for execution + inputElap <- elapsedSecs() res <- new.env() @@ -107,6 +141,8 @@ if (isEmpty != 0) { res[[bucket]] <- acc } invisible(lapply(data, hashTupleToEnvir)) + # Timing computing + computeElap <- elapsedSecs() # Step 2: write out all of the environment as key-value pairs. for (name in ls(res)) { @@ -116,13 +152,26 @@ if (isEmpty != 0) { length(res[[name]]$data) <- res[[name]]$counter SparkR:::writeRawSerialize(outputCon, res[[name]]$data) } + # Timing output + outputElap <- elapsedSecs() } +} else { + inputElap <- broadcastElap + computeElap <- broadcastElap + outputElap <- broadcastElap } +# Report timing +SparkR:::writeInt(outputCon, specialLengths$TIMING_DATA) +SparkR:::writeDouble(outputCon, bootTime) +SparkR:::writeDouble(outputCon, initElap - bootElap) # init +SparkR:::writeDouble(outputCon, broadcastElap - initElap) # broadcast +SparkR:::writeDouble(outputCon, inputElap - broadcastElap) # input +SparkR:::writeDouble(outputCon, computeElap - inputElap) # compute +SparkR:::writeDouble(outputCon, outputElap - computeElap) # output + # End of output -if (serializer %in% c("byte", "row")) { - SparkR:::writeInt(outputCon, 0L) -} +SparkR:::writeInt(outputCon, specialLengths$END_OF_STERAM) close(outputCon) close(inputCon) -- cgit v1.2.3