In this example, we’ll demonstrate how to build a Wikipedia web scraper using Beam functions. While you could run this on a local computer, Beam provides access to more powerful computational resources, allowing you to add advanced features to your webscraper using large language models or OCR models.
We will start by defining our scraping function. This is the Beam function that will be invoked remotely. We use the Image class from the beam SDK to install these packages in the container running your code.
from beam import Image, function@function(image=Image().add_python_packages(["requests", "beautifulsoup4"]))def scrape_page(url): import requests from bs4 import BeautifulSoup response = requests.get(url) if response.status_code != 200: return {"url": url, "title": "", "content": "", "links": []} soup = BeautifulSoup(response.text, "html.parser") title = soup.find(id="firstHeading").text content = soup.find(id="mw-content-text").find(class_="mw-parser-output") if not content: return {"url": url, "title": title, "content": "", "links": []} paragraphs = [p.text for p in content.find_all("p", recursive=False)] links = [urljoin(url, link["href"]) for link in content.find_all("a", href=True)] return { "url": url, "title": title, "content": "\n\n".join(paragraphs), "links": links, }
Our function takes in a URL, fetches the page’s HTML, and then uses BeautifulSoup to extract the page’s title, content, and links. It returns that content in a dictionary so that our crawler can invoke new functions with the links found on the page. If we wanted, we could add more functionality to this function to extract or process the content in a variety of ways. For example, we could add a language model to summarize the content or use an OCR model to extract text from an image.
Next, we’ll build a crawler that will use Beam’s map method to invoke our scrape_page function on a list of URLs. Below, is our __init__ method for the crawler.
Our crawler takes in a starting URL, a maximum number of pages to scrape, and a batch size. The batch size determines how many remote function invocations we will make at a time.
Next, we’ll define the actual crawl method along with a helper method to determine if a URL is a valid Wikipedia URL.
def is_wikipedia_url(self, url): parsed_url = urlparse(url) return parsed_url.netloc.endswith( "wikipedia.org" ) and parsed_url.path.startswith("/wiki/") def crawl(self): while len(self.scraped_data) < self.max_pages and self.pages_to_visit: # Create a batch of 5 pages to scrape that we have not yet visited batch = [] while len(batch) < self.batch_size and self.pages_to_visit: p = self.pages_to_visit.pop(0) if p not in self.visited_pages: batch.append(p) for result in scrape_page.map(batch): # Save the result and collect new links self.scraped_data[result["url"]] = result if len(self.scraped_data) < self.max_pages: new_links = [ link for link in result["links"] if self.is_wikipedia_url(link) and link not in self.visited_pages and link not in self.pages_to_visit ] self.pages_to_visit.extend(new_links) print(f"Crawling completed. Scraped {len(self.scraped_data)} pages.") def get_scraped_data(self): return self.scraped_data
The crawl method runs continuously until we have scraped the maximum number of pages or there are no more pages to visit. It creates a batch of URLs to scrape and then passes them to the scrape_page function’s map method. This allows us to scrape multiple pages in parallel. After the pages are scraped, we collect any new links that we want to visit and add them to the pages_to_visit list.
Finally, we can run our crawler. Below is the code for our main function which initializes the crawler and runs the crawl method.
if __name__ == "__main__": start_url = "https://en.wikipedia.org/wiki/Web_scraping" crawler = WikipediaCrawler(start_url, max_pages=20) crawler.crawl() # Write the scraped data to a file with open("scraped_data.json", "w") as f: json.dump(crawler.get_scraped_data(), f)
This code initializes the crawler with a starting URL and a maximum number of pages to scrape. It then runs the crawl method and writes the scraped data to a file. You can run this code like any other Python script:
python batch_crawl.py
When you run this code, you should see output that looks like the following:
=> Building image=> Using cached image=> Syncing files...=> Uploading━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 727.9/727.9 kB 0:00:00=> Files synced=> Running function: <app2:scrape_page>=> Function complete <21f88938-8b82-465c-8b16-8bc0259e1997>=> Running function: <app2:scrape_page>=> Running function: <app2:scrape_page>=> Running function: <app2:scrape_page>=> Running function: <app2:scrape_page>=> Running function: <app2:scrape_page>=> Function complete <0f384b7a-98da-400e-bcc5-abacf7f239ef>=> Function complete <16da6df3-955d-4ad7-a8ec-c6456ab6de1e>=> Function complete <2dd4c91b-a48f-4d7e-ada3-485633539ee5>=> Function complete <04452bb5-f642-43e3-9d0f-9cb7532c0d4b>=> Function complete <7ba94632-1907-415a-acad-c37a2cddd97e>
The output shows five function invocations in parallel. Once the scraping is complete, you can see the results in the scraped_data.json file. It will look something like this:
{ "https://en.wikipedia.org/wiki/Web_scraping": { "url": "https://en.wikipedia.org/wiki/Web_scraping", "title": "Web scraping", "content": "Web scraping, web harvesting, or web data extraction ...", "links": [ "https://en.wikipedia.org/wiki/Data_scraping", ... ] }, "https://en.wikipedia.org/wiki/Wikipedia:Verifiability": { "url": "https://en.wikipedia.org/wiki/Wikipedia:Verifiability", "title": "Wikipedia:Verifiability", "content": "\n\n\nIn the English Wikipedia, ...", "links": [ ... ] }, ...}
Building a Continuous Crawler with Beam Functions and Threads
The batched web crawler is a good starting point, but it requires waiting for a full batch to finish before starting any new jobs. If we want to keep our crawler limit continuously saturated, we can use Beam functions in conjunction with Python threads.
To do this, we will use the same scrape_page function, but instead of using the map method, we will use a thread pool to invoke the function in parallel. Below is the code for our WikipediaCrawler class with a continuous crawl method.
def process_scraped_page(self, result): if not result or len(self.scraped_data) >= self.max_pages: return self.scraped_data[result["url"]] = result if len(self.scraped_data) < self.max_pages: new_links = filter(self.is_wikipedia_url, result["links"]) self.pages_to_visit.extend(new_links) def crawl(self): with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: futures = {} while len(self.scraped_data) < self.max_pages and ( self.pages_to_visit or futures ): # Start new tasks if we have capacity and pages to visit while len(futures) < 5 and self.pages_to_visit: url = self.pages_to_visit.pop(0) self.visited_pages.add(url) future = executor.submit(scrape_page.remote, url) futures[future] = url # Wait for any task to complete if futures: done, _ = concurrent.futures.wait( futures, return_when=concurrent.futures.FIRST_COMPLETED ) for future in done: url = futures.pop(future) try: result = future.result() self.process_scraped_page(result) except Exception as e: print(f"Error processing {url}: {str(e)}") print(f"Crawling completed. Scraped {len(self.scraped_data)} pages.")
This code is more complex than the batch crawler, but it allows us to better utilize our compute resources. Instead of having containers sitting idle while other containers complete their work, we immediately send a new function invocation as soon as another one completes. To do this, we track the futures returned by the executor.submit method and wait for any of them to complete using the concurrent.futures.wait method. We specify that we only want to wait for one of the futures to complete using the concurrent.futures.FIRST_COMPLETED constant. This means that as soon as any future completes, we will process the result and add new work to the pool.
To run the continuous crawler, you can use the same main function as before. When you run this code, you should see output that looks like the following:
=> Building image=> Using cached image=> Syncing files...=> Files already synced=> Running function: <app:scrape_page>=> Function complete <5c059d2c-2570-4ac1-8c8b-11d96543d197>=> Running function: <app:scrape_page>=> Running function: <app:scrape_page>=> Running function: <app:scrape_page>=> Running function: <app:scrape_page>=> Running function: <app:scrape_page>=> Function complete <4f0ff5f6-12db-448f-acad-66362485c988>=> Running function: <app:scrape_page>=> Function complete <b4ada76f-a567-4b43-9d40-0481f3fccd6f>=> Running function: <app:scrape_page>=> Function complete <44ae4cbe-6de1-4442-a28c-9adb32937a03>=> Running function: <app:scrape_page>=> Function complete <5788c95d-afb7-428d-b906-d0378f099d58>=> Running function: <app:scrape_page>=> Function complete <dedfaf60-9430-4ece-83b8-58c73ad88f30>
As you can see, as soon as one function invocation completes, we immediately start a new one.