Spark简介

  • Apache Spark™ is a unified analytics engine for large-scale data processing.

  • spark是针对于大规模数据处理的统一分析引擎

        spark是在Hadoop基础上的改进,是UC Berkeley AMP lab所开源的类Hadoop MapReduce的通用的并行计算框架,Spark基于map reduce算法实现的分布式计算,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出和结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的map reduce的算法。
    
        spark是基于内存计算框架,计算速度非常之快,但是它仅仅只是涉及到计算,并没有涉及到数据的存储,后期需要使用spark对接外部的数据源,比如hdfs。
    

四大特性

  • 速度快
* 运行速度提高100倍
  * Apache Spark使用最先进的DAG调度程序,查询优化程序和物理执行引擎,实现批量和流式数据的高性能。
* spark比mapreduce快的2个主要原因
  * 1、==基于内存==
    (1)mapreduce任务后期再计算的时候,每一个job的输出结果会落地到磁盘,后续有其他的job需要依赖于前面job的输出结果,这个时候就需要进行大量的磁盘io操作。性能就比较低。
    (2)spark任务后期再计算的时候,job的输出结果可以保存在内存中,后续有其他的job需要依赖于前面job的输出结果,这个时候就直接从内存中获取得到,避免了磁盘io操作,性能比较高
  * 2、==进程与线程==
    (1)mapreduce任务以进程的方式运行在yarn集群中,比如程序中有100个MapTask,一个task就需要一个进程,这些task要运行就需要开启100个进程。
    (2)spark任务以线程的方式运行在进程中,比如程序中有100个MapTask,后期一个task就对应一个线程,这里就不在是进程,这些task需要运行,这里可以极端一点:
    只需要开启1个进程,在这个进程中启动100个线程就可以了。
    进程中可以启动很多个线程,而开启一个进程与开启一个线程需要的时间和调度代价是不一样。 开启一个进程需要的时间远远大于开启一个线程。
  • 易用性
- 可以快速去编写spark程序通过 java/scala/python/R/SQL等不同语言
  • 通用性
- spark框架不在是一个简单的框架,可以把spark理解成一个==**生态系统**==,它内部是包含了很多模块,基于不同的应用场景可以选择对应的模块去使用
* ==**sparksql**==
  * 通过sql去开发spark程序做一些离线分析
* ==**sparkStreaming**==
  * 主要是用来解决公司有实时计算的这种场景
* ==**Mlib**==
  * 它封装了一些机器学习的算法库
* ==**Graphx**==
  * 图计算
  • 兼容性
- spark程序就是一个计算逻辑程序,这个任务要运行就需要计算资源(内存、cpu、磁盘),哪里可以给当前这个任务提供计算资源,就可以把spark程序提交到哪里去运行
* ==**standAlone**==
  * 它是spark自带的独立运行模式,整个任务的资源分配由spark集群的老大Master负责
* ==**yarn**==
  * 可以把spark程序提交到yarn中运行,整个任务的资源分配由yarn中的老大ResourceManager负责
* **mesos**
  * 它也是apache开源的一个类似于yarn的资源调度平台

集群架构

spark

  • ==Driver==

    • 它会执行客户端写好的main方法,它会构建一个名叫SparkContext对象
      • 该对象是所有spark程序的执行入口
  • ==Application==

    • 就是一个spark的应用程序,它是包含了客户端的代码和任务运行的资源信息
  • ==ClusterManager==

    • 它是给程序提供计算资源的外部服务
      • standAlone
        • 它是spark自带的集群模式,整个任务的资源分配由spark集群的老大Master负责
      • yarn
        • 可以把spark程序提交到yarn中运行,整个任务的资源分配由yarn中的老大ResourceManager负责
      • mesos
        • 它也是apache开源的一个类似于yarn的资源调度平台。
  • ==Master==

    • 它是整个spark集群的主节点,负责任务资源的分配
  • ==Worker==

    • 它是整个spark集群的从节点,负责任务计算的节点
  • ==Executor==

    • 它是一个进程,它会在worker节点启动该进程(计算资源)
  • ==Task==

    • spark任务是以task线程的方式运行在worker节点对应的executor进程中

启动停止

