Panel |
---|
Description of MapReduce concept: |
http://java.dzone.com/articles/how-hadoop-mapreduce-works
Idea of mapper & reducer
Data flow for a single MapReduce job
Hadoop - my first MapReduce (M/R) code
- install VMware on the local machine
- import Hadoop server from http://www.cloudera.com/hadoop-training-virtual-machine
- fire VM up
- To use Streamers add system variable:
export SJAR=/usr/lib/hadoop/contrib/streaming/hadoop-0.20.1+133-streaming.jar
Now you can writhe M/R in any language - upload Shakespeare text to HDFS (Hadoop Distributed File System)
cd ~/git/data (is distributed w/ Cloudera VM)
tar vzxf shakespeare.tar.gz
check nothing is in HDFShadoop fs -ls /user/training - add text file to HDFS hadoop fs -put input /user/training/inputShak
and check again hadoop fs -ls /user/training - Execute M/R job under hadoop using 'cat' & 'wc'
Code Block hadoop jar $SJAR \ -mapper cat \ -reducer wc \ -input inputShak \ -output outputShak
- inspect output in
hadoop fs -cat outputShak/p*
175376 948516 5398106
- inspect output in
Task: write own Map & Reduce counting frequency of words:
mapper : read text data from stdin
write "<key> <value>" to stdout (<key>=word, <value>=1)
example:
$ echo "foo foo quux labs foo bar quux" | ./mapper.py
- Python Mapper
reducer : read a stream of "<word> 1" from stdinCode Block title mapp1.py borderStyle solid #!/usr/bin/env python # my 1st mapper: writes <word> 1 import sys data = sys.stdin.readlines() for ln in data: L=ln.split() for key in L:scp balewski@deltag5.lns.mit.edu:"0x/mySetup.sh" . if len(key)>1: print key,1
write "<word> <count>" to stdoutCode Block title redu1.py borderStyle solid #!/usr/bin/env python # my 1st reducer: reads: <word> <vlue., sums key values from the same consecutive key, writes <word> <sum> import sys data = sys.stdin.readlines() myKey="" myVal=0 for ln in data:scp balewski@deltag5.lns.mit.edu:"0x/mySetup.sh" . #print ln, L=ln.split() #print L nw=len(L)/2 for i in range(nw): #print i key=L[0+2*i] val=int(L[2*i+1]) #print key,val,nw if myKey==key: myVal=myVal+val else: if len(myKey)>0: print myKey, myVal, myKey=key myVal=val if len(myKey)>0: print myKey, myVal,
- Execute:
Code Block hadoop jar $SJAR \ -mapper $(pwd)/mapp1.py \ -reducer $(pwd)/redu1.py \ -input inputShak \ -output outputShak3
- Python Mapper
Cloudera - my 1st EC2 cluster deployed
- follow instruction http://archive.cloudera.com/docs/ec2.html
- Item 2.1 . I uploaded 3 tar files: 'client script', 'boto', and 'simplejson'.
- Un-tarred all 3: tar vzxf ....
- execute twice , in 'boto' and 'simplejson' directories
Code Block sudo python setup.py install
- move hadoop-ec2 to permanent place & add to path (for easier use)
Code Block tar vzxf cloudera-for-hadoop-on-ec2-py-0.3.0-beta..tar.gz sudo mv cloudera-for-hadoop-on-ec2-py-0.3.0-beta /opt/ export HADOOP_EC2_HOME=/opt/cloudera-for-hadoop-on-ec2-py-0.3.0-beta export PATH=$PATH:$HADOOP_EC2_HOME
- exported by hand environment variables
AWS_ACCESS_KEY_ID - Your AWS Access Key ID
AWS_SECRET_ACCESS_KEY - Your AWS Secret Access Key - create a directory called ~/.hadoop-ec2 w/ file ec2-clusters.cfg with content:
Code Block [my-hadoop-cluster] ami=ami-6159bf08 instance_type=m1.small key_name=janAmazonKey2 availability_zone=us-east-1a private_key=/home/training/.ec2/id_rsa-janAmazonKey2 ssh_options=-i %(private_key)s -o StrictHostKeyChecking=no
- filer a cluster of 1 server+2 nodes
hadoop-ec2 launch-cluster my-hadoop-cluster 2 - define bunch of variables in /usr/src/hadoop/contrib/ec2/bin/hadoop-ec2-env.sh
- Item 2.1 . I uploaded 3 tar files: 'client script', 'boto', and 'simplejson'.
1+2 machines are up and running:
Code Block |
---|
Waiting for master to start (Reservation:r-d0d424b8) ............................................... master i-1cbe3974 ami-6159bf08 ec2-67-202-49-216.compute-1.amazonaws.com ip-10-244-182-63.ec2.internal running janAmazonKey c1.medium 2009-10-26T02:31:17.000Z us-east-1c Waiting for slaves to start ............................................. slave i-f2be399a ami-6159bf08 ec2-67-202-14-97.compute-1.amazonaws.com ip-10-244-182-127.ec2.internal running janAmazonKey c1.medium 2009-10-26T02:32:19.000Z us-east-1c slave i-f4be399c ami-6159bf08 ec2-75-101-201-30.compute-1.amazonaws.com ip-10-242-19-81.ec2.internal running janAmazonKey c1.medium 2009-10-26T02:32:19.000Z us-east-1c Waiting for jobtracker to start Waiting for 2 tasktrackers to start .................................................1.............2. Browse the cluster at http://ec2-67-202-49-216.compute-1.amazonaws.com/ |
To login to cluster execute:
hadoop-ec2 login my-hadoop-cluster
To terminate cluster execute:
hadoop-ec2 terminate-cluster my-hadoop-cluster
and say 'yes' !!
To Deleting the EC2 security groups
hadoop-ec2 delete-cluster my-hadoop-cluster
To actually brows them I need to:
- execute ./hadoop-ec2 proxy my-hadoop-cluster
Resulting with:
export HADOOP_EC2_PROXY_PID=20873;
echo Proxy pid 20873; - and add proxy to the fireFox - so far it did not worked.
Functional packages I made
- Very simple word count example which works TAR BALL
- Compute PageRank TAR BALL for medium-set wiki-pages from the Harvard class by Hanspeter Pfister, code still have problems in deploying it. It crashes on reduce if full 11M lines is processed on 10-node EC2 cluster
- here is tar-ball of the finale version of my code
- to upload it more often I used command
scp balewski@deltag5.lns.mit.edu:"0x/mySetup.sh" .
./mySetup.sh -f l -v11 -D
it containsCode Block training 495 2009-11-11 20:25 abcd-pages training 290 2009-11-11 20:25 cleanup.py training 1374 2009-11-14 19:47 mappPR.py training 2302 2009-11-14 18:30 pageRankCommon.py training 2648 2009-11-14 18:31 pageRankCommon.pyc training 1034 2009-11-14 19:25 reduPR.py training 7251 2009-11-14 11:33 runPageRank.sh training 1806 2009-11-14 18:34 wiki2mappPR.py
- to upload data set to hadoop HDFS by hand I did
hadoop fs -put wL-pages-iter0 wL-pages-iter0 - to execute full map/reduce job w/ 3 iterations:
cleaniup all, write raw file, use 4map+2red, init Map, 3 x M/R, final sort
/runPageRank.sh -X -w -D 4.2 -m -I 0.3 -i -f
- to upload it more often I used command