본문 바로가기
BigData/Flink

Flink 훑어보기

by jeff제프 2020. 11. 20.

flink ?

"플링크는 구분된 데이터(bounded data) 및 연속 데이터(unbounded data) 스트림에 대해 stateful 하게 처리하기 위한 프레임워크 (분산 처리 엔진)이다"

알아듣기 쉽게 말하면, 

스트림 형태로 계속 오는 데이터들을 다루는데 강점이 있는 프레임워크다.

또한, 데이터를 단순히 처리하고 흘려보내는 것이 아니라, 상태를 저장하고 컨트롤이 가능하다. 

어렵게 생각하지 말고 일단 코드를 차근차근 보자. 

object Handler {


  def handle[E <: StreamExecutionEnvironment](env: E) = {
	Properties properties = new Properties();
  	properties.setProperty("bootstrap.servers", "localhost:9092");
  	properties.setProperty("group.id", "test");


    val stream = env
      //데이터 소스를 추가한다
      .addSource(new FlinkKafkaConsumer<>("카프카토픽", new SimpleStringSchema(), properties))
      .filter( (o : PredefinedMessage) => {
      //걸러낸다
      })
      .map((o: PredefinedMessage) => {
      //무언가를 한다
      })

      
      .keyBy( (o: Tuple2[Useridentifier, CustomLog1]) => {
      //그룹화 시킨다
        o.f0.getUniqueId()
      })


      //어떤 처리를 한다
      .process(new someProcess)

      //저장소에 저장을 한다
      .addSink(JdbcSink.sink(
                "insert into books (id, name, mode, isComplete, isOffline) values (?,?,?,?,?)",
                (ps, t) -> {
                    ps.setInt(1, t._1.id);
                    ps.setString(2, t._1.name);
                    ps.setString(3, t._2.mode);
                    ps.setBoolean(4, t._2.isComplete);
                    ps.setBoolean(5, t._2.isOffline);
                    
                },
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://디비주소/스키마")
                        .withDriverName("com.mysql.cj.jdbc.Driver")
                        .withUsername("디비계정")
                        .withPassword("디비비번")
                        .build()));


  }
}

 

어떤가? 상당히 단순하고 직관적이다.

 

현실적인 예시를 추가하면 아래와 같다.

그냥 이전 프레임에서 넘어온 데이터를 어떤 부분을 어떻게 처리할지 명시만 해주는게 끝이다.

object Handler {


  def handle[E <: StreamExecutionEnvironment](env: E) = {
	Properties properties = new Properties();
  	properties.setProperty("bootstrap.servers", "localhost:9092");
  	properties.setProperty("group.id", "test");


    val stream = env
      .addSource(new FlinkKafkaConsumer<>("카프카토픽", new SimpleStringSchema(), properties))
      .filter(_.`type` == "CustomLog1")
      .filter( (o : PredefinedMessage) => {
        ((o.createdate >= Meta.WEEK_1.getMillis() )
          && (o.createdate < Meta.LASTDAY.getMillis() ))
      })
      .map((o: PredefinedMessage) => {
        val parsed = JsonPath.parse(new String(o.dynamicItems.toByteArray()))
        new Tuple2[Useridentifier, CustomLog1](
          Useridentifier(
            id = parsed.read[String](s"$$.id"),
            name = parsed.read[String](s"$$.${o.`type`}.name"),
          ),
          CustomLog1(
            mode = parsed.read[Int](s"$$.${o.`type`}.mode").toString,
            isComplete = (parsed.read[Int](s"$$.${o.`type`}.iscomplete").toString == "1"),
            isOffline = (parsed.read[Int](s"$$.${o.`type`}.isoffline").toString == "1")
          )
        )
      })
      //타입힌트 명시
      .returns(createTypeInformation[Tuple2[Useridentifier, CustomLog1]])
      .name("mapping")

      //비정상 로그는 제외
      .filter((o) => {
        ((o.f1.isOffline == false)  && (o.f1.mode == "3"))
      })

      //그룹바이
      .keyBy( (o: Tuple2[Useridentifier, CustomLog1]) => {
        o.f0.getUniqueId()
      })


      //처리
      .process(new someProcess)
      .returns(createTypeInformation[ModelFromProcess])
      .name("result")


	//저장소에 저장 과정
    stream.addSink(JdbcSink.sink(
                "insert into books (id, name, mode, isComplete, isOffline) values (?,?,?,?,?)",
                (ps, t) -> {
                    ps.setInt(1, t._1.id);
                    ps.setString(2, t._1.name);
                    ps.setString(3, t._2.mode);
                    ps.setBoolean(4, t._2.isComplete);
                    ps.setBoolean(5, t._2.isOffline);
                    
                },
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://디비주소/스키마")
                        .withDriverName("com.mysql.cj.jdbc.Driver")
                        .withUsername("디비계정")
                        .withPassword("디비비번")
                        .build()));


  }
}