How to build a data extraction pipeline with Apache Airflow

[ad_1]


Data extraction pipelines might be hard to build and manage, so it’s a good idea to use a tool that can help you with these tasks. Apache Airflow is a popular open-source management workflow platform and in this article you’ll learn how to use it to automate your first workflow.

To follow along I’m assuming you already know how to create and run Bash and Python scripts. This tutorial was built using Ubuntu 20.04 with ImageMagick, tesseract and Python3 installed.

One important concept is that you’ll use Airflow only to automate and manage the tasks. This means that you’ll still have to design and break down your workflow into Bash and/or Python scripts.

To see how this works we’ll first create a workflow and run it manually, then we’ll see how to automate it using Airflow.

The task you’ll automate

In this tutorial you will extract data from a .pdf file and save it to a .csv file.
Screenshot from 2020-12-08 15-53-50.png
These are the main tasks to achieve that:

  1. Extract the text from the .pdf file and save it in a .txt file
  2. Extract the desired metadata from the text file and save it to a .csv file

To run the first task you’ll use the ImageMagick tool to convert the .pdf page to a .png file and then use tesseract to convert the image to a .txt file. These tasks will be defined in a Bash script. To extract the metadata you’ll use Python and regular expressions.

You’ll design the script like this:

  1. Receive the .pdf filename as a parameter
  2. Convert the page to a .png file
  3. Convert the image to a .txt file

This is the script to do all that:

#!/bin/bash
PDF_FILENAME="$1"
convert -density 600 "$PDF_FILENAME" "$PDF_FILENAME.png"
tesseract "$PDF_FILENAME.png" "$PDF_FILENAME"

Save this in a file named pdf_to_text.sh, then run chmod +x pdf_to_text.sh and finally run ./pdf_to_text.sh pdf_filename to create the .txt file. I’m using this pdf as an example.

2. Script to extract the metadata and save it to a .csv file

Now that you have the txt file it’s time to create the regex rule to extrat the data. The goal is to extract what happens in each hour of the meeting. You can extract the data by using a pattern that captures the hours and what happens immediately after that and before a line break. With regex you can do it with this pattern: (d:dd)-(d:dd) (.*n). You’ll then save this data into a .csv file. Here is the code:

import re
import csv
if __name__ == "__main__":
    pattern = "(d:dd)-(d:dd) (.*n)"
with open('metadata.csv', 'w', newline='') as file:
   writer = csv.writer(file)
   writer.writerow(["start_hour","end_hour","activity"])
 
   txt = open("extracted_text.txt", "r", encoding="ISO-8859–1").read()
   extracted_data = re.findall(pattern,txt)
   for data in extracted_data:
      moment = [data[0].strip(),data[1].strip(),data[2].strip()]
      writer.writerow(moment)

Save this in a file named extract_metadata.py and run it with python3 extrac_metadata.py. You can see that the file metadata.csv was created. Beautiful right?
Screenshot from 2020-12-08 15-59-52.png

In this article we’re not dealing with the best practices on how to create workflows, but if you want to learn more about it I highly recommend this talk by the creator of Airflow himself: Functional Data Engineering - A Set of Best Practices.

Ok, now that you have your workflow it’s time to automate it using Airflow.

To use Airflow we run a web server and then access the UI through the browser. You can schedule jobs to run automatically, so besides the server you’ll also need to run the scheduler. In a production setting we usually run it on a dedicated server, but here we’ll run it locally to understand how it works. You’ll create a virtual environment and run these commands to do install everything:

$ python3 -m venv .env
$ source .env/bin/activate
$ pip3 install apache-airflow
$ pip3 install cattrs==1.0.0. #I had to run this to work
$ airflow version # check if everything is ok
$ airflow initdb #start the database Airflow uses
$ airflow scheduler #start the scheduler

Then open another terminal window and run the server:

$ source .env/bin/activate
$ airflow webserver -p 8080

Now go ahead and open https://localhost:8080 to access the Airflow UI.

