Fanning Out Workloads
You can scale out individual Python functions to many containers using the .map()
method.
You might use this for parallelizing computational-heavy tasks, such as batch inference or data processing jobs.
from beam import function
@function(cpu=0.1)
def square(i: int):
return i**2
def main():
numbers = list(range(10))
squared = []
# Run a remote container for every item in list
for result in square.map(numbers):
print(result)
squared.append(result)
if __name__ == "__main__":
main()
When we run this Python module, 10 containers will be spawned to run the workload:
$ python math-app.py
=> Building image
=> Using cached image
=> Syncing files
=> Running function: <map-example:square>
=> Running function: <map-example:square>
=> Running function: <map-example:square>
=> Running function: <map-example:square>
=> Running function: <map-example:square>
=> Running function: <map-example:square>
=> Running function: <map-example:square>
=> Running function: <map-example:square>
=> Running function: <map-example:square>
=> Running function: <map-example:square>
=> Function complete <a6a1c063-b0d7-4c62-b6b1-a7940b19fde9>
=> Function complete <531e1f2c-a4f2-4edf-9cb9-6240df959815>
=> Function complete <bc421f5a-e09b-42d4-8035-d3d13ca5c238>
=> Function complete <2a3dde03-20df-4805-8fb0-ec9743f2bde3>
=> Function complete <59b64517-7b4a-4260-8c65-d0fbb9b98a76>
=> Function complete <f0ab7790-e2fb-441f-8278-74856719a457>
=> Function complete <1256a9ac-c035-412a-ac65-c94248f1ce99>
=> Function complete <476189dd-ba28-4646-9911-96ef8794cb58>
=> Function complete <04ef44cd-ff64-4ef2-a087-00c01ce5a2e4>
=> Function complete <104a602c-93a7-43d5-983c-071f64d91a2c>
Passing Multiple Arguments
The .map()
method can also parallelize functions that require multiple parameters. Simply pass a list of tuples, where each tuple corresponds to a set of arguments for your function.
Below is an example that counts how many prime numbers appear between a start and a stop index for each tuple in ranges:
from beam import function
def is_prime(n: int) -> bool:
if n < 2:
return False
for i in range(2, int(n**0.5) + 1):
if n % i == 0:
return False
return True
@function(cpu=0.1)
def count_primes_in_range(start: int, stop: int) -> int:
"""
Returns the number of prime numbers in the range [start, stop).
"""
return sum(is_prime(i) for i in range(start, stop))
def main():
# Each tuple represents (start, stop)
ranges = [
(0, 10),
(10, 20),
(20, 30)
]
# .map() will launch a remote container for each tuple
for result in count_primes_in_range.map(ranges):
print(result)
if __name__ == "__main__":
main()
In this example:
ranges
is a list of tuples (start, stop)
.
- Calling
count_primes_in_range.map(ranges)
spawns a remote execution for each tuple, passing (start, stop)
to the function.
- Each remote call returns the number of prime numbers in that sub-range, which we print out.
With .map()
, Beam takes care of distributing each item (or tuple of items) to separate containers for parallel processing. This approach makes it easy to scale out CPU-heavy or data-intensive tasks with minimal code.