Backfilling a Database With Data From BigQuery
7 min read
Hot on the heels of writing a Python script for scraping my Goodreads "read" books, I was recently tasked with writing another Python script for backfilling some data in a database. Rather than using a scraper to collect the data, the problem to be solved this time round, was how to get data from a BigQuery database to our production database.
In our case, the BigQuery database and production database was all part of the same network, so there were no issues with authorisation. The solution turned out to be fairly simple in the end, but I wanted to jot this down in a blog post as I did have some key learnings from doing this task.
The Python script
As an example, let's say we have an Order table, where each row has a unique email address. There's another column in the table called name
that currently lies empty. This is the column we want to backfill, using data from our BigQuery table. The BigQuery table also contains rows with email and name, so the data itself is all there from BQ - the question is just how to get it.
Let's start with the final Python script:
# get_names_script.py
import argparse
def get_args_parser():
parser = argparse.ArgumentParser(
description="Backfill name in order table"
)
parser.add_argument(
"--do-update",
action="store_true",
help="Execute the updating of names in order table",
)
parser.add_argument(
"--batch-size",
type=int,
help="Size of each batch fetched from orders table. Default is 1000.",
)
return parser
if __name__ == "__main__":
parser = get_args_parser()
args = parser.parse_args()
batch_size = args.batch_size if args.batch_size else 1000
is_dry_run = False if args.do_update else True
if not is_dry_run:
print("[WARN] UPDATE mode enabled.")
input("Press 'Enter' to continue")
logger.info("[INFO] UPDATE mode enabled.")
else:
logger.info("[INFO] DRY RUN mode enabled.")
backfill_names(
batch_size=batch_size,
is_dry_run=is_dry_run,
)
What this script is saying is that in the terminal, we want to be able to call python get_names_script.py
with optional arguments --do-update
and --batch-size
and have the backfill_names
method run. The default settings are to have the script be executed in "dry run" mode (where no updates are actually done to the database), in batches of 1,000 rows from our Order table.
The main backfill_names
method
Let's now look at what the main method actually does:
import logging
import sys
from typing import Dict, Optional
from app.order.database import managed_db
from app.order.names import NamesRepository
from app.order.repository import OrderRepository
from app.order.service import OrderService
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
root = logging.getLogger()
root.setLevel(logging.INFO)
root.addHandler(handler)
logger = logging.getLogger(__name__)
def get_order_repo():
with managed_db() as db:
return OrderRepository(db)
def get_names_repo():
return NamesRepository()
def get_names_lookup() -> Dict[str, str]:
repo = get_names_repo()
names_rows = repo.get_names()
names_lookup = {
row["email"]: row["names"]
for row in names_rows
}
return names_lookup
def backfill_names(
batch_size: int = 1000,
is_dry_run: bool = True,
):
order_repo_for_retrieving = get_order_repo()
order_repo_for_writing = get_order_repo()
order_service = OrderService(order_repo_for_writing)
write_session = order_repo_for_writing.db
names_lookup = get_names_lookup()
logger.info(f"Found {len(names_lookup)} names")
counters = {
"updated": 0,
"order_missing_email": 0,
"no_email_found": 0,
}
try:
for (
order
) in order_repo_for_retrieving.get_orders_without_names(
batch_size=batch_size
):
email = order.email
if email is None:
counters["order_missing_email"] += 1
continue
try:
name = names_lookup[
email
]
except KeyError:
logger.warning(
f"Couldn't find name for order with email {email}"
)
counters["no_email_found"] += 1
continue
logger.debug(
f"Updating order for {email} with name {name}"
)
counters["updated"] += 1
if not is_dry_run:
try:
order = write_session.merge(order, load=False)
order_service.update_order_with_name(
order=order, name=name
)
except Exception as e:
logger.error(
f"Error updating order for {email} with name {name}: {e}"
)
logger.exception(e)
except Exception as e:
logger.error("Failed looping through orders")
logger.exception(e)
finally:
skipped = (
counters["order_missing_email"]
+ counters["no_email_found"]
)
logger.info(
f"Orders: Updated {counters['updated']}, Order missing email {counters['order_missing_email']}, No email found {counters['no_email_found']}"
)
logger.info(
f"Orders: Batches: { (counters['updated'] + skipped) / float(batch_size) }"
)
Just looking at the backfill_names()
method, the steps we're taking are:
- To get two database connections (via our
OrderRepository
class) - one for retrieving the rows in Order table (in batches of 1,000), and the second for doing the actual updating of the rows. Two connections were necessary as it otherwise gets messy as the database gets confused about rows being updated whilst it's retrieving potentially the same rows in memory. - We call
get_names_lookup()
that makes a call to our BQ table, and processes the data on the fly to create a dictionary where the key is theemail
, and the value is thename
. I'll explain what the actual use of theget_names_repo()
method is below (in the bonus section), if you're wondering why we're just returning theNamesRepository
. - The
NamesRepository
BQ query looks like this - we're using the Pythongoogle-cloud-bigquery
package which works really nicely.
# app.order.names
from google.cloud import bigquery
class NamesRepository:
def __init__(self):
self._bq_client = bigquery.Client()
def get_names(
self,
):
query_str = """
SELECT
email,
name,
FROM
`whatever-your-bq-table-is`
WHERE
name IS NOT NULL
"""
query_result = self._bq_client.query(query_str).to_dataframe()
return query_result[["email", "name"]].to_records()
- The next step is to get all the Order table rows with empty name fields in batches of 1,000. We then iterate through these and for each row, get the order
email
, perform the lookup for thename
using the dictionary we created in the steps above, and update the row if found.- There are a bunch of logs and error handling in between to keep us up to date on what's happening as the script is running. 🕵🏻
- The script culminates by outputting a summary of the number of rows updated, skipped and the total number of batches processed.
How the batch retrieval is performed
First things first, we use SQLAlchemy as our ORM. The way we retrieve our Order table rows was by running:
order_repo_for_retrieving.get_orders_without_names(
batch_size=batch_size
)
Our OrderRepository
class looks like this (note the yield_per
which you can read about here):
class OrderRepository:
def __init__(self, db: Session):
self.db: Session = db
def save_order(self, order: Order) -> Order:
self.db.add(order)
self.db.commit()
return order
def get_orders_without_names(self, batch_size: int = 1000):
return (
self.db.query(Order)
.filter(Order.name.is_(None))
.yield_per(batch_size)
)
How the update is performed
Let's deep dive into how the updating of each row is actually performed.
if not is_dry_run:
try:
order = write_session.merge(order, load=False)
order_service.update_order_with_name(
order=order, name=name
)
except Exception as e:
logger.error(
f"Error updating order for {email} with name {name}: {e}"
)
logger.exception(e)
As mentioned above, we had to set up a dedicated database connection just for writing the updates to the database (write_session
). To ensure we're updating the correct order
(retrieved by the first database connection), we need to run the merge
method. You can read more about it here.
The OrderService
looks like this:
class OrderService:
def __init__(self, repo: OrderRepository):
self.repo = repo
def save_order(self, order: Order) -> Order:
return self.repo.save_order(order=order)
def update_order_with_name(
self,
order,
name: str,
) -> Order:
order.name = name
return self.save_order(order)
Bonus: How to test
Let me come back to why I introduced the seemingly redundant get_names_repo()
. The reason was to allow for testing. This is what my script actually looks like (to allow for dependency injection for writing tests):
class NamesCSVRepository:
def __init__(self, filename: str):
self.filename = filename
def get_names(self):
with open(self.filename) as csv_file:
csv_reader = csv.DictReader(csv_file)
return list(iter(csv_reader))
def get_names_repo(names_csv_file: Optional[str] = None):
if names_csv_file:
return NamesCSVRepository(names_csv_file)
else:
return NamesRepository()
def get_names_lookup(
names_csv_file: Optional[str] = None,
) -> Dict[str, str]:
repo = get_names_repo(names_csv_file)
names_rows = repo.get_names()
names_lookup = {
row["email"]: row["names"]
for row in names_rows
}
return names_lookup
def backfill_names(
batch_size: int = 1000,
is_dry_run: bool = True,
names_csv_file: Optional[str] = None,
order_repo_for_retrieving: Optional[OrderRepository] = None,
order_service: Optional[OrderService] = None,
):
order_repo_for_retrieving = (
order_repo_for_retrieving if order_repo_for_retrieving else get_order_repo()
)
order_repo_for_writing = get_order_repo()
order_service = (
order_service if order_service else OrderService(order_repo_for_writing)
)
write_session = (
order_service.repo.db if order_service else order_repo_for_writing.db
)
names_lookup = get_names_lookup(names_csv_file)
logger.info(f"Found {len(names_lookup)} names")
# ... rest of script
The NamesCSVRepository
serves as an alternative source for getting the name by email, which is what I use in my tests, instead of calling the BigQuery table. What this means is that I can create a test CSV file (example format below), and pass the path in as an argument fro names_csv_file
in my tests.
"email","name"
[email protected],Mario Mario
[email protected],Luigi Mario
[email protected],King Bowser