Name:

Student Number:

BSc CS/TI or minor student?:

Instructions

The total number of points is 75. You have 180 minutes: Good Luck!

Open Questions

Please provide brief answers to the following questions. Be precice!

Data processing with FP

  1. (2 points) Describe what is a relation and what is a Key/Value pair and comment on their differences.

Relation: a set of tuples which share the same type(s). Keys are always unique. Key/Value pair: a 2-tuple where the key is the (unique) identifier for the value. It can contain duplicate keys.

Differences: in a relation the types are predefined while using key/value pairs this is not the case. This makes key-value pairs more flexible while relations provide more (type) safety.

  1. Provide the function signatures for the following functions of the type List[A].

    • (1 point) foldL: foldL(list: List[A], f: (acc: B, x: A) => B, init: B) : B
    • (1 point) reduceR: reduceR(list: List[A], init: B, f: (acc: A, x: B) => B) : B
    • (1 point) flatMap: flatMap(list: List[A], f: A => List[B]) : List[B]
    • (1 point) scan: scan(list: List[A], f: (B, A) => B, init: B) : List[B]
    • (1 point) groupBy: groupBy(list: List[A], f: => K) : Map[K, List[A]]
  2. (2 points) Implement groupBy for List[A] using foldL.

def groupBy(list: List[A], classifier : (A) => String)) : Map[String, List[A]] = {
    def helper(map : Map[String, List[A]], item : A) : Map[String, List[A]] = {
        var key = classifier(item)
        
        if (map.contains(key)) {
            val values: List[A] = (map.get(key)).get
            return map + (key -> (values :+ item))
        } else {
            return map + (key -> (item :: Nil))
        }
    }


    return foldL(list, Map[String, List[A]](), helper)
}
  1. (2 points) Implement foldL for List[A] using foldR
def foldL(list: List[A], f: (B, A) => B, init: B) : B = {
    var functionSwap = (x: A, y: B) => f(y, x)
    return foldR(list.reverse, functionSwap, init)
}
  1. (2 points) Implement leftJoin(xs:[(K, A)], ys:[(K, B)]) for KV pairs xs and ys where the type K signifies the key. What is the return type?
def leftJoin[K,A,B](xs:List[(K, A)], ys:List[(K, B)]) : List[(K, (A, Option[B]))] = {

  xs.flatMap { x =>
    val inBoth = ys.filter(y => y._1 == x._1)

    if (inBoth.size > 0) {
      inBoth.map(i => (x._1, (x._2, Some(i._2))))
    } else {
      List((x._1, (x._2, None)))
    }
  }
}

val xs = List((1,2), (2,3), (1,4))
val ys = List((1,2))

leftJoin(xs, ys)
  1. (2 points) How would you rewrite (in any language) the following 2 examples without side-effects?

Example 1, in Scala

var sum = 0
for (i <- 1 to 10) {
  sum = sum + i
}
var toSum = List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
var summed = toSum.reduce(_ + _)

Example 2, in Python

x = {}
for i in [1,2,3,4,5,6]:
  key = i % 2
  if key in x.keys():
    x[key].append(i)
  else:
    x[key] = [i]
val toGroup = List(1, 2, 3, 4, 5, 6)
toGroup.groupBy(x => x % 2)
  1. (2 points) Describe what monads are and why they are useful. Give an example of a useful monad application.

They are a design pattern which define how functions can be used together to build generic types. They wrap values, that can only be transformed to another monad. They are useful to deal with side-effects.

Example: Option[T], which deals with null values, or Future[T], which masks network latency.

Distributed systems/databases/filesystems

  1. (2 points) What is Amdahl’s law and what are its implications?

Amdahl’s law is a formula to calculate the maximum improvement if you parallelize part of the system. It gives us an upper bound to the speedup that we can obtain through parallelization. One implication is that performance gains through paralellization drop sharply even if a very small fraction (e.g. 5%) of our computation is serial.

  1. (2 points) What is the CAP conjecture and what are its implications?

The CAP conjecture consists of the following guarantees: - Consistency - Availability - Partition tolerance

It is speculated that a distributed system can only provide 2 of those 3 guarantees. In practice, a system can provide all 3 guarantees, while the network is working.

  1. (3 points) What are the 3 typical replication architectures? Describe one benefit and one drawback for each.
