Working with Apache Airflow
Let’s begin by talking about Airflow. In a nutshell it is a platform to programmatically write, timetable and oversee workflows. You can use airflow to author workflows as Directed Acyclic Graphs (DAGs) of tasks. Simplifying this further Airflow assists in automating scripts so that it can execute operations. It is founded in Python however you can run a program on it regardless of its coding language.
What do We Mean by DAG?
According to Wikipedia a Directed Acyclic Graph or DAG is a finite directed graph with no directed cycles. It consists of finitely many vertices and edges, with each edge directed from one vertex to another, such that there is no way to start at ay vertex v and follow a consistently directed sequence of edges that eventually looks back to v again.
In layman terms this means you can only be a daughter to your mother but not the other way around. This what it essentially means for something to constitute a directed cycle.
When you talk about Airflow you need to know that every workflow is a DAG and DAGs are made up of operators. An operator is in charge of outlining singular tasks and we can find many types available.
BashOperator – executes a bash command
PythonOperator – calls a subjective Python function
EmailOperator – sends an email
SimpleHttpoperator – sends an HTTP request
MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, etc. – executes a SQL command
Sensor – waits for a specific time, file, database row, S3 key, etc.
Setting Up and Installing
Since Airflow is grounded in Python the recommended way to install it is through pip tool.
To check if it has been installed, run the command: airflow version and it should display:
[2018-09-22 15:59:23,880] {__init__.py:51} INFO – Using executor SequentialExecutor
____________ _____________
____ |__( )_________ __/__ /________ ___
___ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
v1.10.0
The next step is to install mysqlclient and integrate MySQL into your workflows.
pip install mysqlclient
Prior to moving any further we advise you to set up a folder titled AIRFLOW_HOME. After this you will call export command to set it in the path.
Export AIRFLOW_HOME=’pwd’ airflow_home
Check that you are a folder above AIRFLOW_HOME before you run the export command. Inside AIRFLOW_HOME set up an additional folder titled dags.
In the event you set load_exampldes=False it will not load default examples on the web interface.
In this situation you must call airflow initdb within airflow_home folder. Once this is finished it generates airflow.cfg and unitests.cfg
airflow.db is an SQLite file assigned to keeping every configuration needed to execute workflows. The purpose of airflow.cfg is to maintain every preliminary setting in order to keep operations going.
Within this file sql_alchemy_conn parameter will be visible with the value ../airflow_home/airflow.db
Now to move onto the web server airflow webserver upon initiation should look like:
2018-09-20 22:36:24,943] {__init__.py:51} INFO – Using executor SequentialExecutor
/anaconda3/anaconda/lib/python3.6/site-
packages/airflow/bin/cli.py:1595: DeprecationWarning: The celeryd_concurrency option in [celery] has been renamed to worker_concurrency – the old setting has been used, but please update your config.
default=conf.get(‘celery’, ‘worker_concurrency’)),
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
v1.10.0[2018-09-19 14:21:42,340] {__init__.py:57} INFO – Using executor SequentialExecutor
____________ ___________
______ |__( )_________ __/__ /________ ___
___ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
/anaconda3/anaconda/lib/python3.6/site-packages/flask/exthook.py:71: ExtDeprecationWarning: Importing flask.ext.cache is deprecated, use flask_cache instead.
.format(x=modname), ExtDeprecationWarning
[2018-09-19 14:21:43,119] [48995] {models.py:167} INFO – Filling up the DagBag from /Development/airflow_home/dags
Running the Gunicorn Server with:
Workers: 4 sync
Host: 0.0.0.0:8080
When visiting 0.0.0.0:8080 a number of entries are visible which are examples of Airflow installation they can be made invisible by going to airflow.cfg file and set load_examples to FALSE.
DAG Runs will let you know the number of times a specific DAG has been executed. Recent Tasks will make you aware of the number of tasks in a DAG currently running as well as its status. Schedule is much like the one you would have used while scheduling a Cron it is in charge of the timing behind a DAG trigger.
If you look at the screen where you have created and executed a DAG earlier it will show you boxes titled – success, running and failed on the top right-hand side which function as legends. The color around the boxes represents the various operators utilized in this specific DAG.
This is how you can implement a comprehensive workflow system to help automate workflows.
If you’re interested in exploring more about increasing functional efficiency eConnect would be happy to help. We understand the nuances of leveraging Salesforce for your unique business requirements. We provide hassle-free support with implementation driving digital transformation for more efficient operations and better business outcomes. Our team has experienced and knowledgeable experts to enhance lead nurturing and even reputation management. If you’d like to explore using Salesforce to your advantage, reach out to us and we’d be glad to help.
Leave a Reply