Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions chapters/chapter09/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
ARG AIRFLOW_BASE_IMAGE="apache/airflow:1.10.12-python3.8"
FROM ${AIRFLOW_BASE_IMAGE}

USER root
COPY custom /opt/airflow/custom
RUN pip install -e /opt/airflow/custom && \
pip install pytest~=6.1.1 pytest-helpers-namespace~=2019.1.8 pytest-mock~=3.3.1 pytest-docker-tools~=1.0.0

USER airflow
30 changes: 30 additions & 0 deletions chapters/chapter09/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Chapter 9

Code accompanying Chapter 9 (about testing) of the book [Data Pipelines with Apache Airflow](https://www.manning.com/books/data-pipelines-with-apache-airflow).

## Contents

Chapter 9 discusses the topic of testing. In this folder we have a few DAGs for demonstration purposes,
however there's not really a point in actually running them. Nevertheless, a Docker Compose file was added
which sets up Airflow and allows you to run the DAGs.

The more interesting thing for this chapter are the tests. A custom Docker image was built which includes all
the pytest goodies explained in the chapter. A custom package with operators and hooks is also installed,
named `airflowbook`.

## Usage

To get started with the code examples, start Airflow with Docker Compose with the following command:

```bash
docker-compose up -d
```

The webserver initializes a few things, so wait for a few seconds, and you should be able to access the
Airflow webserver at http://localhost:8080.

To stop running the examples, run the following command:

```bash
docker-compose down -v
```
8 changes: 8 additions & 0 deletions chapters/chapter09/custom/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import setuptools

setuptools.setup(
name="airflowbook",
version="0.1",
packages=setuptools.find_packages(where="src"),
package_dir={"": "src"},
)
Empty file.
Empty file.
Empty file.
106 changes: 106 additions & 0 deletions chapters/chapter09/custom/src/airflowbook/hooks/movielens_hook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
from airflow.hooks.base_hook import BaseHook


class MovielensHook(BaseHook):
"""Hook for our MovieLens API."""

def __init__(self, conn_id, retry=3):
super().__init__(source=None)
self._conn_id = conn_id
self._retry = retry

self._session = None

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.close()

def get_conn(self):
if self._session is None:
# Fetch config for the given connection (host, login, etc).
conn_config = self.get_connection(self._conn_id)
self._session = self._build_session(
auth=(conn_config.login, conn_config.password), retry=self._retry
)
return self._session

@staticmethod
def _build_session(auth=None, headers=None, retry=None):
"""
Helper method for building a requests Session.

Parameters
----------
auth : Tuple[str]
Credentials (user, pass) to use for basic HTTP authentication.
headers : Dict[str, str]
Headers to include in every request.
retry : Union[int, requests.packages.urllib3.util.retry.Retry]
Configuration to use for handling retries when errors occur during
requests. Can be an int to specify the number of retries to perform
using default settings, or a Retry object which allows you to configure
more details such as which status codes to retry for, etc. See the
documentation of the Retry class for more details.
"""
import requests
from requests.adapters import HTTPAdapter

session = requests.Session()

if auth:
session.auth = auth

if headers:
session.headers.update(headers)

if retry:
adapter = HTTPAdapter(max_retries=retry)
session.mount("http://", adapter)
session.mount("https://", adapter)

return session

def close(self):
if self._session:
self._session.close()
self._session = None

# API methods:
def get_movies(self):
raise NotImplementedError()

def get_users(self):
raise NotImplementedError()

def get_ratings(self, start_date=None, end_date=None, batch_size=100):
"""Fetches ratings between the given start/end date."""

yield from self._get_with_pagination(
url="http://localhost:5000/ratings",
params={"start_date": start_date, "end_date": end_date},
batch_size=batch_size,
)

def _get_with_pagination(self, url, params, batch_size=100):
"""
Fetches records using a get request with given url/params,
taking pagination into account.
"""

session = self.get_conn()

offset = 0
total = None
while total is None or offset < total:
response = session.get(
url, params={**params, **{"offset": offset, "limit": batch_size}}
)
response.raise_for_status()
response_json = response.json()

yield from response_json["result"]

offset += batch_size
total = response_json["total"]
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import csv
import json

from airflow.models import BaseOperator


class JsonToCsvOperator(BaseOperator):
def __init__(self, input_path, output_path, **kwargs):
super().__init__(**kwargs)
self._input_path = input_path
self._output_path = output_path

def execute(self, context):
# Read input CSV
with open(self._input_path, "r") as json_file:
data = json.load(json_file)

# Get columns
columns = {key for row in data for key in row.keys()}

# Write output JSON
with open(self._output_path, mode="w") as csv_file:
writer = csv.DictWriter(csv_file, fieldnames=columns)
writer.writeheader()
writer.writerows(data)
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import json
from collections import Counter, defaultdict

from airflow.hooks.postgres_hook import PostgresHook
from airflow.models import BaseOperator

from airflowbook.hooks.movielens_hook import MovielensHook


class MovielensDownloadOperator(BaseOperator):
template_fields = ("_start_date", "_end_date", "_output_path")

def __init__(self, conn_id, start_date, end_date, output_path, **kwargs):
super().__init__(**kwargs)
self._conn_id = conn_id
self._start_date = start_date
self._end_date = end_date
self._output_path = output_path

def execute(self, context):
with MovielensHook(self._conn_id) as hook:
ratings = list(
hook.get_ratings(start_date=self._start_date, end_date=self._end_date)
)

with open(self._output_path, "w") as f:
f.write(json.dumps(ratings))


class MovielensPopularityOperator(BaseOperator):
def __init__(self, conn_id, start_date, end_date, min_ratings=4, top_n=5, **kwargs):
super().__init__(**kwargs)
self._conn_id = conn_id
self._start_date = start_date
self._end_date = end_date
self._min_ratings = min_ratings
self._top_n = top_n

def execute(self, context):
with MovielensHook(self._conn_id) as hook:
ratings = hook.get_ratings(
start_date=self._start_date, end_date=self._end_date
)

rating_sums = defaultdict(Counter)
for rating in ratings:
rating_sums[rating["movieId"]].update(count=1, rating=rating["rating"])

averages = {
movie_id: (
rating_counter["rating"] / rating_counter["count"],
rating_counter["count"],
)
for movie_id, rating_counter in rating_sums.items()
if rating_counter["count"] >= self._min_ratings
}
return sorted(averages.items(), key=lambda x: x[1], reverse=True)[
: self._top_n
]


class MovielensToPostgresOperator(BaseOperator):
template_fields = ("_start_date", "_end_date", "_insert_query")

def __init__(
self,
movielens_conn_id,
start_date,
end_date,
postgres_conn_id,
insert_query,
**kwargs
):
super().__init__(**kwargs)
self._movielens_conn_id = movielens_conn_id
self._start_date = start_date
self._end_date = end_date
self._postgres_conn_id = postgres_conn_id
self._insert_query = insert_query

def execute(self, context):
with MovielensHook(self._movielens_conn_id) as movielens_hook:
ratings = list(
movielens_hook.get_ratings(
start_date=self._start_date, end_date=self._end_date
)
)

import pdb

pdb.set_trace()

postgres_hook = PostgresHook(postgres_conn_id=self._postgres_conn_id)
insert_queries = [
self._insert_query.format(
",".join([str(_[1]) for _ in sorted(rating.items())])
)
for rating in ratings
]
postgres_hook.run(insert_queries)
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import csv
import io

from airflow.hooks.S3_hook import S3Hook
from airflow.hooks.postgres_hook import PostgresHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults


class PostgresToS3Operator(BaseOperator):
template_fields = ("_query", "_s3_key")

@apply_defaults
def __init__(
self, postgres_conn_id, query, s3_conn_id, s3_bucket, s3_key, **kwargs
):
super().__init__(**kwargs)
self._postgres_conn_id = postgres_conn_id
self._query = query
self._s3_conn_id = s3_conn_id
self._s3_bucket = s3_bucket
self._s3_key = s3_key

def execute(self, context):
postgres_hook = PostgresHook(postgres_conn_id=self._postgres_conn_id)
s3_hook = S3Hook(aws_conn_id=self._s3_conn_id)

results = postgres_hook.get_records(self._query)

data_buffer = io.StringIO()
csv_writer = csv.writer(data_buffer)
csv_writer.writerows(results)
data_buffer_binary = io.BytesIO(data_buffer.getvalue().encode())

s3_hook.get_conn().upload_fileobj(
Fileobj=data_buffer_binary, Bucket=self._s3_bucket, Key=self._s3_key
)
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
from airflow.operators.bash_operator import BashOperator

dag = DAG(
dag_id="chapter8_bash_operator_no_command",
dag_id="chapter_9_bash_operator_no_command",
start_date=airflow.utils.dates.days_ago(3),
description="This DAG is intentionally faulty to demonstrate the DAG integrity test.",
schedule_interval=None,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
from airflow.operators.dummy_operator import DummyOperator

dag = DAG(
dag_id="chapter8_dag_cycle",
dag_id="chapter_9_dag_cycle",
start_date=airflow.utils.dates.days_ago(3),
description="This DAG is intentionally faulty to demonstrate the DAG integrity test.",
schedule_interval=None,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from airflow.operators.dummy_operator import DummyOperator

dag = DAG(
dag_id="chapter8_duplicate_task_ids",
dag_id="chapter_9_duplicate_task_ids",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval=None,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
from airflow.operators.dummy_operator import DummyOperator

dag = DAG(
dag_id="testme", start_date=airflow.utils.dates.days_ago(3), schedule_interval=None
dag_id="chapter_9_testme",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval=None,
)

t1 = DummyOperator(task_id="test", dag=dag)
Expand Down
Loading