master-slave:
    benefit: Reads can scale 
    drawback: Bad availability if the master crashes
master-master:
    benefit: If a master crashes, there are still other masters available. Writes can scale.
    drawback: Might run in conflicts with concurrent writes
leaderless replication:
    benefit: Node failures can be tolerated more easily
    drawback: Client has to handle synchronization conflicts
  1. (2 points) Explain the guarantees given by each “letter” in an ACID compliant database.
A: atomic: a transaction is either completely executed or not.

C: consistent: a transaction will bring the database from one valid state to another.

I: isolated: transactions cannot read data from each other 
(depending on the isolation level) if a transaction is not yet completed. 

D: durable: if a transaction has been committed, it will be remain so, even after a crash or error
  1. (6 points) Consider a cluster of 6 machines running HDFS (1 namenode, 5 datanodes). Each node in the cluster has a total of 1TB hard disk space and 2GB of main memory available. The cluster uses a block-size of 64 MB and a replication factor of 3. The master maintains 100 bytes of metadata for each 64MB block. Imagine that we upload a 128GB file.
  • (2 points) How much data does each datanode store?
data * replication factor = 128 * 3 = 384GB total data
384 / 5 datanodes = 76.8GB per data node
(76.8 * 1000MB) / 64MB = 1200 blocks 
  • (4 points) Describe how HDFS will store the file.

The client requests the NameNode to write a file. The NameNode provides the address of the DataNodes. Then the client directly writes the data on the DataNodes. Internally the DataNodes will replicate the data 3 times. Once the data is replicated, the DataNode sends an acknowledgment to the client.

  1. (10 points) You are tasked to design a system that reliably delivers messages from one client to another (think Whatsapp). The process to deliver a message from client A to client B is the following:

    1. A writes outgoing messages to a queue
    2. The system retrieves it from the queue and stores it in a database
    3. The system notifies B that a message is waiting
    4. B connects, retrieves the message and marks it as read (let’s ignore read receipts for now)

The system needs to work 24/7 and be resilient to database and queue failures. You will be using ready-made databases and queues that support replication, sharding and transactions. How would you design it? Discuss the advantages and disadvantages of your design decisions.

This is an open question with no single correct solution. Some elements that would lead to a 10 point grade in this question are the following:

  • Ordering of messages only depends on the source (client A); therefore, a timestamp-based ordering is enough to guarantee that client B sees A’s messages in the correct order. If two messages arrive out of order on B, B can just sort them based on the timestamp.

  • Both the queue and the database need to be both highly available and high performance. This means that they need to employ replication. Multi-master replication can work nicely for the queue as it is write intensive and ordering of messages does not need to be maintained within the system. The database requires transactional semantics both for writing messages and acknoweledgements, so a simpler single-master replication scheme might be more suitable.

  • The queue and the database need to co-ordinate writes; to do this in a way that no messages are lot between 2 indpendent systems, we need to employ 2 phase commits.

  • It is difficult to scale millions of clients (B) connected to a single database that continuously ask the database whether there are new messages for them. So what we need is an Observation read pattern, where each client B connects to an intermediate system that sits between the database and the client and just maintains an open connection to B. When a new message transaction is committed in the database, the database notifies the intermediate system about a new message for each client B. The notification can happen through, for example, a stored procedure. The intermediate system then pushes the new message to B.

  • The intermediate system can also acknoweledge messages to the database after it pushes them to B and B acknoweledges receipt. There are no transaction requirements as the writes only affect a single record.

Spark

You are given the following dataset; it contains citation information in the IEEE CS format (lines that end with \ continue to the next line):

S. Ryza, U. Laserson, S. Owen, and J. Wills, Advanced analytics with spark: \
 Patterns for learning from data at scale. O’Reilly Media, Inc., 2015.
H. Karau, A. Konwinski, P. Wendell, and M. Zaharia, Learning spark: \
  Lightning-fast big data analysis. O’Reilly Media, Inc., 2015.
