top of page
Blog SO Labs na Íntegra
Writer's pictureDaniel Sanches

Building a Data Pipeline with PySpark and AWS

Updated: Sep 27, 2022


Apache Spark is a framework used in cluster computing environments for analyzing big data. Apache Spark is able to work in a distributed environment across a group of computers in a cluster to more effectively process big sets of data. This Spark open-source engine supports a wide array of programming languages including Scala, Java, R, and Python.

In this article, I’ll show you how to get started with installing Pyspark on your Ubuntu machine and then build a basic ETL pipeline to extract transfer-load data from a remote RDBMS system to an AWS S3 bucket. This ETL architecture can be used to transfer 100s of Gigabytes of data from any RDBMS database server (in this article we have used MS SQL Server) to an Amazon S3 bucket. Key advantages of using Apache Spark: Run workloads 100x faster than Hadoop Supported by Java, Scala, Python, R, and SQL



Introduction and Requeriments


To get started we need to have the following pre-requisites:

  • A system running Ubuntu 18.04 or Ubuntu 20.04

  • A user account with sudo privileges

  • An AWS account with upload access to S3 bucket

  • Before downloading and setting up Spark, you need to install the necessary package dependencies. Make sure the following packages are already set up in your system.

  • Python

  • JDK

  • Git

To confirm the installed dependencies by running these commands:



java -version; git --version; python --version



Development Case



1. Install PySpark

Download the version of Spark you want from Apache’s official website. We will download Spark 3.0.3 with Hadoop 2.7 as it is the current version. Next, use the wget command and the direct URL to download the Spark package. Change your working directory to /opt/spark.



cd /opt/spark
sudo wget https://downloads.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz 



Extract the saved package using tar command. Once the untar process completes, the output shows the files that have been unpacked from the archive.


tar xvf spark-*
ls -lrt spark-*



2. Configure spark environment

Before starting a spark server, we need to configure some environment variables. There are a few Spark directories we need to add to the default profile. Use the vi editor or any other editor to add these three lines to .profile:


vi ~/.profile

Insert these 3 lines at the end of the .profile file.


export SPARK_HOME=/opt/spark 
export PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin 
export PYSPARK_PYTHON=/usr/bin/python3

Save changes and exit from the editor. When you finish editing the file, load the .profile file in the command line by typing. Alternatively, we can exit from the server and re-login for the changes to take effect.



source ~/.profile



2. Start/Stop Spark Master & Worker

Go to Spark installation directory Go to the Spark installation directory /opt/spark/spark*. It has all the scripts required to start/stop spark services. Run this command to start spark master.



start-master.sh

To view the Spark web interface, open a web browser and enter the localhost IP address on port 8080.(This is the default port that Spark uses if you need to change it, do it in the start-master.sh script). Alternatively, you can replace 127.0.0.1 with the actual network IP address of your host machine.


The webpage shows your Spark master URL, worker nodes, CPU resource utilization, memory, running applications etc.



3. Resource allocation to the Spark worker

By default, when you start a worker instance it uses all the cores available in the machine. However, for practical reasons, you may want to limit the number of cores and amount of RAM allocated to each worker.

 
 start-slave.sh spark://0.0.0.0:8082 -c 4 -m 512M
 

Here, we have allocated 4 cores and 512 MB of RAM to the worker. Let’s confirm this by re-starting the worker instance.


To stop the master instance started by executing the script above, run:


stop-master.sh

To stop a running worker process, enter this command:


stop-slave.sh


4. Setup MS SQL connection

In this PySpark ETL, we will connect to an MS SQL server instance as the source system and run SQL queries to fetch data. So, we have to download the necessary dependancies first. Download the MS-SQL jar file(mssql-jdbc-9.2.1.jre8) from Microsoft website and copy it to “/opt/spark/jars” directory. https://www.microsoft.com/en-us/download/details.aspx?id=11774 Download the Spark SQL jar file(spark-sql_2.12-3.0.3.jar) from the Apache download site and copy it to ‘/opt/spark/jars” directory. https://jar-download.com/?search_box=org.apache.spark+spark.sql Edit the .profile, add the PySpark & Py4J classes to the Python path:



export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH
export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.10.9-src.zip:$PYTHONPATH


5. Setup AWS S3 connection