(1) 如何恢复到上一次活着master挂掉之前的状态?
    在高可用模式下,整个spark集群就有很多个master,其中只有一个master被zk选举成活着的master,其他的多个master都处于standby,同时把整个spark集群的元数据信息通过zk中节点进行保存。

    后期如果活着的master挂掉。首先zk会感知到活着的master挂掉,下面开始在多个处于standby中的master进行选举,再次产生一个活着的master,这个活着的master会读取保存在zk节点中的spark集群元数据信息,恢复到上一次master的状态。整个过程在恢复的时候经历过了很多个不同的阶段,每个阶段都需要一定时间,最终恢复到上个活着的master的转态,整个恢复过程一般需要1-2分钟。

(2) 在master的恢复阶段对任务的影响?
   a)对已经运行的任务是没有任何影响
         由于该任务正在运行,说明它已经拿到了计算资源,这个时候就不需要master。
   b) 对即将要提交的任务是有影响
         由于该任务需要有计算资源,这个时候会找活着的master去申请计算资源,由于没有一个活着的master,该任务是获取不到计算资源,也就是任务无法运行。

示例

bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://node01:7077 \
--executor-memory 1G \
--total-executor-cores 2 \
examples/jars/spark-examples_2.11-2.3.3.jar \
10

####参数说明
--class:指定包含main方法的主类
--master:指定spark集群master地址
--executor-memory:指定任务在运行的时候需要的每一个executor内存大小
--total-executor-cores: 指定任务在运行的时候需要总的cpu核数

### 注意
spark集群中有很多个master,并不知道哪一个master是活着的master,即使你知道哪一个master是活着的master,它也有可能下一秒就挂掉,这里就可以把所有master都罗列出来
--master spark://node01:7077,node02:7077,node03:7077

后期程序会轮训整个master列表,最终找到活着的master,然后向它申请计算资源,最后运行程序。

SparkRDD

  • RDD(Resilient Distributed Dataset)叫做==弹性分布式数据集==,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合.

    • Dataset: 就是一个集合,存储很多数据.
    • Distributed:它内部的元素进行了分布式存储,方便于后期进行分布式计算.
    • Resilient: 表示弹性,rdd的数据是可以保存在内存或者是磁盘中.
  • (1)A list of partitions

    • ==一个分区(Partition)列表,数据集的基本组成单位。==
    这里表示一个rdd有很多分区,每一个分区内部是包含了该rdd的部分数据,
spark中任务是以task线程的方式运行, 一个分区就对应一个task线程。

    用户可以在创建RDD时指定RDD的分区个数,如果没有指定,那么就会采用默认值。
    val rdd=sparkContext.textFile("/words.txt")
    如果该文件的block块个数小于等于2,这里生产的RDD分区数就为2
    如果该文件的block块个数大于2,这里生产的RDD分区数就与block块个数保持一致
  • (2)A function for computing each split
    • ==一个计算每个分区的函数==
    Spark中RDD的计算是以分区为单位的,每个RDD都会实现compute计算函数以达到这个目的.
  • (3)A list of dependencies on other RDDs
    • ==一个rdd会依赖于其他多个rdd==
  这里就涉及到rdd与rdd之间的依赖关系,spark任务的容错机制就是根据这个特性(血统)而来。
  • (4)Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
    • ==一个Partitioner,即RDD的分区函数(可选项)==
当前Spark中实现了两种类型的分区函数,
一个是基于哈希的HashPartitioner,(key.hashcode % 分区数= 分区号)
另外一个是基于范围的RangePartitioner。
只有对于key-value的RDD,并且产生shuffle,才会有Partitioner,

非key-value的RDD的Parititioner的值是None。
  • (5)Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
    • ==一个列表,存储每个Partition的优先位置(可选项)==
这里涉及到数据的本地性,数据块位置最优。
spark任务在调度的时候会优先考虑存有数据的节点开启计算任务,减少数据的网络传输,提升计算效率。
  • 流程分析

rdd的五大属性

