Assignment on Distributed Databases

Distributed filesystems and databases are where Big Data lives, either temporarily or permanently. It is therefore important to get a practical feel of how things such as sharding, failover and distributed querying work.

In this assignment, your task is to configure a distributed database cluster, insert some data, perform some queries, make it fail and observe the failover procedures.

For performing the tasks below, you will use MongoDB and your programming language of choice (the recommended one is Python). MongoDB can work in standalone mode, as a normal client-server database similar to MySQL. However, what is really interesting about it is that it supports both replication and sharding (albeit sacrificing ACID properties).

Task 1: Configure a sharded cluster

Setup a MongoDB cluster, with sharding and replication enabled. For that, you will need to read about both replication and sharding in MongoDB. You need to install the latest version of MongoDB in the course VM, you can find instructions here.

# Create a directory for the whole installation
mkdir mongoassignment
cd mongoassignment

# Create data directories for each 
mkdir cfg0 cfg1 cfg2
mkdir a0 a1 a2
mkdir b0 b1 b2

# configuration nodes
mongod --configsvr --replSet conf --port 26050 --logpath log.cfg0 --logappend --dbpath cfg0 --fork
mongod --configsvr --replSet conf --port 26051 --logpath log.cfg1 --logappend --dbpath cfg1 --fork
mongod --configsvr --replSet conf --port 26052 --logpath log.cfg2 --logappend --dbpath cfg2 --fork

# replica set 1
mongod --shardsvr --replSet a --dbpath a0 --logpath log.a0 --port 27000 --logappend --smallfiles --fork
mongod --shardsvr --replSet a --dbpath a1 --logpath log.a1 --port 27001 --logappend --smallfiles --fork
mongod --shardsvr --replSet a --dbpath a2 --logpath log.a2 --port 27002 --logappend --smallfiles --fork

# replica set 2
mongod --shardsvr --replSet b --dbpath b0 --logpath log.b0 --port 28000 --logappend --smallfiles --fork
mongod --shardsvr --replSet b --dbpath b1 --logpath log.b1 --port 28001 --logappend --smallfiles --fork
mongod --shardsvr --replSet b --dbpath b2 --logpath log.b2 --port 28002 --logappend --smallfiles --fork

Then, you will need to configure the cluster

# At the MongoDB console for replica set a, add all replica set nodes
mongo --port 27000
> config = {
      _id: "a",
      version: 1,
      members: [
         { _id: 0, host : "localhost:27000" },
         { _id: 1, host : "localhost:27001" },
         { _id: 2, host : "localhost:27002" }
      ]
   }
> rs.initiate(config)
a:PRIMARY>

# At the MongoDB console for replica set b, add all replica set nodes
mongo --port 28000
> config = {
      _id: "b",
      version: 1,
      members: [
         { _id: 0, host : "localhost:28000" },
         { _id: 1, host : "localhost:28001" },
         { _id: 2, host : "localhost:28002" }
      ]
   }
> rs.initiate(config)
b:PRIMARY>

# At the MongoDB console for the configuration servers replica set, add all
# replica set nodes
mongo --port 26050
configsvr> config = {
      _id: "conf",
      version: 1,
      members: [
         { _id: 0, host : "localhost:26050" },
         { _id: 1, host : "localhost:26051" },
         { _id: 2, host : "localhost:26052" }
      ]
   }
configsvr> rs.initiate(config)
conf:PRIMARY>

And finally, you can start the query router, connect to it and setup sharding

# query router (one is enough, more are possible)
mongos --configdb conf/localhost:26050,localhost:26051,localhost:26052 --logappend --logpath log.mongos0 --port 27017

### On a different terminal
mongo
> sh.status()
> sh.addShard("a/localhost:27000")
> sh.addShard("b/localhost:28000")
> sh.getBalancerState()

# Create a database and setup sharding on it
> show databases
> use any_db_name
> db.createCollection("any_collection_name")
> show databases
> sh.enableSharding("any_db_name")

# For each collection that must be sharded, specify a sharding key
> sh.shardCollection("any_db_name.any_collection_name", { "any_field_name" : 1 } )

After you do the above, answer to the following questions:

  • Given a dataset of GitHub commits, in JSON format, what would be the appropriate key to shard upon? Why?

SHA, for the rate of change it is good because new commits will be randomly distributed. For cardinality it is good as well because you can make as many splits as you like. For frequency it is good because it is evenly divided.

  • If the total dataset is 1.000.000 items and we have a 4-way sharded cluster, estimate how many items will reside on each node given the sharding key you chose above.

Approximately 250.000 items per node.

  • What algorithm does MongoDB use for leader election?

