Flink 自定义窗口实践-恶意登录用户识别实现
in 大数据 with 0 comment

Flink 自定义窗口实践-恶意登录用户识别实现

in 大数据 with 0 comment

本文适合有自定义窗口需求的读者阅读。读者在阅读前最好能知道 Flink 中窗口组件以及阅读过 SessionWindow,CountWindow,TimeWindow 的源码。因为这个自定义窗口的思路是借鉴了它们的实现。

需求分析

比如说我们需要实现检测恶意登录的功能,一小时内某个 ip 登录了 5 个账号,则认为这个 ip 有问题。把 ip 拉黑一小时,并且输出这一小时内所登录的全部账号。在这个 ip 一小时内没有新账号登录后则认为恢复正常。

ip账号窗口输出
13.108.77.58user1user1
13.108.77.58user2user1,user2
13.108.77.58user3user1,user2,user3
13.108.77.58user4user1,user2,user3,user4
13.108.77.58user4user1,user2,user3,user4
13.108.77.58user5user2,user3,user4,user5user1,user2,user3,user4,user5
13.108.77.58user6user3,user4,user5,user6user6

首先我们看1-5行,user1-4到达时,窗口未到阈值,不会被触发,同时窗口会对重复的用户 Id 去重。

在第 6 行时,达到了阈值,把之前所有的作弊用户 id 都输出,同时删除最旧的元素。

在最后一行,user6 到达后,只输出user6,并且删除最旧的元素。

同时我们需要实现当一段时间没有元素到达时,删除这个窗口,有点类似于 session window 的实现。

ip账号窗口输出
13.108.77.58user1user1
13.108.77.58user2user1,user2
13.108.77.58user3user1,user2,user3
13.108.77.58user4user1,user2,user3,user4

一小时没有元素到达:删除窗口

实现思路分析

按照需求听起来我们需要实现一个带有 distinct count 功能的 session window。

具体实现

基础事件定义

为了避免重复输出,首先我们需要定义好基础事件,在向外输出前会判断一下是否输出过了,如果输出过了则不重复输出。

/**
 * 带有事件和是否输出过标记的基础事件
 *
 * @author zjmeow
 */
@Data
public abstract class BaseEvent {
    private long time;
    private boolean outputted;
}

继承基础事件,简单的开发一个登录事件,事件中只有用户 id。

/**
 * 登录事件
 *
 * @author zjmeow
 */
@Data
public class LoginEvent extends BaseEvent {

    private String userId;
    private String ip;
}

接下来自定义个数据源进行测试,这些都没什么好讲的。

/**
 * 测试数据源
 *
 * @author zjmeow
 */
public class EventSource implements SourceFunction<LoginEvent> {

    private boolean running = true;
    public static final int GAP_TIME = 500;

