aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main/scala/org/apache/spark/streaming/receiver/Receiver.scala
blob: 5157ca62dc44942c17a2387f3ad6b5a425100138 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.streaming.receiver

import java.nio.ByteBuffer

import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConverters._

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.storage.StorageLevel

/**
 * :: DeveloperApi ::
 * Abstract class of a receiver that can be run on worker nodes to receive external data. A
 * custom receiver can be defined by defining the functions `onStart()` and `onStop()`. `onStart()`
 * should define the setup steps necessary to start receiving data,
 * and `onStop()` should define the cleanup steps necessary to stop receiving data.
 * Exceptions while receiving can be handled either by restarting the receiver with `restart(...)`
 * or stopped completely by `stop(...)` or
 *
 * A custom receiver in Scala would look like this.
 *
 * {{{
 *  class MyReceiver(storageLevel: StorageLevel) extends NetworkReceiver[String](storageLevel) {
 *      def onStart() {
 *          // Setup stuff (start threads, open sockets, etc.) to start receiving data.
 *          // Must start new thread to receive data, as onStart() must be non-blocking.
 *
 *          // Call store(...) in those threads to store received data into Spark's memory.
 *
 *          // Call stop(...), restart(...) or reportError(...) on any thread based on how
 *          // different errors needs to be handled.
 *
 *          // See corresponding method documentation for more details
 *      }
 *
 *      def onStop() {
 *          // Cleanup stuff (stop threads, close sockets, etc.) to stop receiving data.
 *      }
 *  }
 * }}}
 *
 * A custom receiver in Java would look like this.
 *
 * {{{
 * class MyReceiver extends Receiver<String> {
 *     public MyReceiver(StorageLevel storageLevel) {
 *         super(storageLevel);
 *     }
 *
 *     public void onStart() {
 *          // Setup stuff (start threads, open sockets, etc.) to start receiving data.
 *          // Must start new thread to receive data, as onStart() must be non-blocking.
 *
 *          // Call store(...) in those threads to store received data into Spark's memory.
 *
 *          // Call stop(...), restart(...) or reportError(...) on any thread based on how
 *          // different errors needs to be handled.
 *
 *          // See corresponding method documentation for more details
 *     }
 *
 *     public void onStop() {
 *          // Cleanup stuff (stop threads, close sockets, etc.) to stop receiving data.
 *     }
 * }
 * }}}
 */