Creating your first DAG

With Airflow you specify your workflow in a DAG (Directed Acyclic Graph). A DAG is a Python script that has the collection of all the tasks organized to reflect their relationships and dependencies, as stated here.

To specify the tasks you use Operators. There are BashOperators (to execute bash commands), PythonOperators (to call Python functions), MySqlOperators (to execute SQL commands) and so on. In this tutorial you’ll only use the BashOperator to run the scripts.

When you run a workflow it creates a DAG Run, an object representing an instantiation of the DAG in time. Since this is your first time using Airflow, in this tutorial we’ll trigger the DAG manually and not use the scheduler yet.

Let’s create the first_dag.py script to understand how all of this fits together.

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

default_args = {
    'owner': 'Déborah Mesquita',
    'start_date': days_ago(1)
    }

# Defining the DAG using Context Manager
with DAG(
        'extract-meeting-activities',
        default_args=default_args,
        schedule_interval=None,
        ) as dag:
        t1 = BashOperator(
                task_id = 'extract_text_from_pdf',
                bash_command = 'YOUR-LOCAL-PATH/airflow-tutorial/pdf_to_text.sh {{ dag_run.conf["working_path"] if dag_run else "" }} {{ dag_run.conf["pdf_filename"] if dag_run else "" }}',
        )

        t2 = BashOperator(
                task_id = 'extract_metadata_from_text',
                bash_command = 'python3 YOUR-LOCAL-PATH/airflow-tutorial/extract_metadata.py {{ dag_run.conf["working_path"] if dag_run else "" }}',
        )

        t1 >> t2 # Defining the task dependencies

You’re specifying two parameters into the default_args dict, owner and start date. To instantiate the DAG we are using a context manager. The first parameter is the name of the DAG (extract-meeting-activities), then we pass the default_args and set schedule_interval to None since we’ll trigger the workflow manually.

After that it’s time to create the tasks. In this workflow you’ll use the BashOperator since the workflow consists of running Bash and Python scripts.

Remember that the first script receives the pdf filename as a parameter? In this DAG you’ll pass this parameter when you trigger the workflow manually. The parameters are stored in the dag_run.conf dict and you can use the keys to access them.

For the workflow to work as expected you’ll need to make some changes in the scripts. When Airflow runs the scripts it does so in a temporary directory, meaning that it will not be the same directory we were running the scripts manually. So you’ll pass the PATH that has the pdf file, which will be the same path you want to store the .csv file as a parameter as well. You can see the final scripts here in this repository.

Then it’s time to define the task dependencies. You are using a bitshift operator to do that, >>, meaning that t1 runs first and t2 runs second.

Running your first workflow

By default, Airflow looks at the directory ~/airflow/dags to search for DAGs. Since we didn’t change the Airflow config this should be the for you too. So go ahead and copy the file first_dag.py to that directory.

$ cp first_dag.py ~/airflow/dags/

Now if you go to https://localhost:8080 you can see the DAG.
Screenshot from 2020-12-08 16-14-50.png

To trigger the DAG click on its name (extract-meeting-activities) and then click on “Trigger DAG”:
Screenshot from 2020-12-08 16-20-56.png

Now pass the parameter in the textbox and click the Trigger buttom. These are the parameters:

{"working_path": "YOUR-LOCAL-PATH/airflow-tutorial/", 
"pdf_filename": "sample_meeting_agenda.pdf"}

Then if you go to the “Tree View” you can see all the runs for this DAG. Green means everything went well, red means that the task has failed, and yellow means the task didn’t run yet and is up to retry. Check out the Task Lifecycle here. To check the logs for each task you can click on the square and then click on “View Log”.

Screenshot from 2020-12-08 16-26-01.png

And there you have it, your first workflow running with Airflow! Some good next steps could be learning more about DAG Runs, check the command line interface or use an operator to execute SQL commands.

Thanks for reading!

Read More …

[ad_2]


Write a comment