-
Notifications
You must be signed in to change notification settings - Fork 0
Home
Welcome to the Spark wiki!
The purpose of the application that is being created is going to be a good end to end example where we read data from a CSV file and load it into a database
For every spark application that you write the very first step is to connect with something called Spark Master Node- To connect with a Master you first establish a session
-
Create a Session - SparkSession spark = new SparkSession.Builder()
-
Create a Name - .appName("CSV to DB")
-
Specify the Master .master("local")
-
The put get or Create statement - .getOrCreate();
-
Add to get the data - to do this we can use a spark session and we could do read and then specify the format of data that we want to read - this is a CSV data - Its important to note that other formats are also allowed such as JSON and Parquet files , and XML. So the options here are endless and then we specify whether this file has a header or not. CSV files could have header is true or false
-
We have to specify the file location using the .load - in this case it is called name_and_comments.txt - locating the file in src/main/resources/name_and_comments.txt
-
Its not enough to read the file - we need to save the file into some data structure - And the data structure that we are going to use here is known as the data set with rows - the dataset in using angle brackets the type that this data set supports is called a Row and then use the DF as a variable Data Frame variable to indicate that this is a data frame
-
Use CMD SHIFT O to bring in the imports and notice its bringing in from org.apache.spark.sql.Row - make sure you get the right import
-
What is a DataFrame which is identified by a data type - think of it as a database table -its got Rows and
Columns -
Use dataframe - use df.show() this is going to display the contents of this data frame in the console
-
Right click in the window scroll down to Run As Java Application and you can see the results in the Terminal Window
-
Now you can also display only sertain columns with the df.show() by using df.show(2) to only show 2 columns
-
Now we want to do another transformation - we want to see the comments that have numbers in them - you can see in the txt file that there are numbers in the comment section - so we will perform another transformation
-
You have to put a condition with a column name in this case the comment column and then we can call a function called rlike - SQL has the like operator and databases support regex like and that it is used for regular expression RLIke here function basically is used to match against regular expressions - so I can put in quotes " "a regular expression So to match against sentences that have numbers in them I can use the two slashes with a d+ - this will only match sentences that have numbers within the digit d that is what the d stands for
-
To actually do the transformation I need to reassign it to the df object
Transformation
df = df.withColumn("full_name", concat("last_name"), lit(""), df.col("first_name")));
/Transformation
df = df.filter(df.col("comment").rlike("\d+"));
df.show()
-
Re-run the file
-
You can also do a order by with .orderBy and specify the column that we want to order by a .orderBy(df.col("last_name").asc()); - you can use ascending or descending order
-
Its good to split the lines in code
-
You combine the filter with the column
-
We can create a new data frame that does not conflict with the old data frame
-
I want to save this data frame modified data frame into a database table
-
Instead of showing the contents I actually want this data saved into a database
-
If you are familiar with JDBC - we are using a Postgresql database driver and we are giving it the properties such as the driver which can be part of the source found in the pom file and you can see the dependency for postgresql in the pom file
-
To write to the database we have to use a dataframe to write to the database - SaveMode.overwrite which allows us to rerun this application and keep writing into the table that we specify and the JDBC and specify the URL and paste it
specify the table we will call it project 1 and then prop
package com.jobreadyprogrammer.spark;
import org.apache.spark.sql.SparkSession;
public class Application {
public static void main(String args[]) {
//Create Session
SparkSession spark = new SparkSession.Builder()
.appName("CSV to DB")
.master("local")
.getOrCreate();
// get data
Dataset<Row> df = spark.read().format("csv")
.option("header", true)
.load(("src/main/resources/name_and_comments.txt");
//df.show(3);
//Transformation
df = df.withColumn("full_name",
concat("last_name"), lit(""), df.col("first_name")));
.filter(df.col("comment").rlike("\\d+"))
.orderBy(df.col("last_name").asc());
//df.show()
String dbConnectionUrl = "jdbc:postgresql://localhost/course_data";
Properties prop = new Properties();
prop.setProperty("driver", org.postgresql.Driver");
prop.setProperty("user", "postgreql");
prop.setProperty("password", "password");
df.write()
.mode(SaveMode.Overwrite)
.jdbc(dbConnectionUrl, "project1", prop);
}
}