Assignment on Spark (Scala)

Gosia Migut and Georgios Gousios

In this assignment, we will use Spark to have a look at the Movie Lens dataset containing user generated ratings for movies. The dataset comes in 3 files:

  • ratings.dat contains the ratings in the following format: UserID::MovieID::Rating::Timestamp
  • users.dat contains demographic information about the users: UserID::Gender::Age::Occupation::Zip-code
  • movies.dat contains meta information about the movies: MovieID::Title::Genres

Refer to the README for the detailed description of the data.

Note: when using the files use the filepath data/[file].dat, otherwise automatic grading will fail.

Grade: This assignment consists of 105 points. You need to collect them all to get a 10! All cells that are graded include the expected answer. Your task is to write the code that comes up with the expected result. The automated grading process will be run on a different dataset.

Loading and parsing the file

Q1 (5 points): Download the ratings file, parse it and load it in an RDD named ratings.

In [6]:
import org.apache.spark.rdd.RDD

case class Rating(user_ID: Integer, movie_ID: Integer, rating: Integer, timestamp: String)
case class Movie(movie_ID: Integer, title: String, genre: String)
case class User(user_ID: Integer, gender: String, age: Integer, occupation: String, zip_code: String)

def parseRatings(row: String): Rating = {
    return null
}
In [7]:
//load data to RDD and use parseRatings function to parse it.
Out[7]:
MapPartitionsRDD[7] at map at <console>:25

Q2 (5 points): How many lines does the ratings RDD contain?

In [8]:

Out[8]:
1000209

Basic filtering and counting

Q3 (5 points): Count how many times the rating '1' has been given.

In [9]:

Out[9]:
56174

Q4 (5 points): Count how many unique movies have been rated.

In [10]:

Out[10]:
3706

Q5 (5 points): Which user gave most ratings? Return the userID and number of ratings.

In [11]:

Out[11]:
Array((4169,2314))

Q6 (5 points): Which user gave most '5' ratings? Return the userID and number of ratings.

In [12]:

Out[12]:
Array((4277,571))

Q7 (5 points): Which movie was rated most times? Return the movieID and number of ratings.

In [13]:

Out[13]:
Array((2858,3428))

Joining

Now we will look at two additional files from the Movie Lens dataset.

Q8 (5 points): Read the movies and users files into RDDs. How many records are there in each RDD?

In [14]:
//load movies dataset to RDD, parse and cache it.
In [15]:
//how many records are in movies RDD's?
Out[15]:
3883
In [16]:
//load users dataset to RDD, parse and cache it.
In [17]:
//how many records are in users RDD's?
Out[17]:
6040

As you probably have noticed there are more movies in the movies dataset than rated movies.

Q9 (5 points): How many of the movies are a comedy?

In [18]:

Out[18]:
1200

Q10 (10 points): Which comedy has the most ratings? Return the title and the number of rankings. Answer this question by joining two datasets.

In [19]:

Out[19]:
Array((American Beauty (1999),3428))

Q11 (10 points): For users under 18 years old (category 1), what is the frequency of each star rating? Return a list/array with the rating and the number of times it appears, e.g. Array((4,16), (1,3), (3,9), (5,62), (2,2))

In [20]:

Out[20]:
Array((4,8808), (2,2983), (1,2238), (3,6380), (5,6802))

Indexing

As you have noticed, typical operations on RDDs require grouping on a specific part of each record and then calculating specific counts given the groups. While this operation can be achieved with the groupBy family of functions, it is often useful to create a structure called an inverted index. An inverted index creates an 1..n mapping from the record part to all occurencies of the record in the dataset. For example, if the dataset looks like the following:

col1,col2,col3
A,1,foo
B,1,bar
C,2,foo
D,3,baz
E,1,foobar

an inverted index on col2 would look like

1 -> [(A,1,foo), (B,1,bar), (E,1,foobar)]
2 -> [(C,2,foo)]
3 -> [(D,3,baz)]

Inverted indexes enable us to quickly access precalculated partitions of the dataset. Let's compute an inverted index on the rating field of ratings.dat.

Q12 (5 points): Compute the number of unique users that rated the movies with movie_IDs 2858, 356 and 2329.

In [21]:

Out[21]:
4213

Measure the time (in seconds) it takes to make this computation.

In [22]:

1.022

Q13 (5 points): Create an inverted index on ratings, field movie_ID. Print the first item.

In [23]:

Out[23]:
ArrayBuffer(CompactBuffer(Rating(1,1,5,978824268), Rating(6,1,4,978237008), Rating(8,1,4,978233496), Rating(9,1,5,978225952), Rating(10,1,5,978226474), Rating(18,1,4,978154768), Rating(19,1,5,978555994), Rating(21,1,3,978139347), Rating(23,1,4,978463614), Rating(26,1,3,978130703), Rating(28,1,3,978985309), Rating(34,1,5,978102970), Rating(36,1,5,978061285), Rating(38,1,5,978046225), Rating(44,1,5,978019369), Rating(45,1,4,977990044), Rating(48,1,4,977975909), Rating(49,1,5,977972501), Rating(51,1,5,977947828), Rating(56,1,5,977938855), Rating(60,1,4,977931983), Rating(65,1,5,991368774), Rating(68,1,3,991376026), Rating(73,1,3,977867812), Rating(75,1,5,977851099), Rating(76,1,5,977847069), Rating(78,1,4,978570648), Rating(80,1,3,977786904), Rating(90,1,3,99...

Q14 (5 points): Compute the number of unique users that rated the movies with movie_IDs 2858, 356 and 2329 using the index

In [24]:

Out[24]:
4213

Measure the time (in seconds) it takes to compute the same result using the index.

In [25]:

19.245

You should have noticed difference in performance. Is the indexed version faster? If yes, why? If not, why not? Discuss this with your partner.

Dataframes

Q15 (5 points): Create a data frame from the ratings RDD and count the number of lines in it. Also register the data frame as an SQL table.

In [26]:

Out[26]:
1000209

Q16 (5 points): Provide the statistical summary of the column containing ratings (use Spark function that returns a table with count, mean, stddev, min, max).

Hint: To select the correct column you might first want to print the datatypes and names of each of the columns.

In [27]:

+-------+------------------+
|summary|            rating|
+-------+------------------+
|  count|           1000209|
|   mean| 3.581564453029317|
| stddev|1.1171018453732544|
|    min|                 1|
|    max|                 5|
+-------+------------------+

Q17 (5 points): Count how many times the rating '1' has been given, by filtering it from the ratings DataFrame. Measure the execution time and compare with the execution time of the same query using RDD. Think for yourself when it would be usefull to use DataFrames and when not.

In [28]:

Out[28]:
56174

Q18 (5 points): Count how many times the rating '1' has been given, using an SQL query

In [29]:

+--------+
|count(1)|
+--------+
|   56174|
+--------+

Q19 (5 points): Which user gave most '5' ratings? Return the userID and number of ratings, using an SQL query

In [30]:

+-------+-----------+
|user_ID|num_ratings|
+-------+-----------+
|   4277|        571|
+-------+-----------+
only showing top 1 row