Data is transforming the film industry, and movie analytics using Scala and Spark addresses critical challenges in this dynamic field. This project harnesses Scala's expressiveness and Spark's distributed processing power to optimize content creation and investment decisions. By analyzing movie-related datasets with Spark RDDs, Spark SQL, and Spark DataFrames, it uncovers valuable insights, enhances decision-making, and improves the movie-watching experience for audiences.
- Apache Spark: High-performance distributed computing system for large-scale data processing and analytics.
- Scala: Dynamic programming language blending functional and object-oriented concepts, running on the JVM for enhanced developer productivity.
- Download and install Python 3.11
- Download and install Java SDK 17
- Download Spark 3.4.1 pre-built for Apache Hadoop 3.3 & later
- Extract the Spark download files
- Download Hadoop winutils from GitHub
- Create folder structure:
C:\SPARK
C:\HADOOP\bin
- Copy the downloaded Spark and Hadoop files into the above folders
- Set the environment variables for Java, Hadoop, and Spark
- In CMD, from the path
C:/SPARK/bin
, execute the command:spark-shell
We have three datasets named movies.dat
, ratings.dat
, and users.dat
which are explained below:
-
ratings.dat: This dataset consists of user ratings with the following columns:
- UserID: Ranges from 1 to 6040. Each user has rated at least 20 movies. References UserID in
users.dat
. - MovieID: Ranges from 1 to 3952. References MovieID in
movies.dat
. - Rating: Ratings are on a scale of 1 to 5 stars.
- Timestamp: Represented in seconds since the epoch as returned by time.
- UserID: Ranges from 1 to 6040. Each user has rated at least 20 movies. References UserID in
-
users.dat: This dataset consists of user data with the following columns:
- UserID: Ranges from 1 to 6040 and is unique for all users.
- Gender: 'M' for male and 'F' for female.
- Age: Categorized into ranges such as under 18, 18-24, 25-34, 35-44, 45-49, 50-55, 56+.
- Occupation: Categorized into 20 different categories such as farmer, artist, homemaker, customer service, healthcare, retired, scientist, engineer, tradesman, unemployed, executive/managerial, K-12 student, educator, programmer, lawyer, etc.
- Zip-code: User-provided postal codes.
-
movies.dat: This dataset consists of movie information with the following columns:
- MovieID: Ranges from 1 to 3952 and is unique for all movies.
- Title: Unique movie titles provided by IMDB.
- Genres: Includes more than 10 genres such as action, adventure, fantasy, drama, war, western, thriller, horror, musical, animation, etc.
- Prepare Movies dataset: Clean delimited data and extract the year and genre.
- Prepare Users dataset: Load a double-delimited CSV file into a DataFrame and specify the schema programmatically.
- Prepare Ratings dataset: Load a double-delimited CSV file into a DataFrame and specify the schema programmatically.
To achieve the above results, run the command sh execute.sh
in the terminal at the relevant path.
val movies_rdd=sc.textFile("../../Movielens/movies.dat")
val movie_nm=movies_rdd.map(lines=>lines.split("::")(1))
val year=movie_nm.map(lines=>lines.substring(lines.lastIndexOf("(")+1,lines.lastIndexOf(")")))
val latest=year.max
val latest_movies=movie_nm.filter(lines=>lines.contains("("+latest+")")).saveAsTextFile("result")
System.exit(0)
val movies_rdd = sc.textFile("../../Movielens/movies.dat")
val genres = movies_rdd.map(lines => lines.split("::")(2))
val testing = genres.flatMap(line => line.split('|'))
val genres_distinct_sorted = testing.distinct().sortBy(_(0))
genres_distinct_sorted.saveAsTextFile("result")
System.exit(0)
val movies_rdd=sc.textFile("../../Movielens/movies.dat")
val genre=movies_rdd.map(lines=>lines.split("::")(2))
val flat_genre=genre.flatMap(lines=>lines.split("\\|"))
val genre_kv=flat_genre.map(k=>(k,1))
val genre_count=genre_kv.reduceByKey((k,v)=>(k+v))
val genre_sort= genre_count.sortByKey()
genre_sort.saveAsTextFile("result-csv")
System.exit(0)
val movies_rdd=sc.textFile("../../Movielens/movies.dat")
val movies=movies_rdd.map(lines=>lines.split("::")(1))
val string_flat=movies.map(lines=>lines.split(" ")(0))
// check for the first character for a letter then find the count
val movies_letter=string_flat.filter(word=>Character.isLetter(word.head)).map(word=>(word.head.toUpper,1))
val movies_letter_count=movies_letter.reduceByKey((k,v)=>k+v).sortByKey()
// check for the first character for a digit then find the count
val movies_digit=string_flat.filter(word=>Character.isDigit(word.head)).map(word=>(word.head,1))
val movies_digit_count=movies_digit.reduceByKey((k,v)=>k+v).sortByKey()
// Union the partitions into a same file
val result=movies_digit_count.union(movies_letter_count).repartition(1).saveAsTextFile("result-csv")
System.exit(0)
val ratingsRDD=sc.textFile("../../Movielens/ratings.dat")
val movies=ratingsRDD.map(line=>line.split("::")(1).toInt)
val movies_pair=movies.map(mv=>(mv,1))
val movies_count=movies_pair.reduceByKey((x,y)=>x+y)
val movies_sorted=movies_count.sortBy(x=>x._2,false,1)
val mv_top10List=movies_sorted.take(10).toList
val mv_top10RDD=sc.parallelize(mv_top10List)
val mv_names=sc.textFile("../../Movielens/movies.dat").map(line=>(line.split("::")(0).toInt,line.split("::")(1)))
val join_out=mv_names.join(mv_top10RDD)
join_out.sortBy(x=>x._2._2,false).map(x=> x._1+","+x._2._1+","+x._2._2).repartition(1).saveAsTextFile("Top-10-CSV")
System.exit(0)
spark.sql("""Select movieid,
cast((avg(rating)) as decimal(16,2)) as Average_ratings
from sparkdatalake.ratings
group by movieid
order by cast(movieid as int) asc
""").repartition(1).write.format("csv").option("header","true").save("result")
System.exit(0)
val movies_rdd=sc.textFile("../../Movielens/movies.dat")
// 1st method, convert existing rdd into DF using toDF function and then make it into a view
val movies_DF=movies_rdd.toDF.createOrReplaceTempView("movies_view")
// To use spark.sql, it should be at least a temporary view or even an table
spark.sql(""" select
split(value,'::')[0] as movieid,
split(value,'::')[1] as moviename,
substring(split(value,'::')[1],length(split(value,'::')[1])-4,4) as year
from movies_view """).createOrReplaceTempView("movies");
// To view the records, use spark.sql("select * from movies").show()
var result=spark.sql("Select * from movies m1 where m1.year=(Select min(m2.year) from movies m2)").repartition(1).rdd.saveAsTextFile("result")
System.exit(0);
Clone this repository
git clone <[repository-url](https://github.com/Devubavariaa/DATA-ANALYSIS-AND-MOVIE-RECOMMENDATION-USING-SPARK)>