Skip to main content
This example demonstrates a basic movie recommendation system. The following capabilities are demonstrated:
  1. Training a model using the MovieLens dataset
  2. Saving the trained model to a Volume
  3. Retrieving the trained model from a Volume during inference
  4. Deploying a REST API that accepts a user ID and returns customized recommendations for that user

Defining the runtime

This is the runtime our code will run in. We’ll define the compute settings and Python packages to install.
app.py
from beam import App, Runtime, Image, Output, Volume

inference_app = App(
    name="movie-recommendation-example",
    runtime=Runtime(
        cpu=1,
        memory="8Gi",
        image=Image(
            python_version="python3.8",
            python_packages=["numpy", "torch", "pandas", "matplotlib"],
        ),
    ),
    volumes=[Volume(name="trained_models", path="./trained_models")],
)

Using Volumes to save trained models

We’re going to mount a Volume, which is a writable data store.
  • During training, we will save our models to this volume.
  • During inference, we will retrieve our trained model.
We will access the volume at this path: ./trained_models
volumes=[Volume(name="trained_models", path="./trained_models")],

Training the model

We use an embedding layer for both the user and movie, to compress the respective one-hot encoded vectors into rich, compact representations that are easier to model. These two representations are concatenated into a single vector and then passed into a simple fully connected neural network.
train.py
"""
Trains a neural collaborative filtering recommender model on the MovieLens dataset
"""
from beam import App, Runtime, Image, Volume

import random

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader


from dataset import MovieLensDataset

device = "cpu"

# The environment we'll use for training the model
training_app = App(
    name="movie-recommendation-training",
    runtime=Runtime(
        cpu=1,
        memory="16Gi",
        image=Image(
            python_version="python3.8",
            python_packages=["numpy", "torch", "pandas", "matplotlib"],
        ),
    ),
    volumes=[Volume(name="trained_models", path="./trained_models")],
)



class NCF(nn.Module):
    """
    Use an embedding layer for both the user and movie, to compress the
    respective one-hot encoded vectors into rich, compact representations
    that are easier to model.
    """

    def __init__(self, num_users, num_items):
        super().__init__()
        # User embedding layer
        self.user_embedding = nn.Embedding(num_embeddings=num_users, embedding_dim=8)
        # Movie embedding layer
        self.item_embedding = nn.Embedding(num_embeddings=num_items, embedding_dim=8)
        self.fc1 = nn.Linear(in_features=16, out_features=64)
        self.fc2 = nn.Linear(in_features=64, out_features=32)
        self.output = nn.Linear(in_features=32, out_features=1)

    def forward(self, user, item):
        # Embedding
        u = self.user_embedding(user)
        i = self.item_embedding(item)
        x = torch.cat([u, i], dim=-1)

        # Dense layers
        x = self.fc1(x)
        x = F.relu(x)
        x = F.dropout(x, p=0.2, training=self.training)
        x = self.fc2(x)
        x = F.relu(x)
        x = F.dropout(x, p=0.2, training=self.training)

        # Output
        x = self.output(x)
        x = F.sigmoid(x)
        return x


def load_model():
    # Load MovieLens data
    dataset_train = MovieLensDataset(
        "./data/ratings.csv",
        train=True,
        train_size=0.8,
        negatives=128,
    )
    dataset_test = MovieLensDataset(
        "./data/ratings.csv",
        train=False,
        train_size=0.8,
        negatives=32,
    )
    dataset_test_positives = MovieLensDataset(
        "./data/ratings.csv",
        train=False,
        train_size=0.8,
        negatives=0,
    )
    print(
        "Loaded {} training samples and {} test samples".format(
            len(dataset_train), len(dataset_test)
        )
    )

    # Setup model
    num_users = max(dataset_train.users) + 1
    num_movies = max(dataset_train.movies) + 1

    model = NCF(num_users, num_movies).to(device)
    return model


