Custom dynamic partitioning
Now that we've discussed most of the concepts related to partitioning (distribution of tuples to the partitions, unifying the output of partitions, and use of the built-in stateless partitioners for both static and dynamic partitioning) let us consider an advanced topic: custom dynamic partitioning for potentially stateful operators.
As noted earlier, construction of the new set of partitions is done in the definePartitions()
method of the Partitioner
 interface:
Collection<Partition<T>> definePartitions(Collection<Partition<T>> partitions, PartitioningContext context);
It is important to remember that this function is invoked in the Application Master and not in any of the containers running the partitions.
The first argument is the list of currently existing partitions. We've already seen an example implementation of this method earlier in the context of defining partition masks and keys. That example creates a list of new partitions using...