To connect to an AWS instance, we need to download the three jar files and copy them to the “/opt/spark/jars” directory. Check the Hadoop version that you are using currently. You can get it from any jar present on your Spark installation. If the Hadoop version is 2.7.4 then, download the jar file for the same version. For Java SDK, you have to download the same version that was used to generate the Hadoop-aws package. Make sure the versions are the latest ones.

  • hadoop-aws-2.7.4.jar

  • aws-java-sdk-1.7.4.jar

  • jets3t-0.9.4.jar


sudo wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk/1.11.30/aws-java-sdk-1.7.4.jar
sudo wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.3/hadoop-aws-2.7.4.jar
sudo wget https://repo1.maven.org/maven2/net/java/dev/jets3t/jets3t/0.9.4/jets3t-0.9.4.jar


6. Python development

Create a working directory called ‘scripts’ to store all the Python scripts and configuration files. Create a file called “sqlfile.py” that will contain the SQL queries we want to run on the remote DB server.


vi sqlfile.py

Insert the following SQL query in the sqlfile.py file that will extract the data. Prior to this step, its advisable to test run this SQL query on the server to get an idea of the number of records returned.


query1 = """(select * from sales-data where date >= '2021-01-01' and status ='Completed')"""

Save and exit from the file. Create a configuration file called “config.ini” that will store the login credentials and DB parameters.



vi config.ini

Insert the following AWS and MSSQL connection parameters in the file. Note, that we have created separate sections to store AWS and MSSQL connection parameters. You can create as many DB connection instances as required provided each one is kept under its own section (mssql1, mssql2, aws1 , aws2 etc). [aws]


ACCESS_KEY=BBIYYTU6L4U47BGGG&^CF
SECRET_KEY=Uy76BBJabczF7h6vv+BFssqTVLDVkKYN/f31puYHtG
BUCKET_NAME=s3-bucket-name
DIRECTORY=sales-data-directory

[mssql]
url = jdbc:sqlserver://PPOP24888S08YTA.APAC.PAD.COMPANY-DSN.COM:1433;databaseName=Transactions
database = Transactions
user = MSSQL-USER
password = MSSQL-Password
dbtable = sales-data
filename = data_extract.csv

Save and exit from the file. Create a Python Script called “Data-Extraction.py”.



7. Import libraries for Spark & Boto3

Spark is implemented in Scala, a language that runs on the JVM, but since we are working with Python we will use PySpark. The current version of PySpark is 2.4.3 and works with Python 2.7, 3.3, and above. You can think of PySpark as a Python-based wrapper on top of the Scala API. Here, AWS SDK for Python (Boto3) to create, configure and manage AWS services, such as Amazon EC2 and Amazon S3. The SDK provides an object-oriented API as well as low-level access to AWS services. Import the Python libraries for initiating a Spark session, query1 from the sqlfile.py and boto3.



from pyspark.sql import SparkSession
import shutil
import os
import glob
import boto3
from sqlfile import query1
from configparser import ConfigParser


8. Create a SparkSession

SparkSession provides a single point of entry to interact with the underlying Spark engine and allows programming Spark with DataFrame and Dataset APIs. Most importantly, it restricts the number of concepts and constructs a developer has to work with while interacting with Spark. At this point, you can use the ‘spark’ variable as your instance object to access its public methods and instances for the duration of your Spark job. Give a name to the application.


appName = "PySpark ETL Example - via MS-SQL JDBC"
master = "local"
spark = SparkSession 
    .builder 
    .master(master) 
    .appName(appName) 
    .config("spark.driver.extraClassPath","/opt/spark/jars/mssql-jdbc-9.2.1.jre8.jar") 
    .getOrCreate()


9. Read the configuration file

We have stored the parameters in a “config.ini” file to separate out the static parameters from the Python code. This helps in writing cleaner code without any hard coding. This module implements a basic configuration language that provides a structure similar to what we see in Microsoft Windows .ini files.



url = config.get('mssql-onprem', 'url')
user = config.get('mssql-onprem', 'user')
password  = config.get('mssql-onprem', 'password')
dbtable =  config.get('mssql-onprem', 'dbtable')
filename =  config.get('mssql-onprem', 'filename')

ACCESS_KEY=config.get('aws', 'ACCESS_KEY')

SECRET_KEY=config.get('aws', 'SECRET_KEY')

BUCKET_NAME=config.get('aws', 'BUCKET_NAME')

DIRECTORY=config.get('aws', 'DIRECTORY')


10. Execute the data extraction