Raft algorithm?

  • How can we take an incremental (e.g. every day) backup from the cluster, without overloading it?

By using a delayed replica??

Task 2: Creating data

Write a program that generates 100.000 values in the following format

{
  id: 123 //a unique integer
  name: "A random name from the list here http://www.gutenberg.org/files/3201/files/NAMES.TXT",
  address : [{ // An array populated with 1-10 elements, randomly
    street: "A random place from the list here http://www.gutenberg.org/files/3201/files/PLACES.TXT",
    number: 12 //A random integer in the range 1 -- 500
  }] 
}

The program should also be able to connect to a MongoDB database, using the appropriate MongoDB driver for your language of choice, and write the generated values. Make sure you print a message on the terminal every time you succesfully write an entry in the database.

  • Copy the source code for your program here:
In [ ]:
from pymongo import MongoClient
import random
from random import randint
import time

client = MongoClient()

with open("NAMES.TXT", "r") as f:
    names = f.readlines()
names = [x.strip() for x in names]

with open("PLACES.TXT", "r") as f:
    places = f.readlines()
places = [x.strip() for x in places]

addressData = []

for x in range(0, 100000):
    addresses = []
    for y in range(0, randint(1,10)):
        addresses.append({
            "street": random.choice(places),
            "number": randint(1,500)
        })

    name = random.choice(names)
    myObject = {
        "id": x,
        "name": unicode(name, 'utf-8'),
        "address": addresses
    }
    addressData.append(myObject)

db = client.data
db.addresses.remove()
start = time.time()
result = db.addresses.insert(addressData)
end = time.time()
print end-start
  • On a single MongoDB instance (not the cluster), create a database called data and a collection called addresses. Using the program you wrote, insert the values to the addresses collection. How much time does it take?

3.595s

  • Create a database called data and a sharded collection called addresses. Select the appropriate sharding key and justify your selection. Insert the generated values to your MongoDB cluster. How much time does it take? If it takes more time than in the standalone mode, where does the extra time go? If it takes less, what makes it faster?

I sharded on the ID because we are using a fixed sized database and this key is good for cardinality and frequency.

9.297s

It goes slower because it takes more time to shard the data and create replicas.

Task 3: Perform queries

Given the database of 100.000 items you created in the step above, run the following queries at the MongoDB command prompt, in both the standalone and the sharded database version.

Hint: queries in MongoDB return cursors (iterators) which are lazy; you need to retrieve all results in order to evaluate how much time it takes to execute them. You can use a function like the one below at the MongoDB prompt.

{javascript}
mongo> function timeit(query) {
  var a = Date.now(); 
  query.forEach(function(){}); 
  return (Date.now() - a);
}
mongo> timeit(db.addresses.find({id: 1}))

Fill in the table below with your answers:

Description Query Result (standalone) Time (standalone) Result (cluster) Time (cluster)
1. Number of items in collection addresses db.addresses.count() 100000 4ms 100000 4ms
2. Items where the id is bigger than 50.000 db.addresses.count({ "id": {$gt:50000 }}) 49999 47ms 49999 19ms
3. Names starting with either 'A' or 'K' db.addresses.count({ $or: [{"name": /^A/}, {"name": /^K/}]}) 11575 88ms 11575 67ms
4. Unique street names whose address is even db.addresses.aggregate([{$unwind: "$address" }, { $group: { _id: { street: "$address.street", number: "$address.number" }}}, { $match: { "_id.number": {$mod: [ 2, 0 ] }}}], { allowDiskUse: true }).toArray().length 261105 1880 261105 7054
5. Number of people based on their address db.addresses.aggregate([{$unwind: “$address” }, { $group: { _id: { street: "$address.street", number: "$address.number" }, count: {$sum: 1}}}, {$match: { count: { $gte: 2 }}}], {allowDiskUse: true}).toArray().length 27662 1820 27662 6823

Task 4: Making it fail

Delete all the data from your cluster. Start your data generator again. While the data generator works, remove one cluster node. You can kill a MongoDb process; remove a network cable; stop VM; stop a Docker container; the exact process of removing cluster nodes depends on how you setup the cluster.

After you removed the cluster node, you should observe the cluster reconfigure itself. What where the steps that where taken? How long did the process take? What happened on the client side? How many records where in the database after the data generator finishes? Describe the failover process below.

Hint: Make sure you configure MongoDB to print its logs to a file, so that you can inspect it at a later time.

A new leader was chosen and the data was redistributed. After data generator finishes there are still 100000 records. ??

Restart the failed node. What happens?

Data distributed again and new leader election?