B. Chambers and M. Zaharia, Spark: The definitive guide. O’Reilly Media, Inc., 2017.
M. Kleppmann, Designing data-intensive applications. O’Reilly Media, Inc., 2017.
H. Karau and R. Warren, High performance spark. O’Reilly Media, Inc., 2017.
T. H. Cormen, C. E. Leiserson, Ronald L. Rivest, and C. Stein, Introduction \
  to algorithms (3rd ed.). MIT press, 2009.
P. Louridas, Real world algorithms. MIT press, 2017.

The format looks like this:

author1, author2, ... , authorN, title. publisher, year.
  1. (2 points) Write code to load the data into an RDD. Then, convert the RDD into a DataFrame.
def parser(input: String) : (Array[String], String, String, Integer) = {
    var authors = input.split(",")
    var last2Elements = authors.takeRight(2)
    var titlePublisher = last2Elements(0).split(".", 2) //split on . max 2 elements

    
    (authors.dropRight(2), titlePublisher(0), titlePublisher(1), last2Elements(1)) 
}

val rdd = spark.textFile(data).map(x = parser(x))

//convert to dataframe
rdd.toDF()
  1. (2 points) Write a query (in SQL or programmatically) to print a list of publisher names along with the number of publications in the dataset. For the given dataset, the output should look like:
O’Reilly Media, Inc., 5
MIT press, 2
rdd.map(x => (x._3, 1)).reduceByKey((a, b) => a + b).sortBy(x => x._2, false).collect()
  1. (2 points) Write a query (in SQL or programmatically) to find the author with the most publications. In our case that should be: M. Zaharia.
rdd.map(x => x._1).flatMap(x => x).map(x => (x, 1)).reduceByKey((a, b) => a + b).sortBy(x => x._2, false).take(1)
  1. (2 points) In the 2 queries you have writen above, which operators cause a data shuffle?
reduceByKey and sortBy
  1. (2 points) Explain how Spark deals with fault tolerance.

Spark uses RDD lineage information to know which partition(s) to recompute in case of a node failure.

  1. (2 points) Explain how Spark schedules jobs on a cluster.

A job is initiated when an action is called on a RDD. The RDD dependency graph is then evaluated backwards and a graph of stages is build. Each stage consists of multiple tasks. Tasks are scheduled in parallel on cluster nodes.

Streaming (BSc CS/TI students only!)

Only answer the following questions if you are a BSc TI student!

  1. (3 points) Why do we need streaming windows? What is a range and what is a slice in a streaming window?

We need streaming windows to aggregate data on a relevant set of data.

  • Range: The range on which window operations are executed (e.g. 100 entries, 12 hours)
  • Slide: The frequency of the evaluation of a window range.
  1. (3 points) Say that we have 5 different event sources and 20 clients that can consume all of them; briefly describe 3 ways in which we can connect the sources to the clients, and comment on the relative drawbacks of each.
  • direct messaging: send events directly to clients, drawback is that both the sources and clients have to be online at the same time.

  • message brokers: adds some sort of ‘queue’ between source and client also functioning as load balancer. drawback is that the broker needs to ‘bookkeep’ the events in the queue and this broker is not scalable.

  • partitioned logs: ‘distributed’ message brokers, so easily scalable. Drawback is that using partitioned logs requires a more complex setup.

Unix (Minor students only!)

Only answer the following questions if you are a minor student!

  1. (2 points) Write a program that given a directory of text files, prints the 10 most common words in those files (across all files)
find . -type f |
xargs cat |
cut -f 1 -d ' ' |
sort |
uniq -c |
sort -n |
tail -n 10

For the next 2 questions, you are given a CSV file whose contents look like the following:

$ cat fileA
user_id, account_id, balance
10, 12, 1000
10, 1, 32
12, 122, 5
...

$ cat fileB
account_id, transaction_id, amount
12, 332, 21
122, 21, 20
...
  1. (2 points) Write a pipeline to find and report duplicate lines in fileA
sort fileA | uniq -c | sort -n | tr -s ' '|grep "^ [2-9]* .*"
  1. (2 points) Write a pipeline to list all transactions per user.
join -1 2 -2 1 -t ',' <(cat fileA|sed '1d'|sort -n -k 2 -t ',') <(cat fileB|sed '1d'| sort -n)

Multiple choice questions

