Programming with CDH4 Hadoop in Springsource on Ubuntu

(A) Install Ubuntu 12.04 (Precise Pangolin), 64-bit for CDH4

(B) Perform the Prerequisite steps :

Enable SSH :

sudo service ssh restart
ps aux | grep -i ssh
sudo apt-get remove openssh-client openssh-server
sudo apt-get install openssh-client openssh-server 
GENERATE KEY :
$ssh-keygen -t rsa -P " "

DISABLE FIREWALL or OPEN THE FIREWALL for SSH PORT :

$ sudo ufw statusIf it is active, disable it:
$ ufw disableor allow all ports sudo ufw allow 22

DISABLE IPV6 :

http://mysolvedproblem.blogspot.com/2012/05/installing-hadoop-on-ubuntu-linux-on.html
http://www.michael-noll.com/tutorials/running-hadoop-on-ubuntu-linux-single-node-cluster/

You will need to disable IP version 6 because Ubuntu is using 0.0.0.0 IP for different Hadoop configurations. 
You will need to run the following commands using a root account: 
$sudo gedit /etc/sysctl.conf
This command will open sysctl.conf in text editor, you can copy the following lines at the end of the file:

#disable ipv6net.ipv6.conf.all.disable_ipv6 = 1
net.ipv6.conf.default.disable_ipv6 = 1
net.ipv6.conf.lo.disable_ipv6 = 1
$sudo sysctl -p $cat /proc/sys/net/ipv6/conf/all/disable_ipv6

CHECK NETWORK ACCESS :

http://ubuntuforums.org/showthread.php?t=1528205
sudo lsof -i | grep sshnetstat -l --numeric-ports | grep 22
sudo iptables --list
ssh localhost - should work fine
ALLOW PASSWORD-LESS SUDO from SSH
root user : sudo passwd -u root/etc/ssh/sshd_config 
sudo su - rootssh-keygen (to make sure root has the .ssh directory etc) add your public key to *.ssh/authorized_keys* 
check your sshd_config settings are correct one last time >> 
PermitRootLogin yes
RSAAuthentication yes
PubkeyAuthentication yes

Ensure those settings take effect by restart ssh: /etc/init.d/ssh restart
https://help.ubuntu.com/community/SSH/OpenSSH/Configuring

ADD USER to SUDOERS ::

sudo su chmod 0440 /etc/sudoers 
visudo
kaniska ALL=(ALL) NOPASSWD:ALL

VERIFY PORTS :

Ensure that ports required by CDH are available !
https://ccp.cloudera.com/display/ENT4DOC/Configuring+Ports+for+Cloudera+Managernetstat -tlnp

udp ports :  netstat -ulnp
netstat -n | grep :<port-number>
fuser -n tcp PORT
lsof -i tcp@localhost:25
lsof -P | grep :<port-number>

look for open ports in a range :
nc -z host.example.com 20-30
nmap -sU example.com -p 68

http://serverfault.com/questions/14429/how-do-i-find-out-whether-a-port-is-available-on-ubuntu-8-04
http://serverfault.com/questions/188429/how-to-check-if-a-port-is-open-for-remote-systemubuntu

CORRECT LOOP BACK ADDRESS :

NOTES : http://denizdemir.com/2012/03/02/running-hadoop-cluster/
Make sure your /etc/hosts files have proper entries for the nodes in the cluster. 
Try to avoid lines like this:
127.0.1.1 server-61257.localdomain server-61257
Instead, only have localhost entry for loopback IP:
127.0.0.1 localhost

(C) Install CDH4

Cloudera has simplified the process of installing the complete stack of Hadoop on single node.  Following is a great reference : https://ccp.cloudera.com/display/CDH4DOC/Installing+CDH4+on+a+Single+Linux+Node+in+Pseudo-distributed+Mode.

If the prerequisite steps performed correctly,  one can expect to see hdfs and map reduce services up and running http://localhost:7180/cmf/services/status

(D) Prepare Development workspace

We assume that Hdfs and MR services are up and running. Its okay if there are issues with other services like hive , hue, oozie. We can revisit them later on, once we are done with setting up basic application running in pur programming environment.

First, we install Springsource Toolsuite (STS) and launch it through a script. Ubuntu poses some challenge while displaying STS.

#!/bin/bash
export UBUNTU_MENUPROXY=0
/home/kaniska/springsource/sts-3.1.0.RELEASE/STS

Make sure Open-Jdk is uninstalled and Oracle-Jdk is installed in the system.

Next install maven3 and specify the ‘user settings’ [/usr/share/maven3/conf/settings.xml] and ‘local repo path’  [/home/kaniska/.m2/repository] for maven in STS.

(E) Create your own Application

First setup the project in STS. Import the following spring-hadoop sample app into the workspace. Rename the project to whatever you like.

https://github.com/SpringSource/spring-data-book/tree/master/hadoop/wordcount

Add the correct maven dependency in pom (per the CDH4 version) to connect to Cloudera Hadoop (installed through steps A…C). Note in our case it was 2.0.0-cdh4.1.2

