Apache Beam: Windowing

·

2 min read

What are Windows?

Windows are a way to group your data by their event times. But, why do you want to group on time?

So you can apply aggregations!

An example might be if you have a stream of analytic data coming from mobile phones and want to count the number of times someone opens your app per hour. In this scenario, you want to aggregate data within some time bounds.

Bounded vs. Unbounded Collections

Typically, in other data processing frameworks, you only run an ETL pipeline on batches of data (possibly from a filesystem or database). However, when using Apache Beam, you can run your pipeline on BOTH streaming and batch data with the same code.

Bounded data is data that comes from a fixed source like a file. Unbounded data comes from a continuously updating source like a subscription. Windowing is a mechanism to turn your unbounded data stream into a bounded data set which can then be aggregated on.

How to use Windowing Functions

You should use the windowing functions on a PCollection before you apply a grouping transform, like GroupByKey or Combine. These transforms will then group on both the timestamp (using the window) and the key (if one is present).

Built-In Window Functions

There are 4 window functions Apache Beam provides:

  1. Fixed Time Windows - Non-overlapping time intervals with consistent duration

  2. Sliding Time Windows - Overlapping time intervals with a period and window duration

  3. Session Windows - windows within a certain gap duration of the next element. These windows also apply on a per-key basis

  4. Single Global Window - the default window which contains all data and no late data

Example: Counting Words with Fixed Minute Windows

    // Create some input data with timestamps
    List<String> inputData = Arrays.asList("foo", "bar", "foo", "foo");
    List<Long> timestamps =
        Arrays.asList(
            Duration.standardSeconds(15).getMillis(),
            Duration.standardSeconds(30).getMillis(),
            Duration.standardSeconds(45).getMillis(),
            Duration.standardSeconds(90).getMillis());

    // Create a PCollection from the input data with timestamps
    PCollection<String> items = pipeline.apply(Create.timestamped(inputData, timestamps));

    // Create a windowed PCollection
    PCollection<String> windowedItems =
        items.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))));

    PCollection<KV<String, Long>> windowedCounts = windowedItems.apply(Count.perElement());