This is an experimental feature, which means that we’re evaluating its usefulness and it may not be included in the product long-term if it isn’t useful. Please try it out and let us know what you think!

There are certain cases where you’ll want to send events between different apps running on Beam.

A common scenario is if you have a model inference and retraining pipeline, where the inference app (App #1) needs to use the latest version of a model trained in App #2.

View the Code

See the code for this example on Github.

Invoking Functions in Other Apps

This example demonstrates how to invoke functions in other apps on Beam. Specifically, we cover the scenario with an inference and a retraining function.

The retraining function needs a way to tell the inference function to use the latest model.

We use an experimental.Signal(), which is a special type of event listener that can be triggered from the retrain function.

App 1: Retraining App

This is the retraining app. Below, we register a Signal that will fire an event to our inference app, which is subscribed to this Signal event.

from beam import endpoint, experimental

@endpoint(name="trainer")
def train():
    # Send a signal to another app letting it know that it needs to reload the models
    s = experimental.Signal(name="reload-model")
    s.set(ttl=60)

App 2: Inference App

Below is the inference app, which needs to reload the on_start function when retraining is finished.

You’ll notice that a Signal is registered with a handler that tells us which function to run when an event is fired.

from beam import endpoint, Volume, experimental, Image

VOLUME_NAME = "brand_classifier"
CACHE_PATH = f"./{VOLUME_NAME}-cache"


def load_latest_model():
    # Preload and return the model and tokenizer
    global model, tokenizer
    print("Loading latest...")
    model = lambda x: x + 1  # This is just example code

    s.clear()  # Clear the signal so it doesn't fire again


# Set a signal handler - when invoked, it will run the handler function
s = experimental.Signal(
    name="reload-model",
    handler=load_latest_model,
)


@endpoint(
    name="inference",
    volumes=[Volume(name=VOLUME_NAME, mount_path=CACHE_PATH)],
    image=Image(python_packages=["transformers", "torch"]),
    on_start=load_latest_model,
)
def predict(**inputs):
    global model, tokenizer  # These will have the latest values

    return {"success": "true"}

To test this example, you can open two terminal windows:

  • In window 1, serve and invoke the inference function
  • In window 2, serve and invoke the retrain function

Look at the logs in window 1 — you’ll notice that the signal has fired, and load_latest_model ran again.

Clearing Signals

Signals will refresh every 1 second while a container is running, until signal.clear() is called. It is recommended to run signal.clear() after each signal invovocation.