Enter a hostname, see if common crawl has it Link to heading

This checks CC-MAIN-2023-50 - which was from November/December 2023. I may update this in future to check the latest, but, for now, that’s what we have.

Give it a try here:

About Link to heading

For a project I’m diving into, I needed to check if a handful of websites showed up in the latest Common Crawl data. This was no small feat—the index alone is made up of 300 files, adding up to over 250GB when compressed. Uncompressed, you’re looking at a few terabytes, which was way more than I had room for. So, how to tackle this beast? I’ve been getting pretty deep into Spark recently. I whipped up a Spark job that could sift through the compressed files, pluck out the JSON data from each line, and extract the hostname. With Spark’s groupBy feature, I could then tally up how many pages each domain had, though I’m not showing those counts just yet.

Spark did a great job keeping my 32 cores busy processing all that data.

cores

Then came the next hurdle: figuring out a low-cost, low-hassle way to share these insights. Instead of dumping the data into a database and dealing with all that entails, I went for a Bloom filter. It’s a clever data structure that can guess if something is likely part of the set, with a super slim chance (0.0001%) of getting it wrong, but it’s sure when something isn’t there.

I later realized there might have been some simpler ways to answer this question on Common Crawl’s own examples page. But hey, it was a cool experiment, and now we’ve got this page to show for it.

Code below…

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StringType
import json

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("CountDocumentsByHostname") \
    .config("spark.executor.memory", "1g") \
    .config("spark.driver.memory", "8g") \
    .config("spark.cores.max", "30") \
    .getOrCreate()

# Define a UDF to extract the JSON string starting from the first '{' and parse it
def extract_hostname(line):
    try:
        # Find the starting position of the first '{'
        start_index = line.index('{')
        # Extract the substring from the first '{' to the end of the line
        json_str = line[start_index:]
        # Parse the JSON string
        json_data = json.loads(json_str)
        # Extract the URL and then the hostname from the URL
        url = json_data['url']
        hostname = url.split("//")[-1].split("/")[0].split(":")[0]
        return hostname
    except Exception as e:
        # Return a recognizable error message or code if parsing fails
        return "Error parsing JSON: " + str(e)

# Register the UDF with Spark
extract_hostname_udf = udf(extract_hostname, StringType())

# Load data from the log file into a DataFrame
data = spark.read.text("data/cdx-*.gz")

# Apply the UDF to extract hostname, filter by specific hostname, and count
hostnames = data.withColumn("hostname", extract_hostname_udf(col("value")))

hostname_counts = hostnames.groupBy("hostname").count()
# Show the result
#hostname_counts.show()

# Specify the path for the Delta table
parquet_file_path = "hostname_counts"

# Write the DataFrame to a Delta table
hostname_counts.write.mode("overwrite").parquet(parquet_file_path)

# Stop the Spark session
#spark.stop()

And then, to read that data back again, and write the bloom filter:

from pyspark.sql import SparkSession
from pybloom_live import BloomFilter
import math
import pickle


# Initialize SparkSession
spark = SparkSession.builder.appName("URLsToBloomFilter") \
    .appName("dump_bloom") \
    .config("spark.executor.memory", "2g") \
    .config("spark.driver.memory", "16g") \
    .config("spark.cores.max", "30") \
    .config("spark.driver.maxResultSize", "5g") \
    .getOrCreate()

# Read the Parquet file containing URLs
parquet_file_path = "hostname_counts/*.parquet"
urls_df = spark.read.parquet(parquet_file_path)

# Collect URLs to a list (consider doing this more efficiently for large datasets)
urls = [row['hostname'] for row in urls_df.collect()]

# Bloom filter parameters tuning
n = len(urls)  # Number of items expected to be stored in the bloom filter
p = 0.000001  # False positive probability

# Estimate the optimal size of the Bloom filter and the number of hash functions
# These formulas are derived from the Bloom filter design theory
m = -n * math.log(p) / (math.log(2)**2)  # Optimal number of bits
k = (m/n) * math.log(2)  # Optimal number of hash functions

# Initialize the Bloom filter
bf = BloomFilter(capacity=n, error_rate=p)

# Add URLs to the Bloom filter
for url in urls:
    bf.add(url)


# After adding URLs to the Bloom filter `bf`
with open('hostname_bloom_filter.bf', 'wb') as bf_file:
    pickle.dump(bf, bf_file)

# Clean up Spark session
spark.stop()