Avatar

Hola, I'm Julia.

Backfilling a Database With Data From BigQuery

#databases #python #testing

7 min read

Hot on the heels of writing a Python script for scraping my Goodreads "read" books, I was recently tasked with writing another Python script for backfilling some data in a database. Rather than using a scraper to collect the data, the problem to be solved this time round, was how to get data from a BigQuery database to our production database.

In our case, the BigQuery database and production database was all part of the same network, so there were no issues with authorisation. The solution turned out to be fairly simple in the end, but I wanted to jot this down in a blog post as I did have some key learnings from doing this task.

The Python script

As an example, let's say we have an Order table, where each row has a unique email address. There's another column in the table called name that currently lies empty. This is the column we want to backfill, using data from our BigQuery table. The BigQuery table also contains rows with email and name, so the data itself is all there from BQ - the question is just how to get it.

Let's start with the final Python script:

# get_names_script.py

import argparse

def get_args_parser():
    parser = argparse.ArgumentParser(
        description="Backfill name in order table"
    )

    parser.add_argument(
        "--do-update",
        action="store_true",
        help="Execute the updating of names in order table",
    )

    parser.add_argument(
        "--batch-size",
        type=int,
        help="Size of each batch fetched from orders table. Default is 1000.",
    )
    return parser


if __name__ == "__main__":
    parser = get_args_parser()
    args = parser.parse_args()

    batch_size = args.batch_size if args.batch_size else 1000
    is_dry_run = False if args.do_update else True

    if not is_dry_run:
        print("[WARN] UPDATE mode enabled.")
        input("Press 'Enter' to continue")
        logger.info("[INFO] UPDATE mode enabled.")
    else:
        logger.info("[INFO] DRY RUN mode enabled.")

    backfill_names(
        batch_size=batch_size,
        is_dry_run=is_dry_run,
    )

What this script is saying is that in the terminal, we want to be able to call python get_names_script.py with optional arguments --do-update and --batch-size and have the backfill_names method run. The default settings are to have the script be executed in "dry run" mode (where no updates are actually done to the database), in batches of 1,000 rows from our Order table.

The main backfill_names method

Let's now look at what the main method actually does:

import logging
import sys
from typing import Dict, Optional

from app.order.database import managed_db
from app.order.names import NamesRepository
from app.order.repository import OrderRepository
from app.order.service import OrderService


handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)

root = logging.getLogger()
root.setLevel(logging.INFO)
root.addHandler(handler)

logger = logging.getLogger(__name__)


def get_order_repo():
    with managed_db() as db:
        return OrderRepository(db)

def get_names_repo():
	return NamesRepository()

def get_names_lookup() -> Dict[str, str]:
    repo = get_names_repo()
    names_rows = repo.get_names()
    names_lookup = {
        row["email"]: row["names"]
        for row in names_rows
    }
    return names_lookup

def backfill_names(
    batch_size: int = 1000,
    is_dry_run: bool = True,
):

    order_repo_for_retrieving = get_order_repo()
    order_repo_for_writing = get_order_repo()
    order_service = OrderService(order_repo_for_writing)
    write_session = order_repo_for_writing.db

    names_lookup = get_names_lookup()
    logger.info(f"Found {len(names_lookup)} names")

    counters = {
        "updated": 0,
        "order_missing_email": 0,
        "no_email_found": 0,
    }

    try:
        for (
            order
        ) in order_repo_for_retrieving.get_orders_without_names(
            batch_size=batch_size
        ):
            email = order.email

            if email is None:
                counters["order_missing_email"] += 1
                continue

            try:
                name = names_lookup[
                    email
                ]
            except KeyError:
                logger.warning(
                    f"Couldn't find name for order with email {email}"
                )
                counters["no_email_found"] += 1
                continue

            logger.debug(
                f"Updating order for {email} with name {name}"
            )
            counters["updated"] += 1

            if not is_dry_run:
                try:
                    order = write_session.merge(order, load=False)
                    order_service.update_order_with_name(
                        order=order, name=name
                    )
                except Exception as e:
                    logger.error(
                        f"Error updating order for {email} with name {name}: {e}"
                    )
                    logger.exception(e)

    except Exception as e:
        logger.error("Failed looping through orders")
        logger.exception(e)

    finally:
        skipped = (
            counters["order_missing_email"]
            + counters["no_email_found"]
        )

        logger.info(
            f"Orders: Updated {counters['updated']}, Order missing email {counters['order_missing_email']}, No email found {counters['no_email_found']}"
        )

        logger.info(
            f"Orders: Batches: { (counters['updated'] + skipped) / float(batch_size) }"
        )

