反压机制

Spark Streaming 作为基于 微批次(micro-batch)的流处理框架,其流量的理想状态就是官方文档中所说的 “batches of data should be processed as fast as they are being generated”,即每一批次的处理时长 batchprocesstime 需要小于(但是又比较接近)我们设定的批次间隔 batchinterval。如果 batchprocesstime > batchinterval,说明程序的处理能力不足,积累的数据越来越多,最终会造成 Executor 内存溢出。如果 batchprocesstime << batch_interval ,说明系统有很长时间是空闲的,应该适当提升流量。

接收数据的两种方式

Receiver Stream

  • Spark Streaming 通过 Executor 里的 Receiver 组件源源不断地接收外部数据,并通过 BlockManager 将外部数据转化为 Spark 中的块进行存储。Spark Streaming 中Receiver方式机制的简单框图如下所示。

image-20200407155624244

  • 要限制 Receiver 接收数据的速率,可以在 SparkConf 中设置配置项 spark.streaming.receiver.maxRate,单位为数据条数/秒。

这两种方式的优点是设置非常简单,只需要通过实际业务的吞吐量估算一下使批次间隔和处理耗时基本达到平衡的速率就可以了。缺点是一旦业务量发生变化,就只能手动修改参数并重启 Streaming 程序。另外,人为估计的参数毕竟有可能不准,设置得太激进或太保守都不好

Direct Stream

  • 如果采用的是基于 Direct Stream 方式的 Kafka 连接,不经过 Receiver,就得设置配置项 spark.streaming.kafka.maxRatePerPartition 来限流,单位是每分区的数据条数/秒。

Spark Streaming 反压机制

  • 以上两种通过参数控制非常方便,但是一旦业务量发生改变只能手动修改配置文件并重启程序。 所以在Spark 1.5加入了动态流量控制方案。能够根据当前系统的处理速度智能地调节流量阈值,名为 反压(back pressure)机制
  • 控制反压机制的配置文件如下:
参数名称 默认值 说明
spark.streaming.backpressure.enabled false 是否启用反压机制
spark.streaming.backpressure.initialRate 初始最大接收速率。只适用于Receiver Stream,不适用于Direct Stream。
spark.streaming.backpressure.rateEstimator pid 速率控制器,Spark 默认只支持此控制器,可自定义。
spark.streaming.backpressure.pid.proportional 1.0 只能为非负值。当前速率与最后一批速率之间的差值对总控制信号贡献的权重。用默认值即可。
spark.streaming.backpressure.pid.integral 0.2 只能为非负值。比例误差累积对总控制信号贡献的权重。用默认值即可
spark.streaming.backpressure.pid.derived 0 只能为非负值。比例误差变化对总控制信号贡献的权重。用默认值即可
spark.streaming.backpressure.pid.minRate 100 只能为正数,最小速率

基于PID机制的速率估算器

  • org.apache.spark.streaming.scheduler.rate.RateEstimator是一个很短的特征,其中只给出了计算流量阈值的方法 compute() 的定义。它还有一个伴生对象用于创建速率估算器的实例,其中写出了更多关于反压机制的配置参数。
// 速率估算器
private[streaming] trait RateEstimator extends Serializable {
  def compute(
      time: Long,
      elements: Long,
      processingDelay: Long,
      schedulingDelay: Long): Option[Double]
}
// 半生对象
object RateEstimator {
  def create(conf: SparkConf, batchInterval: Duration): RateEstimator =
    conf.get("spark.streaming.backpressure.rateEstimator", "pid") match {
      case "pid" =>
        val proportional = conf.getDouble("spark.streaming.backpressure.pid.proportional", 1.0)
        val integral = conf.getDouble("spark.streaming.backpressure.pid.integral", 0.2)
        val derived = conf.getDouble("spark.streaming.backpressure.pid.derived", 0.0)
        val minRate = conf.getDouble("spark.streaming.backpressure.pid.minRate", 100)
        new PIDRateEstimator(batchInterval.milliseconds, proportional, integral, derived, minRate)
      case estimator =>
        throw new IllegalArgumentException(s"Unknown rate estimator: $estimator")
    }
}
  • RateEstimator 的唯一实现类是 PIDRateEstimator,亦即 spark.streaming.backpressure.rateEstimator 配置项的值只能为 pid。
