In this example, we’ll demonstrate how to build a Wikipedia web scraper using Beam functions. While you could run this on a local computer, Beam provides access to more powerful computational resources, allowing you to add advanced features to your webscraper using large language models or OCR models.

View the Code

See the code for this example on Github.

Defining our Scraping Function

We will start by defining our scraping function. This is the Beam function that will be invoked remotely. We use the Image class from the beam SDK to install these packages in the container running your code.

from beam import Image, function


@function(image=Image().add_python_packages(["requests", "beautifulsoup4"]))
def scrape_page(url):
    import requests
    from bs4 import BeautifulSoup

    response = requests.get(url)
    if response.status_code != 200:
        return {"url": url, "title": "", "content": "", "links": []}

    soup = BeautifulSoup(response.text, "html.parser")
    title = soup.find(id="firstHeading").text
    content = soup.find(id="mw-content-text").find(class_="mw-parser-output")

    if not content:
        return {"url": url, "title": title, "content": "", "links": []}

    paragraphs = [p.text for p in content.find_all("p", recursive=False)]
    links = [urljoin(url, link["href"]) for link in content.find_all("a", href=True)]

    return {
        "url": url,
        "title": title,
        "content": "\n\n".join(paragraphs),
        "links": links,
    }

Our function takes in a URL, fetches the page’s HTML, and then uses BeautifulSoup to extract the page’s title, content, and links. It returns that content in a dictionary so that our crawler can invoke new functions with the links found on the page. If we wanted, we could add more functionality to this function to extract or process the content in a variety of ways. For example, we could add a language model to summarize the content or use an OCR model to extract text from an image.

Building a Batch Crawler with Beam’s Function Map

Next, we’ll build a crawler that will use Beam’s map method to invoke our scrape_page function on a list of URLs. Below, is our __init__ method for the crawler.

class WikipediaCrawler:
    def __init__(self, start_url, max_pages=100, batch_size=5):
        self.start_url = start_url
        self.max_pages = max_pages
        self.batch_size = batch_size
        self.visited_pages = set()
        self.pages_to_visit = [start_url]
        self.scraped_data = {}

Our crawler takes in a starting URL, a maximum number of pages to scrape, and a batch size. The batch size determines how many remote function invocations we will make at a time.

Next, we’ll define the actual crawl method along with a helper method to determine if a URL is a valid Wikipedia URL.

    def is_wikipedia_url(self, url):
        parsed_url = urlparse(url)
        return parsed_url.netloc.endswith(
            "wikipedia.org"
        ) and parsed_url.path.startswith("/wiki/")

    def crawl(self):
        while len(self.scraped_data) < self.max_pages and self.pages_to_visit:
            # Create a batch of 5 pages to scrape that we have not yet visited
            batch = []
            while len(batch) < self.batch_size and self.pages_to_visit:
                p = self.pages_to_visit.pop(0)
                if p not in self.visited_pages:
                    batch.append(p)

            for result in scrape_page.map(batch):
                # Save the result and collect new links
                self.scraped_data[result["url"]] = result
                if len(self.scraped_data) < self.max_pages:
                    new_links = [
                        link
                        for link in result["links"]
                        if self.is_wikipedia_url(link)
                        and link not in self.visited_pages
                        and link not in self.pages_to_visit
                    ]
                    self.pages_to_visit.extend(new_links)

        print(f"Crawling completed. Scraped {len(self.scraped_data)} pages.")

    def get_scraped_data(self):
        return self.scraped_data

The crawl method runs continuously until we have scraped the maximum number of pages or there are no more pages to visit. It creates a batch of URLs to scrape and then passes them to the scrape_page function’s map method. This allows us to scrape multiple pages in parallel. After the pages are scraped, we collect any new links that we want to visit and add them to the pages_to_visit list.

Running the Batch Crawler

Finally, we can run our crawler. Below is the code for our main function which initializes the crawler and runs the crawl method.

if __name__ == "__main__":
    start_url = "https://en.wikipedia.org/wiki/Web_scraping"
    crawler = WikipediaCrawler(start_url, max_pages=20)
    crawler.crawl()

    # Write the scraped data to a file
    with open("scraped_data.json", "w") as f:
        json.dump(crawler.get_scraped_data(), f)

This code initializes the crawler with a starting URL and a maximum number of pages to scrape. It then runs the crawl method and writes the scraped data to a file. You can run this code like any other Python script:

python batch_crawl.py

When you run this code, you should see output that looks like the following:

