在实时计算作业中,常常需要动态更改一些配置。例如,实时日志提取和转换(ETL)服务需要在日志格式或字段发生变化时仍能正确解析;实时自然语言处理(NLP)服务需要及时识别新增的类别词汇和停用词;实时风控服务则需要根据业务情况调整触发警报的规则。
那么问题来了:每次配置变化是否都需要手动修改代码并重启作业呢?显然不是,因为实时任务的目标是7x24小时不间断运行。Spark Streaming和Flink的广播机制都可以实现这一目标,本文将分别介绍这两种解决方案。
Spark Core 中的广播机制主要用于将只读数据缓存在 Driver 和 Executor 之间共享。虽然 Spark 文档提到广播变量的设计初衷是作为只读缓存,但原生并不支持广播变量的动态更新。因此,我们需要进行一些自定义处理。下面提供一个示例代码,展示了如何定期更新广播变量。
```java
public class PeriodicBroadcastUpdater {
private static final int UPDATE_PERIOD = 60 * 1000; // 更新周期为60秒
private static volatile PeriodicBroadcastUpdater instance;
private Broadcast
private PeriodicBroadcastUpdater() {}
public static PeriodicBroadcastUpdater getInstance() {
if (instance == null) {
synchronized (PeriodicBroadcastUpdater.class) {
if (instance == null) {
instance = new PeriodicBroadcastUpdater();
}
}
}
return instance;
}
public String updateAndGet(SparkContext sc) {
long now = System.currentTimeMillis();
long offset = now - lastUpdate;
if (offset > UPDATE_PERIOD || broadcast == null) {
if (broadcast != null) {
broadcast.unpersist();
}
lastUpdate = now;
String value = fetchBroadcastValue();
broadcast = JavaSparkContext.fromSparkContext(sc).broadcast(value);
}
return broadcast.getValue();
}
private String fetchBroadcastValue() {
// 实际逻辑
return "example";
}
} ```
在 Streaming 主程序中,可以这样使用:
java
dStream.transform(rdd -> {
String broadcastValue = PeriodicBroadcastUpdater.getInstance().updateAndGet(rdd.context());
// 使用广播变量处理RDD
});
这种方法解决了问题,但并不是完美的,因为广播数据的更新仍然是周期性的,并且周期不能太短(需要考虑外部存储的压力)。接下来,我们将探讨 Flink 的处理方式。
Flink 中也有类似于 Spark 的广播变量功能,但 Flink 引入了更加灵活的广播状态(Broadcast State),这是一种特殊的 Operator State。它可以将一个流中的数据(通常是较大的批数据)广播到下游算子的所有并发实例中,从而实现真正的低延迟动态更新。
Flink 使用 MapStateDescriptor 来描述广播状态,方便存储多种不同的广播数据。下面是一个示例:
```java
MapStateDescriptor
// 将控制流转换为广播流
BroadcastStream
// 将数据流与广播流连接
BroadcastConnectedStream
connectedStream.process(new BroadcastProcessFunction
@Override
public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
BroadcastState<String, String> state = ctx.getBroadcastState(broadcastStateDesc);
// 对广播数据进行转换并写入广播状态
state.put("some_key", value);
}
}); ```
可以看出,BroadcastProcessFunction 的行为与 RichCoFlatMapFunction 和 CoProcessFunction 类似。其基本思路是通过 processBroadcastElement()
方法从广播流中获取数据,进行必要的转换后以键值对的形式写入广播状态。而 processElement()
方法则从广播状态中获取广播数据,并将其与原始数据流结合处理。这意味着广播状态起到了两个流之间的桥梁作用。
此外,值得注意的是,processElement()
方法获取的 Context 实例是 ReadOnlyContext,这意味着只有在广播流一侧才能修改广播状态,而数据流一侧只能读取广播状态。这种设计提供了重要的一致性保证,避免了不同操作员实例产生截然不同结果的可能性。
希望以上内容对你有所帮助!