Just looking at the backfill_names() method, the steps we're taking are:

  • To get two database connections (via our OrderRepository class) - one for retrieving the rows in Order table (in batches of 1,000), and the second for doing the actual updating of the rows. Two connections were necessary as it otherwise gets messy as the database gets confused about rows being updated whilst it's retrieving potentially the same rows in memory.
  • We call get_names_lookup() that makes a call to our BQ table, and processes the data on the fly to create a dictionary where the key is the email, and the value is the name. I'll explain what the actual use of the get_names_repo() method is below (in the bonus section), if you're wondering why we're just returning the NamesRepository.
  • The NamesRepository BQ query looks like this - we're using the Python google-cloud-bigquery package which works really nicely.
# app.order.names

from google.cloud import bigquery


class NamesRepository:
    def __init__(self):
        self._bq_client = bigquery.Client()

    def get_names(
        self,
    ):
        query_str = """
            SELECT
              email,
              name,
            FROM
              `whatever-your-bq-table-is`
            WHERE
              name IS NOT NULL
        """
        query_result = self._bq_client.query(query_str).to_dataframe()
        return query_result[["email", "name"]].to_records()
  • The next step is to get all the Order table rows with empty name fields in batches of 1,000. We then iterate through these and for each row, get the order email, perform the lookup for the name using the dictionary we created in the steps above, and update the row if found.
    • There are a bunch of logs and error handling in between to keep us up to date on what's happening as the script is running. 🕵🏻
  • The script culminates by outputting a summary of the number of rows updated, skipped and the total number of batches processed.

How the batch retrieval is performed

First things first, we use SQLAlchemy as our ORM. The way we retrieve our Order table rows was by running:

order_repo_for_retrieving.get_orders_without_names(
	batch_size=batch_size
)

Our OrderRepository class looks like this (note the yield_per which you can read about here):

class OrderRepository:
	def __init__(self, db: Session):
		self.db: Session = db

	def save_order(self, order: Order) -> Order:
        self.db.add(order)
        self.db.commit()
        return order

	def get_orders_without_names(self, batch_size: int = 1000):
        return (
            self.db.query(Order)
            .filter(Order.name.is_(None))
            .yield_per(batch_size)
        )

How the update is performed

Let's deep dive into how the updating of each row is actually performed.

 if not is_dry_run:
	try:
		order = write_session.merge(order, load=False)
		order_service.update_order_with_name(
			order=order, name=name
		)
	except Exception as e:
		logger.error(
			f"Error updating order for {email} with name {name}: {e}"
		)
		logger.exception(e)

As mentioned above, we had to set up a dedicated database connection just for writing the updates to the database (write_session). To ensure we're updating the correct order (retrieved by the first database connection), we need to run the merge method. You can read more about it here.

The OrderService looks like this:

class OrderService:

	def __init__(self, repo: OrderRepository):
		self.repo = repo

	def save_order(self, order: Order) -> Order:
        return self.repo.save_order(order=order)

	def update_order_with_name(
		self,
		order,
		name: str,
	) -> Order:
		order.name = name
		return self.save_order(order)

Bonus: How to test

Let me come back to why I introduced the seemingly redundant get_names_repo(). The reason was to allow for testing. This is what my script actually looks like (to allow for dependency injection for writing tests):

class NamesCSVRepository:
    def __init__(self, filename: str):
        self.filename = filename

    def get_names(self):
        with open(self.filename) as csv_file:
            csv_reader = csv.DictReader(csv_file)
            return list(iter(csv_reader))


def get_names_repo(names_csv_file: Optional[str] = None):
    if names_csv_file:
        return NamesCSVRepository(names_csv_file)
    else:
        return NamesRepository()

def get_names_lookup(
    names_csv_file: Optional[str] = None,
) -> Dict[str, str]:
    repo = get_names_repo(names_csv_file)
    names_rows = repo.get_names()
    names_lookup = {
        row["email"]: row["names"]
        for row in names_rows
    }
    return names_lookup

def backfill_names(
    batch_size: int = 1000,
    is_dry_run: bool = True,
    names_csv_file: Optional[str] = None,
    order_repo_for_retrieving: Optional[OrderRepository] = None,
    order_service: Optional[OrderService] = None,
):

    order_repo_for_retrieving = (
        order_repo_for_retrieving if order_repo_for_retrieving else get_order_repo()
    )
    order_repo_for_writing = get_order_repo()
    order_service = (
        order_service if order_service else OrderService(order_repo_for_writing)
    )
    write_session = (
        order_service.repo.db if order_service else order_repo_for_writing.db
    )

	names_lookup = get_names_lookup(names_csv_file)
    logger.info(f"Found {len(names_lookup)} names")

	# ... rest of script

The NamesCSVRepository serves as an alternative source for getting the name by email, which is what I use in my tests, instead of calling the BigQuery table. What this means is that I can create a test CSV file (example format below), and pass the path in as an argument fro names_csv_file in my tests.

"email","name"
[email protected],Mario Mario
[email protected],Luigi Mario
[email protected],King Bowser

© 2016-2024 Julia Tan · Powered by Next JS.