Rashid Mohammed
Kingmoh preaches data science

Kingmoh preaches data science

Introduction to Apache Airflow

Introduction to Apache Airflow

Setting up and creating a basic pipeline

Rashid Mohammed's photo
Rashid Mohammed
·May 14, 2022·

8 min read

Table of contents

Apache airflow is a tool that is used to schedule and monitor workflows. It's one of the several tools I planned to learn on my journey to becoming a full data professional. The first resource I stumbled upon when I searched "introduction to airflow" on google was a documentation of a workshop led by Tania Allard in 2019. Nonetheless the documentation is loaded with great content, most of the airflow commands used are deprecated thus, I decided to create an updated version of her documentation. In this blog, I discuss the prerequistes for this airflow project and a basic data pipeline. I'll encapsulate these tasks within airflow's functionality in another blog.


Getting Started

The prerequisites for this project are discussed below. Airflow will be installed in the next blog.

Python (preferrably 3.7+)

As Tania mentions in her workshop, it can be difficult to install python packages individually. A recommended solution is to use Miniconda which provides a variety of packages. Make sure you confirm the checksum of the installer after downloading it. For example, the image below shows the checksum of the Miniconda installer (bash version) for mac. Screen Shot 2022-05-14 at 1.29.12 pm.png

>> # Show checksum with openssl
>> openssl dgst -sha256 Miniconda3-latest-MacOSX-x86_64.sh
SHA256(Miniconda3-latest-MacOSX-x86_64.sh)= 7717253055e7c09339cd3d0815a0b1986b9138dcfcb8ec33b9733df32dd40eaa

If you don't prefer Miniconda, you can also create your own virtual environments using venv or pipenv.

Docker

Microsoft does not offer SQL Server for mac so I run an instance of SQL Server using docker. You can go here to download the docker image.

Unfortunately, this image does not work on Apple Silicon

A database (MySQL 8.0+, MsSQL)

MySQL and MsSQL are two of several popular database management systems. I'll be using these two for different purposes in this blog. For macOS users, peruse this gist by nrollr to successfully install MySQL using homebrew. You can also run both databases with docker.

Nice to Haves

Git

Git is a tool used for creating versions of a software. It does this by keeping track of all changes to the source code of the software. You can download it here

GitHub

GitHub is a web-based service that provides internet hosting for version control and software development using Git. You can create a basic account and a limited number of private repositories at no cost. Go here to sign up.


Creating a basic pipeline in Python

Tania provides a great explanation of automation and pipelines here. In this section, I modify an ETL pipeline example provided on the Airflow documentation website. Airflow decided to use PostgreSQL in their tutorial however I'll use MsSQL.

Setting up your MsSQL with docker

Firstly, download the image from dockerhub.

docker pull mcr.microsoft.com/mssql/server

Spin up a new instance by using the following command

docker run -e "ACCEPT_EULA=Y" -e "SA_PASSWORD=${PASSWORD}" --name $NAME -p $HOST_PORT:1433 -d mcr.microsoft.com/mssql/server

It can be a little daunting to connect to SQL Server on Mac but there is light at the end of the tunnel. mkleehammer has a great documentation on the pyodbc driver on GitHub. The steps provided below will work for users trying to connect to SQL Server from Mac OSX. Users of other platforms must consult the documentation.

Install FreeTDS and unixODBC

The connection to SQL Server will be established by using the unixODBC driver manager and the FreeTDS driver.

brew install unixodbc freetds

Edit the freetds.conf configuration file

The freetds.conf file is located at /usr/local/etc/ by default. Just include the following at the end of the script.

[MYMSSQL]
host = localhost
port = 1433
tds version = 7.3

Test the connection

tsql -S MYMSSQL -U myuser -P mypassword

Output:

locale is "en_AU.UTF-8"
locale charset is "UTF-8"
using default charset "UTF-8"
1>

Edit the odbcinst.ini and odbc.ini configuration files

To find the locations of odbcinst.ini and odbc.ini, run

odbcinst -j

We are now ready to edit both files. Firstly, include the following in odbcinst.ini.

[FreeTDS]
Description=FreeTDS Driver for Linux & MSSQL
Driver=/usr/local/lib/libtdsodbc.so
Setup=/usr/local/lib/libtdsodbc.so
UsageCount=1

Similarly, include the following in odbc.ini

[MYMSSQL]
Description         = Test to SQLServer
Driver              = FreeTDS
Servername          = MYMSSQL

Connect to SQL Server with pyodbc

Now it should be possible to connect to SQL Server using pyodbc, for example

import pyodbc

# the DSN value should be the name of the entry in odbc.ini, not freetds.conf
cnxn = pyodbc.connect('DSN=MYMSSQL;UID=myuser;PWD=mypassword')
crsr = cnxn.cursor()
rows = crsr.execute("select @@VERSION").fetchall()
print(rows)
crsr.close()
cnxn.close()


Getting data from an API into SQL Server

We're going to create a python script that performs the following.

  1. Creates an employees relation.
  2. Creates an employees_temp relation.
  3. Gets data from an API
  4. Uploads the data into the employees_temp relation.
  5. Upserts the data into the employees relation.

The SQL commands used are provided below. I use a database called jade. Feel free to use the name of your database.

Create the employees relation

use jade;

if object_id('dbo.employees') is not null
   drop table dbo.employees;

create table dbo.employees (
   "Serial Number" int primary key,
   "Company Name" varchar(1000),
   "Employee Markme" varchar(1000),
   "Description" varchar(1000),
   "Leave" int
);

Create the employees_temp relation

use jade;

