aboutsummaryrefslogtreecommitdiff
path: root/R/pkg/inst/worker/worker.R
diff options
context:
space:
mode:
Diffstat (limited to 'R/pkg/inst/worker/worker.R')
-rw-r--r--R/pkg/inst/worker/worker.R59
1 files changed, 54 insertions, 5 deletions
diff --git a/R/pkg/inst/worker/worker.R b/R/pkg/inst/worker/worker.R
index c6542928e8..014bf7bd7b 100644
--- a/R/pkg/inst/worker/worker.R
+++ b/R/pkg/inst/worker/worker.R
@@ -17,6 +17,23 @@
# Worker class
+# Get current system time
+currentTimeSecs <- function() {
+ as.numeric(Sys.time())
+}
+
+# Get elapsed time
+elapsedSecs <- function() {
+ proc.time()[3]
+}
+
+# Constants
+specialLengths <- list(END_OF_STERAM = 0L, TIMING_DATA = -1L)
+
+# Timing R process boot
+bootTime <- currentTimeSecs()
+bootElap <- elapsedSecs()
+
rLibDir <- Sys.getenv("SPARKR_RLIBDIR")
# Set libPaths to include SparkR package as loadNamespace needs this
# TODO: Figure out if we can avoid this by not loading any objects that require
@@ -37,7 +54,7 @@ serializer <- SparkR:::readString(inputCon)
# Include packages as required
packageNames <- unserialize(SparkR:::readRaw(inputCon))
for (pkg in packageNames) {
- suppressPackageStartupMessages(require(as.character(pkg), character.only=TRUE))
+ suppressPackageStartupMessages(library(as.character(pkg), character.only=TRUE))
}
# read function dependencies
@@ -46,6 +63,9 @@ computeFunc <- unserialize(SparkR:::readRawLen(inputCon, funcLen))
env <- environment(computeFunc)
parent.env(env) <- .GlobalEnv # Attach under global environment.
+# Timing init envs for computing
+initElap <- elapsedSecs()
+
# Read and set broadcast variables
numBroadcastVars <- SparkR:::readInt(inputCon)
if (numBroadcastVars > 0) {
@@ -56,6 +76,9 @@ if (numBroadcastVars > 0) {
}
}
+# Timing broadcast
+broadcastElap <- elapsedSecs()
+
# If -1: read as normal RDD; if >= 0, treat as pairwise RDD and treat the int
# as number of partitions to create.
numPartitions <- SparkR:::readInt(inputCon)
@@ -73,14 +96,23 @@ if (isEmpty != 0) {
} else if (deserializer == "row") {
data <- SparkR:::readDeserializeRows(inputCon)
}
+ # Timing reading input data for execution
+ inputElap <- elapsedSecs()
+
output <- computeFunc(partition, data)
+ # Timing computing
+ computeElap <- elapsedSecs()
+
if (serializer == "byte") {
SparkR:::writeRawSerialize(outputCon, output)
} else if (serializer == "row") {
SparkR:::writeRowSerialize(outputCon, output)
} else {
- SparkR:::writeStrings(outputCon, output)
+ # write lines one-by-one with flag
+ lapply(output, function(line) SparkR:::writeString(outputCon, line))
}
+ # Timing output
+ outputElap <- elapsedSecs()
} else {
if (deserializer == "byte") {
# Now read as many characters as described in funcLen
@@ -90,6 +122,8 @@ if (isEmpty != 0) {
} else if (deserializer == "row") {
data <- SparkR:::readDeserializeRows(inputCon)
}
+ # Timing reading input data for execution
+ inputElap <- elapsedSecs()
res <- new.env()
@@ -107,6 +141,8 @@ if (isEmpty != 0) {
res[[bucket]] <- acc
}
invisible(lapply(data, hashTupleToEnvir))
+ # Timing computing
+ computeElap <- elapsedSecs()
# Step 2: write out all of the environment as key-value pairs.
for (name in ls(res)) {
@@ -116,13 +152,26 @@ if (isEmpty != 0) {
length(res[[name]]$data) <- res[[name]]$counter
SparkR:::writeRawSerialize(outputCon, res[[name]]$data)
}
+ # Timing output
+ outputElap <- elapsedSecs()
}
+} else {
+ inputElap <- broadcastElap
+ computeElap <- broadcastElap
+ outputElap <- broadcastElap
}
+# Report timing
+SparkR:::writeInt(outputCon, specialLengths$TIMING_DATA)
+SparkR:::writeDouble(outputCon, bootTime)
+SparkR:::writeDouble(outputCon, initElap - bootElap) # init
+SparkR:::writeDouble(outputCon, broadcastElap - initElap) # broadcast
+SparkR:::writeDouble(outputCon, inputElap - broadcastElap) # input
+SparkR:::writeDouble(outputCon, computeElap - inputElap) # compute
+SparkR:::writeDouble(outputCon, outputElap - computeElap) # output
+
# End of output
-if (serializer %in% c("byte", "row")) {
- SparkR:::writeInt(outputCon, 0L)
-}
+SparkR:::writeInt(outputCon, specialLengths$END_OF_STERAM)
close(outputCon)
close(inputCon)