RDD的算子分类

  • 1、==transformation(转换)==
    • 根据已经存在的rdd转换生成一个新的rdd, 它是延迟加载,它不会立即执行
    • 例如
      • map / flatMap / reduceByKey 等
  • 2、==action (动作)==
    • 它会真正触发任务的运行
      • 将rdd的计算的结果数据返回给Driver端,或者是保存结果数据到外部存储介质中
    • 例如
      • collect / saveAsTextFile 等

RDD的依赖关系

rdd-dependencies

  • RDD和它依赖的父RDD的关系有两种不同的类型

  • 窄依赖(narrow dependency)和宽依赖(wide dependency)

    • ==窄依赖==

      • 窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用

        • 总结:窄依赖我们形象的比喻为独生子女
        哪些算子操作是窄依赖:
            map/flatMap/filter/union等等
        
            所有的窄依赖不会产生shuffle
        
    • ==宽依赖==

      • 宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition

        • 总结:宽依赖我们形象的比喻为超生
        哪些算子操作是宽依赖:
            reduceByKey/sortByKey/groupBy/groupByKey/join等等
        
            所有的宽依赖会产生shuffle
        
    • 补充说明

      由上图可知,join分为宽依赖和窄依赖,如果RDD有相同的partitioner,那么将不会引起shuffle,这种join是窄依赖,反之就是宽依赖
      

lineage(血统)

  • RDD只支持粗粒度转换
    • 即只记录单个块上执行的单个操作。
  • 将创建RDD的一系列Lineage(即血统)记录下来,以便恢复丢失的分区
  • ==RDD的Lineage会记录RDD的元数据信息和转换行为,lineage保存了RDD的依赖关系,当该RDD的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区==。

RDD的缓存机制

    可以把一个rdd的数据缓存起来,后续有其他的job需要用到该rdd的结果数据,可以直接从缓存中获取得到,避免了重复计算。缓存是加快后续对该数据的访问操作。
  • RDD通过==persist方法==或==cache方法==可以将前面的计算结果缓存。
    • 但是并不是这两个方法被调用时立即缓存,而是==触发后面的action==时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

1569036662039

  • 通过查看源码发现==cache最终也是调用了persist方法==,默认的存储级别都是==仅在内存存储一份==,Spark的存储级别还有好多种,存储级别在==object StorageLevel==中定义的。

1569036703460

  • 使用演示
val rdd1=sc.textFile("/words.txt")
val rdd2=rdd1.flatMap(_.split(" "))
val rdd3=rdd2.cache
rdd3.collect

val rdd4=rdd3.map((_,1))
val rdd5=rdd4.persist(缓存级别)
rdd5.collect
  • cache和persist区别
    对RDD设置缓存成可以调用rdd的2个方法: 一个是cache,一个是persist
调用上面2个方法都可以对rdd的数据设置缓存,但不是立即就触发缓存执行,后面需要有action,才会触发缓存的执行。

cache方法和persist方法区别:
    cache:   默认是把数据缓存在内存中,其本质就是调用persist方法;
    persist:可以把数据缓存在内存或者是磁盘,有丰富的缓存级别,这些缓存级别都被定义在StorageLevel这个object中。
  • 清除缓存数据
  • 1、==自动清除==

    一个application应用程序结束之后,对应的缓存数据也就自动清除
    
  • 2、==手动清除==

    调用rdd的unpersist方法
    

RDD的checkpoint机制

  • 我们可以对rdd的数据进行缓存,保存在内存或者是磁盘中。

    • 后续就可以直接从内存或者磁盘中获取得到,但是它们不是特别安全。

    • cache

      它是直接把数据保存在内存中,后续操作起来速度比较快,直接从内存中获取得到。但这种方式很不安全,由于服务器挂掉或者是进程终止,会导致数据的丢失。
      
    • persist

      它可以把数据保存在本地磁盘中,后续可以从磁盘中获取得到该数据,但它也不是特别安全,由于系统管理员一些误操作删除了,或者是磁盘损坏,也有可能导致数据的丢失。
      
  • ==checkpoint(检查点)==

    它是提供了一种相对而言更加可靠的数据持久化方式。它是把数据保存在分布式文件系统,
    比如HDFS上。这里就是利用了HDFS高可用性,高容错性(多副本)来最大程度保证数据的安全性。
    