if object_id('dbo.employees_temp') is not null
   drop table dbo.employees_temp;

create table dbo.employees_temp (
   "Serial Number" int primary key,
   "Company Name" varchar(1000),
   "Employee Markme" varchar(1000),
   "Description" varchar(1000),
   "Leave" int
);

Upload data into employees_temp

In a typical ETL process, this is where the stage tables are loaded.

use jade;

bulk insert dbo.employees_temp from '/var/opt/mssql/employees.csv' with (FIRSTROW=2, FORMAT='CSV')

Upsert data to employees from employees_temp

use jade;

merge
into dbo.employees as target
using (
   select distinct * from dbo.employees_temp
) as source
on target."Serial Number" = source."Serial Number"
when matched
  then update set target."Serial Number" = source."Serial Number"
when not matched
  then insert values (source."Serial Number", source."Company Name", source."Employee Markme", source."Description", source."Leave");

Now we need to create two functions; one to get the data from the API and another to execute the SQL statements above.

Function to get data from the API

def get_data():

   data_path = "./files/employees.csv"

   try:
      os.makedirs(os.path.dirname(data_path), exist_ok=False)
   except Exception as e:
      pass


   url = "https://raw.githubusercontent.com/apache/airflow/main/docs/apache-airflow/pipeline_example.csv"
   response = requests.get(url)

   with open(data_path, "w") as file:
      file.write(response.text)

Function to run SQL strings

def execute_sql(sql_string):
   try:
      with pyodbc.connect(CONNECTION_STRING) as connection:
         with connection.cursor() as cursor:
            cursor.execute(sql_string)
   except:
      print("An error occurred")
   else:
      print(f"Finished executing the following sql statement: {sql_string}")

Final Python Script

So the final python script should look like this:

import pyodbc
import requests
import os

create_employees = r"""
use jade;

if object_id('dbo.employees') is not null
   drop table dbo.employees;

create table dbo.employees (
   "Serial Number" int primary key,
   "Company Name" varchar(1000),
   "Employee Markme" varchar(1000),
   "Description" varchar(1000),
   "Leave" int
);
"""

create_employees_temp = r"""
use jade;

if object_id('dbo.employees_temp') is not null
   drop table dbo.employees_temp;

create table dbo.employees_temp (
   "Serial Number" int primary key,
   "Company Name" varchar(1000),
   "Employee Markme" varchar(1000),
   "Description" varchar(1000),
   "Leave" int
);
"""


bulk_insert = r"""
use jade;

-- make sure the employees.csv file exists on your file system
bulk insert dbo.employees_temp from '/var/opt/mssql/employees.csv' with (FIRSTROW=2, FORMAT='CSV')
"""


merge_data = r"""
use jade;

merge
into dbo.employees as target
using (
   select distinct * from dbo.employees_temp
) as source
on target."Serial Number" = source."Serial Number"
when matched
  then update set target."Serial Number" = source."Serial Number"
when not matched
  then insert values (source."Serial Number", source."Company Name", source."Employee Markme", source."Description", source."Leave");
"""

CONNECTION_STRING = 'DSN=MYMSSQL; UID=sa; PWD=yourStrongPassword@2022'


def execute_sql(sql_string):
   try:
      with pyodbc.connect(CONNECTION_STRING) as connection:
         with connection.cursor() as cursor:
            cursor.execute(sql_string)
   except:
      print("An error occurred")
   else:
      print(f"Finished executing the following sql statement: {sql_string}")


# get data from API
def get_data():

   data_path = "./files/employees.csv"

   try:
      os.makedirs(os.path.dirname(data_path), exist_ok=False)
   except Exception as e:
      pass


   url = "https://raw.githubusercontent.com/apache/airflow/main/docs/apache-airflow/pipeline_example.csv"
   response = requests.get(url)

   with open(data_path, "w") as file:
      file.write(response.text)


Testing

All tasks can be tested by calling the appropriate function at the bottom of the python script. For example, to test if the python script successfully creates the employees relation, you can include execute_sql(create_employees) at the bottom of the python script.

Output:

Finished executing the following sql statement:
use jade;

if object_id('dbo.employees') is not null
   drop table dbo.employees;

create table dbo.employees (
   "Serial Number" int primary key,
   "Company Name" varchar(1000),
   "Employee Markme" varchar(1000),
   "Description" varchar(1000),
   "Leave" int
);

Once all tests have passed, we can now use airflow to orchestrate the tasks, essentially creating a basic ETL pipeline. Since I'm using docker with SQL Server, the BULK INSERT command requires the source data file to be present in the docker container. Thus, after running the get_data function, the file is manually uploaded to my docker container with the following command using the terminal.

# datawarehouse is the container's name
docker cp "/files/employees.csv" datawarehouse:/var/opt/mssql/

This is another task that will be included in our pipeline in the next blog.


Conclusion

I understand it can be a tad difficult to use SQL Server on platforms other than windows and this is one of the reasons why I try to convert tutorials that use open sources tools like PostgreSQL or MySQL to ones that use MsSQL. In this blog, I introduced the task orchestration tool; apache airflow. I discussed the prerequisites for using airflow locally drawing on previous work from Tania. Lastly, I demonstrated and tested the tasks that we'll like to be orchestrated. In the next part of this blog, I'll use airflow to organise these tasks.


References

Tania Allard's Workshop documentation

airflow-tutorial.readthedocs.io/en/latest/s..

Connecting to SQL Server from Mac OSX

github.com/mkleehammer/pyodbc/wiki/Connecti..

Airflow's tutorial using PostgreSQL

airflow.apache.org/docs/apache-airflow/stab..

 
Share this