aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorfireflyc <fireflyc@126.com>2014-07-25 10:47:52 -0700
committerMatei Zaharia <matei@databricks.com>2014-07-25 10:47:52 -0700
commita2715ccd9437fcdfa0b15e85ab4d0cec91aadf07 (patch)
tree015c3d7411b9efd054a47cfbaa47d2f52e6db110 /streaming
parent184aa1c6c0ddf26b703bcabf55397ade17497465 (diff)
downloadspark-a2715ccd9437fcdfa0b15e85ab4d0cec91aadf07.tar.gz
spark-a2715ccd9437fcdfa0b15e85ab4d0cec91aadf07.tar.bz2
spark-a2715ccd9437fcdfa0b15e85ab4d0cec91aadf07.zip
replace println to log4j
Our program needs to receive a large amount of data and run for a long time. We set the log level to WARN but "Storing iterator" "received single" as such message written to the log file. (over yarn) Author: fireflyc <fireflyc@126.com> Closes #1372 from fireflyc/fix-replace-stdout-log and squashes the following commits: e684140 [fireflyc] 'info' modified into the 'debug' fa22a38 [fireflyc] replace println to log4j
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala12
1 files changed, 7 insertions, 5 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala
index 743be58950..1868a1ebc7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala
@@ -68,13 +68,13 @@ object ActorSupervisorStrategy {
* should be same.
*/
@DeveloperApi
-trait ActorHelper {
+trait ActorHelper extends Logging{
self: Actor => // to ensure that this can be added to Actor classes only
/** Store an iterator of received data as a data block into Spark's memory. */
def store[T](iter: Iterator[T]) {
- println("Storing iterator")
+ logDebug("Storing iterator")
context.parent ! IteratorData(iter)
}
@@ -84,6 +84,7 @@ trait ActorHelper {
* that Spark is configured to use.
*/
def store(bytes: ByteBuffer) {
+ logDebug("Storing Bytes")
context.parent ! ByteBufferData(bytes)
}
@@ -93,7 +94,7 @@ trait ActorHelper {
* being pushed into Spark's memory.
*/
def store[T](item: T) {
- println("Storing item")
+ logDebug("Storing item")
context.parent ! SingleItemData(item)
}
}
@@ -157,15 +158,16 @@ private[streaming] class ActorReceiver[T: ClassTag](
def receive = {
case IteratorData(iterator) =>
- println("received iterator")
+ logDebug("received iterator")
store(iterator.asInstanceOf[Iterator[T]])
case SingleItemData(msg) =>
- println("received single")
+ logDebug("received single")
store(msg.asInstanceOf[T])
n.incrementAndGet
case ByteBufferData(bytes) =>
+ logDebug("received bytes")
store(bytes)
case props: Props =>