案例分析

  • 实际开发中维表服务是经常遇到的。例如:

    • 在水务项目中,来自水源地的不同监测点(压力、温度、氯气浓度)等监测点数据的报警规则改变。
    • 用户行为数据中需要进行大区的转码
    • 商品信息中心只包含了商品Id等

    上述案例中都可以使用维表服务解决。

维表方案

预加载

  • 在flink中,凡是继承了RichFunction的算子,都含有open方法。 可以再此方法里面实现对数据的预加载。
 RichMapFunction richMapFunction = new RichMapFunction<String>() {
                    @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
            }        
                       @Override
            public Object map(String value) throws Exception {
                return null;
            }
     }
// 如果数据变更频次较低的,可以追加每分钟频次的时间调度器。
  • 缺点
    • 维度更新延迟
    • 加载到内存中,不适合数据量较大的场景

热存储关联

  • IO变为异步IO,可使用cache缓存热数据(以guava为例)
Cache<String, String> cache = CacheBuilder
                .newBuilder()
                .expireAfterAccess(300,TimeUnit.MILLISECONDS) // 按照时间访问过期
                .expireAfterWrite(300,TimeUnit.MICROSECONDS) // 写入时间过期
                .build();
// 维度数据加载延迟

广播维表

spark

  • spark的broacast变量定义为只读,所以只能手动更新
// 监测点上报数据格式: P001,10(pointKey,value)
val dataStream = KafkaUtils.createDirectStream[String,String](
      sc,LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String,String](topic1, kafkaParams))

// 报警规则数据:P001,10,1,1(pointKey,max,min,isRemove)
val broadCastStream = KafkaUtils.createDirectStream[String,String](
    sc,LocationStrategies.PreferConsistent,
    ConsumerStrategies.Subscribe[String,String](topic2, kafkaParams))

// 首次加载所有数据
var broadCastDomainMap = sc.sparkContext.broadcast(InitDomainMap)

// 对broadCastStream监测点报警规则进行广播
broadCastStream.foreachRDD(rdd=>{
  // 获取当前广播值
  val mapData = broadCastDomainMap.value
  // 删除广播
  broadCastDomainMap.unpersist()
  // 获取规则流中的数据,转为map
  val streamingMap = rdd.map(line => {
    val lines = line.value().split(",")
    (lines(0), new MonitorPoint(lines(0), lines(1).toLong, lines(2).toLong, lines.length >= 4 && lines(3).equals("1")))
  }).collect().toMap
  // 表示已删除的规则,移除广播。
  val removeKeys = streamingMap.filter(_._2.isRemove).map(_._1).toList
  val broadcastData = mapData.filter(line => !removeKeys.contains(line._1)) ++ (streamingMap.filter(!_._2.isRemove))
  // 重新进行广播
  broadCastDomainMap = rdd.sparkContext.broadcast(broadcastData)
})

// 监测数据和报警规则关联
dataStream.map(line=>{
  val lines = line.value().split(",")
  val value = broadCastDomainMap.value
  (lines(0),lines(1),value.get(lines(0)))
}).print()

  /**
   * 第一次加载所有数据
   */
  def InitDomainMap:Map[String,MonitorPoint] = {
    val map = Map(
      "P001" -> new MonitorPoint("P001",10,1,false),
      "P002" -> new MonitorPoint("P002",10,1,false)
    )
    map
  }
  • flink广播变量支持增量更新到State
// 存储维度信息
MapStateDescriptor<String, AlarmParam> alarmData = 
                                  new MapStateDescriptor<String,AlarmParam>(
                                                  "alarmData", 
                                                  Types.STRING,
                                                  Types.POJO(AlarmParam.class)
                                                );
// kafka数据流
DataStreamSource<String> dataStreamSource = env.addSource(flinkKafkaConsumer010);

// 报警数据流
DataStreamSource<String> broadcastStreamSource = env.addSource(broadcastStream);

// 处理函数
BroadcastProcessFunction processFunction = 
  new BroadcastProcessFunction<String, String, Tuple3<String, Long, AlarmParam>>() {
        @Override
        public void processElement(String value, ReadOnlyContext ctx, Collector out){
          String[] split = value.split(",");
          // 获取广播数据
          ReadOnlyBroadcastState broadcastState = ctx.getBroadcastState(alarmData);
          out.collect(1,2,broadcastState.get(1)); // 关联数据
        }

        @Override
        public void processBroadcastElement(String value, Context ctx, Collector out){
          BroadcastState broadcastState = ctx.getBroadcastState(alarmData);
          String[] split = value.split(",");
          boolean isRemove = split[3].equals("1");
          if (split.length >= 4 && !isRemove) {
            // 增量更新数据
            broadcastState.put(split[0], new AlarmParam(1,2,3,4));
          } else {
            // 已删除数据从广播去除
            broadcastState.remove(split[0]);
          }
        }
      };

        SingleOutputStreamOperatorresult = dataStreamSource
                .connect(broadcastStreamSource.broadcast(alarmData))
                .process(processFunction);
        result.print();