private[streaming] class PIDRateEstimator(
    batchIntervalMillis: Long,
    proportional: Double,
    integral: Double,
    derivative: Double,
    minRate: Double
  ) extends RateEstimator with Logging {

  private var firstRun: Boolean = true
  private var latestTime: Long = -1L
  private var latestRate: Double = -1D
  private var latestError: Double = -1L

  require(
    batchIntervalMillis > 0,
    s"Specified batch interval $batchIntervalMillis in PIDRateEstimator is invalid.")
  require(
    proportional >= 0,
    s"Proportional term $proportional in PIDRateEstimator should be >= 0.")
  require(
    integral >= 0,
    s"Integral term $integral in PIDRateEstimator should be >= 0.")
  require(
    derivative >= 0,
    s"Derivative term $derivative in PIDRateEstimator should be >= 0.")
  require(
    minRate > 0,
    s"Minimum rate in PIDRateEstimator should be > 0")

  logInfo(s"Created PIDRateEstimator with proportional = $proportional, integral = $integral, " +
    s"derivative = $derivative, min rate = $minRate")

  def compute(
      time: Long, // in milliseconds
      numElements: Long,
      processingDelay: Long, // in milliseconds
      schedulingDelay: Long // in milliseconds
    ): Option[Double] = {
    logTrace(s"\ntime = $time, # records = $numElements, " +
      s"processing time = $processingDelay, scheduling delay = $schedulingDelay")
    this.synchronized {
      if (time > latestTime && numElements > 0 && processingDelay > 0) {
        val delaySinceUpdate = (time - latestTime).toDouble / 1000
        val processingRate = numElements.toDouble / processingDelay * 1000
        val error = latestRate - processingRate
        val historicalError = schedulingDelay.toDouble * processingRate / batchIntervalMillis
        val dError = (error - latestError) / delaySinceUpdate
        val newRate = (latestRate - proportional * error -
                                    integral * historicalError -
                                    derivative * dError).max(minRate)
        logTrace(s"""
            | latestRate = $latestRate, error = $error
            | latestError = $latestError, historicalError = $historicalError
            | delaySinceUpdate = $delaySinceUpdate, dError = $dError
            """.stripMargin)
        latestTime = time
        if (firstRun) {
          latestRate = processingRate
          latestError = 0D
          firstRun = false
          logTrace("First run, rate estimation skipped")
          None
        } else {
          latestRate = newRate
          latestError = error
          logTrace(s"New rate = $newRate")
          Some(newRate)
        }
      } else {
        logTrace("Rate estimation skipped")
        None
      }
    }
  }
}
  • PIDRateEstimator 充分运用了工控领域中常见的 PID 控制器的思想。所谓 PID 控制器,即比例(Proportional)-积分(Integral)-微分(Derivative)控制器,本质上是一种反馈回路(loop feedback)。它把收集到的数据和一个设定值(setpoint)进行比较,然后用它们之间的差计算新的输入值,该输入值可以让系统数据尽量接近或者达到设定值。

image-20200407162210511

image-20200407162234229

  • 其中 e(t) 代表误差,即设定值与回授值之间的差。也就是说,比例单元对应当前误差,积分单元对应过去累积误差,而微分单元对应将来误差。控制三个单元的增益因子分别为 Kp、Ki、Kd。

  • 回到 PIDRateEstimator 的源码来,对应以上的式子,我们可以得知:

    • 处理速率的设定值其实就是上一批次的处理速率 latestRate,回授值就是这一批次的速率 processingRate,误差 error 自然就是两者之差。
    • 过去累积误差在这里体现为调度时延的过程中数据积压的速度,也就是schedulingDelay * processingRate / batchInterval。
    • 将来误差就是上面算出的error对时间微分的结果。
  • 将上面三者综合起来,就可以根据 Spark Streaming 在上一批次以及这一批次的处理速率,估算出一个合适的用于下一批次的流量阈值。比例增益 Kp 由 spark.streaming.backpressure.pid.proportional 控制,默认值 1.0;积分增益 Ki 由 spark.streaming.backpressure.pid.integral 控制,默认值0.2;微分增益Kd由 spark.streaming.backpressure.pid.derived控制,默认值0.0。

    RateController

  • 动态流量控制器,类org.apache.spark.streaming.scheduler.RateController是动态流量控制器的核心。