Spark includes a data source that can read data from other databases using JDBC. Run the SQL on the remote database by connecting using the connection parameters and the Microsoft SQL Server JDBC driver. In the “query” option if you want to read a full table then provide the table name, otherwise, if you want to run the select query, specify the same. The data returned by the SQL is stored in a Spark dataframe.



jdbcDF = spark.read.format("jdbc") 
    .option("url", url) 
    .option("query", query2) 
    .option("user", user) 
    .option("password", password) 
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") 
    .load()
jdbcDF.show(5)


11. Save dataframe as CSV file

The dataframe can be stored on the server as a . CSV file. Noe this step is optional in case you want to write the dataframe directly into an S3 bucket this step can be skipped. PySpark, by default, creates multiple partitions, to avoid it we can save it as a single file by using the coalesce(1) function. Next, we move the file to the designated output folder. Optionally, delete the output directory created if you only want to save the dataframe on the S3 bucket.


path = 'output'
jdbcDF.coalesce(1).write.option("header","true").option("sep",",").mode("overwrite").csv(path)
shutil.move(glob.glob(os.getcwd() + '/' + path + '/' + r'*.csv')[0], os.getcwd()+ '/' + filename )
shutil.rmtree(os.getcwd() + '/' + path)


12. Copy dataframe to S3 Bucket

First, create a ‘boto3’ session using the AWS access and secret key values. Retrieve the S3 bucket and subdirectory values where you want to upload the file. The upload_file() method accepts a file name, a bucket name, and an object name. The method handles large files by splitting them into smaller chunks and uploading each chunk in parallel.


session = boto3.Session(
    aws_access_key_id=ACCESS_KEY,
    aws_secret_access_key=SECRET_KEY,
)
bucket_name=BUCKET_NAME
s3_output_key=DIRECTORY + filename
s3 = session.resource('s3')
# Filename - File to upload
# Bucket - Bucket to upload to (the top level directory under AWS S3)
# Key - S3 object name (can contain subdirectories). If not specified then file_name is used
s3.meta.client.upload_file(Filename=filename, Bucket=bucket_name, Key=s3_output_key)


12. Cleanup files

After uploading the file to the S3 bucket, delete any files remaining on the server, else throw an error.



    if os.path.isfile(filename):
    os.remove(filename)
else: 
    print("Error: %s file not found" % filename)

__________________________________________________________________________________


Use Case Conclusion


Apache Spark is an architecture for extremely fast real-time processing as it performs in-memory computations to analyze data. It came about when Apache Hadoop MapReduce only performs batch processing and did not have a real-time processing capability. Apache Spark has been added to perform real-time stream processing and handle batch processing, in addition to supporting queries and iterative algorithms.


Apache Spark has its own cluster manager where it can host applications. It takes advantage of Apache Hadoop by using HDFS (Hadoop Distributed File System) to store and run applications on YARN.


It was originally written in the Scala programming language. To support Python, the community has released a tool called PySpark. With it it is also possible to work with RDD (Resilient Distributed Dataset) in the Python programming language.


In this series we will see how to use PySpark that takes advantage of the ease of the Python language with the speed of Spark. But we need the environment for that. Don't think it's simple to have this environment, need one is to have the base with Apache Hadoop (HDFS and MapReduce), a Spark Master to manage the Spark Workers (if we were talking about Star Wars it would be something like the Master and his apprentices), in addition to JupyterLab for editing the code, basically this environment:



__________________________________________________________________________________


Notes and Related


> Initially, our purpose with the Blog is to make different posts, but we will have a separate area for posts related to Constructor SO, which is our Project Portfolio, Agile and Scrum, in which each member of Constructor SO has its area for their developments. In this way, each update in the SO Constructor area is followed by a post on the professional's blog, informing our readers and thus creating an extensive overview of such released or versioned work;


> In relation to the developments of Space_One Labs, our idea is to randomly launch and work on several projects in the specific related area, thus not becoming limited by specific apps or software;


> All the cases described and developed here, for this blog, in which I belong, that are in the "BI Case" category, are cases of fictitious companies, created and invented, to contextualize and make the work as alive and realistic as possible.




__________________________________________________________________________________


Daniel Sanches


Production Engineer, Universo - Universidade Salgado de Oliveira, specialized in Analytics and Data, Business and Technology.


SO Labs Developer Member of Research, Business Intelligence Analyst, Data Analyst and Business Analyst, Data Engineer, Blogger DS Space_One Labs | Space Members


Member SO Labs Research, Business Intelligence, Data Engineer, Data Analyst and Business Developer

コメント


bottom of page