Fanning Out Workloads

You can scale out individual Python functions to many containers using the .map() method.

You might use this for parallelizing computational-heavy tasks, such as batch inference or data processing jobs.

from beam import function


@function(cpu=0.1)
def square(i: int):
    return i**2


def main():
    numbers = list(range(10))
    squared = []

    # Run a remote container for every item in list
    for result in square.map(numbers):
        print(result)
        squared.append(result)


if __name__ == "__main__":
    main()

When we run this Python module, 10 containers will be spawned to run the workload:

$ python math-app.py

=> Building image
=> Using cached image
=> Syncing files

=> Running function: <map-example:square>
=> Running function: <map-example:square>
=> Running function: <map-example:square>
=> Running function: <map-example:square>
=> Running function: <map-example:square>
=> Running function: <map-example:square>
=> Running function: <map-example:square>
=> Running function: <map-example:square>
=> Running function: <map-example:square>
=> Running function: <map-example:square>

=> Function complete <a6a1c063-b0d7-4c62-b6b1-a7940b19fde9>
=> Function complete <531e1f2c-a4f2-4edf-9cb9-6240df959815>
=> Function complete <bc421f5a-e09b-42d4-8035-d3d13ca5c238>
=> Function complete <2a3dde03-20df-4805-8fb0-ec9743f2bde3>
=> Function complete <59b64517-7b4a-4260-8c65-d0fbb9b98a76>
=> Function complete <f0ab7790-e2fb-441f-8278-74856719a457>
=> Function complete <1256a9ac-c035-412a-ac65-c94248f1ce99>
=> Function complete <476189dd-ba28-4646-9911-96ef8794cb58>
=> Function complete <04ef44cd-ff64-4ef2-a087-00c01ce5a2e4>
=> Function complete <104a602c-93a7-43d5-983c-071f64d91a2c>

Passing Multiple Arguments

The .map() method can also parallelize functions that require multiple parameters. Simply pass a list of tuples, where each tuple corresponds to a set of arguments for your function.

Below is an example that counts how many prime numbers appear between a start and a stop index for each tuple in ranges:

from beam import function

def is_prime(n: int) -> bool:
    if n < 2:
        return False
    for i in range(2, int(n**0.5) + 1):
        if n % i == 0:
            return False
    return True

@function(cpu=0.1)
def count_primes_in_range(start: int, stop: int) -> int:
    """
    Returns the number of prime numbers in the range [start, stop).
    """
    return sum(is_prime(i) for i in range(start, stop))

def main():
    # Each tuple represents (start, stop)
    ranges = [
        (0, 10),
        (10, 20),
        (20, 30)
    ]

    # .map() will launch a remote container for each tuple
    for result in count_primes_in_range.map(ranges):
        print(result)

if __name__ == "__main__":
    main()

In this example:

  1. ranges is a list of tuples (start, stop).
  2. Calling count_primes_in_range.map(ranges) spawns a remote execution for each tuple, passing (start, stop) to the function.
  3. Each remote call returns the number of prime numbers in that sub-range, which we print out.

With .map(), Beam takes care of distributing each item (or tuple of items) to separate containers for parallel processing. This approach makes it easy to scale out CPU-heavy or data-intensive tasks with minimal code.