> ## Documentation Index
> Fetch the complete documentation index at: https://docs.beam.cloud/llms.txt
> Use this file to discover all available pages before exploring further.

# Web Scraping with Beam Functions

In this example, we'll demonstrate how to build a Wikipedia web scraper using Beam functions. While you could run this on a local computer, Beam provides access to more powerful computational resources, allowing you to add advanced features to your webscraper using large language models or OCR models.

<video autoPlay muted loop playsInline className="w-full aspect-video" src="https://mintcdn.com/slai-beam/vg5aTEbpFmupCYom/img/v2/web-scraping.mp4?fit=max&auto=format&n=vg5aTEbpFmupCYom&q=85&s=1f1f14c6f4a741da43added9f2f1c61b" data-path="img/v2/web-scraping.mp4" />

<Card title="View the Code" icon="github" href="https://github.com/beam-cloud/examples/tree/main/web_scraping">
  See the code for this example on Github.
</Card>

## Defining our Scraping Function

We will start by defining our scraping function. This is the Beam function that will be invoked remotely. We use the [`Image` class](/v2/environment/custom-images) from the `beam` SDK to install these packages in the container running your code.

```python theme={null}
from beam import Image, function


@function(image=Image().add_python_packages(["requests", "beautifulsoup4"]))
def scrape_page(url):
    import requests
    from bs4 import BeautifulSoup

    response = requests.get(url)
    if response.status_code != 200:
        return {"url": url, "title": "", "content": "", "links": []}

    soup = BeautifulSoup(response.text, "html.parser")
    title = soup.find(id="firstHeading").text
    content = soup.find(id="mw-content-text").find(class_="mw-parser-output")

    if not content:
        return {"url": url, "title": title, "content": "", "links": []}

    paragraphs = [p.text for p in content.find_all("p", recursive=False)]
    links = [urljoin(url, link["href"]) for link in content.find_all("a", href=True)]

    return {
        "url": url,
        "title": title,
        "content": "\n\n".join(paragraphs),
        "links": links,
    }
```

Our function takes in a URL, fetches the page's HTML, and then uses [BeautifulSoup](https://beautiful-soup-4.readthedocs.io/en/latest/) to extract the page's title, content, and links. It returns that content in a dictionary so that our crawler can invoke new functions with the links found on the page. If we wanted, we could add more functionality to this function to extract or process the content in a variety of ways. For example, we could add a language model to summarize the content or use an OCR model to extract text from an image.

## Building a Batch Crawler with Beam's Function Map

Next, we'll build a crawler that will use Beam's `map` method to invoke our `scrape_page` function on a list of URLs. Below, is our `__init__` method for the crawler.

```python theme={null}
class WikipediaCrawler:
    def __init__(self, start_url, max_pages=100, batch_size=5):
        self.start_url = start_url
        self.max_pages = max_pages
        self.batch_size = batch_size
        self.visited_pages = set()
        self.pages_to_visit = [start_url]
        self.scraped_data = {}
```

Our crawler takes in a starting URL, a maximum number of pages to scrape, and a batch size. The batch size determines how many remote function invocations we will make at a time.

Next, we'll define the actual `crawl` method along with a helper method to determine if a URL is a valid Wikipedia URL.

```python theme={null}
    def is_wikipedia_url(self, url):
        parsed_url = urlparse(url)
        return parsed_url.netloc.endswith(
            "wikipedia.org"
        ) and parsed_url.path.startswith("/wiki/")

    def crawl(self):
        while len(self.scraped_data) < self.max_pages and self.pages_to_visit:
            # Create a batch of 5 pages to scrape that we have not yet visited
            batch = []
            while len(batch) < self.batch_size and self.pages_to_visit:
                p = self.pages_to_visit.pop(0)
                if p not in self.visited_pages:
                    batch.append(p)

            for result in scrape_page.map(batch):
                # Save the result and collect new links
                self.scraped_data[result["url"]] = result
                if len(self.scraped_data) < self.max_pages:
                    new_links = [
                        link
                        for link in result["links"]
                        if self.is_wikipedia_url(link)
                        and link not in self.visited_pages
                        and link not in self.pages_to_visit
                    ]
                    self.pages_to_visit.extend(new_links)

        print(f"Crawling completed. Scraped {len(self.scraped_data)} pages.")

    def get_scraped_data(self):
        return self.scraped_data
```

The crawl method runs continuously until we have scraped the maximum number of pages or there are no more pages to visit. It creates a batch of URLs to scrape and then passes them to the `scrape_page` function's `map` method. This allows us to scrape multiple pages in parallel. After the pages are scraped, we collect any new links that we want to visit and add them to the `pages_to_visit` list.

## Running the Batch Crawler

Finally, we can run our crawler. Below is the code for our `main` function which initializes the crawler and runs the crawl method.

```python theme={null}
if __name__ == "__main__":
    start_url = "https://en.wikipedia.org/wiki/Web_scraping"
    crawler = WikipediaCrawler(start_url, max_pages=20)
    crawler.crawl()

    # Write the scraped data to a file
    with open("scraped_data.json", "w") as f:
        json.dump(crawler.get_scraped_data(), f)
```

This code initializes the crawler with a starting URL and a maximum number of pages to scrape. It then runs the crawl method and writes the scraped data to a file. You can run this code like any other Python script:

