Apache Beam: MapElements and FlatMapElements Transforms
Use Built-in Apache Beam functions to simplify your pipelines
Overview
Have you ever had a transformation so simple you want a one-liner instead of creating a new DoFn class or custom Transform?
You should use the built-in Apache Beam transforms MapElements and FlatMapElements!
MapElements
This convenient transform applies a simple 1 to 1 function over each element in the PCollection. In other words, for each input element, it produces exactly one output element. You can even use Java's built-in lambda functions for additional simplicity!
FlatMapElements
This transform is similar to the MapElements transform, but the function you apply to each input element can output more than one output element. If the function does return multiple outputs for the given single input element, it will add each individual output element to the output PCollection, and will not add lists or iterables to the output collection.
For example, if you are trying to apply a function returning the prime factorization of an integer element, the output PCollection will be the combined list of all the factors of every input integer element. If you had an input PCollection of [4, 6, 9]
, the output PCollection would be [2, 2, 2, 3, 3, 3]
. This is why the transform is named FlatMap, it will flatten all elements produced by the function.
Example 1: Replace a Character in each string in a PCollection
Ok, let's say you have a PCollection of strings and you want to remove any "."
characters (if they happen to be in the string).
// The input PCollection.
PCollection<String> words = ...;
// Apply a MapElements with an anonymous lambda function to the PCollection words.
// Save the result as the PCollection newWord.
PCollection<String> newWords = words.apply(
MapElements.into(TypeDescriptors.strings())
.via((String word) -> word.replace(".", "")));
The .into
specifies the output datatype, in this case strings
. The .via
specifies the function to apply to each element in the PCollection. By the way, the function does not have to be a lambda function, but can be more general (see MapElements JavaDocs).
Example 2: Extract Words from Sentences
What if you have a PCollection of full sentences and would like to transform it into a PCollection of individual words in the sentences?
PCollection<String> lines =
pipeline.apply(Create.of("the quick brown fox", "jumps over the lazy", "dog"));
// Use FlatMapElements to split the lines in PCollection<String> into individual
// words.
PCollection<String> words =
lines.apply(
FlatMapElements.via(
new InferableFunction<String, List<String>>() {
@Override
public List<String> apply(String line) {
return Arrays.asList(line.split(" "));
}
}));
Conclusion
The MapElements and FlatMapElements built-in Apache Beam transforms can simplify your pipelines and code base by allowing you to use these transforms instead of creating custom DoFunctions to perform simple tasks.
Check out other useful transforms from the official Apache Beam documentation.