本文适合有自定义窗口需求的读者阅读。读者在阅读前最好能知道 Flink 中窗口组件以及阅读过 SessionWindow,CountWindow,TimeWindow 的源码。因为这个自定义窗口的思路是借鉴了它们的实现。
需求分析
比如说我们需要实现检测恶意登录的功能,一小时内某个 ip 登录了 5 个账号,则认为这个 ip 有问题。把 ip 拉黑一小时,并且输出这一小时内所登录的全部账号。在这个 ip 一小时内没有新账号登录后则认为恢复正常。
ip | 账号 | 窗口 | 输出 |
---|---|---|---|
13.108.77.58 | user1 | user1 | 无 |
13.108.77.58 | user2 | user1,user2 | 无 |
13.108.77.58 | user3 | user1,user2,user3 | 无 |
13.108.77.58 | user4 | user1,user2,user3,user4 | 无 |
13.108.77.58 | user4 | user1,user2,user3,user4 | 无 |
13.108.77.58 | user5 | user2,user3,user4,user5 | user1,user2,user3,user4,user5 |
13.108.77.58 | user6 | user3,user4,user5,user6 | user6 |
首先我们看1-5行,user1-4到达时,窗口未到阈值,不会被触发,同时窗口会对重复的用户 Id 去重。
在第 6 行时,达到了阈值,把之前所有的作弊用户 id 都输出,同时删除最旧的元素。
在最后一行,user6 到达后,只输出user6,并且删除最旧的元素。
同时我们需要实现当一段时间没有元素到达时,删除这个窗口,有点类似于 session window 的实现。
ip | 账号 | 窗口 | 输出 |
---|---|---|---|
13.108.77.58 | user1 | user1 | 无 |
13.108.77.58 | user2 | user1,user2 | 无 |
13.108.77.58 | user3 | user1,user2,user3 | 无 |
13.108.77.58 | user4 | user1,user2,user3,user4 | 无 |
一小时没有元素到达:删除窗口
实现思路分析
按照需求听起来我们需要实现一个带有 distinct count 功能的 session window。
-
Trigger
- 注册计时器并且删除前一个计时器,当时间到了以后触发 PURGE 删除窗口
- 记录窗口元素个数,当个数到达阈值时触发 FIRE
-
Evictor
- 删除过期数据,因为可能会有其实元素已经过期但是由于没有触发 FIRE,数据还保留着
- 给 window 中的元素去重
- 触发 process 后,如果滑动窗口大于等于阈值,则删除最后一个元素,保证窗口只持有阈值数量的数据
-
process
- 更新去重后的窗口大小
- 判断去重后的窗口元素是否阈值达到阈值,如果是则输出
具体实现
基础事件定义
为了避免重复输出,首先我们需要定义好基础事件,在向外输出前会判断一下是否输出过了,如果输出过了则不重复输出。
/**
* 带有事件和是否输出过标记的基础事件
*
* @author zjmeow
*/
@Data
public abstract class BaseEvent {
private long time;
private boolean outputted;
}
继承基础事件,简单的开发一个登录事件,事件中只有用户 id。
/**
* 登录事件
*
* @author zjmeow
*/
@Data
public class LoginEvent extends BaseEvent {
private String userId;
private String ip;
}
接下来自定义个数据源进行测试,这些都没什么好讲的。
/**
* 测试数据源
*
* @author zjmeow
*/
public class EventSource implements SourceFunction<LoginEvent> {
private boolean running = true;
public static final int GAP_TIME = 500;
@Override
public void run(SourceContext<LoginEvent> ctx) throws Exception {
while (running) {
LoginEvent event = new LoginEvent();
event.setIp("127.0.0.1");
// 生成用户id
event.setUserId(System.currentTimeMillis() + "");
ctx.collect(event);
Thread.sleep(GAP_TIME);
}
}
@Override
public void cancel() {
running = false;
}
}
trigger 实现
Trigger 需要实现
- 注册计时器并且删除前一个计时器,当时间到了以后触发 PURGE 删除窗口
- 记录窗口元素个数,当个数到达阈值时触发 FIRE
为了避免逻辑过于复杂,这里只实现 EventTime 的 Trigger,如果需要同时实现 ProcessTime 和 EventTime,把 TimeCharacteristic 传入构造函数,然后在相应的地方进行替换即可。
/**
* 类似于加上 distinct count 功能的 session windows
* 一段时间没
*
* @author zjmeow
*/
public class CountTriggerWithTimeout extends Trigger<Object, GlobalWindow> {
/**
* 时间到了删除窗口
*/
private Long sessionTimeout;
/**
* 窗口最大容纳元素
*/
private int maxCount;
/**
* 最后一个元素到达时间
*/
private ReducingStateDescriptor<Long> lastSeenDesc = new ReducingStateDescriptor<>("last-seen", new Sum(),
Long.class);
/**
* 窗口中元素的个数
*/
private ReducingStateDescriptor<Long> countDesc = new ReducingStateDescriptor<>("count", new Sum(), Long.class);
public CountTriggerWithTimeout(int maxCount, long timeout) {
this.maxCount = maxCount;
this.sessionTimeout = timeout;
}
@Override
public TriggerResult onElement(Object element, long timestamp, GlobalWindow globalWindow, TriggerContext context)
throws Exception {
// 注册定时器,窗口超过一定时间没有元素到达则会触发
context.registerEventTimeTimer(timestamp + sessionTimeout);
// 以下代码用于删除上次的定时器
ReducingState<Long> lastSeenState = context.getPartitionedState(this.lastSeenDesc);
// 初始化 ReducingState
lastSeenState.add(0L);
long lastSeen = lastSeenState.get();
if (lastSeen != 0) {
context.deleteEventTimeTimer(lastSeen + sessionTimeout);
}
lastSeenState.add(timestamp - lastSeen);
//以下代码用于更新窗口大小
ReducingState<Long> count = context.getPartitionedState(this.countDesc);
count.add(1L);
long num = count.get();
// 计数用于优化性能,没到阈值不fire,这个计数会在Process中也更新
if (num >= maxCount) {
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
/**
* 计时器触发,一段时间没有元素到达,删除窗口
*/
@Override
public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
ReducingState<Long> lastSeenState = ctx.getPartitionedState(lastSeenDesc);
Long lastSeen = lastSeenState.get();
// 一段时间没元素到达直接删了
if (time - lastSeen >= sessionTimeout) {
System.out.println("CTX: " + ctx + " Purge Time " + time + " last seen " + lastSeen);
return TriggerResult.PURGE;
}
return TriggerResult.CONTINUE;
}
}
Evictor实现
这里有个坑,window 中的元素是无序的,并不会讲究先来后到,所以这里需要自己排序。
在 Trigger 触发后,需要把元素丢给 Evictor,Evictor 进行以下操作:
- 删除过期数据,因为可能会有其实元素已经过期但是由于没有触发 FIRE,数据还保留着
- 给 window 中的元素去重
- 触发 process 后,如果滑动窗口大于等于阈值,则删除最后一个元素,保证窗口只持有阈值数量的数据
/**
* 攒够了去重然后丢给process判断
*
* @author zjmeow
*/
public class DistinctEvictor<T> implements Evictor<T, Window> {
/**
* 为了做的通用,利用反射和字段名来提取需要去重的字段
*/
private String fieldName;
private Class<T> clazz;
private int threshold;
private long timeout;
public DistinctEvictor(String fieldName, Class<T> clazz, int threshold, long timeout) {
this.fieldName = fieldName;
this.clazz = clazz;
this.threshold = threshold;
this.timeout = timeout;
}
/**
* 根据字段去重
*/
@Override
public void evictBefore(Iterable<TimestampedValue<T>> iterable, int size, Window window,
EvictorContext context) {
long watermark = context.getCurrentWatermark();
Iterator<TimestampedValue<T>> iterator = iterable.iterator();
Set<String> has = new HashSet<>();
// 遍历窗口,删除超时元素和重复元素
while (iterator.hasNext()) {
TimestampedValue<T> timestampedValue = iterator.next();
T t = timestampedValue.getValue();
long time = timestampedValue.getTimestamp();
// 删除已经超时的元素
if (watermark - time > timeout) {
iterator.remove();
System.out.println("remove watermark - time:" + (watermark - time) / 3600000);
continue;
}
String val = null;
try {
Field field = clazz.getDeclaredField(fieldName);
field.setAccessible(true);
val = (String) field.get(t);
} catch (IllegalAccessException | NoSuchFieldException e) {
e.printStackTrace();
}
// 去重
if (has.contains(val)) {
iterator.remove();
}
has.add(val);
}
}
/**
* 在这里删除掉过期元素
*/
@Override
public void evictAfter(Iterable<TimestampedValue<T>> iterable, int size, Window window,
EvictorContext context) {
if (size >= threshold) {
removeOld(iterable);
}
}
/**
* 删掉最早到来的
* 注意,window里的元素是没有顺序的,也就是说如果想要删除最先到来的需要遍历整个window
*/
private void removeOld(Iterable<TimestampedValue<T>> iterable) {
Iterator<TimestampedValue<T>> iterator = iterable.iterator();
long smallest = Long.MAX_VALUE;
while (iterator.hasNext()) {
TimestampedValue<T> t = iterator.next();
smallest = Math.min(t.getTimestamp(), smallest);
}
Iterator<TimestampedValue<T>> removeIterator = iterable.iterator();
while (removeIterator.hasNext()) {
TimestampedValue<T> t = removeIterator.next();
if (t.getTimestamp() == smallest) {
removeIterator.remove();
System.out.println("remove one element size:" + Iterables.size(iterable) + " element: " + t);
return;
}
}
}
}
Process 实现
process 没有什么好讲的
- 更新去重后的窗口大小
- 判断去重后的窗口元素是否阈值达到阈值,如果是则输出
/**
* 处理元素
*
* @author zjmeow
*/
public class CountProcess<T extends BaseEvent, W extends Window> extends
ProcessWindowFunction<T, T, String, W> {
private int threshold;
private ReducingStateDescriptor<Long> count = new ReducingStateDescriptor<>("count", new Sum(), Long.class);
public CountProcess(int threshold) {
this.threshold = threshold;
}
@Override
public void process(String key, Context context, Iterable<T> iterable, Collector<T> collector) throws Exception {
// todo 优化 size方法会去遍历一次,这里可以少遍历几次
int size = Iterables.size(iterable);
ReducingState<Long> windowSize = context.windowState().getReducingState(count);
windowSize.add(size - windowSize.get());
if (size < threshold) {
return;
}
// 需要遍历一遍确认没有输出过,以免重复输出
for (T t : iterable) {
if (!t.isHadOutput()) {
collector.collect(t);
t.setHadOutput(true);
}
}
}
}
最后把这些元素组装起来
/**
* 自定义窗口,相当于count distinct time window 的结合
* 参考 https://stackoverflow.com/questions/49783676/apache-flink-count-window-with-timeout
*
* @author zjmeow
*/
public class Application {
private static final long WINDOW_TIME = Time.days(1).toMilliseconds();
private static final int HOLD = 5;
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<LoginEvent> dataStream = env.addSource(new EventSource());
dataStream = dataStream.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<LoginEvent>(Time.milliseconds(1)) {
@Override
public long extractTimestamp(LoginEvent event) {
return event.getTime();
}
});
dataStream.print();
dataStream.keyBy(LoginEvent::getIp)
.window(GlobalWindows.create())
.trigger(new CountTriggerWithTimeout(HOLD, WINDOW_TIME))
.evictor(new DistinctEvictor<>("userId", LoginEvent.class, HOLD, WINDOW_TIME))
.process(new CountProcess<>(HOLD))
.addSink(new SinkFunction<LoginEvent>() {
@Override
public void invoke(LoginEvent value, Context context) throws Exception {
System.out.println("blacklist:" + value);
}
});
env.execute();
}
}
总结
为啥不用其他方式实现
count window:首先如果某个 ip 只登录了一次就再也没登录过,count window 会越累计越多,得不到及时清除。
time window:需要在滑动和响应耗时上进行取舍,而且如果重复数据量大的话每次滑动会进行大量计算。
session window:没去重功能会导致大量重复触发
SQL:SQL 虽然能实现这个功能,但是会出现重复输出的问题,暂时不知道怎么解决,可能是我太菜了。
当然还可以使用 redis 的 zset 来实现这个功能,Flink 解析事件后写入,key 为 id,zset 内保存用户 id 和时间戳,设置好过期时间就能自动删除过期元素了。
如果有更好的方法,欢迎探讨。
优缺点
优点:
- 由于去重的以及最大只存储窗口大小的设计,存储占用少
- 响应极快
- 重复计算少
不足:
-
不知道还有没有更好以及更简洁的实现。
-
会有一部分数据在 state 中没有及时清除,但是也在可接受范围之内
-
在 process 中更新了元素个数同步给 trigger,感觉这个实现不是很优雅
完整代码实现:https://github.com/zjmeow/flink-window
参考
apache-flink-count-window-with-timeout
本文由 鸡米 创作,采用 知识共享署名4.0
国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为: Dec 8,2021