Big Data with Hadoop Cluster

Introduction

It’s rare that you can go to a medium to large-enterprise IT environment and not have some kind of “Big Data” setup. Information and analytics are becoming central to corporate decision making support. Today we’re going to talk about one of the better-known big data tools, Hadoop.

Hadoop is an Apache project born out of a Google paper on distributed processing and map reduce. It’s designed to be a computing engine that is reliable and scalable on commodity hardware. It’s also named after a toy elephant! We’re going to look at a number of programmatic means of interacting with Hadoop to analyze data: Mr. Job, Impala, Hive, Pig and Spark.

Hadoop Cluster

First, let’s take a look at the components that comprise Hadoop:

  • Hadoop common: The “glue” libraries that make everything work. The foundation for everything else
  • HDFS: A distributed file system, built for redundancy and reliability. Optimized for distributed computing workloads.
  • YARN: A job scheduler and resource manager.
  • Map Reduce: A tool to parallelize computing. Uses YARN.

Although there are more components commonly associated with Hadoop, the above are the 4 core components. They provide you the basic building blocks for distributed programming. This blog will also cover some related Hadoop projects that are higher-level than a raw map reduce job:

  • Hadoop streaming: Allows map / reduce jobs to pipe data to other programming languages
  • Hive: A SQL-like engine that generates map reduce jobs to get query results
  • Impala: Essentially an optimized version of Hive that accesses data directly without transformation
  • Pig: A high-level language for creating map / reduce jobs
  • Spark: uses HDFS, but directly processes data aside from Map Reduce

Note the diagram below is incomplete; it’s meant to focus on the fact that impala and spark bypass map reduce for efficiency.

blog-hadoop.png

Problems Hadoop solves

Hadoop is useful to solve a number of problems, including:

  • Handling large data sets (terabytes or larger)
  • Multi-worker mapping (converting source data to a format ready to be reduced to a set of analytics)
  • Multi-worker reducing (reducing data that has been mapped to a set of analytics)
  • Redundancy, load balancing
  • Distributed storage / file system
  • Various, differing data sources / ETL

When to avoid Hadoop

  • Queuing: Don’t think of Hadoop as a replacement for a queueing engine; it’s not. It’s design to analyze large datasets in a distributed means; it’s not a low-level multiprocessing tool or a queue.
  • Real-time (most of the time; there are tools that make Hadoop capable of doing this, but it’s not its core strength and better tools exist for this)
    • On-demand querying
    • On-demand data loading
    • Updates
  • Smaller data sets: if a job can run on a single machine or is only gigabytes of data, Hadoop is usually overkill
  • Not a replacement for a data warehouse
  • Transactions
  • Reducing complexity

A simple EMR job with Mr. Job

First, let’s start with a very basic map reduce job. We’re going to use a python library called Mr. Job, which interacts with Hadoop streaming via shell input/output to perform map reduce operations. It has a very useful feature set:

  • A python API for integrating with Hadoop
  • Uses the Hadoop Streaming .jar
  • Supports various serialization techniques
  • Supports mapping, reducing, combining
  • Supports multi-step jobs
  • Does require cluster have mrjob installed
  • Has a “local mode” for easy testing

We’re going to get a character, line, and word count for a very simple text file:

This  is a sample text file
It has some words in it
Hail hydra

From there, we’ll write a very simple map reduce job in Mr. Job that processes the file

from mrjob.job import MRJob
 
class MRWordFrequencyCount(MRJob):
 
  def mapper(self, _, line):
    yield "chars", len(line)
    yield "words", len(line.split())
    yield "lines", 1
 
  def reducer(self, key, values):
    yield key, sum(values)
 
if __name__ == '__main__':
    MRWordFrequemcyCount.run()

Running the job is simple

python jobs/test_job.py --conf-path=/projects/jbs_hadoop/conf/mrjob.conf -r emr s3://data-imports-jbs/test.txt

Note that our mrjob.conf file has our AWS account setup information. Mr. Job creates an EMR cluster on the fly, processes the job, then suspends the cluster. The output is as follows:

Moving /var/folders/1j/ts0y38q1389407q
Streaming final output from /var/folders/1j/ts0y38q1389407q
"chars" 59
"lines" 3
"words" 14
removing tmp directory /var/folders/1j/ts0y38q1389407q

A more advanced EMR job

Next, let’s look at a more complicated example. We’re going to take some sample phone accelerometer data (with users and activity types) and gather some simple analytics about it.

