Spark 源代码 SparkContext

Spark  源代码 SparkContext

/*  * Licensed to the Apache Software Foundation (ASF) under one or more  * contributor license agreements.  See the NOTICE file distributed with  * this work for additional information regarding copyright ownership.  * The ASF licenses this file to You under the Apache License, Version 2.0  * (the "License"); you may not use this file except in compliance with  * the License.  You may obtain a copy of the License at  *  *    http://www.apache.org/licenses/LICENSE-2.0  *  * Unless required by applicable law or agreed to in writing, software  * distributed under the License is distributed on an "AS IS" BASIS,  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  * See the License for the specific language governing permissions and  * limitations under the License.  */ package org.apache.spark import scala.language.implicitConversions import java.io._ import java.lang.reflect.Constructor import java.net.URI import java.util.{Arrays, Properties, UUID} import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean, AtomicInteger} import java.util.UUID.randomUUID import scala.collection.JavaConverters._ import scala.collection.{Map, Set} import scala.collection.generic.Growable import scala.collection.mutable.HashMap import scala.reflect.{ClassTag, classTag} import scala.util.control.NonFatal import org.apache.commons.lang.SerializationUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable,   FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable} import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat,   TextInputFormat} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob} import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} import org.apache.mesos.MesosNativeLibrary import org.apache.spark.annotation.DeveloperApi import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat,   FixedLengthBinaryInputFormat} import org.apache.spark.io.CompressionCodec import org.apache.spark.metrics.MetricsSystem import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} import org.apache.spark.rdd._ import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend,   SparkDeploySchedulerBackend, SimrSchedulerBackend} import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} import org.apache.spark.scheduler.local.LocalBackend import org.apache.spark.storage._ import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump import org.apache.spark.ui.{SparkUI, ConsoleProgressBar} import org.apache.spark.ui.jobs.JobProgressListener import org.apache.spark.util._ /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. * * Only one SparkContext may be active per JVM.  You must `stop()` the active SparkContext before * creating a new one.  This limitation may eventually be removed; see SPARK-2243 for more details. * * @param config a Spark Config object describing the application configuration. Any settings in *   this config overrides the default configs as well as system properties. */ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient { // The call site where this SparkContext was constructed.   private val creationSite: CallSite = Utils.getCallSite() // If true, log warnings instead of throwing exceptions when multiple SparkContexts are active   private val allowMultipleContexts: Boolean = config.getBoolean("spark.driver.allowMultipleContexts", false) // In order to prevent multiple SparkContexts from being active at the same time, mark this   // context as having started construction.   // NOTE: this must be placed at the beginning of the SparkContext constructor.   SparkContext.markPartiallyConstructed(this, allowMultipleContexts) val startTime = System.currentTimeMillis() private[spark] val stopped: AtomicBoolean = new AtomicBoolean(false) private def assertNotStopped(): Unit = { if (stopped.get()) { val activeContext = SparkContext.activeContext.get() val activeCreationSite = if (activeContext == null) { "(No active SparkContext.)"         } else { activeContext.creationSite.longForm } throw new IllegalStateException( s"""Cannot call methods on a stopped SparkContext.            |This stopped SparkContext was created at:            |            |${creationSite.longForm}           |            |The currently active SparkContext was created at:            |            |$activeCreationSite """.stripMargin) } } /**   * Create a SparkContext that loads settings from system properties (for instance, when   * launching with ./bin/spark-submit).   */  def this() = this(new SparkConf()) /**   * :: DeveloperApi ::   * Alternative constructor for setting preferred locations where Spark will create executors.   *   * @param config a [[org.apache.spark.SparkConf]] object specifying other Spark parameters   * @param preferredNodeLocationData not used. Left for backward compatibility.   */  @deprecated("Passing in preferred locations has no effect at all, see SPARK-8949", "1.5.0") @DeveloperApi   def this(config: SparkConf, preferredNodeLocationData: Map[String, Set[SplitInfo]]) = { this(config) logWarning("Passing in preferred locations has no effect at all, see SPARK-8949") } /**   * Alternative constructor that allows setting common Spark properties directly   *   * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).   * @param appName A name for your application, to display on the cluster web UI   * @param conf a [[org.apache.spark.SparkConf]] object specifying other Spark parameters   */  def this(master: String, appName: String, conf: SparkConf) = this(SparkContext.updatedConf(conf, master, appName)) /**   * Alternative constructor that allows setting common Spark properties directly   *   * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).   * @param appName A name for your application, to display on the cluster web UI.   * @param sparkHome Location where Spark is installed on cluster nodes.   * @param jars Collection of JARs to send to the cluster. These can be paths on the local file   *             system or HDFS, HTTP, HTTPS, or FTP URLs.   * @param environment Environment variables to set on worker nodes.   * @param preferredNodeLocationData not used. Left for backward compatibility.   */  @deprecated("Passing in preferred locations has no effect at all, see SPARK-10921", "1.6.0") def this( master: String,       appName: String,       sparkHome: String = null,       jars: Seq[String] = Nil,       environment: Map[String, String] = Map(),       preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()) = { this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment)) if (preferredNodeLocationData.nonEmpty) { logWarning("Passing in preferred locations has no effect at all, see SPARK-8949") } } // NOTE: The below constructors could be consolidated using default arguments. Due to   // Scala bug SI-8479, however, this causes the compile step to fail when generating docs.   // Until we have a good workaround for that bug the constructors remain broken out.   /**   * Alternative constructor that allows setting common Spark properties directly   *   * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).   * @param appName A name for your application, to display on the cluster web UI.   */  private[spark] def this(master: String, appName: String) = this(master, appName, null, Nil, Map()) /**   * Alternative constructor that allows setting common Spark properties directly   *   * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).   * @param appName A name for your application, to display on the cluster web UI.   * @param sparkHome Location where Spark is installed on cluster nodes.   */  private[spark] def this(master: String, appName: String, sparkHome: String) = this(master, appName, sparkHome, Nil, Map()) /**   * Alternative constructor that allows setting common Spark properties directly   *   * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).   * @param appName A name for your application, to display on the cluster web UI.   * @param sparkHome Location where Spark is installed on cluster nodes.   * @param jars Collection of JARs to send to the cluster. These can be paths on the local file   *             system or HDFS, HTTP, HTTPS, or FTP URLs.   */  private[spark] def this(master: String, appName: String, sparkHome: String, jars: Seq[String]) = this(master, appName, sparkHome, jars, Map()) // log out Spark Version in Spark driver log   logInfo(s"Running Spark version $SPARK_VERSION") /* ------------------------------------------------------------------------------------- *    | Private variables. These variables keep the internal state of the context, and are    |    | not accessible by the outside world. They're mutable since we want to initialize all  |    | of them to some neutral value ahead of time, so that calling "stop()" while the       |    | constructor is still running is safe.                                                 |    * ------------------------------------------------------------------------------------- */   private var _conf: SparkConf = _ private var _eventLogDir: Option[URI] = None private var _eventLogCodec: Option[String] = None private var _env: SparkEnv = _ private var _metadataCleaner: MetadataCleaner = _ private var _jobProgressListener: JobProgressListener = _ private var _statusTracker: SparkStatusTracker = _ private var _progressBar: Option[ConsoleProgressBar] = None private var _ui: Option[SparkUI] = None private var _hadoopConfiguration: Configuration = _ private var _executorMemory: Int = _ private var _schedulerBackend: SchedulerBackend = _ private var _taskScheduler: TaskScheduler = _ private var _heartbeatReceiver: RpcEndpointRef = _ @volatile private var _dagScheduler: DAGScheduler = _ private var _applicationId: String = _ private var _applicationAttemptId: Option[String] = None private var _eventLogger: Option[EventLoggingListener] = None private var _executorAllocationManager: Option[ExecutorAllocationManager] = None private var _cleaner: Option[ContextCleaner] = None private var _listenerBusStarted: Boolean = false  private var _jars: Seq[String] = _ private var _files: Seq[String] = _ private var _shutdownHookRef: AnyRef = _ /* ------------------------------------------------------------------------------------- *    | Accessors and public fields. These provide access to the internal state of the        |    | context.                                                                              |    * ------------------------------------------------------------------------------------- */   private[spark] def conf: SparkConf = _conf  /**   * Return a copy of this SparkContext's configuration. The configuration ''cannot'' be   * changed at runtime.   */  def getConf: SparkConf = conf.clone() def jars: Seq[String] = _jars  def files: Seq[String] = _files  def master: String = _conf.get("spark.master") def appName: String = _conf.get("spark.app.name") private[spark] def isEventLogEnabled: Boolean = _conf.getBoolean("spark.eventLog.enabled", false) private[spark] def eventLogDir: Option[URI] = _eventLogDir  private[spark] def eventLogCodec: Option[String] = _eventLogCodec  // Generate the random name for a temp folder in external block store.   // Add a timestamp as the suffix here to make it more safe   val externalBlockStoreFolderName = "spark-" + randomUUID.toString() @deprecated("Use externalBlockStoreFolderName instead.", "1.4.0") val tachyonFolderName = externalBlockStoreFolderName  def isLocal: Boolean = (master == "local" || master.startsWith("local[")) /**   * @return true if context is stopped or in the midst of stopping.   */  def isStopped: Boolean = stopped.get() // An asynchronous listener bus for Spark events   private[spark] val listenerBus = new LiveListenerBus // This function allows components created by SparkEnv to be mocked in unit tests:   private[spark] def createSparkEnv( conf: SparkConf,       isLocal: Boolean,       listenerBus: LiveListenerBus): SparkEnv = { SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master)) } private[spark] def env: SparkEnv = _env  // Used to store a URL for each static file/jar together with the file's local timestamp   private[spark] val addedFiles = HashMap[String, Long]() private[spark] val addedJars = HashMap[String, Long]() // Keeps track of all persisted RDDs   private[spark] val persistentRdds = new TimeStampedWeakValueHashMap[Int, RDD[_]] private[spark] def metadataCleaner: MetadataCleaner = _metadataCleaner  private[spark] def jobProgressListener: JobProgressListener = _jobProgressListener  def statusTracker: SparkStatusTracker = _statusTracker  private[spark] def progressBar: Option[ConsoleProgressBar] = _progressBar  private[spark] def ui: Option[SparkUI] = _ui  /**   * A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse.   *   * '''Note:''' As it will be reused in all Hadoop RDDs, it's better not to modify it unless you   * plan to set some global configurations for all Hadoop RDDs.   */  def hadoopConfiguration: Configuration = _hadoopConfiguration  private[spark] def executorMemory: Int = _executorMemory  // Environment variables to pass to our executors.   private[spark] val executorEnvs = HashMap[String, String]() // Set SPARK_USER for user who is running SparkContext.   val sparkUser = Utils.getCurrentUserName() private[spark] def schedulerBackend: SchedulerBackend = _schedulerBackend  private[spark] def schedulerBackend_=(sb: SchedulerBackend): Unit = { _schedulerBackend = sb } private[spark] def taskScheduler: TaskScheduler = _taskScheduler  private[spark] def taskScheduler_=(ts: TaskScheduler): Unit = { _taskScheduler = ts } private[spark] def dagScheduler: DAGScheduler = _dagScheduler  private[spark] def dagScheduler_=(ds: DAGScheduler): Unit = { _dagScheduler = ds } /**   * A unique identifier for the Spark application.   * Its format depends on the scheduler implementation.   * (i.e.   *  in case of local spark app something like 'local-1433865536131'   *  in case of YARN something like 'application_1433865536131_34483'   * )   */  def applicationId: String = _applicationId  def applicationAttemptId: Option[String] = _applicationAttemptId  def metricsSystem: MetricsSystem = if (_env != null) _env.metricsSystem else null  private[spark] def eventLogger: Option[EventLoggingListener] = _eventLogger  private[spark] def executorAllocationManager: Option[ExecutorAllocationManager] = _executorAllocationManager  private[spark] def cleaner: Option[ContextCleaner] = _cleaner  private[spark] var checkpointDir: Option[String] = None // Thread Local variable that can be used by users to pass information down the stack   protected[spark] val localProperties = new InheritableThreadLocal[Properties] { override protected def childValue(parent: Properties): Properties = { // Note: make a clone such that changes in the parent properties aren't reflected in       // the those of the children threads, which has confusing semantics (SPARK-10563).       SerializationUtils.clone(parent).asInstanceOf[Properties] } override protected def initialValue(): Properties = new Properties() } /* ------------------------------------------------------------------------------------- *    | Initialization. This code initializes the context in a manner that is exception-safe. |    | All internal fields holding state are initialized here, and any error prompts the     |    | stop() method to be called.                                                           |    * ------------------------------------------------------------------------------------- */   private def warnSparkMem(value: String): String = { logWarning("Using SPARK_MEM to set amount of memory to use per executor process is " + "deprecated, please use spark.executor.memory instead.") value } /** Control our logLevel. This overrides any user-defined log settings.   * @param logLevel The desired log level as a string.   * Valid log levels include: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN   */  def setLogLevel(logLevel: String) { val validLevels = Seq("ALL", "DEBUG", "ERROR", "FATAL", "INFO", "OFF", "TRACE", "WARN") if (!validLevels.contains(logLevel)) { throw new IllegalArgumentException( s"Supplied level $logLevel did not match one of: ${validLevels.mkString(",")}") } Utils.setLogLevel(org.apache.log4j.Level.toLevel(logLevel)) } try { _conf = config.clone() _conf.validateSettings() if (!_conf.contains("spark.master")) { throw new SparkException("A master URL must be set in your configuration") } if (!_conf.contains("spark.app.name")) { throw new SparkException("An application name must be set in your configuration") } // System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster     // yarn-standalone is deprecated, but still supported     if ((master == "yarn-cluster" || master == "yarn-standalone") && !_conf.contains("spark.yarn.app.id")) { throw new SparkException("Detected yarn-cluster mode, but isn't running on a cluster. " + "Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.") } if (_conf.getBoolean("spark.logConf", false)) { logInfo("Spark configuration:\n" + _conf.toDebugString) } // Set Spark driver host and port system properties     _conf.setIfMissing("spark.driver.host", Utils.localHostName()) _conf.setIfMissing("spark.driver.port", "0") _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER) _jars = _conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.size != 0)) .toSeq.flatten _eventLogDir = if (isEventLogEnabled) { val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR) .stripSuffix("/") Some(Utils.resolveURI(unresolvedDir)) } else { None } _eventLogCodec = { val compress = _conf.getBoolean("spark.eventLog.compress", false) if (compress && isEventLogEnabled) { Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName) } else { None } } _conf.set("spark.externalBlockStore.folderName", externalBlockStoreFolderName) if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true") // "_jobProgressListener" should be set up before creating SparkEnv because when creating     // "SparkEnv", some messages will be posted to "listenerBus" and we should not miss them.     _jobProgressListener = new JobProgressListener(_conf) listenerBus.addListener(jobProgressListener) // Create the Spark execution environment (cache, map output tracker, etc)     _env = createSparkEnv(_conf, isLocal, listenerBus) SparkEnv.set(_env) _metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, _conf) _statusTracker = new SparkStatusTracker(this) _progressBar = if (_conf.getBoolean("spark.ui.showConsoleProgress", true) && !log.isInfoEnabled) { Some(new ConsoleProgressBar(this)) } else { None } _ui = if (conf.getBoolean("spark.ui.enabled", true)) { Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener,           _env.securityManager, appName, startTime = startTime)) } else { // For tests, do not enable the UI         None } // Bind the UI before starting the task scheduler to communicate     // the bound port to the cluster manager properly     _ui.foreach(_.bind()) _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf) // Add each JAR given through the constructor     if (jars != null) { jars.foreach(addJar) } if (files != null) { files.foreach(addFile) } _executorMemory = _conf.getOption("spark.executor.memory") .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY"))) .orElse(Option(System.getenv("SPARK_MEM")) .map(warnSparkMem)) .map(Utils.memoryStringToMb) .getOrElse(1024) // Convert java options to env vars as a work around     // since we can't set env vars directly in sbt.     for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing")) value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} { executorEnvs(envKey) = value } Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v => executorEnvs("SPARK_PREPEND_CLASSES") = v } // The Mesos scheduler backend relies on this environment variable to set executor memory.     // TODO: Set this only in the Mesos scheduler.    executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"     executorEnvs ++= _conf.getExecutorEnv executorEnvs("SPARK_USER") = sparkUser    // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will     // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)     _heartbeatReceiver = env.rpcEnv.setupEndpoint( HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this)) // Create and start the scheduler     val (sched, ts) = SparkContext.createTaskScheduler(this, master) _schedulerBackend = sched _taskScheduler = ts _dagScheduler = new DAGScheduler(this) _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet) // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's     // constructor     _taskScheduler.start() _applicationId = _taskScheduler.applicationId() _applicationAttemptId = taskScheduler.applicationAttemptId() _conf.set("spark.app.id", _applicationId) _ui.foreach(_.setAppId(_applicationId)) _env.blockManager.initialize(_applicationId) // The metrics system for Driver need to be set spark.app.id to app ID.     // So it should start after we get app ID from the task scheduler and set spark.app.id.     metricsSystem.start() // Attach the driver metrics servlet handler to the web ui after the metrics system is started.     metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler))) _eventLogger = if (isEventLogEnabled) { val logger = new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,             _conf, _hadoopConfiguration) logger.start() listenerBus.addListener(logger) Some(logger) } else { None } // Optionally scale number of executors dynamically based on workload. Exposed for testing.     val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf) if (!dynamicAllocationEnabled && _conf.getBoolean("spark.dynamicAllocation.enabled", false)) { logWarning("Dynamic Allocation and num executors both set, thus dynamic allocation disabled.") } _executorAllocationManager = if (dynamicAllocationEnabled) { Some(new ExecutorAllocationManager(this, listenerBus, _conf)) } else { None } _executorAllocationManager.foreach(_.start()) _cleaner = if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) { Some(new ContextCleaner(this)) } else { None } _cleaner.foreach(_.start()) setupAndStartListenerBus() postEnvironmentUpdate() postApplicationStart() // Post init     _taskScheduler.postStartHook() _env.metricsSystem.registerSource(_dagScheduler.metricsSource) _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager)) _executorAllocationManager.foreach { e => _env.metricsSystem.registerSource(e.executorAllocationManagerSource) } // Make sure the context is stopped if the user forgets about it. This avoids leaving     // unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM     // is killed, though.     _shutdownHookRef = ShutdownHookManager.addShutdownHook( ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () => logInfo("Invoking stop() from shutdown hook") stop() } } catch { case NonFatal(e) => logError("Error initializing SparkContext.", e) try { stop() } catch { case NonFatal(inner) => logError("Error stopping SparkContext after init error.", inner) } finally { throw e } } /**   * Called by the web UI to obtain executor thread dumps.  This method may be expensive.   * Logs an error and returns None if we failed to obtain a thread dump, which could occur due   * to an executor being dead or unresponsive or due to network issues while sending the thread   * dump message back to the driver.   */  private[spark] def getExecutorThreadDump(executorId: String): Option[Array[ThreadStackTrace]] = { try { if (executorId == SparkContext.DRIVER_IDENTIFIER) { Some(Utils.getThreadDump()) } else { val endpointRef = env.blockManager.master.getExecutorEndpointRef(executorId).get Some(endpointRef.askWithRetry[Array[ThreadStackTrace]](TriggerThreadDump)) } } catch { case e: Exception => logError(s"Exception getting thread dump from executor $executorId", e) None } } private[spark] def getLocalProperties: Properties = localProperties.get() private[spark] def setLocalProperties(props: Properties) { localProperties.set(props) } @deprecated("Properties no longer need to be explicitly initialized.", "1.0.0") def initLocalProperties() { localProperties.set(new Properties()) } /**   * Set a local property that affects jobs submitted from this thread, such as the   * Spark fair scheduler pool.   */  def setLocalProperty(key: String, value: String) { if (value == null) { localProperties.get.remove(key) } else { localProperties.get.setProperty(key, value) } } /**   * Get a local property set in this thread, or null if it is missing. See   * [[org.apache.spark.SparkContext.setLocalProperty]].   */  def getLocalProperty(key: String): String = Option(localProperties.get).map(_.getProperty(key)).orNull /** Set a human readable description of the current job. */  def setJobDescription(value: String) { setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value) } /**   * Assigns a group ID to all the jobs started by this thread until the group ID is set to a   * different value or cleared.   *   * Often, a unit of execution in an application consists of multiple Spark actions or jobs.   * Application programmers can use this method to group all those jobs together and give a   * group description. Once set, the Spark web UI will associate such jobs with this group.   *   * The application can also use [[org.apache.spark.SparkContext.cancelJobGroup]] to cancel all   * running jobs in this group. For example,   * {{{    * // In the main thread:   * sc.setJobGroup("some_job_to_cancel", "some job description")   * sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.count()   *   * // In a separate thread:   * sc.cancelJobGroup("some_job_to_cancel")   * }}}    *   * If interruptOnCancel is set to true for the job group, then job cancellation will result   * in Thread.interrupt() being called on the job's executor threads. This is useful to help ensure   * that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208,   * where HDFS may respond to Thread.interrupt() by marking nodes as dead.   */  def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean = false) { setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, description) setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, groupId) // Note: Specifying interruptOnCancel in setJobGroup (rather than cancelJobGroup) avoids     // changing several public APIs and allows Spark cancellations outside of the cancelJobGroup     // APIs to also take advantage of this property (e.g., internal job failures or canceling from     // JobProgressTab UI) on a per-job basis.     setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, interruptOnCancel.toString) } /** Clear the current thread's job group ID and its description. */  def clearJobGroup() { setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, null) setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, null) setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, null) } /**   * Execute a block of code in a scope such that all new RDDs created in this body will   * be part of the same scope. For more detail, see {{org.apache.spark.rdd.RDDOperationScope}}.   *   * Note: Return statements are NOT allowed in the given body.   */  private[spark] def withScope[U](body: => U): U = RDDOperationScope.withScope[U](this)(body) // Methods for creating RDDs   /** Distribute a local Scala collection to form an RDD.   *   * @note Parallelize acts lazily. If `seq` is a mutable collection and is altered after the call   * to parallelize and before the first action on the RDD, the resultant RDD will reflect the   * modified collection. Pass a copy of the argument to avoid this.   * @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an   * RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions.   */  def parallelize[T: ClassTag]( seq: Seq[T],       numSlices: Int = defaultParallelism): RDD[T] = withScope { assertNotStopped() new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]()) } /**   * Creates a new RDD[Long] containing elements from `start` to `end`(exclusive), increased by   * `step` every element.   *   * @note if we need to cache this RDD, we should make sure each partition does not exceed limit.   *   * @param start the start value.   * @param end the end value.   * @param step the incremental step   * @param numSlices the partition number of the new RDD.   * @return   */  def range( start: Long,       end: Long,       step: Long = 1,       numSlices: Int = defaultParallelism): RDD[Long] = withScope { assertNotStopped() // when step is 0, range will run infinitely     require(step != 0, "step cannot be 0") val numElements: BigInt = { val safeStart = BigInt(start) val safeEnd = BigInt(end) if ((safeEnd - safeStart) % step == 0 || safeEnd > safeStart ^ step > 0) { (safeEnd - safeStart) / step } else { // the remainder has the same sign with range, could add 1 more         (safeEnd - safeStart) / step + 1       } } parallelize(0 until numSlices, numSlices).mapPartitionsWithIndex((i, _) => { val partitionStart = (i * numElements) / numSlices * step + start val partitionEnd = (((i + 1) * numElements) / numSlices) * step + start def getSafeMargin(bi: BigInt): Long = if (bi.isValidLong) { bi.toLong } else if (bi > 0) { Long.MaxValue        } else { Long.MinValue        } val safePartitionStart = getSafeMargin(partitionStart) val safePartitionEnd = getSafeMargin(partitionEnd) new Iterator[Long] { private[this] var number: Long = safePartitionStart private[this] var overflow: Boolean = false        override def hasNext = if (!overflow) { if (step > 0) { number < safePartitionEnd } else { number > safePartitionEnd } } else false        override def next() = { val ret = number          number += step if (number < ret ^ step < 0) { // we have Long.MaxValue + Long.MaxValue < Long.MaxValue             // and Long.MinValue + Long.MinValue > Long.MinValue, so iff the step causes a step             // back, we are pretty sure that we have an overflow.             overflow = true          } ret } } }) } /** Distribute a local Scala collection to form an RDD.   *   * This method is identical to `parallelize`.   */  def makeRDD[T: ClassTag]( seq: Seq[T],       numSlices: Int = defaultParallelism): RDD[T] = withScope { parallelize(seq, numSlices) } /** Distribute a local Scala collection to form an RDD, with one or more    * location preferences (hostnames of Spark nodes) for each object.    * Create a new partition for each collection item. */  def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = withScope { assertNotStopped() val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap new ParallelCollectionRDD[T](this, seq.map(_._1), seq.size, indexToPrefs) } /**   * Read a text file from HDFS, a local file system (available on all nodes), or any   * Hadoop-supported file system URI, and return it as an RDD of Strings.   */  def textFile( path: String,       minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { assertNotStopped() hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],       minPartitions).map(pair => pair._2.toString) } /**   * Read a directory of text files from HDFS, a local file system (available on all nodes), or any   * Hadoop-supported file system URI. Each file is read as a single record and returned in a   * key-value pair, where the key is the path of each file, the value is the content of each file.   *   * <p> For example, if you have the following files:   * {{{    *   hdfs://a-hdfs-path/part-00000   *   hdfs://a-hdfs-path/part-00001   *   ...   *   hdfs://a-hdfs-path/part-nnnnn   * }}}    *   * Do `val rdd = sparkContext.wholeTextFile("hdfs://a-hdfs-path")`,   *   * <p> then `rdd` contains   * {{{    *   (a-hdfs-path/part-00000, its content)   *   (a-hdfs-path/part-00001, its content)   *   ...   *   (a-hdfs-path/part-nnnnn, its content)   * }}}    *   * @note Small files are preferred, large file is also allowable, but may cause bad performance.   * @note On some filesystems, `.../path/&#42;` can be a more efficient way to read all files   *       in a directory rather than `.../path/` or `.../path`    *   * @param path Directory to the input data files, the path can be comma separated paths as the   *             list of inputs.   * @param minPartitions A suggestion value of the minimal splitting number for input data.   */  def wholeTextFiles( path: String,       minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withScope { assertNotStopped() val job = new NewHadoopJob(hadoopConfiguration) // Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking     // comma separated files as input. (see SPARK-7155)     NewFileInputFormat.setInputPaths(job, path) val updateConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job) new WholeTextFileRDD( this,       classOf[WholeTextFileInputFormat],       classOf[Text],       classOf[Text],       updateConf,       minPartitions).setName(path).map(record => (record._1.toString, record._2.toString)) } /**   * Get an RDD for a Hadoop-readable dataset as PortableDataStream for each file   * (useful for binary data)   *   * For example, if you have the following files:   * {{{    *   hdfs://a-hdfs-path/part-00000   *   hdfs://a-hdfs-path/part-00001   *   ...   *   hdfs://a-hdfs-path/part-nnnnn   * }}}    *   * Do   * `val rdd = sparkContext.binaryFiles("hdfs://a-hdfs-path")`,   *   * then `rdd` contains   * {{{    *   (a-hdfs-path/part-00000, its content)   *   (a-hdfs-path/part-00001, its content)   *   ...   *   (a-hdfs-path/part-nnnnn, its content)   * }}}    *   * @note Small files are preferred; very large files may cause bad performance.   * @note On some filesystems, `.../path/&#42;` can be a more efficient way to read all files   *       in a directory rather than `.../path/` or `.../path`    *   * @param path Directory to the input data files, the path can be comma separated paths as the   *             list of inputs.   * @param minPartitions A suggestion value of the minimal splitting number for input data.   */  def binaryFiles( path: String,       minPartitions: Int = defaultMinPartitions): RDD[(String, PortableDataStream)] = withScope { assertNotStopped() val job = new NewHadoopJob(hadoopConfiguration) // Use setInputPaths so that binaryFiles aligns with hadoopFile/textFile in taking     // comma separated files as input. (see SPARK-7155)     NewFileInputFormat.setInputPaths(job, path) val updateConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job) new BinaryFileRDD( this,       classOf[StreamInputFormat],       classOf[String],       classOf[PortableDataStream],       updateConf,       minPartitions).setName(path) } /**   * Load data from a flat binary file, assuming the length of each record is constant.   *   * '''Note:''' We ensure that the byte array for each record in the resulting RDD   * has the provided record length.   *   * @param path Directory to the input data files, the path can be comma separated paths as the   *             list of inputs.   * @param recordLength The length at which to split the records   * @param conf Configuration for setting up the dataset.   *   * @return An RDD of data with values, represented as byte arrays   */  def binaryRecords( path: String,       recordLength: Int,       conf: Configuration = hadoopConfiguration): RDD[Array[Byte]] = withScope { assertNotStopped() conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength) val br = newAPIHadoopFile[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](path,       classOf[FixedLengthBinaryInputFormat],       classOf[LongWritable],       classOf[BytesWritable],       conf = conf) val data = br.map { case (k, v) => val bytes = v.getBytes assert(bytes.length == recordLength, "Byte array does not have correct length") bytes } data } /**   * Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf given its InputFormat and other   * necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable),   * using the older MapReduce API (`org.apache.hadoop.mapred`).   *   * @param conf JobConf for setting up the dataset. Note: This will be put into a Broadcast.   *             Therefore if you plan to reuse this conf to create multiple RDDs, you need to make   *             sure you won't modify the conf. A safe approach is always creating a new conf for   *             a new RDD.   * @param inputFormatClass Class of the InputFormat   * @param keyClass Class of the keys   * @param valueClass Class of the values   * @param minPartitions Minimum number of Hadoop Splits to generate.   *   * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each   * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle   * operation will create many references to the same object.   * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first   * copy them using a `map` function.   */  def hadoopRDD[K, V]( conf: JobConf,       inputFormatClass: Class[_ <: InputFormat[K, V]],       keyClass: Class[K],       valueClass: Class[V],       minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope { assertNotStopped() // Add necessary security credentials to the JobConf before broadcasting it.     SparkHadoopUtil.get.addCredentials(conf) new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minPartitions) } /** Get an RDD for a Hadoop file with an arbitrary InputFormat   *   * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each   * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle   * operation will create many references to the same object.   * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first   * copy them using a `map` function.   */  def hadoopFile[K, V]( path: String,       inputFormatClass: Class[_ <: InputFormat[K, V]],       keyClass: Class[K],       valueClass: Class[V],       minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope { assertNotStopped() // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.     val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration)) val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path) new HadoopRDD( this,       confBroadcast,       Some(setInputPathsFunc),       inputFormatClass,       keyClass,       valueClass,       minPartitions).setName(path) } /**   * Smarter version of hadoopFile() that uses class tags to figure out the classes of keys,   * values and the InputFormat so that users don't need to pass them directly. Instead, callers   * can just write, for example,   * {{{    * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minPartitions)   * }}}    *   * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each   * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle   * operation will create many references to the same object.   * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first   * copy them using a `map` function.   */  def hadoopFile[K, V, F <: InputFormat[K, V]] (path: String, minPartitions: Int) (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = withScope { hadoopFile(path,       fm.runtimeClass.asInstanceOf[Class[F]],       km.runtimeClass.asInstanceOf[Class[K]],       vm.runtimeClass.asInstanceOf[Class[V]],       minPartitions) } /**   * Smarter version of hadoopFile() that uses class tags to figure out the classes of keys,   * values and the InputFormat so that users don't need to pass them directly. Instead, callers   * can just write, for example,   * {{{    * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path)   * }}}    *   * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each   * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle   * operation will create many references to the same object.   * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first   * copy them using a `map` function.   */  def hadoopFile[K, V, F <: InputFormat[K, V]](path: String) (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = withScope { hadoopFile[K, V, F](path, defaultMinPartitions) } /** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */  def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]] (path: String) (implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = withScope { newAPIHadoopFile( path,       fm.runtimeClass.asInstanceOf[Class[F]],       km.runtimeClass.asInstanceOf[Class[K]],       vm.runtimeClass.asInstanceOf[Class[V]]) } /**   * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat   * and extra configuration options to pass to the input format.   *   * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each   * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle   * operation will create many references to the same object.   * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first   * copy them using a `map` function.   */  def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]( path: String,       fClass: Class[F],       kClass: Class[K],       vClass: Class[V],       conf: Configuration = hadoopConfiguration): RDD[(K, V)] = withScope { assertNotStopped() // The call to new NewHadoopJob automatically adds security credentials to conf,     // so we don't need to explicitly add them ourselves     val job = new NewHadoopJob(conf) // Use setInputPaths so that newAPIHadoopFile aligns with hadoopFile/textFile in taking     // comma separated files as input. (see SPARK-7155)     NewFileInputFormat.setInputPaths(job, path) val updatedConf = SparkHadoopUtil.get.getConfigurationFromJobContext(job) new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf).setName(path) } /**   * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat   * and extra configuration options to pass to the input format.   *   * @param conf Configuration for setting up the dataset. Note: This will be put into a Broadcast.   *             Therefore if you plan to reuse this conf to create multiple RDDs, you need to make   *             sure you won't modify the conf. A safe approach is always creating a new conf for   *             a new RDD.   * @param fClass Class of the InputFormat   * @param kClass Class of the keys   * @param vClass Class of the values   *   * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each   * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle   * operation will create many references to the same object.   * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first   * copy them using a `map` function.   */  def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]]( conf: Configuration = hadoopConfiguration,       fClass: Class[F],       kClass: Class[K],       vClass: Class[V]): RDD[(K, V)] = withScope { assertNotStopped() // Add necessary security credentials to the JobConf. Required to access secure HDFS.     val jconf = new JobConf(conf) SparkHadoopUtil.get.addCredentials(jconf) new NewHadoopRDD(this, fClass, kClass, vClass, jconf) } /** Get an RDD for a Hadoop SequenceFile with given key and value types.    *    * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each    * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle    * operation will create many references to the same object.    * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first    * copy them using a `map` function.    */  def sequenceFile[K, V](path: String,       keyClass: Class[K],       valueClass: Class[V],       minPartitions: Int ): RDD[(K, V)] = withScope { assertNotStopped() val inputFormatClass = classOf[SequenceFileInputFormat[K, V]] hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions) } /** Get an RDD for a Hadoop SequenceFile with given key and value types.    *    * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each    * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle    * operation will create many references to the same object.    * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first    * copy them using a `map` function.    * */  def sequenceFile[K, V]( path: String,       keyClass: Class[K],       valueClass: Class[V]): RDD[(K, V)] = withScope { assertNotStopped() sequenceFile(path, keyClass, valueClass, defaultMinPartitions) } /**   * Version of sequenceFile() for types implicitly convertible to Writables through a   * WritableConverter. For example, to access a SequenceFile where the keys are Text and the   * values are IntWritable, you could simply write   * {{{    * sparkContext.sequenceFile[String, Int](path, ...)   * }}}    *   * WritableConverters are provided in a somewhat strange way (by an implicit function) to support   * both subclasses of Writable and types for which we define a converter (e.g. Int to   * IntWritable). The most natural thing would've been to have implicit objects for the   * converters, but then we couldn't have an object for every subclass of Writable (you can't   * have a parameterized singleton object). We use functions instead to create a new converter   * for the appropriate type. In addition, we pass the converter a ClassTag of its type to   * allow it to figure out the Writable class to use in the subclass case.   *   * '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable object for each   * record, directly caching the returned RDD or directly passing it to an aggregation or shuffle   * operation will create many references to the same object.   * If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first   * copy them using a `map` function.   */   def sequenceFile[K, V] (path: String, minPartitions: Int = defaultMinPartitions) (implicit km: ClassTag[K], vm: ClassTag[V],         kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]): RDD[(K, V)] = { withScope { assertNotStopped() val kc = clean(kcf)() val vc = clean(vcf)() val format = classOf[SequenceFileInputFormat[Writable, Writable]] val writables = hadoopFile(path, format,         kc.writableClass(km).asInstanceOf[Class[Writable]],         vc.writableClass(vm).asInstanceOf[Class[Writable]], minPartitions) writables.map { case (k, v) => (kc.convert(k), vc.convert(v)) } } } /**   * Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and   * BytesWritable values that contain a serialized partition. This is still an experimental   * storage format and may not be supported exactly as is in future Spark releases. It will also   * be pretty slow if you use the default serializer (Java serialization),   * though the nice thing about it is that there's very little effort required to save arbitrary   * objects.   */  def objectFile[T: ClassTag]( path: String,       minPartitions: Int = defaultMinPartitions): RDD[T] = withScope { assertNotStopped() sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minPartitions) .flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes, Utils.getContextOrSparkClassLoader)) } protected[spark] def checkpointFile[T: ClassTag](path: String): RDD[T] = withScope { new ReliableCheckpointRDD[T](this, path) } /** Build the union of a list of RDDs. */  def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = withScope { val partitioners = rdds.flatMap(_.partitioner).toSet if (rdds.forall(_.partitioner.isDefined) && partitioners.size == 1) { new PartitionerAwareUnionRDD(this, rdds) } else { new UnionRDD(this, rdds) } } /** Build the union of a list of RDDs passed as variable-length arguments. */  def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] = withScope { union(Seq(first) ++ rest) } /** Get an RDD that has no partitions or elements. */  def emptyRDD[T: ClassTag]: EmptyRDD[T] = new EmptyRDD[T](this) // Methods for creating shared variables   /**   * Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add"   * values to using the `+=` method. Only the driver can access the accumulator's `value`.   */  def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]): Accumulator[T] = { val acc = new Accumulator(initialValue, param) cleaner.foreach(_.registerAccumulatorForCleanup(acc)) acc } /**   * Create an [[org.apache.spark.Accumulator]] variable of a given type, with a name for display   * in the Spark UI. Tasks can "add" values to the accumulator using the `+=` method. Only the   * driver can access the accumulator's `value`.   */  def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T]) : Accumulator[T] = { val acc = new Accumulator(initialValue, param, Some(name)) cleaner.foreach(_.registerAccumulatorForCleanup(acc)) acc } /**   * Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values   * with `+=`. Only the driver can access the accumuable's `value`.   * @tparam R accumulator result type   * @tparam T type that can be added to the accumulator   */  def accumulable[R, T](initialValue: R)(implicit param: AccumulableParam[R, T]) : Accumulable[R, T] = { val acc = new Accumulable(initialValue, param) cleaner.foreach(_.registerAccumulatorForCleanup(acc)) acc } /**   * Create an [[org.apache.spark.Accumulable]] shared variable, with a name for display in the   * Spark UI. Tasks can add values to the accumuable using the `+=` operator. Only the driver can   * access the accumuable's `value`.   * @tparam R accumulator result type   * @tparam T type that can be added to the accumulator   */  def accumulable[R, T](initialValue: R, name: String)(implicit param: AccumulableParam[R, T]) : Accumulable[R, T] = { val acc = new Accumulable(initialValue, param, Some(name)) cleaner.foreach(_.registerAccumulatorForCleanup(acc)) acc } /**   * Create an accumulator from a "mutable collection" type.   *   * Growable and TraversableOnce are the standard APIs that guarantee += and ++=, implemented by   * standard mutable collections. So you can use this with mutable Map, Set, etc.   */  def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T] (initialValue: R): Accumulable[R, T] = { val param = new GrowableAccumulableParam[R, T] val acc = new Accumulable(initialValue, param) cleaner.foreach(_.registerAccumulatorForCleanup(acc)) acc } /**   * Broadcast a read-only variable to the cluster, returning a   * [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.   * The variable will be sent to each cluster only once.   */  def broadcast[T: ClassTag](value: T): Broadcast[T] = { assertNotStopped() if (classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass)) { // This is a warning instead of an exception in order to avoid breaking user programs that       // might have created RDD broadcast variables but not used them:       logWarning("Can not directly broadcast RDDs; instead, call collect() and "         + "broadcast the result (see SPARK-5063)") } val bc = env.broadcastManager.newBroadcast[T](value, isLocal) val callSite = getCallSite logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm) cleaner.foreach(_.registerBroadcastForCleanup(bc)) bc } /**   * Add a file to be downloaded with this Spark job on every node.   * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported   * filesystems), or an HTTP, HTTPS or FTP URI.  To access the file in Spark jobs,   * use `SparkFiles.get(fileName)` to find its download location.   */  def addFile(path: String): Unit = { addFile(path, false) } /**   * Add a file to be downloaded with this Spark job on every node.   * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported   * filesystems), or an HTTP, HTTPS or FTP URI.  To access the file in Spark jobs,   * use `SparkFiles.get(fileName)` to find its download location.   *   * A directory can be given if the recursive option is set to true. Currently directories are only   * supported for Hadoop-supported filesystems.   */  def addFile(path: String, recursive: Boolean): Unit = { val uri = new URI(path) val schemeCorrectedPath = uri.getScheme match { case null | "local" => new File(path).getCanonicalFile.toURI.toString case _ => path } val hadoopPath = new Path(schemeCorrectedPath) val scheme = new URI(schemeCorrectedPath).getScheme if (!Array("http", "https", "ftp").contains(scheme)) { val fs = hadoopPath.getFileSystem(hadoopConfiguration) if (!fs.exists(hadoopPath)) { throw new FileNotFoundException(s"Added file $hadoopPath does not exist.") } val isDir = fs.getFileStatus(hadoopPath).isDir if (!isLocal && scheme == "file" && isDir) { throw new SparkException(s"addFile does not support local directories when not running " + "local mode.") } if (!recursive && isDir) { throw new SparkException(s"Added file $hadoopPath is a directory and recursive is not " + "turned on.") } } val key = if (!isLocal && scheme == "file") { env.rpcEnv.fileServer.addFile(new File(uri.getPath)) } else { schemeCorrectedPath } val timestamp = System.currentTimeMillis addedFiles(key) = timestamp // Fetch the file locally in case a job is executed using DAGScheduler.runLocally().     Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()), conf, env.securityManager,       hadoopConfiguration, timestamp, useCache = false) logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key)) postEnvironmentUpdate() } /**   * :: DeveloperApi ::   * Register a listener to receive up-calls from events that happen during execution.   */  @DeveloperApi   def addSparkListener(listener: SparkListener) { listenerBus.addListener(listener) } /**   * Update the cluster manager on our scheduling needs. Three bits of information are included   * to help it make decisions.   * @param numExecutors The total number of executors we'd like to have. The cluster manager   *                     shouldn't kill any running executor to reach this number, but,   *                     if all existing executors were to die, this is the number of executors   *                     we'd want to be allocated.   * @param localityAwareTasks The number of tasks in all active stages that have a locality   *                           preferences. This includes running, pending, and completed tasks.   * @param hostToLocalTaskCount A map of hosts to the number of tasks from all active stages   *                             that would like to like to run on that host.   *                             This includes running, pending, and completed tasks.   * @return whether the request is acknowledged by the cluster manager.   */  private[spark] override def requestTotalExecutors( numExecutors: Int,       localityAwareTasks: Int,       hostToLocalTaskCount: scala.collection.immutable.Map[String, Int] ): Boolean = { schedulerBackend match { case b: CoarseGrainedSchedulerBackend => b.requestTotalExecutors(numExecutors, localityAwareTasks, hostToLocalTaskCount) case _ => logWarning("Requesting executors is only supported in coarse-grained mode") false    } } /**   * :: DeveloperApi ::   * Request an additional number of executors from the cluster manager.   * @return whether the request is received.   */  @DeveloperApi   override def requestExecutors(numAdditionalExecutors: Int): Boolean = { schedulerBackend match { case b: CoarseGrainedSchedulerBackend => b.requestExecutors(numAdditionalExecutors) case _ => logWarning("Requesting executors is only supported in coarse-grained mode") false    } } /**   * :: DeveloperApi ::   * Request that the cluster manager kill the specified executors.   *   * Note: This is an indication to the cluster manager that the application wishes to adjust   * its resource usage downwards. If the application wishes to replace the executors it kills   * through this method with new ones, it should follow up explicitly with a call to   * {{SparkContext#requestExecutors}}.   *   * @return whether the request is received.   */  @DeveloperApi   override def killExecutors(executorIds: Seq[String]): Boolean = { schedulerBackend match { case b: CoarseGrainedSchedulerBackend => b.killExecutors(executorIds, replace = false, force = true) case _ => logWarning("Killing executors is only supported in coarse-grained mode") false    } } /**   * :: DeveloperApi ::   * Request that the cluster manager kill the specified executor.   *   * Note: This is an indication to the cluster manager that the application wishes to adjust   * its resource usage downwards. If the application wishes to replace the executor it kills   * through this method with a new one, it should follow up explicitly with a call to   * {{SparkContext#requestExecutors}}.   *   * @return whether the request is received.   */  @DeveloperApi   override def killExecutor(executorId: String): Boolean = super.killExecutor(executorId) /**   * Request that the cluster manager kill the specified executor without adjusting the   * application resource requirements.   *   * The effect is that a new executor will be launched in place of the one killed by   * this request. This assumes the cluster manager will automatically and eventually   * fulfill all missing application resource requests.   *   * Note: The replace is by no means guaranteed; another application on the same cluster   * can steal the window of opportunity and acquire this application's resources in the   * mean time.   *   * @return whether the request is received.   */  private[spark] def killAndReplaceExecutor(executorId: String): Boolean = { schedulerBackend match { case b: CoarseGrainedSchedulerBackend => b.killExecutors(Seq(executorId), replace = true, force = true) case _ => logWarning("Killing executors is only supported in coarse-grained mode") false    } } /** The version of Spark on which this application is running. */  def version: String = SPARK_VERSION  /**   * Return a map from the slave to the max memory available for caching and the remaining   * memory available for caching.   */  def getExecutorMemoryStatus: Map[String, (Long, Long)] = { assertNotStopped() env.blockManager.master.getMemoryStatus.map { case(blockManagerId, mem) => (blockManagerId.host + ":" + blockManagerId.port, mem) } } /**   * :: DeveloperApi ::   * Return information about what RDDs are cached, if they are in mem or on disk, how much space   * they take, etc.   */  @DeveloperApi   def getRDDStorageInfo: Array[RDDInfo] = { getRDDStorageInfo(_ => true) } private[spark] def getRDDStorageInfo(filter: RDD[_] => Boolean): Array[RDDInfo] = { assertNotStopped() val rddInfos = persistentRdds.values.filter(filter).map(RDDInfo.fromRdd).toArray StorageUtils.updateRddInfo(rddInfos, getExecutorStorageStatus) rddInfos.filter(_.isCached) } /**   * Returns an immutable map of RDDs that have marked themselves as persistent via cache() call.   * Note that this does not necessarily mean the caching or computation was successful.   */  def getPersistentRDDs: Map[Int, RDD[_]] = persistentRdds.toMap /**   * :: DeveloperApi ::   * Return information about blocks stored in all of the slaves   */  @DeveloperApi   def getExecutorStorageStatus: Array[StorageStatus] = { assertNotStopped() env.blockManager.master.getStorageStatus } /**   * :: DeveloperApi ::   * Return pools for fair scheduler   */  @DeveloperApi   def getAllPools: Seq[Schedulable] = { assertNotStopped() // TODO(xiajunluan): We should take nested pools into account    taskScheduler.rootPool.schedulableQueue.asScala.toSeq } /**   * :: DeveloperApi ::   * Return the pool associated with the given name, if one exists   */  @DeveloperApi   def getPoolForName(pool: String): Option[Schedulable] = { assertNotStopped() Option(taskScheduler.rootPool.schedulableNameToSchedulable.get(pool)) } /**   * Return current scheduling mode   */  def getSchedulingMode: SchedulingMode.SchedulingMode = { assertNotStopped() taskScheduler.schedulingMode } /**   * Clear the job's list of files added by `addFile` so that they do not get downloaded to   * any new nodes.   */  @deprecated("adding files no longer creates local copies that need to be deleted", "1.0.0") def clearFiles() { addedFiles.clear() } /**   * Gets the locality information associated with the partition in a particular rdd   * @param rdd of interest   * @param partition to be looked up for locality   * @return list of preferred locations for the partition   */  private [spark] def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = { dagScheduler.getPreferredLocs(rdd, partition) } /**   * Register an RDD to be persisted in memory and/or disk storage   */  private[spark] def persistRDD(rdd: RDD[_]) { persistentRdds(rdd.id) = rdd } /**   * Unpersist an RDD from memory and/or disk storage   */  private[spark] def unpersistRDD(rddId: Int, blocking: Boolean = true) { env.blockManager.master.removeRdd(rddId, blocking) persistentRdds.remove(rddId) listenerBus.post(SparkListenerUnpersistRDD(rddId)) } /**   * Adds a JAR dependency for all tasks to be executed on this SparkContext in the future.   * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported   * filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.   */  def addJar(path: String) { if (path == null) { logWarning("null specified as parameter to addJar") } else { var key = ""       if (path.contains("\\")) { // For local paths with backslashes on Windows, URI throws an exception         key = env.rpcEnv.fileServer.addJar(new File(path)) } else { val uri = new URI(path) key = uri.getScheme match { // A JAR file which exists only on the driver node           case null | "file" => // yarn-standalone is deprecated, but still supported             if (SparkHadoopUtil.get.isYarnMode() && (master == "yarn-standalone" || master == "yarn-cluster")) { // In order for this to work in yarn-cluster mode the user must specify the               // --addJars option to the client to upload the file into the distributed cache               // of the AM to make it show up in the current working directory.               val fileName = new Path(uri.getPath).getName() try { env.rpcEnv.fileServer.addJar(new File(fileName)) } catch { case e: Exception => // For now just log an error but allow to go through so spark examples work.                   // The spark examples don't really need the jar distributed since its also                   // the app jar.                   logError("Error adding jar (" + e + "), was the --addJars option used?") null              } } else { try { env.rpcEnv.fileServer.addJar(new File(uri.getPath)) } catch { case exc: FileNotFoundException => logError(s"Jar not found at $path") null                case e: Exception => // For now just log an error but allow to go through so spark examples work.                   // The spark examples don't really need the jar distributed since its also                   // the app jar.                   logError("Error adding jar (" + e + "), was the --addJars option used?") null              } } // A JAR file which exists locally on every worker node           case "local" => "file:" + uri.getPath case _ => path } } if (key != null) { addedJars(key) = System.currentTimeMillis logInfo("Added JAR " + path + " at " + key + " with timestamp " + addedJars(key)) } } postEnvironmentUpdate() } /**   * Clear the job's list of JARs added by `addJar` so that they do not get downloaded to   * any new nodes.   */  @deprecated("adding jars no longer creates local copies that need to be deleted", "1.0.0") def clearJars() { addedJars.clear() } // Shut down the SparkContext.   def stop() { if (AsynchronousListenerBus.withinListenerThread.value) { throw new SparkException("Cannot stop SparkContext within listener thread of" + " AsynchronousListenerBus") } // Use the stopping variable to ensure no contention for the stop scenario.     // Still track the stopped variable for use elsewhere in the code.     if (!stopped.compareAndSet(false, true)) { logInfo("SparkContext already stopped.") return    } if (_shutdownHookRef != null) { ShutdownHookManager.removeShutdownHook(_shutdownHookRef) } Utils.tryLogNonFatalError { postApplicationEnd() } Utils.tryLogNonFatalError { _ui.foreach(_.stop()) } if (env != null) { Utils.tryLogNonFatalError { env.metricsSystem.report() } } if (metadataCleaner != null) { Utils.tryLogNonFatalError { metadataCleaner.cancel() } } Utils.tryLogNonFatalError { _cleaner.foreach(_.stop()) } Utils.tryLogNonFatalError { _executorAllocationManager.foreach(_.stop()) } if (_dagScheduler != null) { Utils.tryLogNonFatalError { _dagScheduler.stop() } _dagScheduler = null    } if (_listenerBusStarted) { Utils.tryLogNonFatalError { listenerBus.stop() _listenerBusStarted = false      } } Utils.tryLogNonFatalError { _eventLogger.foreach(_.stop()) } if (env != null && _heartbeatReceiver != null) { Utils.tryLogNonFatalError { env.rpcEnv.stop(_heartbeatReceiver) } } Utils.tryLogNonFatalError { _progressBar.foreach(_.stop()) } _taskScheduler = null    // TODO: Cache.stop()?    if (_env != null) { Utils.tryLogNonFatalError { _env.stop() } SparkEnv.set(null) } // Unset YARN mode system env variable, to allow switching between cluster types.     System.clearProperty("SPARK_YARN_MODE") SparkContext.clearActiveContext() logInfo("Successfully stopped SparkContext") } /**   * Get Spark's home location from either a value set through the constructor,   * or the spark.home Java property, or the SPARK_HOME environment variable   * (in that order of preference). If neither of these is set, return None.   */  private[spark] def getSparkHome(): Option[String] = { conf.getOption("spark.home").orElse(Option(System.getenv("SPARK_HOME"))) } /**   * Set the thread-local property for overriding the call sites   * of actions and RDDs.   */  def setCallSite(shortCallSite: String) { setLocalProperty(CallSite.SHORT_FORM, shortCallSite) } /**   * Set the thread-local property for overriding the call sites   * of actions and RDDs.   */  private[spark] def setCallSite(callSite: CallSite) { setLocalProperty(CallSite.SHORT_FORM, callSite.shortForm) setLocalProperty(CallSite.LONG_FORM, callSite.longForm) } /**   * Clear the thread-local property for overriding the call sites   * of actions and RDDs.   */  def clearCallSite() { setLocalProperty(CallSite.SHORT_FORM, null) setLocalProperty(CallSite.LONG_FORM, null) } /**   * Capture the current user callsite and return a formatted version for printing. If the user   * has overridden the call site using `setCallSite()`, this will return the user's version.   */  private[spark] def getCallSite(): CallSite = { val callSite = Utils.getCallSite() CallSite( Option(getLocalProperty(CallSite.SHORT_FORM)).getOrElse(callSite.shortForm),       Option(getLocalProperty(CallSite.LONG_FORM)).getOrElse(callSite.longForm) ) } /**   * Run a function on a given set of partitions in an RDD and pass the results to the given   * handler function. This is the main entry point for all actions in Spark.   */  def runJob[T, U: ClassTag]( rdd: RDD[T],       func: (TaskContext, Iterator[T]) => U,       partitions: Seq[Int],       resultHandler: (Int, U) => Unit): Unit = { if (stopped.get()) { throw new IllegalStateException("SparkContext has been shutdown") } val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite.shortForm) if (conf.getBoolean("spark.logLineage", false)) { logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString) } dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) progressBar.foreach(_.finishAll()) rdd.doCheckpoint() } /**   * Run a function on a given set of partitions in an RDD and return the results as an array.   */  def runJob[T, U: ClassTag]( rdd: RDD[T],       func: (TaskContext, Iterator[T]) => U,       partitions: Seq[Int]): Array[U] = { val results = new Array[U](partitions.size) runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res) results } /**   * Run a job on a given set of partitions of an RDD, but take a function of type   * `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`.   */  def runJob[T, U: ClassTag]( rdd: RDD[T],       func: Iterator[T] => U,       partitions: Seq[Int]): Array[U] = { val cleanedFunc = clean(func) runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions) } /**   * Run a function on a given set of partitions in an RDD and pass the results to the given   * handler function. This is the main entry point for all actions in Spark.   *   * The allowLocal flag is deprecated as of Spark 1.5.0+.   */  @deprecated("use the version of runJob without the allowLocal parameter", "1.5.0") def runJob[T, U: ClassTag]( rdd: RDD[T],       func: (TaskContext, Iterator[T]) => U,       partitions: Seq[Int],       allowLocal: Boolean,       resultHandler: (Int, U) => Unit): Unit = { if (allowLocal) { logWarning("sc.runJob with allowLocal=true is deprecated in Spark 1.5.0+") } runJob(rdd, func, partitions, resultHandler) } /**   * Run a function on a given set of partitions in an RDD and return the results as an array.   *   * The allowLocal flag is deprecated as of Spark 1.5.0+.   */  @deprecated("use the version of runJob without the allowLocal parameter", "1.5.0") def runJob[T, U: ClassTag]( rdd: RDD[T],       func: (TaskContext, Iterator[T]) => U,       partitions: Seq[Int],       allowLocal: Boolean ): Array[U] = { if (allowLocal) { logWarning("sc.runJob with allowLocal=true is deprecated in Spark 1.5.0+") } runJob(rdd, func, partitions) } /**   * Run a job on a given set of partitions of an RDD, but take a function of type   * `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`.   *   * The allowLocal argument is deprecated as of Spark 1.5.0+.   */  @deprecated("use the version of runJob without the allowLocal parameter", "1.5.0") def runJob[T, U: ClassTag]( rdd: RDD[T],       func: Iterator[T] => U,       partitions: Seq[Int],       allowLocal: Boolean ): Array[U] = { if (allowLocal) { logWarning("sc.runJob with allowLocal=true is deprecated in Spark 1.5.0+") } runJob(rdd, func, partitions) } /**   * Run a job on all partitions in an RDD and return the results in an array.   */  def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U] = { runJob(rdd, func, 0 until rdd.partitions.length) } /**   * Run a job on all partitions in an RDD and return the results in an array.   */  def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = { runJob(rdd, func, 0 until rdd.partitions.length) } /**   * Run a job on all partitions in an RDD and pass the results to a handler function.   */  def runJob[T, U: ClassTag]( rdd: RDD[T],     processPartition: (TaskContext, Iterator[T]) => U,     resultHandler: (Int, U) => Unit) { runJob[T, U](rdd, processPartition, 0 until rdd.partitions.length, resultHandler) } /**   * Run a job on all partitions in an RDD and pass the results to a handler function.   */  def runJob[T, U: ClassTag]( rdd: RDD[T],       processPartition: Iterator[T] => U,       resultHandler: (Int, U) => Unit) { val processFunc = (context: TaskContext, iter: Iterator[T]) => processPartition(iter) runJob[T, U](rdd, processFunc, 0 until rdd.partitions.length, resultHandler) } /**   * :: DeveloperApi ::   * Run a job that can return approximate results.   */  @DeveloperApi   def runApproximateJob[T, U, R]( rdd: RDD[T],       func: (TaskContext, Iterator[T]) => U,       evaluator: ApproximateEvaluator[U, R],       timeout: Long): PartialResult[R] = { assertNotStopped() val callSite = getCallSite logInfo("Starting job: " + callSite.shortForm) val start = System.nanoTime val cleanedFunc = clean(func) val result = dagScheduler.runApproximateJob(rdd, cleanedFunc, evaluator, callSite, timeout,       localProperties.get) logInfo( "Job finished: " + callSite.shortForm + ", took " + (System.nanoTime - start) / 1e9 + " s") result } /**   * Submit a job for execution and return a FutureJob holding the result.   */  def submitJob[T, U, R]( rdd: RDD[T],       processPartition: Iterator[T] => U,       partitions: Seq[Int],       resultHandler: (Int, U) => Unit,       resultFunc: => R): SimpleFutureAction[R] = { assertNotStopped() val cleanF = clean(processPartition) val callSite = getCallSite val waiter = dagScheduler.submitJob( rdd,       (context: TaskContext, iter: Iterator[T]) => cleanF(iter),       partitions,       callSite,       resultHandler,       localProperties.get) new SimpleFutureAction(waiter, resultFunc) } /**   * Submit a map stage for execution. This is currently an internal API only, but might be   * promoted to DeveloperApi in the future.   */  private[spark] def submitMapStage[K, V, C](dependency: ShuffleDependency[K, V, C]) : SimpleFutureAction[MapOutputStatistics] = { assertNotStopped() val callSite = getCallSite() var result: MapOutputStatistics = null    val waiter = dagScheduler.submitMapStage( dependency,       (r: MapOutputStatistics) => { result = r },       callSite,       localProperties.get) new SimpleFutureAction[MapOutputStatistics](waiter, result) } /**   * Cancel active jobs for the specified group. See [[org.apache.spark.SparkContext.setJobGroup]]    * for more information.   */  def cancelJobGroup(groupId: String) { assertNotStopped() dagScheduler.cancelJobGroup(groupId) } /** Cancel all jobs that have been scheduled or are running.  */  def cancelAllJobs() { assertNotStopped() dagScheduler.cancelAllJobs() } /** Cancel a given job if it's scheduled or running */  private[spark] def cancelJob(jobId: Int) { dagScheduler.cancelJob(jobId) } /** Cancel a given stage and all jobs associated with it */  private[spark] def cancelStage(stageId: Int) { dagScheduler.cancelStage(stageId) } /**   * Clean a closure to make it ready to serialized and send to tasks   * (removes unreferenced variables in $outer's, updates REPL variables)   * If <tt>checkSerializable</tt> is set, <tt>clean</tt> will also proactively   * check to see if <tt>f</tt> is serializable and throw a <tt>SparkException</tt>    * if not.   *   * @param f the closure to clean   * @param checkSerializable whether or not to immediately check <tt>f</tt> for serializability   * @throws SparkException if <tt>checkSerializable</tt> is set but <tt>f</tt> is not   *   serializable   */  private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = { ClosureCleaner.clean(f, checkSerializable) f } /**   * Set the directory under which RDDs are going to be checkpointed. The directory must   * be a HDFS path if running on a cluster.   */  def setCheckpointDir(directory: String) { // If we are running on a cluster, log a warning if the directory is local.     // Otherwise, the driver may attempt to reconstruct the checkpointed RDD from     // its own local file system, which is incorrect because the checkpoint files     // are actually on the executor machines.     if (!isLocal && Utils.nonLocalPaths(directory).isEmpty) { logWarning("Checkpoint directory must be non-local " + "if Spark is running on a cluster: " + directory) } checkpointDir = Option(directory).map { dir => val path = new Path(dir, UUID.randomUUID().toString) val fs = path.getFileSystem(hadoopConfiguration) fs.mkdirs(path) fs.getFileStatus(path).getPath.toString } } def getCheckpointDir: Option[String] = checkpointDir  /** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */  def defaultParallelism: Int = { assertNotStopped() taskScheduler.defaultParallelism } /** Default min number of partitions for Hadoop RDDs when not given by user */  @deprecated("use defaultMinPartitions", "1.0.0") def defaultMinSplits: Int = math.min(defaultParallelism, 2) /**   * Default min number of partitions for Hadoop RDDs when not given by user   * Notice that we use math.min so the "defaultMinPartitions" cannot be higher than 2.   * The reasons for this are discussed in https://github.com/mesos/spark/pull/718   */  def defaultMinPartitions: Int = math.min(defaultParallelism, 2) private val nextShuffleId = new AtomicInteger(0) private[spark] def newShuffleId(): Int = nextShuffleId.getAndIncrement() private val nextRddId = new AtomicInteger(0) /** Register a new RDD, returning its RDD ID */  private[spark] def newRddId(): Int = nextRddId.getAndIncrement() /**   * Registers listeners specified in spark.extraListeners, then starts the listener bus.   * This should be called after all internal listeners have been registered with the listener bus   * (e.g. after the web UI and event logging listeners have been registered).   */  private def setupAndStartListenerBus(): Unit = { // Use reflection to instantiate listeners specified via `spark.extraListeners`     try { val listenerClassNames: Seq[String] = conf.get("spark.extraListeners", "").split(',').map(_.trim).filter(_ != "") for (className <- listenerClassNames) { // Use reflection to find the right constructor         val constructors = { val listenerClass = Utils.classForName(className) listenerClass.getConstructors.asInstanceOf[Array[Constructor[_ <: SparkListener]]] } val constructorTakingSparkConf = constructors.find { c => c.getParameterTypes.sameElements(Array(classOf[SparkConf])) } lazy val zeroArgumentConstructor = constructors.find { c => c.getParameterTypes.isEmpty } val listener: SparkListener = { if (constructorTakingSparkConf.isDefined) { constructorTakingSparkConf.get.newInstance(conf) } else if (zeroArgumentConstructor.isDefined) { zeroArgumentConstructor.get.newInstance() } else { throw new SparkException( s"$className did not have a zero-argument constructor or a" + " single-argument constructor that accepts SparkConf. Note: if the class is" + " defined inside of another Scala class, then its constructors may accept an" + " implicit parameter that references the enclosing class; in this case, you must" + " define the listener as a top-level class in order to prevent this extra" + " parameter from breaking Spark's ability to find a valid constructor.") } } listenerBus.addListener(listener) logInfo(s"Registered listener $className") } } catch { case e: Exception => try { stop() } finally { throw new SparkException(s"Exception when registering SparkListener", e) } } listenerBus.start(this) _listenerBusStarted = true  } /** Post the application start event */  private def postApplicationStart() { // Note: this code assumes that the task scheduler has been initialized and has contacted     // the cluster manager to get an application ID (in case the cluster manager provides one).     listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),       startTime, sparkUser, applicationAttemptId, schedulerBackend.getDriverLogUrls)) } /** Post the application end event */  private def postApplicationEnd() { listenerBus.post(SparkListenerApplicationEnd(System.currentTimeMillis)) } /** Post the environment update event once the task scheduler is ready */  private def postEnvironmentUpdate() { if (taskScheduler != null) { val schedulingMode = getSchedulingMode.toString val addedJarPaths = addedJars.keys.toSeq val addedFilePaths = addedFiles.keys.toSeq val environmentDetails = SparkEnv.environmentDetails(conf, schedulingMode, addedJarPaths,         addedFilePaths) val environmentUpdate = SparkListenerEnvironmentUpdate(environmentDetails) listenerBus.post(environmentUpdate) } } /** Called by MetadataCleaner to clean up the persistentRdds map periodically */  private[spark] def cleanup(cleanupTime: Long) { persistentRdds.clearOldValues(cleanupTime) } // In order to prevent multiple SparkContexts from being active at the same time, mark this   // context as having finished construction.   // NOTE: this must be placed at the end of the SparkContext constructor.   SparkContext.setActiveContext(this, allowMultipleContexts) } /** * The SparkContext object contains a number of implicit conversions and parameters for use with * various Spark features. */ object SparkContext extends Logging { /**   * Lock that guards access to global variables that track SparkContext construction.   */  private val SPARK_CONTEXT_CONSTRUCTOR_LOCK = new Object() /**   * The active, fully-constructed SparkContext.  If no SparkContext is active, then this is `null`.   *   * Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK.   */  private val activeContext: AtomicReference[SparkContext] = new AtomicReference[SparkContext](null) /**   * Points to a partially-constructed SparkContext if some thread is in the SparkContext   * constructor, or `None` if no SparkContext is being constructed.   *   * Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK   */  private var contextBeingConstructed: Option[SparkContext] = None /**   * Called to ensure that no other SparkContext is running in this JVM.   *   * Throws an exception if a running context is detected and logs a warning if another thread is   * constructing a SparkContext.  This warning is necessary because the current locking scheme   * prevents us from reliably distinguishing between cases where another context is being   * constructed and cases where another constructor threw an exception.   */  private def assertNoOtherContextIsRunning( sc: SparkContext,       allowMultipleContexts: Boolean): Unit = { SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { contextBeingConstructed.foreach { otherContext => if (otherContext ne sc) {  // checks for reference equality           // Since otherContext might point to a partially-constructed context, guard against           // its creationSite field being null:           val otherContextCreationSite = Option(otherContext.creationSite).map(_.longForm).getOrElse("unknown location") val warnMsg = "Another SparkContext is being constructed (or threw an exception in its" + " constructor).  This may indicate an error, since only one SparkContext may be" + " running in this JVM (see SPARK-2243)." + s" The other SparkContext was created at:\n$otherContextCreationSite"           logWarning(warnMsg) } if (activeContext.get() != null) { val ctx = activeContext.get() val errMsg = "Only one SparkContext may be running in this JVM (see SPARK-2243)." + " To ignore this error, set spark.driver.allowMultipleContexts = true. " + s"The currently running SparkContext was created at:\n${ctx.creationSite.longForm}"           val exception = new SparkException(errMsg) if (allowMultipleContexts) { logWarning("Multiple running SparkContexts detected in the same JVM!", exception) } else { throw exception } } } } } /**   * This function may be used to get or instantiate a SparkContext and register it as a   * singleton object. Because we can only have one active SparkContext per JVM,   * this is useful when applications may wish to share a SparkContext.   *   * Note: This function cannot be used to create multiple SparkContext instances   * even if multiple contexts are allowed.   */  def getOrCreate(config: SparkConf): SparkContext = { // Synchronize to ensure that multiple create requests don't trigger an exception     // from assertNoOtherContextIsRunning within setActiveContext     SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { if (activeContext.get() == null) { setActiveContext(new SparkContext(config), allowMultipleContexts = false) } activeContext.get() } } /**   * This function may be used to get or instantiate a SparkContext and register it as a   * singleton object. Because we can only have one active SparkContext per JVM,   * this is useful when applications may wish to share a SparkContext.   *   * This method allows not passing a SparkConf (useful if just retrieving).   *   * Note: This function cannot be used to create multiple SparkContext instances   * even if multiple contexts are allowed.   */  def getOrCreate(): SparkContext = { getOrCreate(new SparkConf()) } /**   * Called at the beginning of the SparkContext constructor to ensure that no SparkContext is   * running.  Throws an exception if a running context is detected and logs a warning if another   * thread is constructing a SparkContext.  This warning is necessary because the current locking   * scheme prevents us from reliably distinguishing between cases where another context is being   * constructed and cases where another constructor threw an exception.   */  private[spark] def markPartiallyConstructed( sc: SparkContext,       allowMultipleContexts: Boolean): Unit = { SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { assertNoOtherContextIsRunning(sc, allowMultipleContexts) contextBeingConstructed = Some(sc) } } /**   * Called at the end of the SparkContext constructor to ensure that no other SparkContext has   * raced with this constructor and started.   */  private[spark] def setActiveContext( sc: SparkContext,       allowMultipleContexts: Boolean): Unit = { SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { assertNoOtherContextIsRunning(sc, allowMultipleContexts) contextBeingConstructed = None activeContext.set(sc) } } /**   * Clears the active SparkContext metadata.  This is called by `SparkContext#stop()`.  It's   * also called in unit tests to prevent a flood of warnings from test suites that don't / can't   * properly clean up their SparkContexts.   */  private[spark] def clearActiveContext(): Unit = { SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { activeContext.set(null) } } private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description"   private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"   private[spark] val SPARK_JOB_INTERRUPT_ON_CANCEL = "spark.job.interruptOnCancel"   private[spark] val RDD_SCOPE_KEY = "spark.rdd.scope"   private[spark] val RDD_SCOPE_NO_OVERRIDE_KEY = "spark.rdd.scope.noOverride"   /**   * Executor id for the driver.  In earlier versions of Spark, this was `<driver>`, but this was   * changed to `driver` because the angle brackets caused escaping issues in URLs and XML (see   * SPARK-6716 for more details).   */  private[spark] val DRIVER_IDENTIFIER = "driver"   /**   * Legacy version of DRIVER_IDENTIFIER, retained for backwards-compatibility.   */  private[spark] val LEGACY_DRIVER_IDENTIFIER = "<driver>"   // The following deprecated objects have already been copied to `object AccumulatorParam` to   // make the compiler find them automatically. They are duplicate codes only for backward   // compatibility, please update `object AccumulatorParam` accordingly if you plan to modify the   // following ones.   @deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " + "backward compatibility.", "1.3.0") object DoubleAccumulatorParam extends AccumulatorParam[Double] { def addInPlace(t1: Double, t2: Double): Double = t1 + t2 def zero(initialValue: Double): Double = 0.0   } @deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " + "backward compatibility.", "1.3.0") object IntAccumulatorParam extends AccumulatorParam[Int] { def addInPlace(t1: Int, t2: Int): Int = t1 + t2 def zero(initialValue: Int): Int = 0   } @deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " + "backward compatibility.", "1.3.0") object LongAccumulatorParam extends AccumulatorParam[Long] { def addInPlace(t1: Long, t2: Long): Long = t1 + t2 def zero(initialValue: Long): Long = 0L   } @deprecated("Replaced by implicit objects in AccumulatorParam. This is kept here only for " + "backward compatibility.", "1.3.0") object FloatAccumulatorParam extends AccumulatorParam[Float] { def addInPlace(t1: Float, t2: Float): Float = t1 + t2 def zero(initialValue: Float): Float = 0f   } // The following deprecated functions have already been moved to `object RDD` to   // make the compiler find them automatically. They are still kept here for backward compatibility   // and just call the corresponding functions in `object RDD`.   @deprecated("Replaced by implicit functions in the RDD companion object. This is " + "kept here only for backward compatibility.", "1.3.0") def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)]) (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = RDD.rddToPairRDDFunctions(rdd) @deprecated("Replaced by implicit functions in the RDD companion object. This is " + "kept here only for backward compatibility.", "1.3.0") def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]): AsyncRDDActions[T] = RDD.rddToAsyncRDDActions(rdd) @deprecated("Replaced by implicit functions in the RDD companion object. This is " + "kept here only for backward compatibility.", "1.3.0") def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag]( rdd: RDD[(K, V)]): SequenceFileRDDFunctions[K, V] = { val kf = implicitly[K => Writable] val vf = implicitly[V => Writable] // Set the Writable class to null and `SequenceFileRDDFunctions` will use Reflection to get it     implicit val keyWritableFactory = new WritableFactory[K](_ => null, kf) implicit val valueWritableFactory = new WritableFactory[V](_ => null, vf) RDD.rddToSequenceFileRDDFunctions(rdd) } @deprecated("Replaced by implicit functions in the RDD companion object. This is " + "kept here only for backward compatibility.", "1.3.0") def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag]( rdd: RDD[(K, V)]): OrderedRDDFunctions[K, V, (K, V)] = RDD.rddToOrderedRDDFunctions(rdd) @deprecated("Replaced by implicit functions in the RDD companion object. This is " + "kept here only for backward compatibility.", "1.3.0") def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]): DoubleRDDFunctions = RDD.doubleRDDToDoubleRDDFunctions(rdd) @deprecated("Replaced by implicit functions in the RDD companion object. This is " + "kept here only for backward compatibility.", "1.3.0") def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]): DoubleRDDFunctions = RDD.numericRDDToDoubleRDDFunctions(rdd) // The following deprecated functions have already been moved to `object WritableFactory` to   // make the compiler find them automatically. They are still kept here for backward compatibility.   @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " + "kept here only for backward compatibility.", "1.3.0") implicit def intToIntWritable(i: Int): IntWritable = new IntWritable(i) @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " + "kept here only for backward compatibility.", "1.3.0") implicit def longToLongWritable(l: Long): LongWritable = new LongWritable(l) @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " + "kept here only for backward compatibility.", "1.3.0") implicit def floatToFloatWritable(f: Float): FloatWritable = new FloatWritable(f) @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " + "kept here only for backward compatibility.", "1.3.0") implicit def doubleToDoubleWritable(d: Double): DoubleWritable = new DoubleWritable(d) @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " + "kept here only for backward compatibility.", "1.3.0") implicit def boolToBoolWritable (b: Boolean): BooleanWritable = new BooleanWritable(b) @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " + "kept here only for backward compatibility.", "1.3.0") implicit def bytesToBytesWritable (aob: Array[Byte]): BytesWritable = new BytesWritable(aob) @deprecated("Replaced by implicit functions in the WritableFactory companion object. This is " + "kept here only for backward compatibility.", "1.3.0") implicit def stringToText(s: String): Text = new Text(s) private implicit def arrayToArrayWritable[T <% Writable: ClassTag](arr: Traversable[T]) : ArrayWritable = { def anyToWritable[U <% Writable](u: U): Writable = u new ArrayWritable(classTag[T].runtimeClass.asInstanceOf[Class[Writable]],         arr.map(x => anyToWritable(x)).toArray) } // The following deprecated functions have already been moved to `object WritableConverter` to   // make the compiler find them automatically. They are still kept here for backward compatibility   // and just call the corresponding functions in `object WritableConverter`.   @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + "backward compatibility.", "1.3.0") def intWritableConverter(): WritableConverter[Int] = WritableConverter.intWritableConverter() @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + "backward compatibility.", "1.3.0") def longWritableConverter(): WritableConverter[Long] = WritableConverter.longWritableConverter() @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + "backward compatibility.", "1.3.0") def doubleWritableConverter(): WritableConverter[Double] = WritableConverter.doubleWritableConverter() @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + "backward compatibility.", "1.3.0") def floatWritableConverter(): WritableConverter[Float] = WritableConverter.floatWritableConverter() @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + "backward compatibility.", "1.3.0") def booleanWritableConverter(): WritableConverter[Boolean] = WritableConverter.booleanWritableConverter() @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + "backward compatibility.", "1.3.0") def bytesWritableConverter(): WritableConverter[Array[Byte]] = WritableConverter.bytesWritableConverter() @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + "backward compatibility.", "1.3.0") def stringWritableConverter(): WritableConverter[String] = WritableConverter.stringWritableConverter() @deprecated("Replaced by implicit functions in WritableConverter. This is kept here only for " + "backward compatibility.", "1.3.0") def writableWritableConverter[T <: Writable](): WritableConverter[T] = WritableConverter.writableWritableConverter() /**   * Find the JAR from which a given class was loaded, to make it easy for users to pass   * their JARs to SparkContext.   */  def jarOfClass(cls: Class[_]): Option[String] = { val uri = cls.getResource("/" + cls.getName.replace('.', '/') + ".class") if (uri != null) { val uriStr = uri.toString if (uriStr.startsWith("jar:file:")) { // URI will be of the form "jar:file:/path/foo.jar!/package/cls.class",         // so pull out the /path/foo.jar         Some(uriStr.substring("jar:file:".length, uriStr.indexOf('!'))) } else { None } } else { None } } /**   * Find the JAR that contains the class of a particular object, to make it easy for users   * to pass their JARs to SparkContext. In most cases you can call jarOfObject(this) in   * your driver program.   */  def jarOfObject(obj: AnyRef): Option[String] = jarOfClass(obj.getClass) /**   * Creates a modified version of a SparkConf with the parameters that can be passed separately   * to SparkContext, to make it easier to write SparkContext's constructors. This ignores   * parameters that are passed as the default value of null, instead of throwing an exception   * like SparkConf would.   */  private[spark] def updatedConf( conf: SparkConf,       master: String,       appName: String,       sparkHome: String = null,       jars: Seq[String] = Nil,       environment: Map[String, String] = Map()): SparkConf = { val res = conf.clone() res.setMaster(master) res.setAppName(appName) if (sparkHome != null) { res.setSparkHome(sparkHome) } if (jars != null && !jars.isEmpty) { res.setJars(jars) } res.setExecutorEnv(environment.toSeq) res } /**   * The number of driver cores to use for execution in local mode, 0 otherwise.   */  private[spark] def numDriverCores(master: String): Int = { def convertToInt(threads: String): Int = { if (threads == "*") Runtime.getRuntime.availableProcessors() else threads.toInt } master match { case "local" => 1       case SparkMasterRegex.LOCAL_N_REGEX(threads) => convertToInt(threads) case SparkMasterRegex.LOCAL_N_FAILURES_REGEX(threads, _) => convertToInt(threads) case _ => 0 // driver is not used for execution     } } /**   * Create a task scheduler based on a given master URL.   * Return a 2-tuple of the scheduler backend and the task scheduler.   */  private def createTaskScheduler( sc: SparkContext,       master: String): (SchedulerBackend, TaskScheduler) = { import SparkMasterRegex._ // When running locally, don't try to re-execute tasks on failure.     val MAX_LOCAL_TASK_FAILURES = 1     master match { case "local" => val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) val backend = new LocalBackend(sc.getConf, scheduler, 1) scheduler.initialize(backend) (backend, scheduler) case LOCAL_N_REGEX(threads) => def localCpuCount: Int = Runtime.getRuntime.availableProcessors() // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.         val threadCount = if (threads == "*") localCpuCount else threads.toInt if (threadCount <= 0) { throw new SparkException(s"Asked to run locally with $threadCount threads") } val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) val backend = new LocalBackend(sc.getConf, scheduler, threadCount) scheduler.initialize(backend) (backend, scheduler) case LOCAL_N_FAILURES_REGEX(threads, maxFailures) => def localCpuCount: Int = Runtime.getRuntime.availableProcessors() // local[*, M] means the number of cores on the computer with M failures         // local[N, M] means exactly N threads with M failures         val threadCount = if (threads == "*") localCpuCount else threads.toInt val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true) val backend = new LocalBackend(sc.getConf, scheduler, threadCount) scheduler.initialize(backend) (backend, scheduler) case SPARK_REGEX(sparkUrl) => val scheduler = new TaskSchedulerImpl(sc) val masterUrls = sparkUrl.split(",").map("spark://" + _) val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls) scheduler.initialize(backend) (backend, scheduler) case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) => // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.         val memoryPerSlaveInt = memoryPerSlave.toInt if (sc.executorMemory > memoryPerSlaveInt) { throw new SparkException( "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format( memoryPerSlaveInt, sc.executorMemory)) } val scheduler = new TaskSchedulerImpl(sc) val localCluster = new LocalSparkCluster( numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf) val masterUrls = localCluster.start() val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls) scheduler.initialize(backend) backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => { localCluster.stop() } (backend, scheduler) case "yarn-standalone" | "yarn-cluster" => if (master == "yarn-standalone") { logWarning( "\"yarn-standalone\" is deprecated as of Spark 1.0. Use \"yarn-cluster\" instead.") } val scheduler = try { val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterScheduler") val cons = clazz.getConstructor(classOf[SparkContext]) cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl] } catch { // TODO: Enumerate the exact reasons why it can fail          // But irrespective of it, it means we cannot proceed !           case e: Exception => { throw new SparkException("YARN mode not available ?", e) } } val backend = try { val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend") val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext]) cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend] } catch { case e: Exception => { throw new SparkException("YARN mode not available ?", e) } } scheduler.initialize(backend) (backend, scheduler) case "yarn-client" => val scheduler = try { val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnScheduler") val cons = clazz.getConstructor(classOf[SparkContext]) cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl] } catch { case e: Exception => { throw new SparkException("YARN mode not available ?", e) } } val backend = try { val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend") val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext]) cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend] } catch { case e: Exception => { throw new SparkException("YARN mode not available ?", e) } } scheduler.initialize(backend) (backend, scheduler) case MESOS_REGEX(mesosUrl) => MesosNativeLibrary.load() val scheduler = new TaskSchedulerImpl(sc) val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", defaultValue = true) val backend = if (coarseGrained) { new CoarseMesosSchedulerBackend(scheduler, sc, mesosUrl, sc.env.securityManager) } else { new MesosSchedulerBackend(scheduler, sc, mesosUrl) } scheduler.initialize(backend) (backend, scheduler) case SIMR_REGEX(simrUrl) => val scheduler = new TaskSchedulerImpl(sc) val backend = new SimrSchedulerBackend(scheduler, sc, simrUrl) scheduler.initialize(backend) (backend, scheduler) case zkUrl if zkUrl.startsWith("zk://") => logWarning("Master URL for a multi-master Mesos cluster managed by ZooKeeper should be " + "in the form mesos://zk://host:port. Current Master URL will stop working in Spark 2.0.") createTaskScheduler(sc, "mesos://" + zkUrl) case _ => throw new SparkException("Could not parse Master URL: '" + master + "'") } } } /** * A collection of regexes for extracting information from the master string. */ private object SparkMasterRegex { // Regular expression used for local[N] and local[*] master formats   val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r // Regular expression for local[N, maxRetries], used in tests with failing tasks   val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r // Regular expression for simulating a Spark cluster of [N, cores, memory] locally   val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r // Regular expression for connecting to Spark deploy clusters   val SPARK_REGEX = """spark://(.*)""".r // Regular expression for connection to Mesos cluster by mesos:// or mesos://zk:// url   val MESOS_REGEX = """mesos://(.*)""".r // Regular expression for connection to Simr cluster   val SIMR_REGEX = """simr://(.*)""".r } /** * A class encapsulating how to convert some type T to Writable. It stores both the Writable class * corresponding to T (e.g. IntWritable for Int) and a function for doing the conversion. * The getter for the writable class takes a ClassTag[T] in case this is a generic object * that doesn't know the type of T when it is created. This sounds strange but is necessary to * support converting subclasses of Writable to themselves (writableWritableConverter). */ private[spark] class WritableConverter[T]( val writableClass: ClassTag[T] => Class[_ <: Writable],     val convert: Writable => T) extends Serializable object WritableConverter { // Helper objects for converting common types to Writable   private[spark] def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T) : WritableConverter[T] = { val wClass = classTag[W].runtimeClass.asInstanceOf[Class[W]] new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W])) } // The following implicit functions were in SparkContext before 1.3 and users had to   // `import SparkContext._` to enable them. Now we move them here to make the compiler find   // them automatically. However, we still keep the old functions in SparkContext for backward   // compatibility and forward to the following functions directly.   implicit def intWritableConverter(): WritableConverter[Int] = simpleWritableConverter[Int, IntWritable](_.get) implicit def longWritableConverter(): WritableConverter[Long] = simpleWritableConverter[Long, LongWritable](_.get) implicit def doubleWritableConverter(): WritableConverter[Double] = simpleWritableConverter[Double, DoubleWritable](_.get) implicit def floatWritableConverter(): WritableConverter[Float] = simpleWritableConverter[Float, FloatWritable](_.get) implicit def booleanWritableConverter(): WritableConverter[Boolean] = simpleWritableConverter[Boolean, BooleanWritable](_.get) implicit def bytesWritableConverter(): WritableConverter[Array[Byte]] = { simpleWritableConverter[Array[Byte], BytesWritable] { bw => // getBytes method returns array which is longer then data to be returned       Arrays.copyOfRange(bw.getBytes, 0, bw.getLength) } } implicit def stringWritableConverter(): WritableConverter[String] = simpleWritableConverter[String, Text](_.toString) implicit def writableWritableConverter[T <: Writable](): WritableConverter[T] = new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T]) } /** * A class encapsulating how to convert some type T to Writable. It stores both the Writable class * corresponding to T (e.g. IntWritable for Int) and a function for doing the conversion. * The Writable class will be used in `SequenceFileRDDFunctions`. */ private[spark] class WritableFactory[T]( val writableClass: ClassTag[T] => Class[_ <: Writable],     val convert: T => Writable) extends Serializable object WritableFactory { private[spark] def simpleWritableFactory[T: ClassTag, W <: Writable : ClassTag](convert: T => W) : WritableFactory[T] = { val writableClass = implicitly[ClassTag[W]].runtimeClass.asInstanceOf[Class[W]] new WritableFactory[T](_ => writableClass, convert) } implicit def intWritableFactory: WritableFactory[Int] = simpleWritableFactory(new IntWritable(_)) implicit def longWritableFactory: WritableFactory[Long] = simpleWritableFactory(new LongWritable(_)) implicit def floatWritableFactory: WritableFactory[Float] = simpleWritableFactory(new FloatWritable(_)) implicit def doubleWritableFactory: WritableFactory[Double] = simpleWritableFactory(new DoubleWritable(_)) implicit def booleanWritableFactory: WritableFactory[Boolean] = simpleWritableFactory(new BooleanWritable(_)) implicit def bytesWritableFactory: WritableFactory[Array[Byte]] = simpleWritableFactory(new BytesWritable(_)) implicit def stringWritableFactory: WritableFactory[String] = simpleWritableFactory(new Text(_)) implicit def writableWritableFactory[T <: Writable: ClassTag]: WritableFactory[T] = simpleWritableFactory(w => w) }

Read more

黄仁勋公开发文:传统软件开发模式终结,参与AI不必非得拥有计算机博士学位

黄仁勋公开发文:传统软件开发模式终结,参与AI不必非得拥有计算机博士学位

黄仁勋分享全文: AI 是当今塑造世界最强大的力量之一。它并非仅仅是一款巧妙的应用程序,也不是某个单一的模型,而是一种就像电力和互联网一样的基础设施。 AI 依赖真实的硬件、真实的能源以及真实的经济体系运行。它可以把原材料大规模地转化为智能。未来,每家公司都会使用 AI,每个国家/地区都会建设 AI。 要理解 AI 为什么会以这样的方式发展,最好的方法是从第一性原理出发,看看计算领域究竟发生了哪些根本性的变化。 从预先编写的软件,到实时生成的智能 在计算机发展的绝大多数历史中,软件都是预先写好的。人类先描述算法,然后由计算机执行。数据必须被精心组织与设计,存储在表格中,再通过精确的查询进行检索。 因此,SQL 成为不可或缺的工具,因为它让这种计算模式变得可行。 而 AI 打破了这一模式。 我们首次拥有了一台能够理解非结构化信息的计算机。它可以识别图像、阅读文本、理解声音,并理解其中的含义。它能够根据上下文和意图进行推理。更重要的是,它能够实时生成智能。 每一次回答都是全新的。每一次回应都取决于你提供的上下文。这不再是软件从数据库中取出预先存储的指令。而是软件在实时推理

By Ne0inhk
转型AI工程师实战指南

转型AI工程师实战指南

https://github.com/zeeklog/Be-an-AI-engineer-from-any-role 如果帮到你,Plz右上角Star⭐️ 来自 590 家公司的895 条真实JD的数据分析结论: AI 工程师岗位正在疯狂分层——会的人月入翻倍,不会的人正在被时代彻底甩在身后。 现在不行动,你就下一个N+1 🚀 AI 工程师角色 残酷真相:AI 工程师已经彻底变天! 2026 年市场只认一种人:能把 LLM + RAG + Agent 真正上线、能赚钱、能扛住生产压力的 Full-Stack Builder。 岗位类型分布(2026 真实画像) * AI-First(69.4%):直接构建产品核心功能(RAG、智能体、自动化工作流)——做不出来就出局 * AI-Support(28.5%):为全公司搭建 AI

By Ne0inhk