def train():
    # load movielens data
    dataset_train = MovieLensDataset(
        "./data/ratings.csv",
        train=True,
        train_size=0.8,
        negatives=128,
    )
    dataset_test = MovieLensDataset(
        "./data/ratings.csv",
        train=False,
        train_size=0.8,
        negatives=32,
    )
    dataset_test_positives = MovieLensDataset(
        "./data/ratings.csv",
        train=False,
        train_size=0.8,
        negatives=0,
    )
    print(
        "Loaded {} training samples and {} test samples".format(
            len(dataset_train), len(dataset_test)
        )
    )

    loader_train = DataLoader(dataset_train, batch_size=1024, shuffle=True)
    loader_test = DataLoader(dataset_test, batch_size=1024)
    loader_test_positives = DataLoader(dataset_test_positives, batch_size=1024)

    unique_movies = set(dataset_train.movies)
    unique_users = set(dataset_train.users)

    # Setup model
    model = load_model()
    optimizer = torch.optim.Adam(model.parameters())

    # *** Training ***
    for epoch in range(0, 20):
        # To begin the training process, the model weights are randomly initialized
        # We use an Adam optimizer with a binary cross entropy loss function to minimize error in predicting interactions between users and movies.
        model.train()
        train_loss = 0
        for batch_idx, (user, movie, label) in enumerate(loader_train):
            user, movie, label = user.to(device), movie.to(device), label.to(device)
            optimizer.zero_grad()
            output = model(user, movie)
            loss = F.binary_cross_entropy(output, label.view(-1, 1).float())
            loss.backward()
            train_loss += loss.item()
            optimizer.step()

        train_loss /= len(loader_train)
        print("Train epoch: {}, avg loss: {:.6f}".format(epoch, train_loss))

        # Test
        model.eval()
        test_loss = 0
        hits = 0
        with torch.no_grad():
            # Loss
            for user, movie, label in loader_test:
                user, movie, label = user.to(device), movie.to(device), label.to(device)
                output = model(user, movie)
                test_loss += F.binary_cross_entropy(
                    output, label.view(-1, 1).float()
                ).item()

            # Calculates hit rate -- basically, given N total samples, including 1 positive sample, what is the probability
            # that the positive sample will appear in the top K results. We can refer to this as “hit rate @ K / N”.

            # Hit rate @ 10
            k = 10
            total = 1000
            hit_thresholds = {}
            for u in unique_users:
                negatives = random.sample(
                    [
                        m
                        for m in unique_movies
                        if m not in dataset_test_positives.user_movies[u]
                    ],
                    total,
                )
                negatives = torch.tensor(negatives).to(device)
                user = torch.tensor([u] * total).to(device)
                output = model(user, negatives)
                top_k = torch.topk(output.flatten(), k)
                hit_thresholds[u] = top_k.values[k - 1].item()

            for user, movie, label in loader_test_positives:
                user, movie, label = user.to(device), movie.to(device), label.to(device)
                output = model(user, movie)
                for u, o in zip(user, output):
                    if o.item() > hit_thresholds[u.item()]:
                        hits += 1

        test_loss /= len(loader_test)
        hit_rate = hits / len(dataset_test_positives)

        print(
            "Test set: avg loss: {:.4f}, hit rate: {}/{} ({:.2f}%)\n".format(
                test_loss,
                hits,
                len(dataset_test_positives),
                100.0 * hit_rate,
            )
        )

    return model


# This function trains the model and saves it to the Beam Volume 
@training_app.run()
def run_training_pipeline():
    # Trains a model and saves the state_dict to the persistent volume
    trained_model = train()
    persistent_volume_path = "/volumes/trained_models/model.pt"
    torch.save(trained_model.state_dict(), persistent_volume_path)
There’s a lot going on here, but the main thing to note is the code at the end, which saves the trained model to a Volume. We’ve wrapped this function in a run(), which will let us run it asynchronously on Beam.
@training_app.run()
def run_training_pipeline():
    # Trains a model and saves the state_dict to the persistent volume
    trained_model = train()
    persistent_volume_path = "/volumes/trained_models/model.pt"
    torch.save(trained_model.state_dict(), persistent_volume_path)

Running the training script

Running the training script is straightforward — just enter your shell, and kick off a run:
beam run train.py:run_training_pipeline
This command will containerize your app and run it on a remote container. Feel free to close your terminal window, if you wish. The app will continue running on Beam remotely. You’ll see the training logs stream to your shell. This will look something like this:
(.venv) user@MacBook movie-recommendations % beam run train.py:run_training_pipeline
 i  Using cached image.
  App initialized.
 i  Uploading files...
  Container scheduled, logs will appear below.
 Starting container... 29s (Estimated: 5m0s)
Starting app...
Loading handler in 'train.py:run_training_pipeline'...
Running task: a22e62e1-97f4-4381-9a47-426aacd3b5a2
Loaded 158499 training samples and 39937 test samples
Loaded 158499 training samples and 39937 test samples
Train epoch: 0, avg loss: 0.674729
Test set: avg loss: 0.6624, hit rate: 301/20417 (1.47%)

Train epoch: 1, avg loss: 0.625152
Test set: avg loss: 0.6426, hit rate: 399/20417 (1.95%)

Making Predictions

