Working with Apache Airflow

Working with Apache Airflow

Let’s begin by talking about Airflow. In a nutshell it is a platform to programmatically write, timetable and oversee workflows. You can use airflow to author workflows as Directed Acyclic Graphs (DAGs) of tasks. Simplifying this further Airflow assists in automating scripts so that it can execute operations. It is founded in Python however you can run a program on it regardless of its coding language.

What do We Mean by DAG?

According to Wikipedia a Directed Acyclic Graph or DAG is a finite directed graph with no directed cycles. It consists of finitely many vertices and edges, with each edge directed from one vertex to another, such that there is no way to start at ay vertex v and follow a consistently directed sequence of edges that eventually looks back to v again.

In layman terms this means you can only be a daughter to your mother but not the other way around. This what it essentially means for something to constitute a directed cycle.

When you talk about Airflow you need to know that every workflow is a DAG and DAGs are made up of operators. An operator is in charge of outlining singular tasks and we can find many types available.

BashOperator – executes a bash command

PythonOperator – calls a subjective Python function

EmailOperator – sends an email

SimpleHttpoperator – sends an HTTP request

MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, etc. – executes a SQL command

Sensor – waits for a specific time, file, database row, S3 key, etc.

Setting Up and Installing

Since Airflow is grounded in Python the recommended way to install it is through pip tool.

To check if it has been installed, run the command: airflow version and it should display:

[2018-09-22 15:59:23,880] {__init__.py:51} INFO – Using executor SequentialExecutor

____________       _____________

____    |__( )_________  __/__  /________      ___

___  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /

___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /

_/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/

v1.10.0

The next step is to install mysqlclient and integrate MySQL into your workflows.

pip install mysqlclient

Prior to moving any further we advise you to set up a folder titled AIRFLOW_HOME. After this you will call export command to set it in the path.

Export AIRFLOW_HOME=’pwd’ airflow_home

Check that you are a folder above AIRFLOW_HOME before you run the export command. Inside AIRFLOW_HOME set up an additional folder titled dags.

In the event you set load_exampldes=False it will not load default examples on the web interface.

In this situation you must call airflow initdb within airflow_home folder. Once this is finished it generates airflow.cfg and unitests.cfg

airflow.db is an SQLite file assigned to keeping every configuration needed to execute workflows. The purpose of airflow.cfg is to maintain every preliminary setting in order to keep operations going.

Within this file sql_alchemy_conn parameter will be visible with the value ../airflow_home/airflow.db
Now to move onto the web server airflow webserver upon initiation should look like:
2018-09-20 22:36:24,943] {__init__.py:51} INFO – Using executor SequentialExecutor

/anaconda3/anaconda/lib/python3.6/site-
packages/airflow/bin/cli.py:1595: DeprecationWarning: The celeryd_concurrency option in [celery] has been renamed to worker_concurrency – the old setting has been used, but please update your config.

default=conf.get(‘celery’, ‘worker_concurrency’)),

____________ _____________

____ |__( )_________ __/__ /________ __

____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /

___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /

_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/

v1.10.0[2018-09-19 14:21:42,340] {__init__.py:57} INFO – Using executor SequentialExecutor

____________ ___________

______ |__( )_________ __/__ /________ ___

___ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /

_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/

/anaconda3/anaconda/lib/python3.6/site-packages/flask/exthook.py:71: ExtDeprecationWarning: Importing flask.ext.cache is deprecated, use flask_cache instead.

.format(x=modname), ExtDeprecationWarning

[2018-09-19 14:21:43,119] [48995] {models.py:167} INFO – Filling up the DagBag from /Development/airflow_home/dags

Running the Gunicorn Server with:

Workers: 4 sync

Host: 0.0.0.0:8080

When visiting 0.0.0.0:8080 a number of entries are visible which are examples of Airflow installation they can be made invisible by going to airflow.cfg file and set load_examples to FALSE.

DAG Runs will let you know the number of times a specific DAG has been executed. Recent Tasks will make you aware of the number of tasks in a DAG currently running as well as its status. Schedule is much like the one you would have used while scheduling a Cron it is in charge of the timing behind a DAG trigger.

If you look at the screen where you have created and executed a DAG earlier it will show you boxes titled – success, running and failed on the top right-hand side which function as legends. The color around the boxes represents the various operators utilized in this specific DAG.

This is how you can implement a comprehensive workflow system to help automate workflows.

If you’re interested in exploring more about increasing functional efficiency eConnect would be happy to help. We understand the nuances of leveraging Salesforce for your unique business requirements. We provide hassle-free support with implementation driving digital transformation for more efficient operations and better business outcomes. Our team has experienced and knowledgeable experts to enhance lead nurturing and even reputation management. If you’d like to explore using Salesforce to your advantage, reach out to us and we’d be glad to help.

Share this post

Leave a Reply

Your email address will not be published. Required fields are marked *