private[streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator) extends StreamingListener  // 抽象类继承自 StreamingListener 特征,表示它是一个 Streaming 监听器。
with Serializable {
  // 监听 StreamingListenerBatchCompleted 事件,该事件表示一个批次已经处理完成。
  override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
    val elements = batchCompleted.batchInfo.streamIdToInputInfo

    for {
      // 处理完成的时间戳 processingEndTime
      processingEnd <- batchCompleted.batchInfo.processingEndTime
      // 实际处理时长 processingDelay(从批次的第一个 job 开始处理到最后一个job处理完成经过的时间)
      workDelay <- batchCompleted.batchInfo.processingDelay
      // 调度时延schedulingDelay(从批次被提交给 Streaming JobScheduler 到第一个 job 开始处理经过的时间)。
      waitDelay <- batchCompleted.batchInfo.schedulingDelay
      // 批次输入数据的条数 numRecords。
      elems <- elements.get(streamUID).map(_.numRecords)
    } computeAndPublish(processingEnd, elems, workDelay, waitDelay)
  }
}

RateController两个子类

  • 两个子类分别对应两种数据接收方式。Receiver 和 Direct
ReceiverRateController
  • 通过 RateController 的子类 ReceiverRateController 实现的 publish() 抽象方法可知,新的流量阈值是发布给了 ReceiverTracker。

    /**
       * A RateController that sends the new rate to receivers, via the receiver tracker.
       */
    private[streaming] class ReceiverRateController(id: Int, estimator: RateEstimator)
    extends RateController(id, estimator) {
      override def publish(rate: Long): Unit =
      ssc.scheduler.receiverTracker.sendRateUpdate(id, rate)
    }
    
通过RPC发布流量阈值
  • 回来看 ReceiverTracker,顾名思义,它负责追踪 Receiver 的状态。其 sendRateUpdate() 方法如下

    // 1. endpoint 是 RPC 端点的引用。具体来说,是 ReceiverTrackerEndpoint 的引用
    private var endpoint: RpcEndpointRef = null
    
    /** Update a receiver's maximum ingestion rate */
    def sendRateUpdate(streamUID: Int, newRate: Long): Unit = synchronized {
      if (isTrackerStarted) {
        // 2. endpoint 将流 ID 与新的流量阈值包装在 UpdateReceiverRateLimit 消息中发送过去。
        endpoint.send(UpdateReceiverRateLimit(streamUID, newRate))
      }
    }
    
  • ReceiverTrackerEndpoint 收到这条消息后,会再将其包装为 UpdateRateLimit 消息并发送给 Receiver 注册时的 RPC 端点(位于 ReceiverSupervisorImpl 类中)。

  • ReceiverTrackerEndpoint 收到这条消息后,会再将其包装为 UpdateRateLimit 消息并发送给 Receiver 注册时的 RPC 端点(位于 ReceiverSupervisorImpl 类中)。
/** RpcEndpointRef for receiving messages from the ReceiverTracker in the driver */
private val endpoint = env.rpcEnv.setupEndpoint(
  "Receiver-" + streamId + "-" + System.currentTimeMillis(), new ThreadSafeRpcEndpoint {
    override val rpcEnv: RpcEnv = env.rpcEnv

    override def receive: PartialFunction[Any, Unit] = {
      case StopReceiver =>
      logInfo("Received stop signal")
      ReceiverSupervisorImpl.this.stop("Stopped by driver", None)
      case CleanupOldBlocks(threshTime) =>
      logDebug("Received delete old batch signal")
      cleanupOldBlocks(threshTime)
      case UpdateRateLimit(eps) =>
      logInfo(s"Received a new rate limit: $eps.")
      // 1. 收到该消息之后调用了 BlockGenerator.updateRate() 方法。
      registeredBlockGenerators.asScala.foreach { bg =>
        bg.updateRate(eps)
      }
    }
  })
// 2. BlockGenerator 是 RateLimiter 的子类,它负责将收到的流数据转化成块存储。updateRate() 方法是在 RateLimiter 抽象类中实现的。
private[streaming] class BlockGenerator(
    listener: BlockGeneratorListener,
    receiverId: Int,
    conf: SparkConf,
    clock: Clock = new SystemClock()
  ) extends RateLimiter(conf) with Logging {}
// 这里接住了guava的RateLimiter
import com.google.common.util.concurrent.{RateLimiter => GuavaRateLimiter}

