From a162a73668be2b1b7c27e23bc8f5dcef89a1c186 Mon Sep 17 00:00:00 2001 From: Bas Harenslak Date: Sun, 18 Oct 2020 19:25:55 +0200 Subject: [PATCH 1/3] Rename ch8 to the new ch9 --- chapters/{chapter8 => chapter09}/dags/.airflowignore | 0 chapters/{chapter8 => chapter09}/dags/bash_operator_no_command.py | 0 chapters/{chapter8 => chapter09}/dags/dag_cycle.py | 0 chapters/{chapter8 => chapter09}/dags/duplicate_task_ids.py | 0 chapters/{chapter8 => chapter09}/dags/testme.py | 0 .../tests/airflowbook/operators/postgres-init.sql | 0 .../tests/airflowbook/operators/test_json_to_csv_operator.py | 0 .../tests/airflowbook/operators/test_movielens_operator.py | 0 .../tests/airflowbook/operators/test_movielens_operator2.py | 0 chapters/{chapter8 => chapter09}/tests/conftest.py | 0 .../tests/dags/chapter7/custom/test_operators.py | 0 .../tests/dags/chapter7/custom/test_operators_incorrect.py | 0 chapters/{chapter8 => chapter09}/tests/dags/conftest.py | 0 chapters/{chapter8 => chapter09}/tests/dags/test_dag_integrity.py | 0 14 files changed, 0 insertions(+), 0 deletions(-) rename chapters/{chapter8 => chapter09}/dags/.airflowignore (100%) rename chapters/{chapter8 => chapter09}/dags/bash_operator_no_command.py (100%) rename chapters/{chapter8 => chapter09}/dags/dag_cycle.py (100%) rename chapters/{chapter8 => chapter09}/dags/duplicate_task_ids.py (100%) rename chapters/{chapter8 => chapter09}/dags/testme.py (100%) rename chapters/{chapter8 => chapter09}/tests/airflowbook/operators/postgres-init.sql (100%) rename chapters/{chapter8 => chapter09}/tests/airflowbook/operators/test_json_to_csv_operator.py (100%) rename chapters/{chapter8 => chapter09}/tests/airflowbook/operators/test_movielens_operator.py (100%) rename chapters/{chapter8 => chapter09}/tests/airflowbook/operators/test_movielens_operator2.py (100%) rename chapters/{chapter8 => chapter09}/tests/conftest.py (100%) rename chapters/{chapter8 => chapter09}/tests/dags/chapter7/custom/test_operators.py (100%) rename chapters/{chapter8 => chapter09}/tests/dags/chapter7/custom/test_operators_incorrect.py (100%) rename chapters/{chapter8 => chapter09}/tests/dags/conftest.py (100%) rename chapters/{chapter8 => chapter09}/tests/dags/test_dag_integrity.py (100%) diff --git a/chapters/chapter8/dags/.airflowignore b/chapters/chapter09/dags/.airflowignore similarity index 100% rename from chapters/chapter8/dags/.airflowignore rename to chapters/chapter09/dags/.airflowignore diff --git a/chapters/chapter8/dags/bash_operator_no_command.py b/chapters/chapter09/dags/bash_operator_no_command.py similarity index 100% rename from chapters/chapter8/dags/bash_operator_no_command.py rename to chapters/chapter09/dags/bash_operator_no_command.py diff --git a/chapters/chapter8/dags/dag_cycle.py b/chapters/chapter09/dags/dag_cycle.py similarity index 100% rename from chapters/chapter8/dags/dag_cycle.py rename to chapters/chapter09/dags/dag_cycle.py diff --git a/chapters/chapter8/dags/duplicate_task_ids.py b/chapters/chapter09/dags/duplicate_task_ids.py similarity index 100% rename from chapters/chapter8/dags/duplicate_task_ids.py rename to chapters/chapter09/dags/duplicate_task_ids.py diff --git a/chapters/chapter8/dags/testme.py b/chapters/chapter09/dags/testme.py similarity index 100% rename from chapters/chapter8/dags/testme.py rename to chapters/chapter09/dags/testme.py diff --git a/chapters/chapter8/tests/airflowbook/operators/postgres-init.sql b/chapters/chapter09/tests/airflowbook/operators/postgres-init.sql similarity index 100% rename from chapters/chapter8/tests/airflowbook/operators/postgres-init.sql rename to chapters/chapter09/tests/airflowbook/operators/postgres-init.sql diff --git a/chapters/chapter8/tests/airflowbook/operators/test_json_to_csv_operator.py b/chapters/chapter09/tests/airflowbook/operators/test_json_to_csv_operator.py similarity index 100% rename from chapters/chapter8/tests/airflowbook/operators/test_json_to_csv_operator.py rename to chapters/chapter09/tests/airflowbook/operators/test_json_to_csv_operator.py diff --git a/chapters/chapter8/tests/airflowbook/operators/test_movielens_operator.py b/chapters/chapter09/tests/airflowbook/operators/test_movielens_operator.py similarity index 100% rename from chapters/chapter8/tests/airflowbook/operators/test_movielens_operator.py rename to chapters/chapter09/tests/airflowbook/operators/test_movielens_operator.py diff --git a/chapters/chapter8/tests/airflowbook/operators/test_movielens_operator2.py b/chapters/chapter09/tests/airflowbook/operators/test_movielens_operator2.py similarity index 100% rename from chapters/chapter8/tests/airflowbook/operators/test_movielens_operator2.py rename to chapters/chapter09/tests/airflowbook/operators/test_movielens_operator2.py diff --git a/chapters/chapter8/tests/conftest.py b/chapters/chapter09/tests/conftest.py similarity index 100% rename from chapters/chapter8/tests/conftest.py rename to chapters/chapter09/tests/conftest.py diff --git a/chapters/chapter8/tests/dags/chapter7/custom/test_operators.py b/chapters/chapter09/tests/dags/chapter7/custom/test_operators.py similarity index 100% rename from chapters/chapter8/tests/dags/chapter7/custom/test_operators.py rename to chapters/chapter09/tests/dags/chapter7/custom/test_operators.py diff --git a/chapters/chapter8/tests/dags/chapter7/custom/test_operators_incorrect.py b/chapters/chapter09/tests/dags/chapter7/custom/test_operators_incorrect.py similarity index 100% rename from chapters/chapter8/tests/dags/chapter7/custom/test_operators_incorrect.py rename to chapters/chapter09/tests/dags/chapter7/custom/test_operators_incorrect.py diff --git a/chapters/chapter8/tests/dags/conftest.py b/chapters/chapter09/tests/dags/conftest.py similarity index 100% rename from chapters/chapter8/tests/dags/conftest.py rename to chapters/chapter09/tests/dags/conftest.py diff --git a/chapters/chapter8/tests/dags/test_dag_integrity.py b/chapters/chapter09/tests/dags/test_dag_integrity.py similarity index 100% rename from chapters/chapter8/tests/dags/test_dag_integrity.py rename to chapters/chapter09/tests/dags/test_dag_integrity.py From 139dcf976d0587a913c77f35a53bc42e7e243d66 Mon Sep 17 00:00:00 2001 From: Bas Harenslak Date: Sun, 18 Oct 2020 19:29:13 +0200 Subject: [PATCH 2/3] Rename DAGs --- chapters/chapter09/dags/bash_operator_no_command.py | 3 ++- chapters/chapter09/dags/dag_cycle.py | 3 ++- chapters/chapter09/dags/duplicate_task_ids.py | 2 +- chapters/chapter09/dags/testme.py | 2 +- 4 files changed, 6 insertions(+), 4 deletions(-) diff --git a/chapters/chapter09/dags/bash_operator_no_command.py b/chapters/chapter09/dags/bash_operator_no_command.py index 11e1636b..83ba9126 100644 --- a/chapters/chapter09/dags/bash_operator_no_command.py +++ b/chapters/chapter09/dags/bash_operator_no_command.py @@ -3,8 +3,9 @@ from airflow.operators.bash_operator import BashOperator dag = DAG( - dag_id="chapter8_bash_operator_no_command", + dag_id="chapter_9_bash_operator_no_command", start_date=airflow.utils.dates.days_ago(3), + description="This DAG is intentionally faulty to demonstrate the DAG integrity test.", schedule_interval=None, ) diff --git a/chapters/chapter09/dags/dag_cycle.py b/chapters/chapter09/dags/dag_cycle.py index 11346798..0ed4483c 100644 --- a/chapters/chapter09/dags/dag_cycle.py +++ b/chapters/chapter09/dags/dag_cycle.py @@ -3,8 +3,9 @@ from airflow.operators.dummy_operator import DummyOperator dag = DAG( - dag_id="chapter8_dag_cycle", + dag_id="chapter_9_dag_cycle", start_date=airflow.utils.dates.days_ago(3), + description="This DAG is intentionally faulty to demonstrate the DAG integrity test.", schedule_interval=None, ) diff --git a/chapters/chapter09/dags/duplicate_task_ids.py b/chapters/chapter09/dags/duplicate_task_ids.py index 52f81d75..dab9746f 100644 --- a/chapters/chapter09/dags/duplicate_task_ids.py +++ b/chapters/chapter09/dags/duplicate_task_ids.py @@ -3,7 +3,7 @@ from airflow.operators.dummy_operator import DummyOperator dag = DAG( - dag_id="chapter8_duplicate_task_ids", + dag_id="chapter_9_duplicate_task_ids", start_date=airflow.utils.dates.days_ago(3), schedule_interval=None, ) diff --git a/chapters/chapter09/dags/testme.py b/chapters/chapter09/dags/testme.py index 4c6c0d2f..ae37e1d3 100644 --- a/chapters/chapter09/dags/testme.py +++ b/chapters/chapter09/dags/testme.py @@ -4,7 +4,7 @@ from airflow.operators.dummy_operator import DummyOperator dag = DAG( - dag_id="testme", start_date=airflow.utils.dates.days_ago(3), schedule_interval=None + dag_id="chapter_9_testme", start_date=airflow.utils.dates.days_ago(3), schedule_interval=None ) t1 = DummyOperator(task_id="test", dag=dag) From 33ff7abb1bd894f2d7ebd20c47b8c644fca8a696 Mon Sep 17 00:00:00 2001 From: Bas Harenslak Date: Sun, 18 Oct 2020 20:45:45 +0200 Subject: [PATCH 3/3] Some restructuring --- chapters/chapter09/Dockerfile | 9 ++ chapters/chapter09/README.md | 30 +++++ chapters/chapter09/custom/setup.py | 8 ++ chapters/chapter09/custom/src/__init__.py | 0 .../custom/src/airflowbook/__init__.py | 0 .../custom/src/airflowbook/hooks/__init__.py | 0 .../src/airflowbook/hooks/movielens_hook.py | 106 ++++++++++++++++++ .../src/airflowbook/operators/__init__.py | 0 .../operators/json_to_csv_operator.py | 25 +++++ .../operators/movielens_operator.py | 100 +++++++++++++++++ .../operators/postgres_to_s3_operator.py | 37 ++++++ chapters/chapter09/dags/testme.py | 4 +- chapters/chapter09/docker-compose.yml | 70 ++++++++++++ .../custom => airflowbook}/test_operators.py | 0 .../test_operators_incorrect.py | 0 .../tests/dags/test_dag_integrity.py | 2 +- 16 files changed, 389 insertions(+), 2 deletions(-) create mode 100644 chapters/chapter09/Dockerfile create mode 100644 chapters/chapter09/README.md create mode 100644 chapters/chapter09/custom/setup.py create mode 100644 chapters/chapter09/custom/src/__init__.py create mode 100644 chapters/chapter09/custom/src/airflowbook/__init__.py create mode 100644 chapters/chapter09/custom/src/airflowbook/hooks/__init__.py create mode 100644 chapters/chapter09/custom/src/airflowbook/hooks/movielens_hook.py create mode 100644 chapters/chapter09/custom/src/airflowbook/operators/__init__.py create mode 100644 chapters/chapter09/custom/src/airflowbook/operators/json_to_csv_operator.py create mode 100644 chapters/chapter09/custom/src/airflowbook/operators/movielens_operator.py create mode 100644 chapters/chapter09/custom/src/airflowbook/operators/postgres_to_s3_operator.py create mode 100644 chapters/chapter09/docker-compose.yml rename chapters/chapter09/tests/dags/{chapter7/custom => airflowbook}/test_operators.py (100%) rename chapters/chapter09/tests/dags/{chapter7/custom => airflowbook}/test_operators_incorrect.py (100%) diff --git a/chapters/chapter09/Dockerfile b/chapters/chapter09/Dockerfile new file mode 100644 index 00000000..d8558fd3 --- /dev/null +++ b/chapters/chapter09/Dockerfile @@ -0,0 +1,9 @@ +ARG AIRFLOW_BASE_IMAGE="apache/airflow:1.10.12-python3.8" +FROM ${AIRFLOW_BASE_IMAGE} + +USER root +COPY custom /opt/airflow/custom +RUN pip install -e /opt/airflow/custom && \ + pip install pytest~=6.1.1 pytest-helpers-namespace~=2019.1.8 pytest-mock~=3.3.1 pytest-docker-tools~=1.0.0 + +USER airflow diff --git a/chapters/chapter09/README.md b/chapters/chapter09/README.md new file mode 100644 index 00000000..c2614643 --- /dev/null +++ b/chapters/chapter09/README.md @@ -0,0 +1,30 @@ +# Chapter 9 + +Code accompanying Chapter 9 (about testing) of the book [Data Pipelines with Apache Airflow](https://www.manning.com/books/data-pipelines-with-apache-airflow). + +## Contents + +Chapter 9 discusses the topic of testing. In this folder we have a few DAGs for demonstration purposes, +however there's not really a point in actually running them. Nevertheless, a Docker Compose file was added +which sets up Airflow and allows you to run the DAGs. + +The more interesting thing for this chapter are the tests. A custom Docker image was built which includes all +the pytest goodies explained in the chapter. A custom package with operators and hooks is also installed, +named `airflowbook`. + +## Usage + +To get started with the code examples, start Airflow with Docker Compose with the following command: + +```bash +docker-compose up -d +``` + +The webserver initializes a few things, so wait for a few seconds, and you should be able to access the +Airflow webserver at http://localhost:8080. + +To stop running the examples, run the following command: + +```bash +docker-compose down -v +``` diff --git a/chapters/chapter09/custom/setup.py b/chapters/chapter09/custom/setup.py new file mode 100644 index 00000000..5bb3d8d6 --- /dev/null +++ b/chapters/chapter09/custom/setup.py @@ -0,0 +1,8 @@ +import setuptools + +setuptools.setup( + name="airflowbook", + version="0.1", + packages=setuptools.find_packages(where="src"), + package_dir={"": "src"}, +) diff --git a/chapters/chapter09/custom/src/__init__.py b/chapters/chapter09/custom/src/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/chapters/chapter09/custom/src/airflowbook/__init__.py b/chapters/chapter09/custom/src/airflowbook/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/chapters/chapter09/custom/src/airflowbook/hooks/__init__.py b/chapters/chapter09/custom/src/airflowbook/hooks/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/chapters/chapter09/custom/src/airflowbook/hooks/movielens_hook.py b/chapters/chapter09/custom/src/airflowbook/hooks/movielens_hook.py new file mode 100644 index 00000000..a799b51e --- /dev/null +++ b/chapters/chapter09/custom/src/airflowbook/hooks/movielens_hook.py @@ -0,0 +1,106 @@ +from airflow.hooks.base_hook import BaseHook + + +class MovielensHook(BaseHook): + """Hook for our MovieLens API.""" + + def __init__(self, conn_id, retry=3): + super().__init__(source=None) + self._conn_id = conn_id + self._retry = retry + + self._session = None + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + + def get_conn(self): + if self._session is None: + # Fetch config for the given connection (host, login, etc). + conn_config = self.get_connection(self._conn_id) + self._session = self._build_session( + auth=(conn_config.login, conn_config.password), retry=self._retry + ) + return self._session + + @staticmethod + def _build_session(auth=None, headers=None, retry=None): + """ + Helper method for building a requests Session. + + Parameters + ---------- + auth : Tuple[str] + Credentials (user, pass) to use for basic HTTP authentication. + headers : Dict[str, str] + Headers to include in every request. + retry : Union[int, requests.packages.urllib3.util.retry.Retry] + Configuration to use for handling retries when errors occur during + requests. Can be an int to specify the number of retries to perform + using default settings, or a Retry object which allows you to configure + more details such as which status codes to retry for, etc. See the + documentation of the Retry class for more details. + """ + import requests + from requests.adapters import HTTPAdapter + + session = requests.Session() + + if auth: + session.auth = auth + + if headers: + session.headers.update(headers) + + if retry: + adapter = HTTPAdapter(max_retries=retry) + session.mount("http://", adapter) + session.mount("https://", adapter) + + return session + + def close(self): + if self._session: + self._session.close() + self._session = None + + # API methods: + def get_movies(self): + raise NotImplementedError() + + def get_users(self): + raise NotImplementedError() + + def get_ratings(self, start_date=None, end_date=None, batch_size=100): + """Fetches ratings between the given start/end date.""" + + yield from self._get_with_pagination( + url="http://localhost:5000/ratings", + params={"start_date": start_date, "end_date": end_date}, + batch_size=batch_size, + ) + + def _get_with_pagination(self, url, params, batch_size=100): + """ + Fetches records using a get request with given url/params, + taking pagination into account. + """ + + session = self.get_conn() + + offset = 0 + total = None + while total is None or offset < total: + response = session.get( + url, params={**params, **{"offset": offset, "limit": batch_size}} + ) + response.raise_for_status() + response_json = response.json() + + yield from response_json["result"] + + offset += batch_size + total = response_json["total"] diff --git a/chapters/chapter09/custom/src/airflowbook/operators/__init__.py b/chapters/chapter09/custom/src/airflowbook/operators/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/chapters/chapter09/custom/src/airflowbook/operators/json_to_csv_operator.py b/chapters/chapter09/custom/src/airflowbook/operators/json_to_csv_operator.py new file mode 100644 index 00000000..faa82f39 --- /dev/null +++ b/chapters/chapter09/custom/src/airflowbook/operators/json_to_csv_operator.py @@ -0,0 +1,25 @@ +import csv +import json + +from airflow.models import BaseOperator + + +class JsonToCsvOperator(BaseOperator): + def __init__(self, input_path, output_path, **kwargs): + super().__init__(**kwargs) + self._input_path = input_path + self._output_path = output_path + + def execute(self, context): + # Read input CSV + with open(self._input_path, "r") as json_file: + data = json.load(json_file) + + # Get columns + columns = {key for row in data for key in row.keys()} + + # Write output JSON + with open(self._output_path, mode="w") as csv_file: + writer = csv.DictWriter(csv_file, fieldnames=columns) + writer.writeheader() + writer.writerows(data) diff --git a/chapters/chapter09/custom/src/airflowbook/operators/movielens_operator.py b/chapters/chapter09/custom/src/airflowbook/operators/movielens_operator.py new file mode 100644 index 00000000..f1d0cef9 --- /dev/null +++ b/chapters/chapter09/custom/src/airflowbook/operators/movielens_operator.py @@ -0,0 +1,100 @@ +import json +from collections import Counter, defaultdict + +from airflow.hooks.postgres_hook import PostgresHook +from airflow.models import BaseOperator + +from airflowbook.hooks.movielens_hook import MovielensHook + + +class MovielensDownloadOperator(BaseOperator): + template_fields = ("_start_date", "_end_date", "_output_path") + + def __init__(self, conn_id, start_date, end_date, output_path, **kwargs): + super().__init__(**kwargs) + self._conn_id = conn_id + self._start_date = start_date + self._end_date = end_date + self._output_path = output_path + + def execute(self, context): + with MovielensHook(self._conn_id) as hook: + ratings = list( + hook.get_ratings(start_date=self._start_date, end_date=self._end_date) + ) + + with open(self._output_path, "w") as f: + f.write(json.dumps(ratings)) + + +class MovielensPopularityOperator(BaseOperator): + def __init__(self, conn_id, start_date, end_date, min_ratings=4, top_n=5, **kwargs): + super().__init__(**kwargs) + self._conn_id = conn_id + self._start_date = start_date + self._end_date = end_date + self._min_ratings = min_ratings + self._top_n = top_n + + def execute(self, context): + with MovielensHook(self._conn_id) as hook: + ratings = hook.get_ratings( + start_date=self._start_date, end_date=self._end_date + ) + + rating_sums = defaultdict(Counter) + for rating in ratings: + rating_sums[rating["movieId"]].update(count=1, rating=rating["rating"]) + + averages = { + movie_id: ( + rating_counter["rating"] / rating_counter["count"], + rating_counter["count"], + ) + for movie_id, rating_counter in rating_sums.items() + if rating_counter["count"] >= self._min_ratings + } + return sorted(averages.items(), key=lambda x: x[1], reverse=True)[ + : self._top_n + ] + + +class MovielensToPostgresOperator(BaseOperator): + template_fields = ("_start_date", "_end_date", "_insert_query") + + def __init__( + self, + movielens_conn_id, + start_date, + end_date, + postgres_conn_id, + insert_query, + **kwargs + ): + super().__init__(**kwargs) + self._movielens_conn_id = movielens_conn_id + self._start_date = start_date + self._end_date = end_date + self._postgres_conn_id = postgres_conn_id + self._insert_query = insert_query + + def execute(self, context): + with MovielensHook(self._movielens_conn_id) as movielens_hook: + ratings = list( + movielens_hook.get_ratings( + start_date=self._start_date, end_date=self._end_date + ) + ) + + import pdb + + pdb.set_trace() + + postgres_hook = PostgresHook(postgres_conn_id=self._postgres_conn_id) + insert_queries = [ + self._insert_query.format( + ",".join([str(_[1]) for _ in sorted(rating.items())]) + ) + for rating in ratings + ] + postgres_hook.run(insert_queries) diff --git a/chapters/chapter09/custom/src/airflowbook/operators/postgres_to_s3_operator.py b/chapters/chapter09/custom/src/airflowbook/operators/postgres_to_s3_operator.py new file mode 100644 index 00000000..5f003ddb --- /dev/null +++ b/chapters/chapter09/custom/src/airflowbook/operators/postgres_to_s3_operator.py @@ -0,0 +1,37 @@ +import csv +import io + +from airflow.hooks.S3_hook import S3Hook +from airflow.hooks.postgres_hook import PostgresHook +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults + + +class PostgresToS3Operator(BaseOperator): + template_fields = ("_query", "_s3_key") + + @apply_defaults + def __init__( + self, postgres_conn_id, query, s3_conn_id, s3_bucket, s3_key, **kwargs + ): + super().__init__(**kwargs) + self._postgres_conn_id = postgres_conn_id + self._query = query + self._s3_conn_id = s3_conn_id + self._s3_bucket = s3_bucket + self._s3_key = s3_key + + def execute(self, context): + postgres_hook = PostgresHook(postgres_conn_id=self._postgres_conn_id) + s3_hook = S3Hook(aws_conn_id=self._s3_conn_id) + + results = postgres_hook.get_records(self._query) + + data_buffer = io.StringIO() + csv_writer = csv.writer(data_buffer) + csv_writer.writerows(results) + data_buffer_binary = io.BytesIO(data_buffer.getvalue().encode()) + + s3_hook.get_conn().upload_fileobj( + Fileobj=data_buffer_binary, Bucket=self._s3_bucket, Key=self._s3_key + ) diff --git a/chapters/chapter09/dags/testme.py b/chapters/chapter09/dags/testme.py index ae37e1d3..857fbecd 100644 --- a/chapters/chapter09/dags/testme.py +++ b/chapters/chapter09/dags/testme.py @@ -4,7 +4,9 @@ from airflow.operators.dummy_operator import DummyOperator dag = DAG( - dag_id="chapter_9_testme", start_date=airflow.utils.dates.days_ago(3), schedule_interval=None + dag_id="chapter_9_testme", + start_date=airflow.utils.dates.days_ago(3), + schedule_interval=None, ) t1 = DummyOperator(task_id="test", dag=dag) diff --git a/chapters/chapter09/docker-compose.yml b/chapters/chapter09/docker-compose.yml new file mode 100644 index 00000000..d322f68b --- /dev/null +++ b/chapters/chapter09/docker-compose.yml @@ -0,0 +1,70 @@ +version: '3.7' + +# ====================================== AIRFLOW ENVIRONMENT VARIABLES ======================================= +x-environment: &airflow_environment + - AIRFLOW__CORE__EXECUTOR=LocalExecutor + - AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS=False + - AIRFLOW__CORE__LOAD_EXAMPLES=False + - AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql://airflow:airflow@postgres:5432/airflow + - AIRFLOW__CORE__STORE_DAG_CODE=True + - AIRFLOW__CORE__STORE_SERIALIZED_DAGS=True + - AIRFLOW__WEBSERVER__EXPOSE_CONFIG=True + - AIRFLOW__WEBSERVER__RBAC=False + - AIRFLOW_CONN_MY_POSTGRES=postgresql://airflow:airflow@wiki_results:5432/airflow + +x-airflow-image: &airflow_image apache/airflow:1.10.12-python3.8 +# ====================================== /AIRFLOW ENVIRONMENT VARIABLES ====================================== + +services: + postgres: + image: postgres:12-alpine + environment: + - POSTGRES_USER=airflow + - POSTGRES_PASSWORD=airflow + - POSTGRES_DB=airflow + ports: + - "5432:5432" + + init: + build: + context: . + args: + AIRFLOW_BASE_IMAGE: *airflow_image + depends_on: + - postgres + environment: *airflow_environment + entrypoint: /bin/bash + command: -c 'airflow upgradedb && sleep 5 && airflow create_user --username admin --password admin --firstname John --lastname Smith --role Admin --email admin@example.org' + + webserver: + build: + context: . + args: + AIRFLOW_BASE_IMAGE: *airflow_image + restart: always + depends_on: + - postgres + ports: + - "8080:8080" + volumes: + - logs:/opt/airflow/logs + environment: *airflow_environment + command: webserver + + scheduler: + build: + context: . + args: + AIRFLOW_BASE_IMAGE: *airflow_image + restart: always + depends_on: + - postgres + volumes: + - logs:/opt/airflow/logs + - ./dags:/opt/airflow/dags + - ./tests:/opt/airflow/tests + environment: *airflow_environment + command: scheduler + +volumes: + logs: diff --git a/chapters/chapter09/tests/dags/chapter7/custom/test_operators.py b/chapters/chapter09/tests/dags/airflowbook/test_operators.py similarity index 100% rename from chapters/chapter09/tests/dags/chapter7/custom/test_operators.py rename to chapters/chapter09/tests/dags/airflowbook/test_operators.py diff --git a/chapters/chapter09/tests/dags/chapter7/custom/test_operators_incorrect.py b/chapters/chapter09/tests/dags/airflowbook/test_operators_incorrect.py similarity index 100% rename from chapters/chapter09/tests/dags/chapter7/custom/test_operators_incorrect.py rename to chapters/chapter09/tests/dags/airflowbook/test_operators_incorrect.py diff --git a/chapters/chapter09/tests/dags/test_dag_integrity.py b/chapters/chapter09/tests/dags/test_dag_integrity.py index c86b7985..12da1f9e 100644 --- a/chapters/chapter09/tests/dags/test_dag_integrity.py +++ b/chapters/chapter09/tests/dags/test_dag_integrity.py @@ -8,7 +8,7 @@ from airflow.models import DAG DAG_PATH = os.path.join(os.path.dirname(__file__), "..", "..", "dags/**/*.py") -DAG_FILES = glob.glob(DAG_PATH) +DAG_FILES = glob.glob(DAG_PATH, recursive=True) @pytest.mark.parametrize("dag_file", DAG_FILES)