@DeveloperApi
abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable {

  /**
   * This method is called by the system when the receiver is started. This function
   * must initialize all resources (threads, buffers, etc.) necessary for receiving data.
   * This function must be non-blocking, so receiving the data must occur on a different
   * thread. Received data can be stored with Spark by calling `store(data)`.
   *
   * If there are errors in threads started here, then following options can be done
   * (i) `reportError(...)` can be called to report the error to the driver.
   * The receiving of data will continue uninterrupted.
   * (ii) `stop(...)` can be called to stop receiving data. This will call `onStop()` to
   * clear up all resources allocated (threads, buffers, etc.) during `onStart()`.
   * (iii) `restart(...)` can be called to restart the receiver. This will call `onStop()`
   * immediately, and then `onStart()` after a delay.
   */
  def onStart(): Unit

  /**
   * This method is called by the system when the receiver is stopped. All resources
   * (threads, buffers, etc.) set up in `onStart()` must be cleaned up in this method.
   */
  def onStop(): Unit

  /** Override this to specify a preferred location (hostname). */
  def preferredLocation: Option[String] = None

  /**
   * Store a single item of received data to Spark's memory.
   * These single items will be aggregated together into data blocks before
   * being pushed into Spark's memory.
   */
  def store(dataItem: T) {
    supervisor.pushSingle(dataItem)
  }

  /** Store an ArrayBuffer of received data as a data block into Spark's memory. */
  def store(dataBuffer: ArrayBuffer[T]) {
    supervisor.pushArrayBuffer(dataBuffer, None, None)
  }

  /**
   * Store an ArrayBuffer of received data as a data block into Spark's memory.
   * The metadata will be associated with this block of data
   * for being used in the corresponding InputDStream.
   */
  def store(dataBuffer: ArrayBuffer[T], metadata: Any) {
    supervisor.pushArrayBuffer(dataBuffer, Some(metadata), None)
  }

  /** Store an iterator of received data as a data block into Spark's memory. */
  def store(dataIterator: Iterator[T]) {
    supervisor.pushIterator(dataIterator, None, None)
  }

  /**
   * Store an iterator of received data as a data block into Spark's memory.
   * The metadata will be associated with this block of data
   * for being used in the corresponding InputDStream.
   */
  def store(dataIterator: java.util.Iterator[T], metadata: Any) {
    supervisor.pushIterator(dataIterator.asScala, Some(metadata), None)
  }

  /** Store an iterator of received data as a data block into Spark's memory. */
  def store(dataIterator: java.util.Iterator[T]) {
    supervisor.pushIterator(dataIterator.asScala, None, None)
  }

  /**
   * Store an iterator of received data as a data block into Spark's memory.
   * The metadata will be associated with this block of data
   * for being used in the corresponding InputDStream.
   */
  def store(dataIterator: Iterator[T], metadata: Any) {
    supervisor.pushIterator(dataIterator, Some(metadata), None)
  }

  /**
   * Store the bytes of received data as a data block into Spark's memory. Note
   * that the data in the ByteBuffer must be serialized using the same serializer
   * that Spark is configured to use.
   */
  def store(bytes: ByteBuffer) {
    supervisor.pushBytes(bytes, None, None)
  }

  /**
   * Store the bytes of received data as a data block into Spark's memory.
   * The metadata will be associated with this block of data
   * for being used in the corresponding InputDStream.
   */
  def store(bytes: ByteBuffer, metadata: Any) {
    supervisor.pushBytes(bytes, Some(metadata), None)
  }

  /** Report exceptions in receiving data. */
  def reportError(message: String, throwable: Throwable) {
    supervisor.reportError(message, throwable)
  }

  /**
   * Restart the receiver. This method schedules the restart and returns
   * immediately. The stopping and subsequent starting of the receiver
   * (by calling `onStop()` and `onStart()`) is performed asynchronously
   * in a background thread. The delay between the stopping and the starting
   * is defined by the Spark configuration `spark.streaming.receiverRestartDelay`.
   * The `message` will be reported to the driver.
   */
  def restart(message: String) {
    supervisor.restartReceiver(message)
  }

  /**
   * Restart the receiver. This method schedules the restart and returns
   * immediately. The stopping and subsequent starting of the receiver
   * (by calling `onStop()` and `onStart()`) is performed asynchronously
   * in a background thread. The delay between the stopping and the starting
   * is defined by the Spark configuration `spark.streaming.receiverRestartDelay`.
   * The `message` and `exception` will be reported to the driver.
   */
  def restart(message: String, error: Throwable) {
    supervisor.restartReceiver(message, Some(error))
  }

  /**
   * Restart the receiver. This method schedules the restart and returns
   * immediately. The stopping and subsequent starting of the receiver
   * (by calling `onStop()` and `onStart()`) is performed asynchronously
   * in a background thread.
   */
  def restart(message: String, error: Throwable, millisecond: Int) {
    supervisor.restartReceiver(message, Some(error), millisecond)
  }

  /** Stop the receiver completely. */
  def stop(message: String) {
    supervisor.stop(message, None)
  }

  /** Stop the receiver completely due to an exception */
  def stop(message: String, error: Throwable) {
    supervisor.stop(message, Some(error))
  }

  /** Check if the receiver has started or not. */
  def isStarted(): Boolean = {
    supervisor.isReceiverStarted()
  }

  /**
   * Check if receiver has been marked for stopping. Use this to identify when
   * the receiving of data should be stopped.
   */
  def isStopped(): Boolean = {
    supervisor.isReceiverStopped()
  }

  /**
   * Get the unique identifier the receiver input stream that this
   * receiver is associated with.
   */
  def streamId: Int = id

  /*
   * =================
   * Private methods
   * =================
   */

  /** Identifier of the stream this receiver is associated with. */
  private var id: Int = -1

  /** Handler object that runs the receiver. This is instantiated lazily in the worker. */
  @transient private var _supervisor: ReceiverSupervisor = null

  /** Set the ID of the DStream that this receiver is associated with. */
  private[streaming] def setReceiverId(_id: Int) {
    id = _id
  }

  /** Attach Network Receiver executor to this receiver. */
  private[streaming] def attachSupervisor(exec: ReceiverSupervisor) {
    assert(_supervisor == null)
    _supervisor = exec
  }

  /** Get the attached supervisor. */
  private[streaming] def supervisor: ReceiverSupervisor = {
    assert(_supervisor != null,
      "A ReceiverSupervisor has not been attached to the receiver yet. Maybe you are starting " +
        "some computation in the receiver before the Receiver.onStart() has been called.")
    _supervisor
  }
}