Spark/Flink广播完成作业配置动态更新
作者头像
  • 米伽慧展
  • 2019-12-03 12:40:01 1

关键词:Spark Flink 动态广播

前言

在实时计算作业中,常常需要动态更改一些配置。例如,实时日志提取和转换(ETL)服务需要在日志格式或字段发生变化时仍能正确解析;实时自然语言处理(NLP)服务需要及时识别新增的类别词汇和停用词;实时风控服务则需要根据业务情况调整触发警报的规则。

那么问题来了:每次配置变化是否都需要手动修改代码并重启作业呢?显然不是,因为实时任务的目标是7x24小时不间断运行。Spark Streaming和Flink的广播机制都可以实现这一目标,本文将分别介绍这两种解决方案。

Spark Streaming的广播机制

Spark Core 中的广播机制主要用于将只读数据缓存在 Driver 和 Executor 之间共享。虽然 Spark 文档提到广播变量的设计初衷是作为只读缓存,但原生并不支持广播变量的动态更新。因此,我们需要进行一些自定义处理。下面提供一个示例代码,展示了如何定期更新广播变量。

```java public class PeriodicBroadcastUpdater { private static final int UPDATE_PERIOD = 60 * 1000; // 更新周期为60秒 private static volatile PeriodicBroadcastUpdater instance; private Broadcast broadcast; private long lastUpdate = 0L;

private PeriodicBroadcastUpdater() {}

public static PeriodicBroadcastUpdater getInstance() {
    if (instance == null) {
        synchronized (PeriodicBroadcastUpdater.class) {
            if (instance == null) {
                instance = new PeriodicBroadcastUpdater();
            }
        }
    }
    return instance;
}

public String updateAndGet(SparkContext sc) {
    long now = System.currentTimeMillis();
    long offset = now - lastUpdate;
    if (offset > UPDATE_PERIOD || broadcast == null) {
        if (broadcast != null) {
            broadcast.unpersist();
        }
        lastUpdate = now;
        String value = fetchBroadcastValue();
        broadcast = JavaSparkContext.fromSparkContext(sc).broadcast(value);
    }
    return broadcast.getValue();
}

private String fetchBroadcastValue() {
    // 实际逻辑
    return "example";
}

} ```

在 Streaming 主程序中,可以这样使用:

java dStream.transform(rdd -> { String broadcastValue = PeriodicBroadcastUpdater.getInstance().updateAndGet(rdd.context()); // 使用广播变量处理RDD });

这种方法解决了问题,但并不是完美的,因为广播数据的更新仍然是周期性的,并且周期不能太短(需要考虑外部存储的压力)。接下来,我们将探讨 Flink 的处理方式。

Flink的广播机制

Flink 中也有类似于 Spark 的广播变量功能,但 Flink 引入了更加灵活的广播状态(Broadcast State),这是一种特殊的 Operator State。它可以将一个流中的数据(通常是较大的批数据)广播到下游算子的所有并发实例中,从而实现真正的低延迟动态更新。

Flink 使用 MapStateDescriptor 来描述广播状态,方便存储多种不同的广播数据。下面是一个示例:

```java MapStateDescriptor broadcastStateDesc = new MapStateDescriptor<>( "broadcast-state-desc", Types.STRING, Types.STRING );

// 将控制流转换为广播流 BroadcastStream broadcastStream = controlStream .setParallelism(1) .broadcast(broadcastStateDesc);

// 将数据流与广播流连接 BroadcastConnectedStream connectedStream = sourceStream.connect(broadcastStream);

connectedStream.process(new BroadcastProcessFunction() { @Override public void processElement(String value, ReadOnlyContext ctx, Collector out) throws Exception { ReadOnlyBroadcastState state = ctx.getBroadcastState(broadcastStateDesc); for (Map.Entry entry : state.immutableEntries()) { String key = entry.getKey(); String value = entry.getValue(); // 根据广播数据处理原始数据流 } out.collect(value); }

@Override
public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
    BroadcastState<String, String> state = ctx.getBroadcastState(broadcastStateDesc);
    // 对广播数据进行转换并写入广播状态
    state.put("some_key", value);
}

}); ```

可以看出,BroadcastProcessFunction 的行为与 RichCoFlatMapFunction 和 CoProcessFunction 类似。其基本思路是通过 processBroadcastElement() 方法从广播流中获取数据,进行必要的转换后以键值对的形式写入广播状态。而 processElement() 方法则从广播状态中获取广播数据,并将其与原始数据流结合处理。这意味着广播状态起到了两个流之间的桥梁作用。

此外,值得注意的是,processElement() 方法获取的 Context 实例是 ReadOnlyContext,这意味着只有在广播流一侧才能修改广播状态,而数据流一侧只能读取广播状态。这种设计提供了重要的一致性保证,避免了不同操作员实例产生截然不同结果的可能性。

希望以上内容对你有所帮助!

    本文来源:图灵汇
责任编辑: : 米伽慧展
声明:本文系图灵汇原创稿件,版权属图灵汇所有,未经授权不得转载,已经协议授权的媒体下载使用时须注明"稿件来源:图灵汇",违者将依法追究责任。
    分享
作业配置完成广播更新动态FlinkSpark
    下一篇