Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 21 additions & 23 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
# Data pipeline to process and analyse Twitter data in a distributed fashion using Apache Spark and Airflow in AWS environment

This repository shows the development of a scalable data pipeline in [AWS](https://aws.amazon.com/de/) using parallelisation techniques via [Apache Spark](https://spark.apache.org/) on [Amazon EMR](https://aws.amazon.com/de/emr/) and orchestrating workflows via [Apache Airflow](https://airflow.apache.org/). The data analysis part consists of a simpl esentiment analysis using a rule-based approach and a topic analysis using word frequencies by applying common NLP techniques.
This repository shows the development of a scalable data pipeline in [AWS](https://aws.amazon.com/de/) using parallelisation techniques via [Apache Spark](https://spark.apache.org/) on [Amazon EMR](https://aws.amazon.com/de/emr/) and orchestrating workflows via [Apache Airflow](https://airflow.apache.org/). The data analysis part consists of a simple sentiment analysis using a rule-based approach and a topic analysis using word frequencies by applying common NLP techniques.

The datapipeline is used for an existing [web application](https://subway.christopherkindl.com/) which allows enduser to analyse housing prices based on locations of subway stations. More precisely, users see the average housing price of properties that are within a radius of less than 1km of a particular subway station. Therefore, the new data pipeline shown in this repository is used to make the application richer and, thus, incorporate sentiment scoring and topics analysis to give users a better sense of the common mood and an indication of what type of milieu lives in a particular area.
The data pipeline is used for an existing [web application](https://subway.christopherkindl.com/) which allows end-user to analyse housing prices based on locations of subway stations. More precisely, users see the average housing price of properties that are within a radius of less than 1km of a particular subway station. Therefore, the new data pipeline shown in this repository is used to make the application richer and, thus, incorporate sentiment scoring and topics analysis to give users a better sense of the common mood and an indication of what type of milieu lives in a particular area.

The python-based web scraper using `BeautifulSoup` to fetch geo-specific housing prices from property website across London is provided in the [repository](https://github.com/christopherkindl/twitter-data-pipeline-using-airflow-and-apache-spark/tree/main/04_web_scraper) as well but will not be extensively discussed here.

Expand All @@ -13,7 +13,7 @@ The python-based web scraper using `BeautifulSoup` to fetch geo-specific housing
## Prerequisites

1. [Git](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) to clone the repository
2. [AWS account](https://aws.amazon.com/de/) to run pipeline in cloud environment
2. [AWS account](https://aws.amazon.com/de/) to run the pipeline in the cloud environment
3. [Twitter developer account](https://developer.twitter.com/en/apply-for-access) with access to the standard API to fetch tweets
4. Database - In this project, we use a [PostgreSQL](https://aws.amazon.com/de/rds/postgresql/what-is-postgresql/) database. Database related code snippets (e.g. uploading data to db) might differ when using other databases
4. Webhosting and domain with [WordPress](https://wordpress.org/support/article/how-to-install-wordpress/) to run the client application
Expand All @@ -29,9 +29,9 @@ AWS provides [Amazon Managed Workflows for Apache Airflow (MWAA)](https://aws.am

2. Select an existing [S3 bucket](https://s3.console.aws.amazon.com/) or create a new one and define the path where the Airflow DAG (the script which executes all tasks you want to run for the data pipeline) should be loaded from. The bucket name must start with `airflow-`

3. Upload `requirements.txt` that contains our python libaries to run the Airflow DAG. AWS will install them via `pip install`. Hint: If your DAG runs on libaries that are not available in pip, you can upload a `plugins.zip` in which you can include your desired libaries as [Python wheels](https://medium.com/swlh/beginners-guide-to-create-python-wheel-7d45f8350a94)
3. Upload `requirements.txt` that contains our python libraries to run the Airflow DAG. AWS will install them via `pip install`. Hint: If your DAG runs on libraries that are not available in pip, you can upload a `plugins.zip` in which you can include your desired libraries as [Python wheels](https://medium.com/swlh/beginners-guide-to-create-python-wheel-7d45f8350a94)

4. Each environment runs in a [Amazon Virtual Private Cloud (VPC)](https://aws.amazon.com/de/vpc/) using private subnets in two [availability zones](https://aws.amazon.com/de/about-aws/global-infrastructure/regions_az/#Availability_Zones). AWS recommends to use a `private network` for the web server access. For simplicity, we select a `public network` which allows to log in over the internet. Lastly, we let MWAA create a new `security group`
4. Each environment runs in a [Amazon Virtual Private Cloud (VPC)](https://aws.amazon.com/de/vpc/) using private subnets in two [availability zones](https://aws.amazon.com/de/about-aws/global-infrastructure/regions_az/#Availability_Zones). AWS recommends using a `private network` for the webserver access. For simplicity, we select a `public network` that allows us to log in over the internet. Lastly, we let MWAA create a new `security group`

5. For the environment class, we select `pw1.small` as it corresponds best to our DAG workload

Expand Down Expand Up @@ -66,7 +66,7 @@ MWAA provides variables to store and retrieve arbitrary content or settings as a

A sample [variabes file](https://github.com/christopherkindl/twitter-data-pipeline-using-airflow-and-apache-spark/blob/main/01_airflow/airflow_variables.json) is provided in the repository that contains all variables used in this project.

Airflow also allows to define connection objects. In our case, we need a connection to `AWS` itself (Airflow acts as an external system to AWS) and to our `database` in which the final results will be stored.
Airflow also allows the definition of connecting objects. In our case, we need a connection to `AWS` itself (Airflow acts as an external system to AWS) and to our `database` in which the final results will be stored.

 

Expand Down Expand Up @@ -100,11 +100,11 @@ dag = DAG('london-housing-webapp',

 

## 2. Taks in the Airflow DAG
## 2. Tasks in the Airflow DAG

**Basic architecture**

A typical Airflow DAG consists of different tasks that either fetch, transform or process data in various ways. Heavy data analysis tasks are not recommended to run within the MWAA environment due to its modest workload capacity. ML tasks are usally called via [Amazon SageMaker](https://aws.amazon.com/de/sagemaker/), whereas complex data analyses can be done in distributed fashion on [Amazon EMR](https://aws.amazon.com/de/emr/). In our case, we run the data analysis on an Amazon EMR cluster using [Apache Spark](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark.html) (via Python API PySpark).
A typical Airflow DAG consists of different tasks that either fetch, transform or process data in various ways. Heavy data analysis tasks are not recommended to run within the MWAA environment due to its modest workload capacity. ML tasks are called via [Amazon SageMaker](https://aws.amazon.com/de/sagemaker/), whereas complex data analyses can be done in a distributed fashion on [Amazon EMR](https://aws.amazon.com/de/emr/). In our case, we run the data analysis on an Amazon EMR cluster using [Apache Spark](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-spark.html) (via Python API PySpark).

We can either write customized functions (e.g. request data via Twitter API) or can make use of predefined modules which are usually there to trigger external activities (e.g. data analysis in Spark on Amazon EMR).

Expand Down Expand Up @@ -156,7 +156,7 @@ def create_schema(**kwargs):



# qualify as task
# qualify as a task

create_schema = PythonOperator(
task_id='create_schema',
Expand All @@ -170,13 +170,13 @@ create_schema = PythonOperator(

```

The custom function above creates schema and tables directly into the PostgreSQL database in which the final data will be uploaded to. Note how `op_kwargs = default_args` allows to interface with the general configuration information provided.
The custom function above creates schema and tables directly into the PostgreSQL database to which the final data will be uploaded to. Note how `op_kwargs = default_args` allows to interface with the general configuration information provided.

 

**Indicating the order of the tasks**

Tasks can be executed in sequence or simoutanesly if possible. The order can be indicated with the following example syntax:
Tasks can be executed sequentially or simultaneously. The order can be indicated with the following example syntax:

```Python

Expand Down Expand Up @@ -230,16 +230,16 @@ Change [IAM policy](https://github.com/christopherkindl/twitter-data-pipeline-us

**Motivation to use Spark for data analysis**

We want to set up an infrastructure that allows big data analysis in a distributed fashion. [Apache Spark](https://www.infoworld.com/article/3236869/what-is-apache-spark-the-big-data-platform-that-crushed-hadoop.html) as our big data framework of choice has the following two main advantages: First, Spark’s in-memory data engine can perform tasks very efficient due to parallelisation logic. Second, Spark’s developer-friendly API reduces much of the grunt work of distributed computing and can be accessed in various languages. In our case, we use [PySpark](https://pypi.org/project/pyspark/) which is a Python API to interface with Spark on a high-level. This means it is suitable for interacting with an existing cluster but does not contain tools to set up a new, standalone cluster.
The parallelisation logic of a distributed architecture is the main driver to speed up processing and, thus, enable scalability. Using Spark’s DataFrame or Resilient Distributed Dataset (RDD) allows to distribute the data computation across a cluster.
We want to set up an infrastructure that allows big data analysis in a distributed fashion. [Apache Spark](https://www.infoworld.com/article/3236869/what-is-apache-spark-the-big-data-platform-that-crushed-hadoop.html) as our big data framework of choice has the following two main advantages: First, Spark’s in-memory data engine can perform tasks very efficient due to parallelisation logic. Second, Spark’s developer-friendly API reduces much of the grunt work of distributed computing and can be accessed in various languages. In our case, we use [PySpark](https://pypi.org/project/pyspark/) which is a Python API to interface with Spark on a high level. This means it is suitable for interacting with an existing cluster but does not contain tools to set up a new, standalone cluster.
The parallelisation logic of a distributed architecture is the main driver to speed up processing and, thus, enable scalability. Using Spark’s DataFrame or Resilient Distributed Dataset (RDD) allows distributing data computation across a cluster.

**Under the hood**

We use Amazon’s big data platform [EMR](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-what-is-emr.html) to run our Spark cluster. A Spark cluster can be characterised by a master node that serves as the central coordinator and worker nodes on which the tasks/jobs are executed (=parallelisation). It requires a distributed storage layer which is [HDFS](https://hadoop.apache.org/docs/r1.2.1/hdfs_design.html) (Hadoop Distributed File System) in our case. S3 object storage is used as our main data storage and HDFS as our intermediate temporary memory on which the script will access the Twitter data and write the results on it. Temporary means that the processed data to HDFS will disappear after termination of the cluster. The reason to use HDFS is because it is a lot faster than writing the results directly to the S3 bucket.
We use Amazon’s big data platform [EMR](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-what-is-emr.html) to run our Spark cluster. A Spark cluster can be characterised by a master node that serves as the central coordinator and worker nodes on which the tasks/jobs are executed (=parallelisation). It requires a distributed storage layer which is [HDFS](https://hadoop.apache.org/docs/r1.2.1/hdfs_design.html) (Hadoop Distributed File System) in our case. S3 object storage is used as our main data storage and HDFS as our intermediate temporary memory on which the script will access the Twitter data and write the results. Temporary means that the processed data to HDFS will disappear after the termination of the cluster. The reason to use HDFS is, that it is substantially faster than writing the results directly to the S3 bucket.

**Interaction between Airflow and Amazon EMR**

Every step that will be made on the cluster will be triggered by our Airflow DAG: First, we create the Spark cluster by providing specific configuration details and launch Hadoop for the distributed data storage simultaneously. In terms of the cluster configuration, we use one master node and two worker nodes all running on a m5.xlarge [instance](https://aws.amazon.com/de/ec2/instance-types/) (16 GB RAM, 4 CPU cores) given the relatively small dataset size. Next, we trigger a bootstrap action to install non-standard python libraries ([vaderSentiment](https://pypi.org/project/vaderSentiment/), [NLTK](https://pypi.org/project/nltk/) for NLP pre-processing steps) on which the sentiment and topic analysis script is dependent on. The [file](https://github.com/christopherkindl/twitter-data-pipeline-using-airflow-and-apache-spark/blob/main/02_emr_spark_jobs/python-libraries.sh) is loaded from an S3 bucket and submitted in the form of a `bash` script.
Every step that will be made on the cluster will be triggered by our Airflow DAG: First, we create the Spark cluster by providing specific configuration details and launch Hadoop for the distributed data storage simultaneously. In terms of the cluster configuration, we use one master node and two worker nodes all running on a m5.xlarge [instance](https://aws.amazon.com/de/ec2/instance-types/) (16 GB RAM, 4 CPU cores) given the relatively small dataset size. Next, we trigger a bootstrap action to install non-standard python libraries ([vaderSentiment](https://pypi.org/project/vaderSentiment/), [NLTK](https://pypi.org/project/nltk/) for NLP pre-processing steps) on which the sentiment and topic analysis scripts depend on. The [file](https://github.com/christopherkindl/twitter-data-pipeline-using-airflow-and-apache-spark/blob/main/02_emr_spark_jobs/python-libraries.sh) is loaded from an S3 bucket and submitted in the form of a `bash` script.

Airflow offers pre-defined modules to quickly interact with Amazon EMR. The example below shows how an Amazon EMR cluster with Spark (PySpark) and Hadoop application is created using `EmrCreateJobFlowOperator()`.

Expand Down Expand Up @@ -438,20 +438,20 @@ step_adder = EmrAddStepsOperator(

## 4. Launch Airflow DAG

Upload the final Airflow DAG to the corresponding path as explaind in the MWAA environment setup guide. Go to the Airflow user interface and start the DAG by switching the button to `On` (**Hint:** use a date in the past to trigger the DAG immediately).
Upload the final Airflow DAG to the corresponding path as explained in the MWAA environment setup guide. Go to the Airflow user interface and start the DAG by switching the button to `On` (**Hint:** use a date in the past to trigger the DAG immediately).

Useful housekeeping things to know:
- Log files can be accessed through clicking on the colored status squares which appear in the [Tree view](https://airflow.apache.org/docs/apache-airflow/stable/ui.html) mode
- Log files can be accessed by clicking on the coloured status squares which appear in the [Tree view](https://airflow.apache.org/docs/apache-airflow/stable/ui.html) mode
- When spark steps are running, you can watch it in Amazon EMR (**AWS management console** > **EMR** > **Clusters**) directly and see how the steps are executed
- Log files of Spark jobs are not shown in the Airflow generated log files, they have to be enabled when configuring the EMR cluster by providing a S3 path (see the example in readme section **Interaction between Airflow and Amazon EMR**)

A more detailed description can be found here.

 

## 5. Connect database to web application
## 5. Connect database to the web application

For simplification, this documentation does not cover the detailed development process of the website itself. Using the WordPress pluging [wpDataTables](https://wpdatatables.com/pricing/) allows us to easily access any common database (MySQL, PostgreSQL, etc). Apparently, noSQL databases are not supported.
For simplification, this documentation does not cover the detailed development process of the website itself. Using the WordPress plugin [wpDataTables](https://wpdatatables.com/pricing/) allows us to easily access any common database (MySQL, PostgreSQL, etc). NoSQL databases are not supported.

Once installed, you can connect to a database (**WordPress Website Admin Panel** > **wpDataTables** > **Settings** > **separate DB connection**) and run a query (**WordPress Website Admin Panel** > **wpDataTables** > **Create a Table/Chart**) that is automatically transformed into a table or chat:

Expand All @@ -460,12 +460,10 @@ Once installed, you can connect to a database (**WordPress Website Admin Panel**

**Using views to avoid complex queries at client-side**

To anticipate a better website performance, we avoid writing a complex query at client-side and, thus, create a view within the schema that already has both data sources (housing prices, sentiment data) combined. The topic analysis data has its own query due to its generalised form and is accessed directly since it does not require any transformation steps at the client-side.
To anticipate a better website performance, we avoid writing a complex query at the client-side and, thus, create a view within the schema that already has both data sources (housing prices, sentiment data) combined. The topic analysis data has its own query due to its generalised form and is accessed directly since it does not require any transformation steps at the client-side.

**Hint:** Views can be easily created using a database administration tool, such as [pgAdmin](https://www.pgadmin.org/)

The figure below summarises the interaction between client-side and the database.
The figure below summarises the interaction between the client-side and the database.

![alt image](https://github.com/christopherkindl/twitter-data-pipeline-using-airflow-and-apache-spark/blob/main/03_images/web_application.jpg)