    @Override
    public void run(SourceContext<LoginEvent> ctx) throws Exception {
        while (running) {
            LoginEvent event = new LoginEvent();
            event.setIp("127.0.0.1");
            // 生成用户id
            event.setUserId(System.currentTimeMillis() + "");
            ctx.collect(event);
            Thread.sleep(GAP_TIME);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}

trigger 实现

Trigger 需要实现

为了避免逻辑过于复杂,这里只实现 EventTime 的 Trigger,如果需要同时实现 ProcessTime 和 EventTime,把 TimeCharacteristic 传入构造函数,然后在相应的地方进行替换即可。

/**
 * 类似于加上 distinct count 功能的 session windows
 * 一段时间没
 *
 * @author zjmeow
 */
public class CountTriggerWithTimeout extends Trigger<Object, GlobalWindow> {

    /**
     * 时间到了删除窗口
     */
    private Long sessionTimeout;
    /**
     * 窗口最大容纳元素
     */
    private int maxCount;
    /**
     * 最后一个元素到达时间
     */
    private ReducingStateDescriptor<Long> lastSeenDesc = new ReducingStateDescriptor<>("last-seen", new Sum(),
            Long.class);
    /**
     * 窗口中元素的个数
     */
    private ReducingStateDescriptor<Long> countDesc = new ReducingStateDescriptor<>("count", new Sum(), Long.class);

    public CountTriggerWithTimeout(int maxCount, long timeout) {
        this.maxCount = maxCount;
        this.sessionTimeout = timeout;
    }

    @Override
    public TriggerResult onElement(Object element, long timestamp, GlobalWindow globalWindow, TriggerContext context)
            throws Exception {
        // 注册定时器,窗口超过一定时间没有元素到达则会触发
        context.registerEventTimeTimer(timestamp + sessionTimeout);
        // 以下代码用于删除上次的定时器
        ReducingState<Long> lastSeenState = context.getPartitionedState(this.lastSeenDesc);
        // 初始化 ReducingState
        lastSeenState.add(0L);
        long lastSeen = lastSeenState.get();
        if (lastSeen != 0) {
            context.deleteEventTimeTimer(lastSeen + sessionTimeout);
        }
        lastSeenState.add(timestamp - lastSeen);
        //以下代码用于更新窗口大小
        ReducingState<Long> count = context.getPartitionedState(this.countDesc);
        count.add(1L);
        long num = count.get();
        // 计数用于优化性能,没到阈值不fire,这个计数会在Process中也更新
        if (num >= maxCount) {
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }

    /**
     * 计时器触发,一段时间没有元素到达,删除窗口
     */
    @Override
    public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
        ReducingState<Long> lastSeenState = ctx.getPartitionedState(lastSeenDesc);
        Long lastSeen = lastSeenState.get();
        // 一段时间没元素到达直接删了
        if (time - lastSeen >= sessionTimeout) {
            System.out.println("CTX: " + ctx + " Purge Time " + time + " last seen " + lastSeen);
            return TriggerResult.PURGE;
        }
        return TriggerResult.CONTINUE;
    }
}

Evictor实现

这里有个坑,window 中的元素是无序的,并不会讲究先来后到,所以这里需要自己排序。

在 Trigger 触发后,需要把元素丢给 Evictor,Evictor 进行以下操作:

/**
 * 攒够了去重然后丢给process判断
 *
 * @author zjmeow
 */
public class DistinctEvictor<T> implements Evictor<T, Window> {

    /**
     * 为了做的通用,利用反射和字段名来提取需要去重的字段
     */
    private String fieldName;
    private Class<T> clazz;
    private int threshold;
    private long timeout;

    public DistinctEvictor(String fieldName, Class<T> clazz, int threshold, long timeout) {

        this.fieldName = fieldName;
        this.clazz = clazz;
        this.threshold = threshold;
        this.timeout = timeout;
    }

    /**
     * 根据字段去重
     */
    @Override
    public void evictBefore(Iterable<TimestampedValue<T>> iterable, int size, Window window,
            EvictorContext context) {
        long watermark = context.getCurrentWatermark();
        Iterator<TimestampedValue<T>> iterator = iterable.iterator();
        Set<String> has = new HashSet<>();
        // 遍历窗口,删除超时元素和重复元素
        while (iterator.hasNext()) {
            TimestampedValue<T> timestampedValue = iterator.next();
            T t = timestampedValue.getValue();
            long time = timestampedValue.getTimestamp();
            // 删除已经超时的元素
            if (watermark - time > timeout) {
                iterator.remove();
                System.out.println("remove watermark - time:" + (watermark - time) / 3600000);
                continue;
            }
            String val = null;
            try {
                Field field = clazz.getDeclaredField(fieldName);
                field.setAccessible(true);
                val = (String) field.get(t);
            } catch (IllegalAccessException | NoSuchFieldException e) {
                e.printStackTrace();
            }
            // 去重
            if (has.contains(val)) {
                iterator.remove();
            }
            has.add(val);
        }
    }

  
    /**
     * 在这里删除掉过期元素
     */
    @Override
    public void evictAfter(Iterable<TimestampedValue<T>> iterable, int size, Window window,
            EvictorContext context) {
        if (size >= threshold) {
            removeOld(iterable);
        }
    }

    /**
     * 删掉最早到来的
     * 注意,window里的元素是没有顺序的,也就是说如果想要删除最先到来的需要遍历整个window
     */
    private void removeOld(Iterable<TimestampedValue<T>> iterable) {
        Iterator<TimestampedValue<T>> iterator = iterable.iterator();
        long smallest = Long.MAX_VALUE;
        while (iterator.hasNext()) {
            TimestampedValue<T> t = iterator.next();
            smallest = Math.min(t.getTimestamp(), smallest);
        }
        Iterator<TimestampedValue<T>> removeIterator = iterable.iterator();
        while (removeIterator.hasNext()) {
            TimestampedValue<T> t = removeIterator.next();
            if (t.getTimestamp() == smallest) {
                removeIterator.remove();
                System.out.println("remove one element size:" + Iterables.size(iterable) + " element: " + t);
                return;
            }
        }

    }
}


Process 实现

process 没有什么好讲的


/**
 * 处理元素
 *
 * @author zjmeow
 */
public class CountProcess<T extends BaseEvent, W extends Window> extends
        ProcessWindowFunction<T, T, String, W> {

    private int threshold;
    private ReducingStateDescriptor<Long> count = new ReducingStateDescriptor<>("count", new Sum(), Long.class);

    public CountProcess(int threshold) {
        this.threshold = threshold;
    }


    @Override
    public void process(String key, Context context, Iterable<T> iterable, Collector<T> collector) throws Exception {
        // todo 优化 size方法会去遍历一次,这里可以少遍历几次
        int size = Iterables.size(iterable);
        ReducingState<Long> windowSize = context.windowState().getReducingState(count);
        windowSize.add(size - windowSize.get());
        if (size < threshold) {
            return;
        }
        // 需要遍历一遍确认没有输出过,以免重复输出
        for (T t : iterable) {
            if (!t.isHadOutput()) {
                collector.collect(t);
                t.setHadOutput(true);
            }
        }

    }


}

最后把这些元素组装起来

/**
 * 自定义窗口,相当于count distinct time window 的结合
 * 参考 https://stackoverflow.com/questions/49783676/apache-flink-count-window-with-timeout
 *
 * @author zjmeow
 */
public class Application {

    private static final long WINDOW_TIME = Time.days(1).toMilliseconds();

    private static final int HOLD = 5;

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        DataStream<LoginEvent> dataStream = env.addSource(new EventSource());
        dataStream = dataStream.assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor<LoginEvent>(Time.milliseconds(1)) {
                    @Override
                    public long extractTimestamp(LoginEvent event) {
                        return event.getTime();
                    }
                });
        dataStream.print();
        dataStream.keyBy(LoginEvent::getIp)
                .window(GlobalWindows.create())
                .trigger(new CountTriggerWithTimeout(HOLD, WINDOW_TIME))
                .evictor(new DistinctEvictor<>("userId", LoginEvent.class, HOLD, WINDOW_TIME))
                .process(new CountProcess<>(HOLD))
                .addSink(new SinkFunction<LoginEvent>() {
                    @Override
                    public void invoke(LoginEvent value, Context context) throws Exception {
                        System.out.println("blacklist:" + value);
                    }
                });
        env.execute();
    }
}

总结

为啥不用其他方式实现

count window:首先如果某个 ip 只登录了一次就再也没登录过,count window 会越累计越多,得不到及时清除。

time window:需要在滑动和响应耗时上进行取舍,而且如果重复数据量大的话每次滑动会进行大量计算。

session window:没去重功能会导致大量重复触发

SQL:SQL 虽然能实现这个功能,但是会出现重复输出的问题,暂时不知道怎么解决,可能是我太菜了。

当然还可以使用 redis 的 zset 来实现这个功能,Flink 解析事件后写入,key 为 id,zset 内保存用户 id 和时间戳,设置好过期时间就能自动删除过期元素了。

如果有更好的方法,欢迎探讨。

优缺点

优点:

  1. 由于去重的以及最大只存储窗口大小的设计,存储占用少
  2. 响应极快
  3. 重复计算少

不足:

  1. 不知道还有没有更好以及更简洁的实现。

  2. 会有一部分数据在 state 中没有及时清除,但是也在可接受范围之内

  3. 在 process 中更新了元素个数同步给 trigger,感觉这个实现不是很优雅

完整代码实现:https://github.com/zjmeow/flink-window

参考

apache-flink-count-window-with-timeout

Flink 原理与实现:Window 机制