<repository>
      <id>cloudera</id>
      <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.0.0-cdh4.1.2</version>
<scope>compile</scope>
</dependency>            

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>2.0.0-mr1-cdh4.1.2</version>
<scope>compile</scope>
</dependency>            

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.0.0-cdh4.1.2</version>
<scope>compile</scope>
</dependency>    

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-examples</artifactId>
<version>2.0.0-mr1-cdh4.1.2</version>
<scope>compile</scope>
</dependency>

Finally its time to create some cool app and debug the code from STS.

Lets download the US baseball data. wget  ‘http://seanlahman.com/files/database/lahman591-csv.zip&#8217;

Then we copy the data into the folder user/hadooptest/data/baseball/ (The folder should have root access )

Now we can create a simple MR program to calculate total runs per team for a specific year.

package com.hadoop.analyze.bigdata;
import .......
public class BaseballDataAnalyzer {
     
      public static class Map extends MapReduceBase 
          implements Mapper<LongWritable, Text, Text, IntWritable> {          
             private Text team = new Text();

            public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable>  output, Reporter reporter) throws IOException {
         // Convert the value from Text to a String so we can use the StringTokenizer on it.
                 String line = value.toString();
                 if(line.startsWith("playerID")) {
                  return;    
                 }
                 
                 // Split the line into fields, using comma as the delimiter
                 StringTokenizer tokenizer = new StringTokenizer(line, ",");
                 // We only care about (Team Id, Year, Run Count)
                 String teamId = null;
                 String year = null;
                 String runCount = null;
                 
                 for (int i = 0; i < 10 && tokenizer.hasMoreTokens(); i++) {
                     switch (i) {
                     case 1:
                         year = tokenizer.nextToken();
                         break;
                     case 3:
                         teamId = tokenizer.nextToken();
                         break;
                     case 8:
                         runCount = tokenizer.nextToken();
                         break;
                     default:
                         tokenizer.nextToken();
                         break;
                     }
                 }
                 if (teamId == null || year == null || runCount == null) {
                     // This is a bad record, throw it out
                     System.err.println("Warning, bad record!");
                     return;
                 }
                 if(!year.trim().equals("1956")){
                     return;
                 }                
                 {
                     team.set(teamId);
                     IntWritable runs = new IntWritable(Integer.valueOf(runCount));
                     output.collect(team, runs);
                 }
             }
         }

     public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
           public void reduce(Text team, Iterator<IntWritable> values, 
                   OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
             int totalRuns = 0; // 
             while (values.hasNext()) {
                 int currentRuns = values.next().get();
                 totalRuns += currentRuns;
             }
             output.collect(team, new IntWritable(totalRuns));
           }
         }

    /**
      * @param args
      */
     public static void main(String[] args) {
            String[] params = new String[]{ "hdfs://localhost:8020/user/hadooptest/data/baseball/"  
                   ,"hdfs://localhost:8020/user/hadooptest/baseball_output1"};    
           
           JobConf conf = new JobConf(BaseballDataAnalyzer.class);
           conf.setJobName("Baseball Runs Analyzer");
           conf.setOutputKeyClass(Text.class);
           conf.setOutputValueClass(IntWritable.class);
           conf.setMapperClass(Map.class);
           conf.setReducerClass(Reduce.class);
           conf.setInputFormat(TextInputFormat.class);
           conf.setOutputFormat(TextOutputFormat.class);

           FileInputFormat.setInputPaths(conf, new Path(params[0]));
           FileOutputFormat.setOutputPath(conf, new Path(params[1]));
          try {
             JobClient.runJob(conf);
         } catch (IOException e) {
             // TODO Auto-generated catch block
             e.printStackTrace();
         }
 }

(F) Verify the output data and debug MR logic

We can now verify the sum of totalRuns per team in the year of 1956 by looking into the files /user/hadooptest/baseball_output1

If any map/reduce logic is changed in code, we can put a debug point in STS to verify our logic. As an exercise we can try to find Top 10 best performing teams for a given year.

Now lets create a simple python script to verify that our Java MR logic produces correct output. This is just for fun 🙂

** It is assumed that the following program is executed from the folder where the Batting.csv file was extracted (link is specified in previous step )

import sys
text = file(“Batting.csv”,”r”)
dict1 = {}
total=0
for line in text:
  data = line.split(‘,’)
  if data[0] in (‘playerID’): continue
  if data[1] not in (‘1956’): continue
  if (data[3] is None or data[8] is None or data[1] is None): continue
  run= int(data[8])
  team = data[3]

  if (team in dict1.keys()): total = dict1[team]
  else: total = 0
  total += run
  dict1[team] = total
for team in dict1:
    print team, ‘ has total runs ‘, dict1[team]

> python test.py
CHA  has total runs  776
PHI  has total runs  668
BOS  has total runs  780
CLE  has total runs  712
NY1  has total runs  540
DET  has total runs  789

The output data should match with the Team:TotalRuns count as reported in the following MR output file :  hdfs hadoop fs -cat /user/hadooptest/baseball_output1/part-00000

Advertisements