1. Window overview
- Generally, real streams are unbounded. To process such unbounded data, we generally divide these infinite data streams to obtain limited data sets for business processing.
- A window is a way to cut an infinite stream into a finite stream. It distributes the stream data to a finite-size bucket for analysis.
1. Window type
- Time window
- Rolling time window
- Sliding time window
- Session window
- Count window
- Scroll count window
- Sliding counting window
2. Scrolling window
Schematic diagram of rolling window
- The data is segmented according to a fixed window length.
- The time is aligned, the window length is fixed, and the data does not overlap .
- Applicable scenarios: suitable for BI statistics, etc. (do aggregate calculations for each time period)
3. Sliding window
Schematic diagram of sliding window
- Sliding window is a more general form of fixed window. Sliding window is composed of fixed window length and sliding interval.
- The length of the window is fixed, and the data can overlap
- Applicable scenario: Statistics in the last time period (seeking the failure rate of an interface in the last 5 minutes to determine whether to alarm)
4. Conversation window
Schematic diagram of the session window
- It consists of a series of events combined with a timeout interval of a specified length of time, that is, a new window will be generated if no new data is received for a period
- Features: Time is not aligned
5. window API
- Tumbling time windows
.timeWindow(Time size)
//
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
if (environment.getStreamTimeCharacteristic() ==
TimeCharacteristic.ProcessingTime) {
//ProcessingTime
//
return window(TumblingProcessingTimeWindows.of(size));
} else {
//
return window(TumblingEventTimeWindows.of(size));
}
}
- Sliding time window
.timeWindow(Time size, Time slide)
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) {
if (environment.getStreamTimeCharacteristic() ==
TimeCharacteristic.ProcessingTime) {
return window(SlidingProcessingTimeWindows.of(size, slide));
} else {
return window(SlidingEventTimeWindows.of(size, slide));
}
}
- Session window
.window(EventTimeSessionWindows.withGap(Time.seconds(10)))
- Rolling count
.countWindow(size: Long)
def countWindow(size: Long): WindowedStream[T, K, GlobalWindow]=
{
new WindowedStream(javaStream.countWin
dow(size))
}
- Slip count
.countWindow(size: Long, slide: Long)
def countWindow(size: Long, slide: Long): WindowedStream[T, K, GlobalWindow] = {
new WindowedStream(javaStream.countWindow(size, slide))
}
6. Window functions
The window function defines the calculation operations to be performed on the data collected in the window, which can be mainly divided into two categories:
1. Incremental aggregation function
When each piece of data arrives, perform calculations to maintain a simple state (save memory space)
Commonly used incremental aggregate functions
- ReduceFunction
- AggregateFunction
//AggregateFunction
//* @param <IN> The type of the values that are aggregated (input values)
//* @param <ACC> The type of the accumulator (intermediate aggregate state).
//* @param <OUT> The type of the aggregated result
AggregateFunction<IN, ACC, OUT>
Instance
import com.atguigu.day2.{SensorReading, SensorSource}
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
object AvgTempByAggregateFunction {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env.addSource(new SensorSource)
stream
.keyBy(_.id)
.timeWindow(Time.seconds(5))
.aggregate(new AvgTempAgg)
.print()
env.execute()
}
//( key )
//
// ( ID )
// ( ID )
class AvgTempAgg extends AggregateFunction[SensorReading, (String, Long, Double), (String, Double)] {
//
override def createAccumulator(): (String, Long, Double) = ("", 0L, 0.0)
//
override def add(value: SensorReading, accumulator: (String, Long, Double)): (String, Long, Double) = {
(value.id, accumulator._2 + 1, accumulator._3 + value.temperature)
}
//
override def getResult(accumulator: (String, Long, Double)): (String, Double) = {
(accumulator._1, accumulator._3/accumulator._2)
}
//
override def merge(a: (String, Long, Double), b: (String, Long, Double)): (String, Long, Double) = {
(a._1, a._2 + b._2, a._3 + b._3)
}
}
}
2. Full window function
- ProcessWindowFunction
Collect all the data in the window first, and traverse all the data when calculating
Source definition
/* @tparam IN The type of the input value.
* @tparam OUT The type of the output value.
* @tparam KEY The type of the key.
* @tparam W The type of the window.
*/
ProcessWindowFunction[IN, OUT, KEY, W <: Window]
Instance
import com.atguigu.day2.{SensorReading, SensorSource}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object AvgTempByProcessWindowFunction {
case class AvgInfo(id: String, avgTemp: Double, windowStart: Long, windowEnd: Long)
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env.addSource(new SensorSource)
stream
.keyBy(_.id)
.timeWindow(Time.seconds(5))
.process(new AvgTempFunc)
.print()
env.execute()
}
//
//
//
class AvgTempFunc extends ProcessWindowFunction[SensorReading, AvgInfo, String, TimeWindow] {
//
override def process(key: String, context: Context, elements: Iterable[SensorReading], out: Collector[AvgInfo]): Unit = {
val count = elements.size //
var sum = 0.0 //
for (r <- elements) {
sum += r.temperature
}
// ms
val windowStart = context.window.getStart
val windowEnd = context.window.getEnd
out.collect(AvgInfo(key, sum/count, windowStart, windowEnd))
}
}
}
3. Incremental aggregation and full window combined use
Look directly at the example
import com.atguigu.day2.{SensorReading, SensorSource}
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object AvgTempByAggAndProcWindow {
case class AvgInfo(id: String, avgTemp: Double, windowStart: Long, windowEnd: Long)
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env.addSource(new SensorSource)
stream
.keyBy(_.id)
.timeWindow(Time.seconds(5))
.aggregate(new AvgTempAgg, new WindowResult)
.print()
env.execute()
}
//( key )
//
// ( ID )
// ( ID )
class AvgTempAgg extends AggregateFunction[SensorReading, (String, Long, Double), (String, Double)] {
//
override def createAccumulator(): (String, Long, Double) = ("", 0L, 0.0)
//
override def add(value: SensorReading, accumulator: (String, Long, Double)): (String, Long, Double) = {
(value.id, accumulator._2 + 1, accumulator._3 + value.temperature)
}
//
override def getResult(accumulator: (String, Long, Double)): (String, Double) = {
(accumulator._1, accumulator._3/accumulator._2)
}
//
override def merge(a: (String, Long, Double), b: (String, Long, Double)): (String, Long, Double) = {
(a._1, a._2 + b._2, a._3 + b._3)
}
}
// ,
class WindowResult extends ProcessWindowFunction[(String, Double), AvgInfo, String, TimeWindow] {
override def process(key: String, context: Context, elements: Iterable[(String, Double)], out: Collector[AvgInfo]): Unit = {
//
out.collect(AvgInfo(key, elements.head._2, context.window.getStart, context.window.getEnd))
}
}
}