The basic concepts of Flink window and window API

The basic concepts of Flink window and window API

1. Window overview

  • Generally, real streams are unbounded. To process such unbounded data, we generally divide these infinite data streams to obtain limited data sets for business processing.
  • A window is a way to cut an infinite stream into a finite stream. It distributes the stream data to a finite-size bucket for analysis.

1. Window type

  1. Time window
  • Rolling time window
  • Sliding time window
  • Session window
  1. Count window
  • Scroll count window
  • Sliding counting window

2. Scrolling window

Schematic diagram of rolling window

  1. The data is segmented according to a fixed window length.
  2. The time is aligned, the window length is fixed, and the data does not overlap .
  3. Applicable scenarios: suitable for BI statistics, etc. (do aggregate calculations for each time period)

3. Sliding window

Schematic diagram of sliding window

  1. Sliding window is a more general form of fixed window. Sliding window is composed of fixed window length and sliding interval.
  2. The length of the window is fixed, and the data can overlap
  3. Applicable scenario: Statistics in the last time period (seeking the failure rate of an interface in the last 5 minutes to determine whether to alarm)

4. Conversation window

Schematic diagram of the session window

  1. It consists of a series of events combined with a timeout interval of a specified length of time, that is, a new window will be generated if no new data is received for a period
  2. Features: Time is not aligned

5. window API

  1. Tumbling time windows

.timeWindow(Time size)

//
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
    if (environment.getStreamTimeCharacteristic() == 
            TimeCharacteristic.ProcessingTime) {
        //ProcessingTime  
        //
        return window(TumblingProcessingTimeWindows.of(size));
    } else {
        //
        return window(TumblingEventTimeWindows.of(size));
    }
}
 
  1. Sliding time window

.timeWindow(Time size, Time slide)

public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) {
    if (environment.getStreamTimeCharacteristic() == 
                TimeCharacteristic.ProcessingTime) {
        return window(SlidingProcessingTimeWindows.of(size, slide));
    } else {
        return window(SlidingEventTimeWindows.of(size, slide));
    }
}
 
  1. Session window

.window(EventTimeSessionWindows.withGap(Time.seconds(10)))

  1. Rolling count

.countWindow(size: Long)

def countWindow(size: Long): WindowedStream[T, K, GlobalWindow]=
{
    new WindowedStream(javaStream.countWin
    dow(size))
}
 
  1. Slip count

.countWindow(size: Long, slide: Long)

def countWindow(size: Long, slide: Long): WindowedStream[T, K, GlobalWindow] = {
    new WindowedStream(javaStream.countWindow(size, slide))
}
 

6. Window functions

The window function defines the calculation operations to be performed on the data collected in the window, which can be mainly divided into two categories:

1. Incremental aggregation function

When each piece of data arrives, perform calculations to maintain a simple state (save memory space)

Commonly used incremental aggregate functions

  • ReduceFunction
  • AggregateFunction
//AggregateFunction 
//* @param <IN>  The type of the values that are aggregated (input values)
//* @param <ACC> The type of the accumulator (intermediate aggregate state).
//* @param <OUT> The type of the aggregated result
AggregateFunction<IN, ACC, OUT>
 

Instance

import com.atguigu.day2.{SensorReading, SensorSource}
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object AvgTempByAggregateFunction {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream = env.addSource(new SensorSource)

    stream
      .keyBy(_.id)
      .timeWindow(Time.seconds(5))
      .aggregate(new AvgTempAgg)
      .print()

    env.execute()
  }

  //( key ) 
  // 
  // ( ID )
  // ( ID )
  class AvgTempAgg extends AggregateFunction[SensorReading, (String, Long, Double), (String, Double)] {
    // 
    override def createAccumulator(): (String, Long, Double) = ("", 0L, 0.0)

    // 
    override def add(value: SensorReading, accumulator: (String, Long, Double)): (String, Long, Double) = {
      (value.id, accumulator._2 + 1, accumulator._3 + value.temperature)
    }

    // 
    override def getResult(accumulator: (String, Long, Double)): (String, Double) = {
      (accumulator._1, accumulator._3/accumulator._2)
    }

    // 
    override def merge(a: (String, Long, Double), b: (String, Long, Double)): (String, Long, Double) = {
      (a._1, a._2 + b._2, a._3 + b._3)
    }
  }
}
 

2. Full window function

  • ProcessWindowFunction

Collect all the data in the window first, and traverse all the data when calculating

Source definition

  /* @tparam IN The type of the input value.
  * @tparam OUT The type of the output value.
  * @tparam KEY The type of the key.
  * @tparam W The type of the window.
  */
 ProcessWindowFunction[IN, OUT, KEY, W <: Window]
 

Instance

import com.atguigu.day2.{SensorReading, SensorSource}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object AvgTempByProcessWindowFunction {

  case class AvgInfo(id: String, avgTemp: Double, windowStart: Long, windowEnd: Long)

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream = env.addSource(new SensorSource)

    stream
      .keyBy(_.id)
      .timeWindow(Time.seconds(5))
      .process(new AvgTempFunc)
      .print()

    env.execute()
  }

  // 
  // 
  // 
  class AvgTempFunc extends ProcessWindowFunction[SensorReading, AvgInfo, String, TimeWindow] {
    // 
    override def process(key: String, context: Context, elements: Iterable[SensorReading], out: Collector[AvgInfo]): Unit = {
      val count = elements.size // 
      var sum = 0.0 // 
      for (r <- elements) {
        sum += r.temperature
      }
      // ms
      val windowStart = context.window.getStart
      val windowEnd = context.window.getEnd
      out.collect(AvgInfo(key, sum/count, windowStart, windowEnd))
    }
  }
}
 

3. Incremental aggregation and full window combined use

Look directly at the example

import com.atguigu.day2.{SensorReading, SensorSource}
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object AvgTempByAggAndProcWindow {

  case class AvgInfo(id: String, avgTemp: Double, windowStart: Long, windowEnd: Long)

  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val stream = env.addSource(new SensorSource)

    stream
      .keyBy(_.id)
      .timeWindow(Time.seconds(5))
      .aggregate(new AvgTempAgg, new WindowResult)
      .print()

    env.execute()
  }

  //( key ) 
  // 
  // ( ID )
  // ( ID )
  class AvgTempAgg extends AggregateFunction[SensorReading, (String, Long, Double), (String, Double)] {
    // 
    override def createAccumulator(): (String, Long, Double) = ("", 0L, 0.0)

    // 
    override def add(value: SensorReading, accumulator: (String, Long, Double)): (String, Long, Double) = {
      (value.id, accumulator._2 + 1, accumulator._3 + value.temperature)
    }

    // 
    override def getResult(accumulator: (String, Long, Double)): (String, Double) = {
      (accumulator._1, accumulator._3/accumulator._2)
    }

    // 
    override def merge(a: (String, Long, Double), b: (String, Long, Double)): (String, Long, Double) = {
      (a._1, a._2 + b._2, a._3 + b._3)
    }
  }

  // , 
  class WindowResult extends ProcessWindowFunction[(String, Double), AvgInfo, String, TimeWindow] {
    override def process(key: String, context: Context, elements: Iterable[(String, Double)], out: Collector[AvgInfo]): Unit = {
      // 
      out.collect(AvgInfo(key, elements.head._2, context.window.getStart, context.window.getEnd))
    }
  }
}
 

7. Other Window API