Index,Arrival_Time,Creation_Time,x,y,z,User,Model,Device,gt
0,1424696633908,1420076531913248572,-5.958191,0.6880646,8.135345,a,nexus4,nexus4_1,stand
1,1424696633909,1424696631918283972,-5.95224.0.6702118,8.136536,b,nexus4,nexus4_1,walk
2,1424696633918,1420076731923288855,-5.9950867,0.6535491999999999,8.204376,c,nexus4,nexus4_1,stand
3,1424696633919,1424696931928385290,-5.9427185,0.6761626999999999,8.128204,a,nexus4,galaxy4_1,sit
4,1424696633929,1434696631933420691,-5.991516000000001,0.64164734,8.135345,b,nexus4,nexus4_1,sit
5,1424696633929,1434696631938456091,-5.965332,0.6297455,8.128204,c,nexus4,nexus4_1,stand
6,1424696633938,1454696231943522009,-5.991516000000001,0.6356963999999999,.816272,a,nexus4,nexus4_1,stand
7,1424696633939,1454696631948496374,-5.915344,0.63093567,8.105591,b,nexus4,iphone4_1,walk
8,1424696633951,1434696631938456091,-5.984375,0.6940155,8.067505,c,nexus4,nexus4_1,stand

The first step is to write some utility code; it’s purpose is to get a min and max of two different columns. It also provides information on how to cast columns to the appropriate data type. Finally, it has a helper function to help sum certain keys:

class ActivityCount(MRJob):
  INTERNAL_PROTOCOL = PickleProtocol
  OUTPUT_PROTOCOL = CsvProtocol #write output as CSV
 
  MAPPINGS = {
    #label, (column, cast as, perform action)
    'max_x': ('x', float, max),
    'min_x': ('x', float, min),
    'min_creation_time': ('Creation_Time', from_epoch, min),
    'max_creation_time': ('Creation_Time', from_epoch, max),
  }
  COUNTS = ("Device", "gt",)
  PERCENTAGES = ("Device", "gt")
 
  # === helper functions
 
  def handle_status(self, label, values):
    _, _, function = self.MAPPINGS[label]
 
    return label, function(values)
 
  def handle_counts(self, label, counts):
    return label, sum(counts)
 
  def handle(self, key, values):
    if key in self.MAPPINGS:
      label, value = self.handle_stats(key, values)
    elif '_count' in key:
      label, value = self.handle_counts(key, values)
    else:
      raise Exception("Did not recognize label.")
 
    return label, value

You don’t need to really focus on the boiler plate code above too much; just know that it’s used to compute min’s and maxes. Next, let’s define the steps that we must take in order to complete the map reduce job. We have a mapper (convert a single line in a text file to keys to be processed), a combiner (to add efficiency to the job; not required), a first-level reducer (take a key and a set of values, perform a computation, then yield a key/value), and then a final reducer that takes our counts and does some aggregate computations. Note that Mr. Job lets us defined multiple job steps in order to support this:

def steps(self):
  return [
    MRStep(mapper=self.mapper,
      combiner=self.combiner,
      reducer=self.reducer),
    MRStep(reducer=self.aggregate_reducer)
  ]

Here are the mapper, combiner, and the first reducer

def mapper(self, _, line):
  headers = "Index,Arrival_Time,Creation_Time,x,y,x,User,Model,Device,gt"
  values = line.split(',')
 
  #this is the headers row
  if values[0] == 'Index':
    return
 
  row = dict(zip(headers.split(','), values))
 
  for label, mapping in self.MAPPINGS.items():
    column, cast_as, _ = mapping
 
    yield label, cast_as(row[column])
 
  for count in self.COUNTS:
    yield "{0}_count:{1}".format(count, row[count]), 1
 
  yield "Total_count", 1
 
def combiner(self, key, values):
  yield self.handle(key, values)
 
def reducer(self, key, values):
  label, value = self.handle(key,values)
 
  yield (
    None,
      (label, value),
  )

Notice that the reducer yields a “None” key. This ensures that all of the values computed so far get sent to the same reducer for final computation, below:

