aboutsummaryrefslogtreecommitdiff
path: root/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoopInit.scala
blob: f1febb9497c7803060c08ef1c8ee20feff6c7329 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
// scalastyle:off

/* NSC -- new Scala compiler
 * Copyright 2005-2013 LAMP/EPFL
 * @author Paul Phillips
 */

package org.apache.spark.repl

import scala.tools.nsc._
import scala.tools.nsc.interpreter._

import scala.reflect.internal.util.Position
import scala.util.control.Exception.ignoring
import scala.tools.nsc.util.stackTraceString

import org.apache.spark.SPARK_VERSION

/**
 *  Machinery for the asynchronous initialization of the repl.
 */
private[repl] trait SparkILoopInit {
  self: SparkILoop =>

  /** Print a welcome message */
  def printWelcome() {
    echo("""Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version %s
      /_/
""".format(SPARK_VERSION))
    import Properties._
    val welcomeMsg = "Using Scala %s (%s, Java %s)".format(
      versionString, javaVmName, javaVersion)
    echo(welcomeMsg)
    echo("Type in expressions to have them evaluated.")
    echo("Type :help for more information.")
   }

  protected def asyncMessage(msg: String) {
    if (isReplInfo || isReplPower)
      echoAndRefresh(msg)
  }

  private val initLock = new java.util.concurrent.locks.ReentrantLock()
  private val initCompilerCondition = initLock.newCondition() // signal the compiler is initialized
  private val initLoopCondition = initLock.newCondition()     // signal the whole repl is initialized
  private val initStart = System.nanoTime

  private def withLock[T](body: => T): T = {
    initLock.lock()
    try body
    finally initLock.unlock()
  }
  // a condition used to ensure serial access to the compiler.
  @volatile private var initIsComplete = false
  @volatile private var initError: String = null
  private def elapsed() = "%.3f".format((System.nanoTime - initStart).toDouble / 1000000000L)

  // the method to be called when the interpreter is initialized.
  // Very important this method does nothing synchronous (i.e. do
  // not try to use the interpreter) because until it returns, the
  // repl's lazy val `global` is still locked.
  protected def initializedCallback() = withLock(initCompilerCondition.signal())

  // Spins off a thread which awaits a single message once the interpreter
  // has been initialized.
  protected def createAsyncListener() = {
    io.spawn {
      withLock(initCompilerCondition.await())
      asyncMessage("[info] compiler init time: " + elapsed() + " s.")
      postInitialization()
    }
  }

  // called from main repl loop
  protected def awaitInitialized(): Boolean = {
    if (!initIsComplete)
      withLock { while (!initIsComplete) initLoopCondition.await() }
    if (initError != null) {
      // scalastyle:off println
      println("""
        |Failed to initialize the REPL due to an unexpected error.
        |This is a bug, please, report it along with the error diagnostics printed below.
        |%s.""".stripMargin.format(initError)
      )
      // scalastyle:on println
      false
    } else true
  }
  // private def warningsThunks = List(
  //   () => intp.bind("lastWarnings", "" + typeTag[List[(Position, String)]], intp.lastWarnings _),
  // )

  protected def postInitThunks = List[Option[() => Unit]](
    Some(intp.setContextClassLoader _),
    if (isReplPower) Some(() => enablePowerMode(true)) else None
  ).flatten
  // ++ (
  //   warningsThunks
  // )
  // called once after init condition is signalled
  protected def postInitialization() {
    try {
      postInitThunks foreach (f => addThunk(f()))
      runThunks()
    } catch {
      case ex: Throwable =>
        initError = stackTraceString(ex)
        throw ex
    } finally {
      initIsComplete = true

      if (isAsync) {
        asyncMessage("[info] total init time: " + elapsed() + " s.")
        withLock(initLoopCondition.signal())
      }
    }
  }

  def initializeSpark() {
    intp.beQuietDuring {
      command("""
        @transient val sc = {
          val _sc = org.apache.spark.repl.Main.interp.createSparkContext()
          _sc.uiWebUrl.foreach(webUrl => println(s"Spark context Web UI available at ${webUrl}"))
          println("Spark context available as 'sc' " +
            s"(master = ${_sc.master}, app id = ${_sc.applicationId}).")
          _sc
        }
        """)
      command("""
        @transient val spark = {
          val _session = org.apache.spark.repl.Main.interp.createSparkSession()
          println("Spark session available as 'spark'.")
          _session
        }
        """)
      command("import org.apache.spark.SparkContext._")
      command("import spark.implicits._")
      command("import spark.sql")
      command("import org.apache.spark.sql.functions._")
    }
  }

  // code to be executed only after the interpreter is initialized
  // and the lazy val `global` can be accessed without risk of deadlock.
  private var pendingThunks: List[() => Unit] = Nil
  protected def addThunk(body: => Unit) = synchronized {
    pendingThunks :+= (() => body)
  }
  protected def runThunks(): Unit = synchronized {
    if (pendingThunks.nonEmpty)
      logDebug("Clearing " + pendingThunks.size + " thunks.")

    while (pendingThunks.nonEmpty) {
      val thunk = pendingThunks.head
      pendingThunks = pendingThunks.tail
      thunk()
    }
  }
}