The whole point of a recommendation system is to make predictions dynamically, and that requires us to deploy an API. We’ll write a function that takes in a user ID, and returns a list of movies that the user is predicted to enjoy watching.
run.py
"""
This function returns the top 'N' unseen movie recommendations for a specific user.

1. Loads the user viewing history
2. Filters out any previously viewed movies
3. Scores all unseen movie candidates
4. Returns the top N results.
"""

import torch
import pandas as pd

from dataset import MovieLensDataset
from train import load_model

# The path where we can retrieve the trained model
volume_path = "./trained_models/model.pt"


def run_inference(**inputs):
    user_id = int(inputs["user_id"])
    number_of_recommendations = int(inputs["number_of_recommendations"])

    dataset = MovieLensDataset("./data/ratings.csv", train_size=0, negatives=0)
    movies = pd.read_csv("./data/movies.csv", index_col="movieId")

    # Gather all items that user has not interacted with
    unseen = torch.tensor(
        [m for m in movies.index if m not in dataset.user_movies[user_id]]
    )

    # Load trained model
    model = load_model()
    model.load_state_dict(torch.load(str(volume_path)))
    model.eval()

    # Predict recommendation scores
    pred = model(torch.tensor([user_id] * len(unseen)), unseen)

    top_k = torch.topk(pred.flatten(), number_of_recommendations)
    # Format scores usable results
    recs = []
    for i, score in zip(top_k.indices, top_k.values):
        m = unseen[i].item()
        recs.append(
            {
                "title": movies.loc[m].title,
                "genres": movies.loc[m].genres,
                "movie_id": m,
                "score": score.item(),
            }
        )

    print(f'recommendations: {recs}')

    # Returns top N unseen recommendations
    return {"recommendations": recs}


if __name__ == "__main__":
    prediction = run_inference(user_id=42, number_of_recommendations=10)
    print(prediction)

Deployment

We’re going to deploy the app as a REST API, which will allow us to generate movie recommendations for users in real-time. Above the run_inference function, add a rest_api decorator:
app.py
@app.rest_api()
def run_inference(**inputs):
    ...
Now, go back to the shell and run this command to deploy the app. Make sure to customize this with the actual name of the file that has the run_inference function you created:
beam deploy inference.py:run_inference
You’ll see some logs appear in your shell, showing the deployment status:
(.venv) user@MacBook movie-recommendations % beam deploy app.py:run_inference
 i  Copying files in workspace './'...
  Done.
 i  Adding python requirements...
 i  	numpy
 i  	torch
 i  	pandas
 i  	matplotlib
 i  	beam-sdk==0.14.0rc11
  Done.
 i  Creating deployment package...
  Done.
 i  Checking deployment package size...
 i  Uploading deployment package...
  Done.
 i  Using cached image.
  <movie-recommendation-example> deployed successfully! 🎉
 i  ID: 64c2bb3238ce120009da1b58
 i  Version: 2
 i  Trigger type: rest_api
 i  CPU: 4000m
 i  GPU:
 i  Memory: 8Gi
 i  Runtime: python3.8
 i  Send requests to: https://apps.beam.cloud/owemo
 i  View deployment status at: https://www.beam.cloud/apps/owemo/deployment/64c2bb3238ce120009da1b58/logs

Calling the API

If you navigate to the link in the last line of the shell output, you’ll be able to login to your Beam web dashboard and view the details of your deployment.
In the dashboard, you’ll be able to copy a cURL request which you can use to make predictions to the API. We’ll pass in a user ID, and ask for 3 movie recommendations in response.
Request
  curl -X POST --compressed "https://apps.beam.cloud/per4g" \
   -H 'Accept: */*' \
   -H 'Accept-Encoding: gzip, deflate' \
   -H 'Authorization: Basic [YOUR_AUTH_TOKEN]' \
   -H 'Connection: keep-alive' \
   -H 'Content-Type: application/json' \
   -d '{"user_id": 42, "number_of_recommendations": 3}'
The movie recommendations will be returned as JSON:
Response
{
  "recommendations": [
    {
      "title": "Raiders of the Lost Ark (Indiana Jones and the Raiders of the Lost Ark) (1981)",
      "genres": "Action|Adventure",
      "movie_id": 1198,
      "score": 0.9955273270606995
    },
    {
      "title": "Mrs. Doubtfire (1993)",
      "genres": "Comedy|Drama",
      "movie_id": 500,
      "score": 0.9951636791229248
    },
    {
      "title": "Toy Story (1995)",
      "genres": "Adventure|Animation|Children|Comedy|Fantasy",
      "movie_id": 1,
      "score": 0.9950186014175415
    }
  ]
}
I