如何设置checkpoint

  • 1、在hdfs上设置一个checkpoint目录

    sc.setCheckpointDir("hdfs://node01:8020/checkpoint") 
    
  • 2、对需要做checkpoint操作的rdd调用checkpoint方法

    val rdd1=sc.textFile("/words.txt")
    rdd1.checkpoint
    val rdd2=rdd1.flatMap(_.split(" ")) 
    
  • 3、最后需要有一个action操作去触发任务的运行

    rdd2.collect
    

cache、persist、checkpoint三者区别

  • ==cache和persist==

    • cache默认数据缓存在内存中
    • persist可以把数据保存在内存或者磁盘中
    • 后续要触发 cache 和 persist 持久化操作,需要有一个action操作
    • 它不会开启其他新的任务,一个action操作就对应一个job
    • 它不会改变rdd的依赖关系,程序运行完成后对应的缓存数据就自动消失
  • ==checkpoint==

    • 可以把数据持久化写入到hdfs上
    • 后续要触发checkpoint持久化操作,需要有一个action操作,后续会开启新的job执行checkpoint操作
    • 它会改变rdd的依赖关系,后续数据丢失了不能够在通过血统进行数据的恢复。
    • 程序运行完成后对应的checkpoint数据就不会消失
   sc.setCheckpointDir("/checkpoint")
   val rdd1=sc.textFile("/words.txt")
   val rdd2=rdd1.cache
   rdd2.checkpoint
   val rdd3=rdd2.flatMap(_.split(" "))
   rdd3.collect

   checkpoint操作要执行需要有一个action操作,一个action操作对应后续的一个job。该job执行完成之后,它会再次单独开启另外一个job来执行 rdd1.checkpoint操作。

   对checkpoint在使用的时候进行优化,在调用checkpoint操作之前,可以先来做一个cache操作,缓存对应rdd的结果数据,后续就可以直接从cache中获取到rdd的数据写入到指定checkpoint目录中

DAG有向无环图生成

  • ==DAG(Directed Acyclic Graph)== 叫做有向无环图(有方向,无闭环,代表着数据的流向),原始的RDD通过一系列的转换就形成了DAG。

  • 下图是基于单词统计逻辑得到的DAG有向无环图

1569047954944

DAG划分stage

  • stage是什么

    • ==一个Job会被拆分为多组Task,每组任务被称为一个stage==
    • stage表示不同的调度阶段,一个spark job会对应产生很多个stage
      • stage类型一共有2种
        • ==ShuffleMapStage==
          • 最后一个shuffle之前的所有变换的Stage叫ShuffleMapStage
            • 它对应的task是shuffleMapTask
        • ==ResultStage==
          • 最后一个shuffle之后操作的Stage叫ResultStage,它是最后一个Stage。
            • 它对应的task是ResultTask
  • 为什么要划分stage

根据RDD之间依赖关系的不同将DAG划分成不同的Stage(调度阶段)
对于窄依赖,partition的转换处理在一个Stage中完成计算
对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,

由于划分完stage之后,在同一个stage中只有窄依赖,没有宽依赖,可以实现流水线计算,
stage中的每一个分区对应一个task,在同一个stage中就有很多可以并行运行的task。
  • 如何划分stage
    • ==划分stage的依据就是宽依赖==
(1) 首先根据rdd的算子操作顺序生成DAG有向无环图,接下里从最后一个rdd往前推,创建一个新的stage,把该rdd加入到该stage中,它是最后一个stage。

(2) 在往前推的过程中运行遇到了窄依赖就把该rdd加入到本stage中,如果遇到了宽依赖,就从宽依赖切开,那么最后一个stage也就结束了。

(3) 重新创建一个新的stage,按照第二个步骤继续往前推,一直到最开始的rdd,整个划分stage也就结束了

划分stage

stage与stage之间的关系

    划分完stage之后,每一个stage中有很多可以并行运行的task,后期把每一个stage中的task封装在一个taskSet集合中,最后把一个一个的taskSet集合提交到worker节点上的executor进程中运行。

rdd与rdd之间存在依赖关系,stage与stage之前也存在依赖关系,前面stage中的task先运行,运行完成了再运行后面stage中的task,也就是说后面stage中的task输入数据是前面stage中task的输出结果数据。

