Skip to content

Master Spark First Application

William Fox edited this page Jul 30, 2021 · 1 revision

First Project Source Code

The purpose of the application that is being created is going to be a good end to end example where we read data from a CSV file and load it into a database

For every spark application that you write the very first step is to connect with something called Spark Master Node- To connect with a Master you first establish a session

1. Create a Session - SparkSession spark = new SparkSession.Builder()

2. Create a Name - .appName("CSV to DB")

3. Specify the Master .master("local")

4. The put get or Create statement - .getOrCreate();

5. Add to get the data - to do this we can use a spark session and we could do read and then specify the format of 

data that we want to read - this is a CSV data - Its important to note that other formats are also allowed such as JSON and Parquet files , and XML. So the options here are endless and then we specify whether this file has a header or not. CSV files could have header is true or false

6. We have to specify the file location using the .load - in this case it is called name_and_comments.txt - 

locating the file in src/main/resources/name_and_comments.txt

7. Its not enough to  read the file  - we need to save the file into some data structure - And the data structure that we  are going to use here is known as the data set with rows  - the dataset in using angle brackets the type that this data set supports is called a Row and then use the DF as a variable Data Frame  variable to indicate that this is a data frame 

8. Use CMD SHIFT O to bring in the imports and notice its bringing in from  org.apache.spark.sql.Row - make sure you get the right import

9. What is a DataFrame which is identified by a data type - think of it as a database table  -its got Rows and  Columns 

10. Use dataframe  - use df.show() this is going to display the contents of this data frame in the console

11. Right click in the window scroll down to Run As Java Application  and you can see the results in the Terminal 

Window

12. Now you can also display only sertain columns with the df.show() by using df.show(2) to only show 2 columns

13. Now we want to do another transformation - we want to see the comments that have numbers in them  - you can see 

in the txt file that there are numbers in the comment section - so we will perform another transformation

14. You have to put a condition with a column name in this case the comment column and then we can call a function called rlike - SQL has the like operator and databases support regex like and that it is used for regular expression - RLIke here function basically is used to match against regular expressions - so I can put in quotes " "a regular expression - So to match against sentences that have numbers in them I can use the two slashes with a d+ - this will only match sentences that have numbers within the digit d that is what the d stands for

15. To actually do the transformation  I need to reassign it to the df object 

	//Transformation
	 df = df.withColumn("full_name", concat("last_name"), lit(""), df.col("first_name")));
	
	//Transformation
	 df = df.filter(df.col("comment").rlike("\\d+"));  
	 df.show()
16. Re-run the file
17. You can also do a order by with .orderBy and specify the column that we want to order by
	a. .orderBy(df.col("last_name").asc());  - you can use ascending or descending  order  
18. Its good to split the lines in code
19. You combine the filter with the column
20. We can create a new data frame that does not conflict with the old data frame 
21. I want to save this data frame modified data frame into a database table 
22. Instead of showing the contents I actually want this data saved into a database
23. If you are familiar with JDBC  - we are using a Postgresql database driver and we are giving it the properties such as the driver which can be part of the source found in the pom file and you can see the dependency for postgresql in the pom file
24. To write to the database we have to use a dataframe to write to the database - SaveMode.overwrite  which allows us to rerun this application and keep writing into the table that we specify and the JDBC and specify the URL and paste it  specify the table we will call it project 1 and then prop 

Application.java

package com.jobreadyprogrammer.spark;

import org.apache.spark.sql.SparkSession;

public class Application {

public static void main(String args[]) {

	 //Create Session
	SparkSession spark = new SparkSession.Builder()
			.appName("CSV to DB")
			.master("local")
			.getOrCreate();
			
	// get data
	 Dataset<Row> df  = spark.read().format("csv")
		  .option("header", true)
		  .load(("src/main/resources/name_and_comments.txt");
		
	
	 //df.show(3);
	
	//Transformation
	 df = df.withColumn("full_name", 
			concat("last_name"), lit(""), df.col("first_name")));
			.filter(df.col("comment").rlike("\\d+"))
			.orderBy(df.col("last_name").asc());  
	
	//df.show()
	
	String dbConnectionUrl = "jdbc:postgresql://localhost/course_data";
	Properties prop = new Properties();
	 prop.setProperty("driver", org.postgresql.Driver");
	 prop.setProperty("user", "postgreql");
	 prop.setProperty("password", "password");
	
	
	df.write()
		.mode(SaveMode.Overwrite)
		.jdbc(dbConnectionUrl, "project1", prop);
		
}

}

Clone this wiki locally