This tutorial introduces several key concepts:

  • Creating a container image
  • Running a custom ML model
  • Developing your app using Beam’s live reloading workflow
  • Pre-loading models and caching them in storage volumes
  • Autoscaling and concurrency

Setup your app

You’ll start by importing a Beam App and Runtime

  • App is the namespace for a project. You’ll give it a unique name as an identifier.
  • Inside the App is a Runtime. The Runtime defines the image and hardware your container will run on.

If you’d like to make further customizations to your image — such as adding shell commands — you can do so using the commands argument. Read more about custom images.

from beam import App, Runtime, Image

app = App(
    name="inference-quickstart",
    runtime=Runtime(
        cpu=1,
        memory="8Gi",
        gpu="T4",
        image=Image(
            python_version="python3.9",
            python_packages=[
                "transformers",
                "torch",
            ],  # You can also add a path to a requirements.txt instead
        ),
    ),
)

Running a custom ML model

We’ll create a new function to run inference on facebook/opt-125m via Huggingface Transformers.

Since we’ll deploy this as a REST API, we add a rest_api() decorator above the inference function:

from beam import App, Runtime, Image
from transformers import AutoTokenizer, OPTForCausalLM

app = App(
    name="inference-quickstart",
    runtime=Runtime(
        cpu=1,
        memory="8Gi",
        gpu="T4",
        image=Image(
            python_version="python3.9",
            python_packages=[
                "transformers",
                "torch",
            ],  # You can also add a path to a requirements.txt instead
        ),
    ),
)

@app.rest_api()
def predict(**inputs):
    model = OPTForCausalLM.from_pretrained("facebook/opt-125m")
    tokenizer = AutoTokenizer.from_pretrained("facebook/opt-125m")

    prompt = inputs["prompt"]
    inputs = tokenizer(prompt, return_tensors="pt")

    # Generate
    generate_ids = model.generate(inputs.input_ids, max_length=30)
    result = tokenizer.batch_decode(
        generate_ids, skip_special_tokens=True, clean_up_tokenization_spaces=False
    )[0]

    print(result)

    return {"prediction": result}

Developing your app on Beam

Beam includes a live-reloading feature that allows you to run your code on the same environment you’ll be running in production.

By default, Beam will sync all the files in your working directory to the remote container. This allows you to use the files you have locally while developing. If you want to prevent some files from getting uploaded, you can create a .beamignore.

In your shell, run beam serve app.py. This will:

  1. Spin up a container
  2. Run it on a GPU
  3. Print a cURL request to invoke the API
  4. Stream the logs to your shell

You should keep this terminal window open while developing.

(.venv) user@MacBook demo % beam serve app.py
 i  Using cached image.
 ✓  App initialized.
 i  Uploading files...
 ✓  Container scheduled, logs will appear below.
⠴ Starting container... 5s (Estimated: 3m20s)

================= Call the API =================

curl -X POST 'https://apps.beam.cloud/serve/3dpga/650b636542ef2e000aef54fa' \
-H 'Accept: */*' \
-H 'Accept-Encoding: gzip, deflate' \
-H 'Connection: keep-alive' \
-H 'Authorization: Basic [YOUR_AUTH_TOKEN]' \
-H 'Content-Type: application/json' \
-d '{}'

============= Logs Streamed Below ==============

INFO:     | Starting app...
INFO:     | Loading handler in 'app.py:predict'...
INFO:     | Running loader in 'app.py:load_models'...
INFO:     | Ready for tasks.

Now, head back to your IDE, and change a line of code. Hit save.

If you look closely at the shell running beam serve, you’ll notice the server reloading with your code changes.

You’ll use this workflow anytime you’re developing an app on Beam. Trust us — it makes the development process uniquely fast and painless.

Performance Optimizations

If you called the API via the cURL command, you’ll notice that your model was downloaded each time you invoked the API.

In order to improve performance, we’ll setup a function to pre-load your models and store them on disk between API calls.

Pre-loading

Beam includes a loader function, which you can pass to your function decorators. The loader is run exactly once when the container first starts:

def load_models():
    model = OPTForCausalLM.from_pretrained("facebook/opt-125m")
    tokenizer = AutoTokenizer.from_pretrained("facebook/opt-125m")

    return model, tokenizer

@app.rest_api(loader=load_models)
def predict(**inputs):
    # Retrieve cached model from loader
    model, tokenizer = inputs["context"]

    # Do something with the model and tokenizer...

Cache in a storage volume

The loader prevents us from having to download the model multiple times, but we can avoid downloading the model entirely by caching it in a Storage Volume:

Beam allows you to create highly-available storage volumes that can be used across tasks. You might use volumes for things like storing model weights or large datasets.

from beam import App, Runtime, Image, Volume
# Beam Volume to store cached models
CACHE_PATH = "./cached_models"

app = App(
    name="inference-quickstart",
    runtime=Runtime(
        cpu=1,
        memory="8Gi",
        gpu="T4",
        image=Image(
            python_version="python3.9",
            python_packages=[
                "transformers",
                "torch",
            ],  # You can also add a path to a requirements.txt instead
        ),
    ),
    # Storage Volume for model weights
    volumes=[Volume(name="cached_models", path=CACHE_PATH)],
)

Now, these models can be automatically downloaded to the volume by using the cache_dir argument in transformers:

model = OPTForCausalLM.from_pretrained("facebook/opt-125m", cache_dir=CACHE_PATH)
tokenizer = AutoTokenizer.from_pretrained("facebook/opt-125m", cache_dir=CACHE_PATH)

These volumes are mounted directly to the container running your app, so you can read and write them to disk like any normal file.

Add Autoscaling (Optional)

When you deploy a Task Queue or REST API, Beam creates a queueing system that manages each task that’s created when your API is called.

You can configure how Beam will scale based on how many things are in the task queue (queue depth) or how quickly to process a task in the queue (request latency).

In this example, we’ll scale by request latency. If it takes more than 30 seconds to process an incoming task, create up to 3 replicas:

from beam import RequestLatencyAutoscaler

autoscaler = RequestLatencyAutoscaler(desired_latency=30, max_replicas=3)

@app.rest_api(autoscaler=autoscaler)
def predict(**inputs):
    ...

Deployment

With these performance optimizations in place, it’s time to deploy your API to create a persistent endpoint. In your shell, run this command to deploy your app:

beam deploy app.py

Monitoring Logs and Concurrency

In the dashboard, you can see the number of replicas running:

Your deployment starts at 0, autoscales based on the max_replicas you’ve defined in your autoscaler, and scales back down to 0 once the endpoint stops getting traffic.

Summary

You’ve successfully created a highly performant serverless API for your ML model!

Further Reading