stage

Spark任务调度

spark任务调度

- Driver端运行客户端的main方法,构建SparkContext对象,在SparkContext对象内部依次构建DAGScheduler和TaskScheduler
- 按照rdd的一系列操作顺序,来生成DAG有向无环图
- DAGScheduler拿到DAG有向无环图之后,按照宽依赖进行stage的划分。每一个stage内部有很多可以并行运行的task,最后封装在一个一个的taskSet集合中,然后把taskSet发送给TaskScheduler
- 所有task运行完成,整个任务也就结束了

spark的运行架构

spark

(1) Driver端向资源管理器Master发送注册和申请计算资源的请求

(2) Master通知对应的worker节点启动executor进程(计算资源)

(3) executor进程向Driver端发送注册并且申请task请求

(4) Driver端运行客户端的main方法,构建SparkContext对象,在SparkContext对象内部依次构建DAGScheduler和TaskScheduler

(5) 按照客户端代码洪rdd的一系列操作顺序,生成DAG有向无环图

(6) DAGScheduler拿到DAG有向无环图之后,按照宽依赖进行stage的划分。每一个stage内部有很多可以并行运行的task,最后封装在一个一个的taskSet集合中,然后把taskSet发送给TaskScheduler

(7) TaskScheduler得到taskSet集合之后,依次遍历取出每一个task提交到worker节点上的executor进程中运行

(8) 所有task运行完成,Driver端向Master发送注销请求,Master通知Worker关闭executor进程,Worker上的计算资源得到释放,最后整个任务也就结束了。
  • 基于wordcount程序剖析spark任务的提交、划分、调度流程

job-scheduler-running

自定义分区

  • 在对RDD数据进行分区时,默认使用的是==HashPartitioner==

  • 该函数对key进行哈希,然后对分区总数取模,取模结果相同的就会被分到同一个partition中

    HashPartitioner分区逻辑:
        key.hashcode % 分区总数 = 分区号
    
  • 如果嫌HashPartitioner功能单一,可以自定义partitioner

  • 实现自定义partitioner大致分为3个步骤

    • 1、继承==org.apache.spark.Partitioner==
    • 2、重写==numPartitions==方法
    • 3、重写==getPartition==方法
//1、对应上面的rdd数据进行自定义分区
 val result: RDD[(String, Int)] = wordLengthRDD.partitionBy(new MyPartitioner(3))

//2、自定义分区
class MyPartitioner(num:Int) extends Partitioner{
  //指定rdd的总的分区数
  override def numPartitions: Int = {
    num
  }
  //消息按照key的某种规则进入到指定的分区号中
  override def getPartition(key: Any): Int ={
    //这里的key就是单词
    val length: Int = key.toString.length
    length match {
      case 4 =>0
      case 5 =>1
    }
  }
}

共享变量(broadcast variable)

  • ​ Spark中分布式执行的代码需要==传递到各个Executor的Task上运行==。对于一些只读、固定的数据(比如从DB中读出的数据),每次都需要Driver广播到各个Task上,这样效率低下。
  • ​ 广播变量允许==将变量只广播给各个Executor==。该Executor上的各个Task再从所在节点的BlockManager获取变量,而不是从Driver获取变量,以减少通信的成本,减少内存的占用,从而提升了效率。

广播变量

广播变量使用
(1) 通过对一个类型T的对象调用 SparkContext.broadcast创建出一个Broadcast[T]对象。
    任何可序列化的类型都可以这么实现
(2) 通过 value 属性访问该对象的值
(3) 变量只会被发到各个节点一次,应作为只读值处理(修改这个值不会影响到别的节点)
  • 使用广播变量代码示例
val word="spark"
val rddData = rdd.collect
//通过调用sparkContext对象的broadcast方法把数据广播出去
val broadCast = sc.broadcast(word)
val broadRddData = sc.broadcast(rddData)

//在executor中通过调用广播变量的value属性获取广播变量的值,分布式环境下广播变量通过网络传输需要序列化
val rdd2=rdd1.flatMap(_.split(" ")).filter(x=>x.equals(broadCast.value))
广播变量使用注意事项
1、不能将一个RDD使用广播变量广播出去

