Apache Beam: Partition Transform

·

2 min read

Overview

If you want to split a PCollection into multiple PCollections based on a function, use the Partition transform.

When You Should Use the Keys Transform

When you want to split the input PCollection into multiple PCollections. You might want to partition a collection of strings based on their length. Then, truncate the longer strings into smaller sizes, but keep the shorter strings the same.

How to Use the Keys Transform

Just apply the builtin Transform to a PCollection with the following arguments:

  1. Number of Partitions

  2. Partition function - This function must return an integer between 0 and the number of partitions

Example: Partition based on Percentile

    // Provide an int value with the desired number of result partitions, and a
    // PartitionFn that represents the
    // partitioning function. In this example, we define the PartitionFn in-line.
    // Returns a PCollectionList
    // containing each of the resulting partitions as individual PCollection
    // objects.
    PCollection<Student> students =
        pipeline.apply(
            Create.of(
                Student.of("Amy", 88),
                Student.of("Bob", 87),
                Student.of("Chris", 49),
                Student.of("Dylan", 62),
                Student.of("Ellen", 78),
                Student.of("Francis", 53)));
    // Split students up into 10 partitions, by percentile:
    PCollectionList<Student> studentsByPercentile =
        students.apply(
            Partition.of(
                10,
                new Partition.PartitionFn<Student>() {
                  @Override
                  public int partitionFor(Student student, int numPartitions) {
                    return student.getPercentile() // 0..99
                        * numPartitions
                        / 100;
                  }
                }));

    // You can extract each partition from the PCollectionList using the get method,
    // as follows:
    PCollection<Student> fortiethPercentile = studentsByPercentile.get(4);
    fortiethPercentile.apply(ParDo.of(new LogOutput<>("Fortieth percentile: ")));
    pipeline.run();
  }

Conclusion

Check out other useful transforms from the official Apache Beam documentation.