Apache Beam: Windowing
What are Windows?
Windows are a way to group your data by their event times. But, why do you want to group on time?
So you can apply aggregations!
An example might be if you have a stream of analytic data coming from mobile phones and want to count the number of times someone opens your app per hour. In this scenario, you want to aggregate data within some time bounds.
Bounded vs. Unbounded Collections
Typically, in other data processing frameworks, you only run an ETL pipeline on batches of data (possibly from a filesystem or database). However, when using Apache Beam, you can run your pipeline on BOTH streaming and batch data with the same code.
Bounded data is data that comes from a fixed source like a file. Unbounded data comes from a continuously updating source like a subscription. Windowing is a mechanism to turn your unbounded data stream into a bounded data set which can then be aggregated on.
How to use Windowing Functions
You should use the windowing functions on a PCollection before you apply a grouping transform, like GroupByKey
or Combine
. These transforms will then group on both the timestamp (using the window) and the key (if one is present).
Built-In Window Functions
There are 4 window functions Apache Beam provides:
Fixed Time Windows - Non-overlapping time intervals with consistent duration
Sliding Time Windows - Overlapping time intervals with a
period
andwindow duration
Session Windows - windows within a certain gap duration of the next element. These windows also apply on a per-key basis
Single Global Window - the default window which contains all data and no late data
Example: Counting Words with Fixed Minute Windows
// Create some input data with timestamps
List<String> inputData = Arrays.asList("foo", "bar", "foo", "foo");
List<Long> timestamps =
Arrays.asList(
Duration.standardSeconds(15).getMillis(),
Duration.standardSeconds(30).getMillis(),
Duration.standardSeconds(45).getMillis(),
Duration.standardSeconds(90).getMillis());
// Create a PCollection from the input data with timestamps
PCollection<String> items = pipeline.apply(Create.timestamped(inputData, timestamps));
// Create a windowed PCollection
PCollection<String> windowedItems =
items.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))));
PCollection<KV<String, Long>> windowedCounts = windowedItems.apply(Count.perElement());