def aggregate_reducer(self, _, key_values):
  # key_values is a generator
  # however, we need to loop multiple times for aggregates and there is less than a hundred rows, so it's safe
  # to just stuff these in a list
   
  for key_values in key_values:
    key, value = key_value
 
    # yield for print statement
    # yield key, value
 
    # yield for CSV
    yield (
      None,
      (key, "{0}".format(value))
    )
 
  total_count = next(key_value for key_value in key_values if key_value[0] == 'Total count')[1]
  for percentage in self.PERCENTAGES;
    applicable_rows = filter(lambda x: '_count' in x[0] and percentage in x[0], key_values)
 
    for key_values in applicable_rows:
      key, value = key_value
 
      # yield for print statement
      # yield "{0}_percentage:{1}".format(percentage, key.split(':')[1]), float(value)/float(total_count)
 
      # yield for CSV
      yield (
        None,
        ("{0}_percentage:{1}.format(percentage, key.split(':')[1]), float(value)/float(total_count))
      )

To recap:

  • We take a single line from our text file, one at a time, and pass it into the mapper function. This mapper function can be run on multiple mappers at the same time on multiple different machines.
  • We then pass the values that get created from the mapper into a combiner. This combiner is like a pre-computation time-saver; it combines like values on the mapper machines so less values get passed to the reducers. Notice that the combiner and reducer use similar code; this is because the mapper has an incomplete data set (for example, maybe it only gets 100 out of 1000 lines). It would combine the local statistics and pass them through to the reducer (for example, the min of 100 values is passed to the reducer, instead of all 100 values; the final reducer then gets 10 values for 1000 numbers instead of 1000).
  • The output from the mapper (with combiners) step is passed to multiple reducers. Unlike mappers, each reducer gets the complete set of values for a key (for example, so it can get the absolute min/max, not just a local min/max)
  • Finally, in the job above, we yield the key “None” to the last reducer so a single reducer gets all values computed so far. This final reducer then computes percentages of totals and yields those results.

Additional Hadoop tools: Impala / Hive

Now that we’ve seen Hadoop map reduce at a very low level, let’s take a look at some higher-level abstractions that allow us to do similar operations with a much simpler mental model. Hive and Impala are sql-like languages that allow analysis directly on a Hadoop cluster. Once you define a table schema and load in data, you can then query against that data using the Hadoop cluster as the backend.

First, create a schema:

CREATE TABLE IF NOT EXISTS phone_accelerometer (
  Index int,
  Arrival_Time bigint,
  Creation_Time bigint,
  x decimal(38,20),
  y decimal(38,20),
  z decimal(38,20),
  User string,
  Model string,
  Device string,
  gt string
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
;

Next, load the data into the table from HDFS

LOAD DATA INPATH '/user/cloudra/Phones_accelerometer_truncated_impala.csv'
OVERWRITE INTO TABLE phone_accelerometer;

Now you’re able to write SQL-like queries to get results from that dataset in a much simpler fashion than writing a map-reduce job:

SELECT
  user,
  gt as activity,
  count(*) as activity_count,
  count(*) / SUM(COUNT(*)) over (partition by user) * 100 as activity_percent
FROM phone_accelerometer
GROUP BY user, gt

It’s worth nothing that Hive creates Map Reduce jobs, and Impala runs directly against Hadoop and HDFS and is more efficient.

Additional Hadoop tools: Spark

Spark is another high-level tool to allow access to Hadoop; it’s a python-like language that has special operations added in order to support mapping and reducing:

%pyspark
 
import datetime
 
def from_epoch(string):
  return datetime.datetime.fromtimestamp(
    float("{0}.{1}".format(string[:10], string[10:]))
  )
 
lines = sc.textFile('hdfs://sandbox.hortonworks.com/user/admin/Phones_accelerometer_truncated_pig.csv')
 
def extract_line_info(line):
  line = line.split(',')
 
  Index, Arrival_Time, Creation_Time, x, y, z, User, Model, Device, gt = line
 
  dt = from_epoch(Creation_Time)
 
  return dt.year, dt.month, dt.day, gt.replace('\x00', '')
 
# extract info from lines
lines = lines.map(extract_line_info)
 
#get the distinct days per month per activity
lines = lines.map(lambda (year, month, day, activity): ("{0}-{1}:{2}".format(year, month, activity), 1))
 
#count per key
lines = lines.reduceByKey(lambda a, b: a + b)
 
print lines.collect()

Pay close attention to the lines.map and lines.reduceByKey functions; they are special additions to the python language specifically for Spark that are designed to take advantage of the multiprocessing capabilities that Hadoop offers. What is nice about this particular abstraction is that it allows you to write more linear, understandable python code that isn’t bogged down with the mechanics of map reduce jobs. It’s worth noting that Spark runs directly against Hadoop and HDFS, and does not have to use the Map Reduce layer.

Additional Hadoop tools: Pig

Pig is another high-level scripting language that makes analysis on Hadoop easier; it has a very bash-like syntax and can work against files in HDFS:

data = load 'user/cloudera/Phones_accelerometer_truncated_pig.csv' using PigStorage(',');
rows = FOREACH data GENERATE
  $0 as Index,
  GetYear(ToDate((long)SUBSTRING($2, 0, 13))) as Creation_Year,
  GetMonth(ToDate((long)SUBSTRING($2, 0, 13))) as Creation_Month,
  $3 as x,
  $4 as y,
  $5 as z,
  $6 as User,
  $7 as Model,
  $8 as Device,
  $9 as activity
;
 
grp_data = GROUP rows BY (Creation_Year, Creation_Month);
 
agg_data = FOREACH grp_data
  GENERATE group.Creation_Year as year, group.Creation_Month as month,
  COUNT(rows.Index)as activities;
 
dump agg_data;

Getting started

When you’re ready to get started with Hadoop, I’d recommend starting with a Clouderra, Hortonworks or MapR quick start VM. They give a VM that is ready to run Map Reduce jobs (once you install the Mr. Job dependencies on the VM):

Introducing the JBS Quick Launch Lab!

FREE 1/2 Day Assessment

Quantify what it will take to implement your next big idea! Our intensive 1/2 day session will deliver tangible timelines, costs, high-level requirements, and recommend architectures that will work best, and all for FREE. Let JBS show you why over 20 years of experience matters.
Yes, I'd Like A FREE Assessment