2、广播变量只能在Driver端定义,不能在Executor端定义

3、在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值

4、如果executor端用到了Driver的变量,如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本

5、如果Executor端用到了Driver的变量,如果使用广播变量在每个Executor中只有一份Driver端的变量副本

累加器(accumulator)

  • 累加器(accumulator)是Spark中提供的一种分布式的变量机制,其原理类似于mapreduce,即分布式的改变,然后聚合这些改变
  • ==累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数。可以使用累加器来进行全局的计数
val accumulator = sc.accumulator(0); 
    val result = linesRDD.map(s => {
      accumulator.add(1) //有一条数据就增加1
    })

序列化问题

  • spark是分布式执行引擎,其核心抽象是弹性分布式数据集RDD,其代表了分布在不同节点的数据。Spark的计算是在executor上分布式执行的,故用户开发的关于RDD的map,flatMap,reduceByKey等transformation 操作(闭包)有如下执行过程:
    • (1)代码中对象在driver本地序列化
    • (2)对象序列化后传输到远程executor节点
    • (3)远程executor节点反序列化对象
    • (4)最终远程节点执行
  • 故对象在执行中需要序列化通过网络传输,则必须经过序列化过程。

spark的任务序列化异常

  • 在编写spark程序中,由于在map,foreachPartition等算子==内部使用了外部定义的变量和函数==,从而引发Task未序列化问题。
  • 然而spark算子在计算过程中使用外部变量在许多情形下确实在所难免,比如在filter算子根据外部指定的条件进行过滤,map根据相应的配置进行变换。
  • 经常会出现“==org.apache.spark.SparkException: Task not serializable==”这个错误
    • 其原因就在于这些算子使用了==外部的变量==,但是这个变量不能序列化。
    • 当前类使用了“extends Serializable”声明支持序列化,但是由于某些字段==不支持序列化==,仍然会导致整个类序列化时出现问题,最终导致出现Task未序列化问题。

解决序列化的办法

  • (1) 如果函数中使用了该类对象,该类要实现序列化
    • ==类 extends Serializable==
  • (2) 如果函数中使用了该类对象的成员变量,该类除了要实现序列化之外,所有的成员变量必须要实现序列化
  • (3) 对于不能序列化的成员变量使用==“@transient”==标注,告诉编译器不需要序列化
  • (4) 也可将依赖的变量独立放到一个小的class中,让这个class支持序列化,这样做可以减少网络传输量,提高效率。
  • (5) 可以把对象的创建直接在该函数中构建这样避免需要序列化

application、job、stage、task之间的关系

application

  • 一个application就是一个应用程序,包含了客户端所有的代码和计算资源
  • 一个action操作对应一个DAG有向无环图,即一个action操作就是一个job
  • 一个job中包含了大量的宽依赖,按照宽依赖进行stage划分,一个job产生了很多个stage
  • 一个stage中有很多分区,一个分区就是一个task,即一个stage中有很多个task
  • ==总结==
    • 一个application包含了很多个job
    • 一个job包含了很多个stage
    • 一个stage包含了很多个task

Spark内存计算框架

spark-submit --class org.apache.spark.examples.SparkPi \
--master yarn \
# cluster / client
--deploy-mode cluster \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 1 \
/kfly/install/spark-2.3.3-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.3.jar \
10
  • yarn-cluster模式

    • spark程序的==Driver程序在YARN中运行==,运行结果不能在客户端显示,并且客户端可以在启动应用程序后消失应用的。

    • 最好运行那些将结果最终保存在外部存储介质(如HDFS、Redis、Mysql),客户端的终端显示的仅是作为YARN的job的简单运行状况。

yarn-cluster

  • yarn-client模式
    • spark程序的==Driver运行在Client上==,应用程序运行结果会在客户端显示,所有适合运行结果有输出的应用程序(如spark-shell)

yarn-client

最大的区别就是Driver端的位置不一样。

yarn-cluster: Driver端运行在yarn集群中,与ApplicationMaster进程在一起。
yarn-client:  Driver端运行在提交任务的客户端,与ApplicationMaster进程没关系,经常用于进行测试