Please select and circle only one answer per question.

  1. (1 point) What is the correct function signature for reduce on Spark RDDs?

    1. RDD[A].reduce(f: (A,B) -> B)
    2. RDD[A].reduce(f: (A,A) -> A) <—
    3. RDD[A].reduce(init: B, seqOp: (A, B) -> A, combOp: (B, B) -> B)
    4. RDD[A].reduce(init:A, f: (A,A) -> A)
  2. (1 point) Distributed systems:

    1. Share memory
    2. Share CPUs
    3. Share networks
    4. None of the above <—
  3. (1 point) Lamport timestamps

    1. enable us to establish total causal ordering of events
    2. enable us to establish a total order of events
    3. given 2 events A and B, enable us to determine which one happend first
    4. none of the above <—
  4. (1 point) The serializable transaction isolation level protects transcations against:

    1. Dirty reads
    2. Non-repeatable reads
    3. Phantom reads
    4. all of the above <—
  5. (1 point) Which of the following methods is part of the Observer interface, for dealing with push-based data consumption?

    1. def subscribe(obs: Observer[A]): Unit
    2. def onNext(a: A): Unit <—
    3. def map(f: (A) -> B): [B]
    4. def onExit(): Unit
  6. (1 point) An transformation in Spark:

    1. Schedules a pipeline run <—
    2. Runs a pipeline and reports a result
    3. Triggers a data shuffle
    4. Does nothing
  7. (1 point) Which of the following is a likely computation order of applying reduceR to a list of 10 integers with the ‘+’ operator?

    1. (((((((((0 + 1) + 2) + 3) + 4) + 5) + 6) + 7) + 8) + 9)
    2. (((1 + 2) + (3 + 4)) + 5) + (((6 + 7) + (8 + 9)) + 0)
    3. (1 + (2 + (3 + (4 + (5 + (6 + (7 + (8 + (9 + 0))))))))) <—
    4. (1 + 2) + (3 + 4) + (5 + 6) + (7 + 8) + (9 + 0)
  8. (1 point) Collaborative filtering:

    1. Allows us to predict a user’s preferences using his/her previous choices
    2. Allows us to predict a user’s preferences using other similar user’s preferences <—
    3. Is a technique where swarms of collaborating users inspect and filter content before it reaches its destination
    4. Is used by Google to filter out search results
  9. (1 point) What is precision in the context of Machine Learning? (\(TP\) = True Positive, \(FP\) = False Positive, \(TN\) = True Negative, \(FN\) = False Negative)

    1. \(\frac{TP}{TP + FP}\) <—
    2. \(\frac{TP}{TP + FN}\)
    3. \(\frac{FP}{FP + TN}\)
    4. None of the above
  10. (1 point) What is Byzantine fault tolerance?

    1. Resilience against multiple node failures
    2. Resilience against suboptimal voting <—
    3. Tolerating node failures when electing cluster leaders
    4. A mechanism used by the Byzantine empire for the good of its people
  11. (1 point) What is eventual consistency?

    1. At any time, the system is linearisable
    2. At any time, concurrent reads from any node return the same values
    3. If writes stop, all reads will return the same value after a while <—
    4. If writes stop, a distributed system will become consistent
  12. (1 point) Which of the following RDD API calls is a performance killer in Spark?

    1. reduceByKey
    2. keyBy
    3. groupByKey <—
    4. aggregatebyKey
  13. (1 point) Copy-on-write is a technique to:

    1. Enable efficient immutable data
    2. Implement filesystems
    3. Share read-only buffers
    4. All the above <—
  14. (1 point) What is the correct signature for rightOuterJoin on Spark RDDs?

    1. RDD[(K,V)].rightOuterJoin(other: RDD[(K, W)]): RDD[(K, (Option[V], W))] <–
    2. RDD[(K,V)].rightOuterJoin(other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
    3. RDD[(K,V)].rightOuterJoin(other: RDD[(K, W)]): RDD[(K, (V, W))]
    4. RDD[(K,V)].rightOuterJoin(other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
  15. (1 point) A GFS chunkserver/HDFS datanode is responsible to:

    1. Store filesystem path information
    2. Split the data into partitions
    3. Store data partitions <—
    4. Replicate the data onto multiple disks