=> Building image
=> Using cached image
=> Syncing files
...
=> Uploading
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 727.9/727.9 kB 0:00:00
=> Files synced
=> Running function: <app2:scrape_page>
=> Function complete <21f88938-8b82-465c-8b16-8bc0259e1997>
=> Running function: <app2:scrape_page>
=> Running function: <app2:scrape_page>
=> Running function: <app2:scrape_page>
=> Running function: <app2:scrape_page>
=> Running function: <app2:scrape_page>
=> Function complete <0f384b7a-98da-400e-bcc5-abacf7f239ef>
=> Function complete <16da6df3-955d-4ad7-a8ec-c6456ab6de1e>
=> Function complete <2dd4c91b-a48f-4d7e-ada3-485633539ee5>
=> Function complete <04452bb5-f642-43e3-9d0f-9cb7532c0d4b>
=> Function complete <7ba94632-1907-415a-acad-c37a2cddd97e>

The output shows five function invocations in parallel. Once the scraping is complete, you can see the results in the scraped_data.json file. It will look something like this:

{
    "https://en.wikipedia.org/wiki/Web_scraping": {
        "url": "https://en.wikipedia.org/wiki/Web_scraping",
        "title": "Web scraping",
        "content": "Web scraping, web harvesting, or web data extraction ...",
        "links": [
            "https://en.wikipedia.org/wiki/Data_scraping",
            ...
        ]
    },
    "https://en.wikipedia.org/wiki/Wikipedia:Verifiability": {
        "url": "https://en.wikipedia.org/wiki/Wikipedia:Verifiability",
        "title": "Wikipedia:Verifiability",
        "content": "\n\n\nIn the English Wikipedia, ...",
        "links": [
            ...
        ]
    },
    ...
}

Building a Continuous Crawler with Beam Functions and Threads

The batched web crawler is a good starting point, but it requires waiting for a full batch to finish before starting any new jobs. If we want to keep our crawler limit continuously saturated, we can use Beam functions in conjunction with Python threads.

To do this, we will use the same scrape_page function, but instead of using the map method, we will use a thread pool to invoke the function in parallel. Below is the code for our WikipediaCrawler class with a continuous crawl method.

    def process_scraped_page(self, result):
        if not result or len(self.scraped_data) >= self.max_pages:
            return

        self.scraped_data[result["url"]] = result
        if len(self.scraped_data) < self.max_pages:
            new_links = filter(self.is_wikipedia_url, result["links"])
            self.pages_to_visit.extend(new_links)

    def crawl(self):
        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            futures = {}
            while len(self.scraped_data) < self.max_pages and (
                self.pages_to_visit or futures
            ):
                # Start new tasks if we have capacity and pages to visit
                while len(futures) < 5 and self.pages_to_visit:
                    url = self.pages_to_visit.pop(0)
                    self.visited_pages.add(url)
                    future = executor.submit(scrape_page.remote, url)
                    futures[future] = url

                # Wait for any task to complete
                if futures:
                    done, _ = concurrent.futures.wait(
                        futures, return_when=concurrent.futures.FIRST_COMPLETED
                    )
                    for future in done:
                        url = futures.pop(future)
                        try:
                            result = future.result()
                            self.process_scraped_page(result)
                        except Exception as e:
                            print(f"Error processing {url}: {str(e)}")

        print(f"Crawling completed. Scraped {len(self.scraped_data)} pages.")

This code is more complex than the batch crawler, but it allows us to better utilize our compute resources. Instead of having containers sitting idle while other containers complete their work, we immediately send a new function invocation as soon as another one completes. To do this, we track the futures returned by the executor.submit method and wait for any of them to complete using the concurrent.futures.wait method. We specify that we only want to wait for one of the futures to complete using the concurrent.futures.FIRST_COMPLETED constant. This means that as soon as any future completes, we will process the result and add new work to the pool.

Running the Continuous Crawler

To run the continuous crawler, you can use the same main function as before. When you run this code, you should see output that looks like the following:

=> Building image
=> Using cached image
=> Syncing files
...
=> Files already synced
=> Running function: <app:scrape_page>
=> Function complete <5c059d2c-2570-4ac1-8c8b-11d96543d197>
=> Running function: <app:scrape_page>
=> Running function: <app:scrape_page>
=> Running function: <app:scrape_page>
=> Running function: <app:scrape_page>
=> Running function: <app:scrape_page>
=> Function complete <4f0ff5f6-12db-448f-acad-66362485c988>
=> Running function: <app:scrape_page>
=> Function complete <b4ada76f-a567-4b43-9d40-0481f3fccd6f>
=> Running function: <app:scrape_page>
=> Function complete <44ae4cbe-6de1-4442-a28c-9adb32937a03>
=> Running function: <app:scrape_page>
=> Function complete <5788c95d-afb7-428d-b906-d0378f099d58>
=> Running function: <app:scrape_page>
=> Function complete <dedfaf60-9430-4ece-83b8-58c73ad88f30>

As you can see, as soon as one function invocation completes, we immediately start a new one.