```bash theme={null}
python batch_crawl.py
```

When you run this code, you should see output that looks like the following:

```bash theme={null}
=> Building image
=> Using cached image
=> Syncing files
...
=> Uploading
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 727.9/727.9 kB 0:00:00
=> Files synced
=> Running function: <app2:scrape_page>
=> Function complete <21f88938-8b82-465c-8b16-8bc0259e1997>
=> Running function: <app2:scrape_page>
=> Running function: <app2:scrape_page>
=> Running function: <app2:scrape_page>
=> Running function: <app2:scrape_page>
=> Running function: <app2:scrape_page>
=> Function complete <0f384b7a-98da-400e-bcc5-abacf7f239ef>
=> Function complete <16da6df3-955d-4ad7-a8ec-c6456ab6de1e>
=> Function complete <2dd4c91b-a48f-4d7e-ada3-485633539ee5>
=> Function complete <04452bb5-f642-43e3-9d0f-9cb7532c0d4b>
=> Function complete <7ba94632-1907-415a-acad-c37a2cddd97e>
```

The output shows five function invocations in parallel. Once the scraping is complete, you can see the results in the `scraped_data.json` file. It will look something like this:

```json theme={null}
{
    "https://en.wikipedia.org/wiki/Web_scraping": {
        "url": "https://en.wikipedia.org/wiki/Web_scraping",
        "title": "Web scraping",
        "content": "Web scraping, web harvesting, or web data extraction ...",
        "links": [
            "https://en.wikipedia.org/wiki/Data_scraping",
            ...
        ]
    },
    "https://en.wikipedia.org/wiki/Wikipedia:Verifiability": {
        "url": "https://en.wikipedia.org/wiki/Wikipedia:Verifiability",
        "title": "Wikipedia:Verifiability",
        "content": "\n\n\nIn the English Wikipedia, ...",
        "links": [
            ...
        ]
    },
    ...
}
```

## Building a Continuous Crawler with Beam Functions and Threads

The batched web crawler is a good starting point, but it requires waiting for a full batch to finish before starting any new jobs. If we want to keep our crawler limit continuously saturated, we can use Beam functions in conjunction with Python threads.

To do this, we will use the same `scrape_page` function, but instead of using the `map` method, we will use a thread pool to invoke the function in parallel. Below is the code for our `WikipediaCrawler` class with a continuous crawl method.

```python theme={null}
    def process_scraped_page(self, result):
        if not result or len(self.scraped_data) >= self.max_pages:
            return

        self.scraped_data[result["url"]] = result
        if len(self.scraped_data) < self.max_pages:
            new_links = filter(self.is_wikipedia_url, result["links"])
            self.pages_to_visit.extend(new_links)

    def crawl(self):
        with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
            futures = {}
            while len(self.scraped_data) < self.max_pages and (
                self.pages_to_visit or futures
            ):
                # Start new tasks if we have capacity and pages to visit
                while len(futures) < 5 and self.pages_to_visit:
                    url = self.pages_to_visit.pop(0)
                    self.visited_pages.add(url)
                    future = executor.submit(scrape_page.remote, url)
                    futures[future] = url

                # Wait for any task to complete
                if futures:
                    done, _ = concurrent.futures.wait(
                        futures, return_when=concurrent.futures.FIRST_COMPLETED
                    )
                    for future in done:
                        url = futures.pop(future)
                        try:
                            result = future.result()
                            self.process_scraped_page(result)
                        except Exception as e:
                            print(f"Error processing {url}: {str(e)}")

        print(f"Crawling completed. Scraped {len(self.scraped_data)} pages.")
```

This code is more complex than the batch crawler, but it allows us to better utilize our compute resources. Instead of having containers sitting idle while other containers complete their work, we immediately send a new function invocation as soon as another one completes. To do this, we track the futures returned by the `executor.submit` method and wait for any of them to complete using the `concurrent.futures.wait` method. We specify that we only want to wait for one of the futures to complete using the `concurrent.futures.FIRST_COMPLETED` constant. This means that as soon as any future completes, we will process the result and add new work to the pool.

## Running the Continuous Crawler

To run the continuous crawler, you can use the same `main` function as before. When you run this code, you should see output that looks like the following:

```bash theme={null}
=> Building image
=> Using cached image
=> Syncing files
...
=> Files already synced
=> Running function: <app:scrape_page>
=> Function complete <5c059d2c-2570-4ac1-8c8b-11d96543d197>
=> Running function: <app:scrape_page>
=> Running function: <app:scrape_page>
=> Running function: <app:scrape_page>
=> Running function: <app:scrape_page>
=> Running function: <app:scrape_page>
=> Function complete <4f0ff5f6-12db-448f-acad-66362485c988>
=> Running function: <app:scrape_page>
=> Function complete <b4ada76f-a567-4b43-9d40-0481f3fccd6f>
=> Running function: <app:scrape_page>
=> Function complete <44ae4cbe-6de1-4442-a28c-9adb32937a03>
=> Running function: <app:scrape_page>
=> Function complete <5788c95d-afb7-428d-b906-d0378f099d58>
=> Running function: <app:scrape_page>
=> Function complete <dedfaf60-9430-4ece-83b8-58c73ad88f30>
```

As you can see, as soon as one function invocation completes, we immediately start a new one.