private[receiver] abstract class RateLimiter(conf: SparkConf) extends Logging {
  // treated as an upper limit
  private val maxRateLimit = conf.getLong("spark.streaming.receiver.maxRate", Long.MaxValue)
  private lazy val rateLimiter = GuavaRateLimiter.create(getInitialRateLimit().toDouble)
  def waitToPush() {
    rateLimiter.acquire()
  }
  /**
   * Return the current rate limit. If no limit has been set so far, it returns {{{Long.MaxValue}}}.
   */
  def getCurrentLimit: Long = rateLimiter.getRate.toLong

  /**
   * Set the rate limit to `newRate`. The new rate will not exceed the maximum rate configured by
   * {{{spark.streaming.receiver.maxRate}}}, even if `newRate` is higher than that.
   *
   * @param newRate A new rate in records per second. It has no effect if it's 0 or negative.
   */
  // 3. 更细Rate
  private[receiver] def updateRate(newRate: Long): Unit =
    if (newRate > 0) {
      if (maxRateLimit > 0) {
        rateLimiter.setRate(newRate.min(maxRateLimit))
      } else {
        rateLimiter.setRate(newRate)
      }
    }
  /**
   * Get the initial rateLimit to initial rateLimiter
   */
  private def getInitialRateLimit(): Long = {
    math.min(conf.getLong("spark.streaming.backpressure.initialRate", maxRateLimit), maxRateLimit)
  }
}

image-20200407164947975

DirectKafkaRateController
  • DirectKafkaRateController只继承了RateController,并未在 publish() 加过多的内容,我们主要看一下DirectKafkaInputDStream这个类。
private[spark] class DirectKafkaInputDStream[K, V](
    _ssc: StreamingContext,
    locationStrategy: LocationStrategy,
    consumerStrategy: ConsumerStrategy[K, V],
    ppc: PerPartitionConfig
  ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging with CanCommitOffsets 

  /**
   * Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker.
   异步,通过ReceiverTracker发送新的速率限制
   */
  override protected[streaming] val rateController: Option[RateController] = {
    if (RateController.isBackPressureEnabled(ssc.conf)) {
      Some(new DirectKafkaRateController(id,
        RateEstimator.create(ssc.conf, context.graph.batchDuration)))
    } else {
      None
    }
  }
    // 主要实现在这里
  protected[streaming] def maxMessagesPerPartition(
    offsets: Map[TopicPartition, Long]): Option[Map[TopicPartition, Long]] = {
    //   调用RateController 的 def getLatestRate(): Long = rateLimit.get()获取rateLimiter    
    val estimatedRateLimit = rateController.map(_.getLatestRate())

    // calculate a per-partition rate limit based on current lag
    val effectiveRateLimitPerPartition = estimatedRateLimit.filter(_ > 0) match {
      case Some(rate) =>
          // offset range的消息量 totalLag
        val lagPerPartition = offsets.map { case (tp, offset) =>
          tp -> Math.max(offset - currentOffsets(tp), 0)
        }
        val totalLag = lagPerPartition.values.sum

        lagPerPartition.map { case (tp, lag) =>
          // 设置的maxRatePerPartition
          val maxRateLimitPerPartition = ppc.maxRatePerPartition(tp)
          val backpressureRate = Math.round(lag / totalLag.toFloat * rate)
          tp -> (if (maxRateLimitPerPartition > 0) {
             // 有效速率=取设置的maxRatePerPartition和预估的速率最小值
            Math.min(backpressureRate, maxRateLimitPerPartition)} else backpressureRate)
        }
      case None => offsets.map { case (tp, offset) => tp -> ppc.maxRatePerPartition(tp) }
    }

    if (effectiveRateLimitPerPartition.values.sum > 0) {
      val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
      Some(effectiveRateLimitPerPartition.map {
        case (tp, limit) => tp -> (secsPerBatch * limit).toLong
      })
    } else {
      None
    }
  }
  /**
   * A RateController to retrieve the rate from RateEstimator.
   * 实现RateController抽象类,并未具体实现publish方法,主要用来取回rate数据
   */
  private[streaming] class DirectKafkaRateController(id: Int, estimator: RateEstimator)
    extends RateController(id, estimator) {
    override def publish(rate: Long): Unit = ()
  }
}