learn and grow up

flink中function注意事项

字数统计: 394阅读时长: 1 min
2020/12/12 Share

写在前面

​ 最近在使用flink进行业务数据的实时计算以展示报表,在开发自测过程中遇到一个NPE,刚接触flink的我是一头雾水,硬啃了一早上源码,才找到问题,发现是flink序列,特此记录下

正文

  1. 问题如下:

    1. 死锁
  2. 分析:

    1. 通过debug分析,最初new的实例和最终sink的function不是一个实例。

      1. 最初
      2. 最终
    2. 那么猜测是实例被替换或者拷贝,那么先查阅代码发现sinkMQ线程初始化如下:

      1. init
      2. 可以看到这里的实例已经是被”替换“过的了,那么我们继续跟踪源代码,发现org.apache.flink.streaming.runtime.tasks.StreamTask#operatorChain是在这里被初始化的,那么继续debug跟踪该方法:org.apache.flink.streaming.runtime.tasks.StreamTask#beforeInvoke。跟踪步骤如下:
        1. init1
        2. init2
        3. init3
        4. 可以看到这里是把sinkFunction序列化为二进制数据,然后使用自定义classLoader等来进行了反序列化。
      3. 看到这里就明白了,flink会把我们定义的各种function序列化保存起来,方便分派给各个slot task,但是可以看到我们前面的自定义sinkFunction内确对charset字段使用了transient关键字,导致该字段在序列化和反序列化后变为了null
    3. 解决方案:

      1. 现在尝试去掉transient,启动直接报错了,原来是flink在准备阶段就做了是否可以被序列化的校验,因为charset该类没有实现Serializable接口,所以直接报错了,如下:
        1. clean
      2. 所以可以这么改:
        1. fix
          1. 问题解决。
    4. 思考:

CATALOG
  1. 1. 写在前面
  2. 2. 正文