博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark RDD类源码阅读
阅读量:6159 次
发布时间:2019-06-21

本文共 7295 字,大约阅读时间需要 24 分钟。

每天进步一点点~开搞~

abstract class RDD[T: ClassTag](  //@transient 注解表示将字段标记为瞬态的    @transient private var _sc: SparkContext,  // Seq是序列,元素有插入的先后顺序,可以有重复的元素。    @transient private var deps: Seq[Dependency[_]]  ) extends Serializable with Logging {    if (classOf[RDD[_]].isAssignableFrom(elementClassTag.runtimeClass)) {  user programs that     }//这里应该是声明sparkContext对象后才能使用RDD的调用  private def sc: SparkContext = {    if (_sc == null) {      throw new SparkException(        "RDD transformations and actions can only be invoked by the driver, not inside of other " +        "transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because " +        "the values transformation and count action cannot be performed inside of the rdd1.map " +        "transformation. For more information, see SPARK-5063.")    }    _sc  }//构建一个RDD应该是一对一的关系,比如子RDD对应唯一的父RDD  def this(@transient oneParent: RDD[_]) =    this(oneParent.context , List(new OneToOneDependency(oneParent)))  private[spark] def conf: SparkConf = _conf//sparkconf的设置def getConf: SparkConf = conf.clone()//获取相应的配置信息def jars: Seq[String] = _jarsdef files: Seq[String] = _filesdef master: String = _conf.get("spark.master")def appName: String = _conf.get("spark.app.name")private[spark] def isEventLogEnabled: Boolean = _conf.getBoolean("spark.eventLog.enabled", false)  private[spark] def eventLogDir: Option[URI] = _eventLogDir  private[spark] def eventLogCodec: Option[String] = _eventLogCodec//临时文件夹的名称为spark+随机时间戳  val externalBlockStoreFolderName = "spark-" + randomUUID.toString()//判断是否为local模式def isLocal: Boolean = (master == "local" || master.startsWith("local["))

 

//用于触发事件的监听 private[spark] val listenerBus = new LiveListenerBus// 该方法可用于测试用  private[spark] def createSparkEnv(      conf: SparkConf,      isLocal: Boolean,      listenerBus: LiveListenerBus): SparkEnv = {    SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master))  }//加载env配置文件  private[spark] def env: SparkEnv = _env  private[spark] val addedFiles = HashMap[String, Long]()  private[spark] val addedJars = HashMap[String, Long]()//监听所有调用persist的RDD  private[spark] val persistentRdds = new TimeStampedWeakValueHashMap[Int, RDD[_]]//重用配置hadoop Configuration  def hadoopConfiguration: Configuration = _hadoopConfiguration//用于设置executorMemory的内存数量  private[spark] def executorMemory: Int = _executorMemory  // 将环境参数传递给exeuctor  private[spark] val executorEnvs = HashMap[String, String]()  // 设置正在使用SparkContext的用户  val sparkUser = Utils.getCurrentUserName()//设置提交的appliaction的唯一标识。就是当提交给yarn或local模式时,申请资源的applaction名称  def applicationId: String = _applicationId  def applicationAttemptId: Option[String] = _applicationAttemptId  def metricsSystem: MetricsSystem = if (_env != null) _env.metricsSystem else null  private[spark] def eventLogger: Option[EventLoggingListener] = _eventLogger  private[spark] def executorAllocationManager: Option[ExecutorAllocationManager] =    _executorAllocationManager  private[spark] def cleaner: Option[ContextCleaner] = _cleaner  private[spark] var checkpointDir: Option[String] = None// 用户可以使用本地变量来传递消息  protected[spark] val localProperties = new InheritableThreadLocal[Properties] {    override protected def childValue(parent: Properties): Properties = {//clone一下,防止父变量改变从而影响子变量semantics (SPARK-10563).      if (conf.get("spark.localProperties.clone", "false").toBoolean) {        SerializationUtils.clone(parent).asInstanceOf[Properties]      } else {        new Properties(parent)      }    }    override protected def initialValue(): Properties = new Properties()  } private def warnSparkMem(value: String): String = {    logWarning("Using SPARK_MEM to set amount of memory to use per executor process is " +      "deprecated, please use spark.executor.memory instead.")    value  }//设置log级别,包括ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARNdef setLogLevel(logLevel: String) {    val validLevels = Seq("ALL", "DEBUG", "ERROR", "FATAL", "INFO", "OFF", "TRACE", "WARN")    if (!validLevels.contains(logLevel)) {      throw new IllegalArgumentException(        s"Supplied level $logLevel did not match one of: ${validLevels.mkString(",")}")    }    Utils.setLogLevel(org.apache.log4j.Level.toLevel(logLevel))  }//不同模式的配置参数    if (!_conf.contains("spark.master")) {      throw new SparkException("A master URL must be set in your configuration")    }    if (!_conf.contains("spark.app.name")) {      throw new SparkException("An application name must be set in your configuration")    }    // System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster    // yarn-standalone is deprecated, but still supported    if ((master == "yarn-cluster" || master == "yarn-standalone") &&        !_conf.contains("spark.yarn.app.id")) {      throw new SparkException("Detected yarn-cluster mode, but isn't running on a cluster. " +        "Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")    }    _conf.setIfMissing("spark.driver.host", Utils.localHostName())    _conf.setIfMissing("spark.driver.port", "0") _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)    _jars = _conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten    _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.size != 0))      .toSeq.flatten    _eventLogDir =      if (isEventLogEnabled) {        val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)          .stripSuffix("/")        Some(Utils.resolveURI(unresolvedDir))      } else {        None      }    _eventLogCodec = {      val compress = _conf.getBoolean("spark.eventLog.compress", false)      if (compress && isEventLogEnabled) {        Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName)      } else {        None      }    }//jobProgressListener应该在创建sparkEnv之前,因为当创建sparkEnv时,一些信息将会被发送到jobProgressListener,否则就会丢失啦。    _jobProgressListener = new JobProgressListener(_conf)    listenerBus.addListener(jobProgressListener)_env = createSparkEnv(_conf, isLocal, listenerBus)    SparkEnv.set(_env)    _metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, _conf)    _statusTracker = new SparkStatusTracker(this)    _progressBar =      if (_conf.getBoolean("spark.ui.showConsoleProgress", true) && !log.isInfoEnabled) {        Some(new ConsoleProgressBar(this))      } else {        None      }    _ui =      if (conf.getBoolean("spark.ui.enabled", true)) {        Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener,          _env.securityManager, appName, startTime = startTime))      } else {        None      }    if (jars != null) {      jars.foreach(addJar)    }    if (files != null) {      files.foreach(addFile)    }//获取启动app设置的参数变量,如果没有则获取配置文件中的    _executorMemory = _conf.getOption("spark.executor.memory")      .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))      .orElse(Option(System.getenv("SPARK_MEM"))      .map(warnSparkMem))      .map(Utils.memoryStringToMb)      .getOrElse(1024)//500这里在创建HeartbeatReceiver 之前先创建createTaskScheduler,因为每个Executor在构造函数中检索HeartbeatReceiver    _heartbeatReceiver = env.rpcEnv.setupEndpoint(      HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))

 

转载地址:http://yusfa.baihongyu.com/

你可能感兴趣的文章
HttpWebRequest的GetResponse或GetRequestStream偶尔超时 + 总结各种超时死掉的可能和相应的解决办法...
查看>>
SparseArray
查看>>
第二章
查看>>
android背景选择器selector用法汇总
查看>>
[转]Paul Adams:为社交设计
查看>>
showdialog弹出窗口刷新问题
查看>>
java
查看>>
Vue.js连接后台数据jsp页面  ̄▽ ̄
查看>>
关于程序的单元测试
查看>>
mysql内存优化
查看>>
都市求生日记第一篇
查看>>
Java集合---HashMap源码剖析
查看>>
SQL优化技巧
查看>>
thead 固定,tbody 超出滚动(附带改变滚动条样式)
查看>>
Dijkstra算法
查看>>
css 动画 和 响应式布局和兼容性
查看>>
csrf 跨站请求伪造相关以及django的中间件
查看>>
MySQL数据类型--与MySQL零距离接触2-11MySQL自动编号
查看>>
生日小助手源码运行的步骤
查看>>
Configuration python CGI in XAMPP in win-7
查看>>