collect 算子操作剖析

  • collect算子操作的作用

    • 1、它是一个action操作,会触发任务的运行

    • 2、它会把RDD的数据进行收集之后,以数组的形式返回给Driver端

      • ==默认Driver端的内存大小为1G,由参数 spark.driver.memory 设置==

      • 如果某个rdd的数据量超过了Driver端默认的1G内存,对rdd调用collect操作,这里会出现Driver端的内存溢出,所有这个collect操作存在一定的风险,实际开发代码一般不会使用。

      • ==实际企业中一般都会把该参数调大,比如5G/10G等==

        • 可以在代码中修改该参数,如下

          new SparkConf().set("spark.driver.memory","5G")
          
    比如说rdd的数据量达到了10G
    
    rdd.collect这个操作非常危险,很有可能出现driver端的内存不足
    

spark任务中资源参数剖析

  • ==–executor-memory==

    • 表示每一个executor进程需要的内存大小,它决定了后期操作数据的速度

    `
    比如说一个rdd的数据量大小为5g,这里给定的executor-memory为2g, 在这种情况下,内存是存储不下,它会把一部分数据保存在内存中,还有一部分数据保存在磁盘,后续需要用到该rdd的结果数据,可以从内存和磁盘中获取得到,这里就涉及到一定的磁盘io操作。

    ,这里给定的executor-memory为10g,这里数据就可以完全在内存中存储下,后续需要用到该rdd的数据,就可以直接从内存中获取,这样一来,避免了大量的磁盘io操作。性能得到提升。

在实际的工作,这里 –executor-memory 需要设置的大一点。
比如说10G/20G/30G等


- ==--total-executor-cores==

  - 表示任务运行需要总的cpu核数,它决定了任务并行运行的粒度

  ~~~
  比如说要处理100个task,注意一个cpu在同一时间只能处理一个task线程。

  如果给定的总的cpu核数是5个,这里就需要100/5=20个批次才可以把这100个task运行完成,如果平均每个task运行1分钟,这里最后一共运行20分钟。

  如果给定的总的cpu核数是20个,这里就需要100/20=5个批次才可以把这100个task运行完成,如果平均每个task运行1分钟,这里最后一共运行5分钟。

  如果如果给定的总的cpu核数是100个,这里就需要100/100=1个批次才可以把这100个task运行完成,如果平均每个task运行1分钟,这里最后一共运行1分钟。


  在实际的生产环境中,--total-executor-cores 这个参数一般也会设置的大一点,
  比如说 30个/50个/100个
  ~~~


- ==总结==

  后期对于spark程序的优化,可以从这2个参数入手,无论你把哪一个参数调大,对程序运行的效率来说都会达到一定程度的提升
  加大计算资源它是最直接、最有效果的优化手段。
  在计算资源有限的情况下,可以考虑其他方面,比如说代码层面,JVM层面等

### spark任务的调度模式

* Spark中的调度模式主要有两种:==FIFO 和 FAIR==
  * ==FIFO(先进先出)==
    * 默认情况下Spark的调度模式是FIFO,谁先提交谁先执行,后面的任务需要等待前面的任务执行。
  * ==FAIR(公平调度)==
    * 支持在调度池中为任务进行分组,不同的调度池权重不同,任务可以按照权重来决定执行顺序。避免大任务运行时间长,占用了大量的资源,后面小任务无法提交运行。

### spark任务的分配资源策略

* 给application分配资源选择worker(executor),现在有两种策略
  * ==尽量的打散==,即一个Application尽可能多的分配到不同的节点。这个可以通过设置spark.deploy.spreadOut来实现。默认值为true,即尽量的打散(默认)
    * 可以充分的发挥数据的本地性,提升执行效率

  * ==尽量的集中==,即一个Application尽量分配到尽可能少的节点。

  ```sh
  # 假如集群有两个节点,worker1,worker2。各 cores 4 memory 128G,需要分配 4cores。 32g
  # 1. 尽量的集中(尽可能分配更少的节点,worker1 4 32g)
  # 2. 尽量打散(尽可能多的分配,worker1 worker2按照顺序依次分配,不够再次循环)