#!/usr/bin/env python

import kimps, drmaa

 

def argsOption():

    usage = "usage: %prog -o ./cluster_out/ ./input_dir/"

    parser = kimps.OptionParser( usage=usage )

    parser.add_option( "-o", "--cluster_out",dest="cluster_out",help="out stream of sge")

    (options, args) = parser.parse_args()

    if len(args) != 1:

        parser.error("incorrect number of arguments")

    return options, args

 

# main

options,args = argsOption()

input_dir = args[0]

 

inpath = input_dir

outpath = options.cluster_out

if not(outpath): outpath = inpath

 

s = drmaa.Session()

s.initialize()

jt = s.createJobTemplate()

joblist = []

#kimps_ctime.stime( kimps.sys._getframe().f_code.co_filename )

inlist = kimps.glob.glob( inpath + '*.sh' )

for onein in inlist:

    kimps.os.chmod( onein, 0766 )

    inname = onein.split('/')[-1]

    innametag = '.'.join(inname.split('.')[:-1])

 

    jt.remoteCommand = onein

    jt.nativeSpecification = " -V -S /bin/bash -j y -cwd -o "+outpath+innametag+".qout"

    jt.joinFiles = True

    joblist.append( s.runJob( jt ) )

 

s.synchronize( joblist, drmaa.Session.TIMEOUT_WAIT_FOREVER, False)

 

for curjob in joblist:

#   print "Collecting job "+curjob

    retval = s.wait( curjob, drmaa.Session.TIMEOUT_WAIT_FOREVER )

#   print 'Job: '+ str(retval.jobId ) + 'finished with status '+ str(retval.hasExited)

 

s.deleteJobTemplate( jt )

s.exit()

Posted by 옥탑방람보
,

#!/usr/bin/env python

import kimps, drmaa

 

def argsOption():

    usage = "usage: %prog -o ./cluster_out/ ./input_dir/"

    parser = kimps.OptionParser( usage=usage )

    parser.add_option( "-o", "--cluster_out",dest="cluster_out",help="out stream of sge")

    (options, args) = parser.parse_args()

    if len(args) != 1:

        parser.error("incorrect number of arguments")

    return options, args

 

# main

options,args = argsOption()

input_dir = args[0]

 

inpath = input_dir

outpath = options.cluster_out

if not(outpath): outpath = inpath

 

s = drmaa.Session()

s.initialize()

jt = s.createJobTemplate()

joblist = []

#kimps_ctime.stime( kimps.sys._getframe().f_code.co_filename )

inlist = kimps.glob.glob( inpath + '*.sh' )

for onein in inlist:

    kimps.os.chmod( onein, 0766 )

    inname = onein.split('/')[-1]

    innametag = '.'.join(inname.split('.')[:-1])

 

    jt.remoteCommand = onein

    jt.nativeSpecification = " -V -S ~/bin/python -j y -cwd -o "+outpath+innametag+".qo"

    jt.joinFiles = True

    joblist.append( s.runJob( jt ) )

 

s.synchronize( joblist, drmaa.Session.TIMEOUT_WAIT_FOREVER, False)

 

for curjob in joblist:

#   print "Collecting job "+curjob

    retval = s.wait( curjob, drmaa.Session.TIMEOUT_WAIT_FOREVER )

#   print 'Job: '+ str(retval.jobId ) + 'finished with status '+ str(retval.hasExited)

 

s.deleteJobTemplate( jt )

s.exit()

Posted by 옥탑방람보
,

[linux] chattr, lsattr

TA/Common 2012. 12. 13. 16:47

$> chattr +i 파일이름

파일을 수정도 내용추가도 삭제도 할 수 없다.

 

$> chattr +a 파일이름

파일을 내용만 계속 추가할 수 있고 파일명수정이나 삭제는 할 수 없다.

 

$> lsattr

chattr로 인해 파일 속성이 변경되었는지를 확인한다.

 

* 파일의 보호해제 방법은 각각,

chattr -i 파일이름

chattr -a 파일이름

 

*chattr, lsattr 은 오직 root 만이 사용할 수 있다.

Posted by 옥탑방람보
,

여기에서는 /var/log/secure 에 대한 로그파일만 대상으로 함.

(첨부파일에 syslog에 대해 자세히 나와 있음)

 

1. syslog에서 떨어지는 secure 파일 위치 : /var/log/secure

 

2. logrotate 사용하여 떨어지는 secure 파일 제어

    $> vi /etc/logrotate.d/syslog

        /var/log/secure {

             sharedscripts

             postrotate

                 /bin/kill -HUP `cat /var/run/syslogd.pid 2> /dev/null` 2> /dev/null || true

                 /bin/kill -HUP `cat /var/run/rsyslogd.pid 2> /dev/null` 2> /dev/null || true

             endscript

             rotate 31                      # 한달분량 저장

             daily                              # 매일 파일 분리

             create 600 root root

          }

    $> cat /etc/cron.daily/logrotate    # 매일 실행

    /usr/bin/logroate /etc/logratate.conf

3. 서버내 이중저장

    $> vi /etc/syslog.conf

        authpriv.*                  /var/log/secure

        authpriv.*                  /root/logBackup/secure

    $> /etc/init.d/syslog restart

4. 서버 외부 저장

    <로그보낼서버>

    $> vi /etc/syslog.conf

       authpriv.*                   @XXX.XXX.XXX.XXX

    $> /etc/init.d/syslog restart

    <로그받을서버>

    $> vi /etc/sysconfig/syslog 파일내에 설정된

        SYSLOGD_OPTIONS=“-r –m 0” 로 변경

    $> /etc/init.d/syslog restart

    * 두 서버 모두의 hosts 파일 설정으로 hostname으로 기록 가능

5. /var/log/secure 파일 파싱

    $> vi montVarLogSecure.py

    #!/usr/bin/env python

    import sys, re, os, time

    def getWho():

        whosys = os.popen( 'who' ).read()

        return whosys

    def getScreen():

        screensys = os.popen( "screen -wls | awk -F 'in' '{print $1}'" ).read()

        return screensys

    def sucLogin( logfile ):

        loginsys = os.popen( "cat "+logfile+" | grep 'Accepted' | awk '{print $1 \" \" $2 \" \" $3 \" User: \" $9 \" \" }'" ).read()

    #   print "cat "+logfile+" | grep 'Accepted' | awk '{print $1 \" \" $2 \" \" $3 \" User: \" $9 \" \" }'"

        return loginsys

    def sudoSucLogin( logfile ):

        sudologin = os.popen( "cat "+logfile+" | grep 'session opened for user root' | awk '{print $1 \" \" $2 \" \" $3 \" Sudo User: \" $13 \" \" }' " ).read()

        return sudologin

    def invalidUser( logfile ):

        invalid = os.popen( "cat "+logfile+" | grep 'Invalid user' " ).read()

        return invalid

    def failedPassword( logfile ):

        failed = os.popen( "cat "+logfile+" | grep -v invaild | grep 'Failed password' ").read()

        return failed

    def refusedUser( logfile ):

        refused = os.popen( "cat "+logfile+" | grep 'refused' " ).read()

        return refused

    def deniedUser( logfile ):

        denided = os.popen( "cat "+logfile+" | grep -v cron | grep 'access denied\}Permission denied'" ).read()

        return denided

    def fatalNotices( logfile ):

        fatal = os.popen( "cat "+logfile+" | grep ssh | grep 'Permission denied\|fatal\|error' " ).read()

        return fatal

    def expireDate( logfile ):

        expire = os.popen( "cat "+logfile+" | grep 'changed password expiry\|expiration from' " ).read()

        return expire

    def top5ip( logfile ):

        top5 = os.popen( "awk 'gsub(\".*sshd.*Failed password for (invalid user )?\", \"\") {print $3}' "+logfile+" | sort | uniq -c | sort -rn | head -5" ).read()

        return top5

     try:

            if sys.argv[1:]:

                    logfile = sys.argv[1]

            else:

                    logfile = raw_input( "Please enter a log file to parse, e.g. /var/log/secure " )

            if sys.argv[2:]:

                    outpath = sys.argv[2]

            else:

                    outpath = raw_input( "Please enter a out path, 2.g. ~/logOut/ " )

            os.system( 'mkdir -p '+outpath )

            outfile = outpath + time.strftime( '%Y%m%d%H%M%S' )

            ofh = open( outfile,'w' )

            print >> ofh, "# Who is online: "

            print >> ofh, getWho()

            print >> ofh, "# Active Screen Sessions: "

            print >> ofh, getScreen()

            print >> ofh, "# List out successful ssh login attempts: "

            print >> ofh, sucLogin( logfile )

            print >> ofh, "# List out successful ssh login attempts from sudo users: "

            print >> ofh, sudoSucLogin( logfile )

            print >> ofh, "# List out ssh login attempts from non-existing and unauthorized user accounts: "

            print >> ofh, invalidUser( logfile )

            print >> ofh, "# List out ssh login attempts by authorized ssh accounts with failed password: "

            print >> ofh, failedPassword( logfile )

            print >> ofh, "# List out refused ssh login attempts: "

            print >> ofh, refusedUser( logfile )

            print >> ofh, "# List out denied ssh login attempts: "

            print >> ofh, deniedUser( logfile )

            print >> ofh, "# List out fatal and miscellaneous ssh session/restart notices: "

            print >> ofh, fatalNotices( logfile )

            print >> ofh, "# List out all successful system account expireation date changes: "

            print >> ofh, expireDate( logfile )

            print >> ofh, "# Top 5 attacker IP adresses: "

            print >> ofh, top5ip( logfile )

            ofh.close()

    except IOError, (errno, strerror):

            print "I/O Error (%s) : %s" % (errno,strerror)

     

    $> vi /etc/cron.daily/exe.cron

    python /usr/local/bin/montVarLogSecure.py /var/log/secure ~/logOut/

 

Posted by 옥탑방람보
,

윈도우PC 에서 리눅스 서버으로 SSH 접속 시 X11 활성화 방법

 

1.     SSH X11 Forwarding Enable

A.     $> vi /etc/ssh/sshd_config

X11Forwarding yes

B.      $> /etc/init.d/sshd reload

2.     Xming 설치 및 실행

A.     다운로드: http://sourceforge.net/projects/xming/

B.      설치

C.      실행 실행여부확인은 트레이아이콘 생성이 되어 있으면 완료

3.     Xshell 로 접속시 (Xming 실행 후)

A.     다운로드: http://www.netsarang.co.kr/download/main.html

B.      설치

C.      파일 열기 열고자 하는 항목에서 우측클릭 등록정보 터널링 – “X11 연결을 다음으로 포워드합니다.” 체크 – X DISPLAY 선택 – localhost:0

D.     해당항목 접속

4.     Putty 로 접속시 (Xming 실행 후)

A.     다운로드: http://www.chiark.greenend.org.uk/~sgtatham/putty/download.html

B.      설치

C.      실행 – Category 에서 SSH 밑에 X11 선택 – “Enable X11 forwarding” 체크 – X display location 항목에 localhost:0 입력

D.     해당항목 접속

 

Posted by 옥탑방람보
,

1. 패스워드 사용 기간 제한

$> vi /etc/login.defs

PASS_MAX_DAYS 90

PASS_MIN_DAYS 0

PASS_MIN_LEN   8

PASS_WARN_AGE  7

 

2. 계정 잠금 허용

$> vi /etc/default/useradd

INACTIVE=0

 

3. 기타 비밀번호 설정

$>vi /etc/pam.d/system-auth

auth        required      pam_env.so

auth        required      pam_tally.so per_user

auth        sufficient    pam_unix.so nullok try_first_pass

auth        requisite     pam_succeed_if.so uid >= 500 quiet

auth        required      pam_deny.so

 

account     required      pam_unix.so

account     sufficient    pam_succeed_if.so uid < 500 quiet

account     required      pam_permit.so

password    requisite     pam_cracklib.so try_first_pass retry=3 minlen=8 ucredit=-1 dcredit=-1 ocredit=-1 lcredit=-1

password    sufficient    pam_unix.so md5 shadow nullok try_first_pass use_authtok

password    required      pam_deny.so

 

session     optional      pam_keyinit.so revoke

session     required      pam_limits.so

session     [success=1 default=ignore] pam_succeed_if.so service in crond quiet use_uid

session     required      pam_unix.so

 

(설명)

auth        required      pam_tally.so per_user

--> faillog 설정에 따름.

password    requisite     pam_cracklib.so try_first_pass retry=3 minlen=8 ucredit=-1 dcredit=-1 ocredit=-1 lcredit=-1

-->

retry=N : 패스워드 입력 실패 시 재시도횟수             

difok=N : 기존 패스워드와 비교. 기본값10 (50%)             

minlen=N :  크레디트를 더한 패스워드최소길이               

dcredit=N : 숫자에 주어지는 크레디트값. 기본 1             

udredit=N : 영어대문자에 주어지는 크레디트값               

lcredit=N : 영어 소문자에 주어지는 크레디트값              

ocredit=N : 숫자, 영어대/소문자를 제외한 기타문자

(각 항목에서 -1 값을 주면 반드시 해당하는 문자를 포함시켜야 함. 즉 dcredit=-1 이라면 패스워드에 숫자가 반드시 포함되어야 함.)

 

4. 계정 접속 제한 설정

*** faillog -m 3 시 /var/log/faillog 파일이 128G 로 되는 버그 fix 방법 ***

$> patch < shadow-4.0.17-setmax.path

    파일위치: /usr/bin/faillog

$> faillog -u userid -m 3  #계정을 3번까지 잘못입력하는 것 허용

$> faillog -u root -m 0   #root 계정은 max 값 0로 주어 제한 없이 사용가능

 

5. 잠긴 계정 활성화

$> faillog -u [userId] -r

 

6. root의 경우 해당 경우에서 제외

$> faillog -u root -m 0

 

 

 

 

 

 

--------------- 여기서 부터는 과거 자료 ------------------------------------------------

 

1. 패스워드 사용 기간 제한

$> vi /etc/login.defs

PASS_MAX_DAYS 90

PASS_MIN_DAYS 0

PASS_MIN_LEN   8

PASS_WARN_AGE  7

 

2. 계정 잠금 허용

$> vi /etc/default/useradd

INACTIVE=0

 

3. 기타 비밀번호 설정

$>vi /etc/pam.d/system-auth

auth        required      pam_env.so

auth        required      pam_tally.so onerr=fail deny=3 reset

auth        sufficient    pam_unix.so nullok try_first_pass

auth        requisite     pam_succeed_if.so uid >= 500 quiet

auth        required      pam_deny.so

 

account     required      pam_unix.so

account     sufficient    pam_succeed_if.so uid < 500 quiet

account     required      pam_permit.so

password    requisite     pam_cracklib.so try_first_pass retry=3 minlen=8 ucredit=-1 dcredit=-1 ocredit=-1 lcredit=-1

password    sufficient    pam_unix.so md5 shadow nullok try_first_pass use_authtok

password    required      pam_deny.so

 

session     optional      pam_keyinit.so revoke

session     required      pam_limits.so

session     [success=1 default=ignore] pam_succeed_if.so service in crond quiet use_uid

session     required      pam_unix.so

 

(설명)

auth        required      pam_tally.so onerr=fail deny=3 reset

--> 3번이상 접근실패시 계정 잠금

password    requisite     pam_cracklib.so try_first_pass retry=3 minlen=8 ucredit=-1 dcredit=-1 ocredit=-1 lcredit=-1

-->

retry=N : 패스워드 입력 실패 시 재시도횟수             

difok=N : 기존 패스워드와 비교. 기본값10 (50%)             

minlen=N :  크레디트를 더한 패스워드최소길이               

dcredit=N : 숫자에 주어지는 크레디트값. 기본 1             

udredit=N : 영어대문자에 주어지는 크레디트값               

lcredit=N : 영어 소문자에 주어지는 크레디트값              

ocredit=N : 숫자, 영어대/소문자를 제외한 기타문자

(각 항목에서 -1 값을 주면 반드시 해당하는 문자를 포함시켜야 함. 즉 dcredit=-1 이라면 패스워드에 숫자가 반드시 포함되어야 함.)

 

4. 잠긴 계정 활성화

$> faillog -u [userId] -r

 

5. root의 경우 해당 경우에서 제외

$> faillog -u root -m 0

Posted by 옥탑방람보
,

1. Install Gtk rpm

http://rpm.pbone.net/index.php3/stat/4/idpl/17756735/dir/fedora_17/com/gtkmm24-2.24.2-3.fc17.x86_64.rpm.html

2. Install gparted

http://rpmfind.net/linux/rpm2html/search.php?query=gparted&submit=Search+...&system=fedora&arch=

Posted by 옥탑방람보
,

http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/

 

 

 

Writing An Hadoop MapReduce Program In Python

by Michael G. Noll on September 21, 2007 (last updated: June 17, 2012)

In this tutorial, I will describe how to write a simple MapReduce program for Hadoop in the Python programming language.


Motivation

Even though the Hadoop framework is written in Java, programs for Hadoop need not to be coded in Java but can also be developed in other languages like Python or C++ (the latter since version 0.14.1). However, the documentation and the most prominent Python example on the Hadoop home page could make you think that youmust translate your Python code using Jython into a Java jar file. Obviously, this is not very convenient and can even be problematic if you depend on Python features not provided by Jython. Another issue of the Jython approach is the overhead of writing your Python program in such a way that it can interact with Hadoop – just have a look at the example in<HADOOP_INSTALL>/src/examples/python/WordCount.py and you see what I mean. I still recommend to have at least a look at the Jython approach and maybe even at the new C++ MapReduce API called Pipes, it’s really interesting.

Having that said, the ground is prepared for the purpose of this tutorial: writing a Hadoop MapReduce program in a more Pythonic way, i.e. in a way you should be familiar with.

What we want to do

We will write a simple MapReduce program (see also Wikipedia) for Hadoop in Python but without using Jython to translate our code to Java jar files.

Our program will mimick the WordCount example, i.e. it reads text files and counts how often words occur. The input is text files and the output is text files, each line of which contains a word and the count of how often it occured, separated by a tab.

Note: You can also use programming languages other than Python such as Perl or Ruby with the “technique” described in this tutorial. I wrote some words about what happens behind the scenes. Feel free to correct me if I’m wrong.

Prerequisites

You should have an Hadoop cluster up and running because we will get our hands dirty. If you don’t have a cluster yet, my following tutorials might help you to build one. The tutorials are tailored to Ubuntu Linux but the information does also apply to other Linux/Unix variants.

Python MapReduce Code

The “trick” behind the following Python code is that we will use HadoopStreaming (see also the wiki entry) for helping us passing data between our Map and Reduce code via STDIN (standard input) and STDOUT (standard output). We will simply use Python’s sys.stdin to read input data and print our own output to sys.stdout. That’s all we need to do because HadoopStreaming will take care of everything else! Amazing, isn’t it? Well, at least I had a “wow” experience…

Map: mapper.py

Save the following code in the file /home/hduser/mapper.py. It will read data from STDIN (standard input), split it into words and output a list of lines mapping words to their (intermediate) counts to STDOUT (standard output). The Map script will not compute an (intermediate) sum of a word’s occurrences. Instead, it will output “<word> 1″ immediately – even though the <word> might occur multiple times in the input – and just let the subsequent Reduce step do the final sum count. Of course, you can change this behavior in your own scripts as you please, but we will keep it like that in this tutorial because of didactic reasons :-)

Make sure the file has execution permission (chmod +x /home/hduser/mapper.py should do the trick) or you will run into problems.

01 #!/usr/bin/env python
02   
03 import sys
04   
05 # input comes from STDIN (standard input)
06 for line in sys.stdin:
07     # remove leading and trailing whitespace
08     line = line.strip()
09     # split the line into words
10     words = line.split()
11     # increase counters
12     for word in words:
13         # write the results to STDOUT (standard output);
14         # what we output here will be the input for the
15         # Reduce step, i.e. the input for reducer.py
16         #
17         # tab-delimited; the trivial word count is 1
18         print '%s\t%s' % (word, 1)

Reduce: reducer.py

Save the following code in the file /home/hduser/reducer.py. It will read the results of mapper.py from STDIN (standard input), and sum the occurrences of each word to a final count, and output its results to STDOUT (standard output).

Make sure the file has execution permission (chmod +x /home/hduser/reducer.py should do the trick) or you will run into problems.

01 #!/usr/bin/env python
02   
03 from operator import itemgetter
04 import sys
05   
06 current_word = None
07 current_count = 0
08 word = None
09   
10 # input comes from STDIN
11 for line in sys.stdin:
12     # remove leading and trailing whitespace
13     line = line.strip()
14   
15     # parse the input we got from mapper.py
16     word, count = line.split('\t', 1)
17   
18     # convert count (currently a string) to int
19     try:
20         count = int(count)
21     except ValueError:
22         # count was not a number, so silently
23         # ignore/discard this line
24         continue
25   
26     # this IF-switch only works because Hadoop sorts map output
27     # by key (here: word) before it is passed to the reducer
28     if current_word == word:
29         current_count += count
30     else:
31         if current_word:
32             # write result to STDOUT
33             print '%s\t%s' % (current_word, current_count)
34         current_count = count
35         current_word = word
36   
37 # do not forget to output the last word if needed!
38 if current_word == word:
39     print '%s\t%s' % (current_word, current_count)

Test your code (cat data | map | sort | reduce)

I recommend to test your mapper.py and reducer.py scripts locally before using them in a MapReduce job. Otherwise your jobs might successfully complete but there will be no job result data at all or not the results you would have expected. If that happens, most likely it was you (or me) who screwed up.

Here are some ideas on how to test the functionality of the Map and Reduce scripts.

 # very basic test
 hduser@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hduser/mapper.py
 foo     1
 foo     1
 quux    1
 labs    1
 foo     1
 bar     1
 quux    1
hduser@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hduser/mapper.py | sort -k1,1 | /home/hduser/reducer.py
 bar     1
 foo     3
 labs    1
 quux    2
 # using one of the ebooks as example input
 # (see below on where to get the ebooks)
 hduser@ubuntu:~$ cat /tmp/gutenberg/20417-8.txt | /home/hduser/mapper.py
 The     1
 Project 1
 Gutenberg       1
 EBook   1
 of      1
 [...]
 (you get the idea)

Running the Python Code on Hadoop

Download example input data

We will use three ebooks from Project Gutenberg for this example:

Download each ebook as text files in Plain Text UTF-8 encoding and store the files in a temporary directory of choice, for example /tmp/gutenberg.

hduser@ubuntu:~$ ls -l /tmp/gutenberg/
total 3604
-rw-r--r-- 1 hduser hadoop  674566 Feb  3 10:17 pg20417.txt
-rw-r--r-- 1 hduser hadoop 1573112 Feb  3 10:18 pg4300.txt
-rw-r--r-- 1 hduser hadoop 1423801 Feb  3 10:18 pg5000.txt
hduser@ubuntu:~$

Copy local example data to HDFS

Before we run the actual MapReduce job, we first have to copy the files from our local file system to Hadoop’s HDFS.

hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -copyFromLocal /tmp/gutenberg /user/hduser/gutenberg
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls
Found 1 items
drwxr-xr-x   - hduser supergroup          0 2010-05-08 17:40 /user/hduser/gutenberg
hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls /user/hduser/gutenberg
Found 3 items
-rw-r--r--   3 hduser supergroup     674566 2011-03-10 11:38 /user/hduser/gutenberg/pg20417.txt
-rw-r--r--   3 hduser supergroup    1573112 2011-03-10 11:38 /user/hduser/gutenberg/pg4300.txt
-rw-r--r--   3 hduser supergroup    1423801 2011-03-10 11:38 /user/hduser/gutenberg/pg5000.txt
hduser@ubuntu:/usr/local/hadoop$

Run the MapReduce job

Now that everything is prepared, we can finally run our Python MapReduce job on the Hadoop cluster. As I said above, we useHadoopStreaming for helping us passing data between our Map and Reduce code via STDIN (standard input) and STDOUT (standard output).

hduser@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar -file /home/hduser/mapper.py -mapper /home/hduser/mapper.py -file /home/hduser/reducer.py -reducer /home/hduser/reducer.py -input /user/hduser/gutenberg/* -output /user/hduser/gutenberg-output

If you want to modify some Hadoop settings on the fly like increasing the number of Reduce tasks, you can use the -Doption:

hduser@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar -D mapred.reduce.tasks=16 ...

An important note about mapred.map.tasksHadoop does not honor mapred.map.tasks beyond considering it a hint. But it accepts the user specified mapred.reduce.tasks and doesn’t manipulate that. You cannot force mapred.map.tasks but can specify mapred.reduce.tasks.

The job will read all the files in the HDFS directory /user/hduser/gutenberg, process it, and store the results in the HDFS directory /user/hduser/gutenberg-output. In general Hadoop will create one output file per reducer; in our case however it will only create a single file because the input files are very small.

Example output of the previous command in the console:

hduser@ubuntu:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar -mapper /home/hduser/mapper.py -reducer /home/hduser/reducer.py -input /user/hduser/gutenberg/* -output /user/hduser/gutenberg-output
 additionalConfSpec_:null
 null=@@@userJobConfProps_.get(stream.shipped.hadoopstreaming
 packageJobJar: [/app/hadoop/tmp/hadoop-unjar54543/]
 [] /tmp/streamjob54544.jar tmpDir=null
 [...] INFO mapred.FileInputFormat: Total input paths to process : 7
 [...] INFO streaming.StreamJob: getLocalDirs(): [/app/hadoop/tmp/mapred/local]
 [...] INFO streaming.StreamJob: Running job: job_200803031615_0021
 [...]
 [...] INFO streaming.StreamJob:  map 0%  reduce 0%
 [...] INFO streaming.StreamJob:  map 43%  reduce 0%
 [...] INFO streaming.StreamJob:  map 86%  reduce 0%
 [...] INFO streaming.StreamJob:  map 100%  reduce 0%
 [...] INFO streaming.StreamJob:  map 100%  reduce 33%
 [...] INFO streaming.StreamJob:  map 100%  reduce 70%
 [...] INFO streaming.StreamJob:  map 100%  reduce 77%
 [...] INFO streaming.StreamJob:  map 100%  reduce 100%
 [...] INFO streaming.StreamJob: Job complete: job_200803031615_0021
 [...] INFO streaming.StreamJob: Output: /user/hduser/gutenberg-output
hduser@ubuntu:/usr/local/hadoop$

As you can see in the output above, Hadoop also provides a basic web interface for statistics and information. When the Hadoop cluster is running, go to http://localhost:50030/ and browse around. Here’s a screenshot of the Hadoop web interface for the job we just ran.

A screenshot of Hadoop's web interface, showing the details of the MapReduce job we just ran.

Check if the result is successfully stored in HDFS directory /user/hduser/gutenberg-output:

hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -ls /user/hduser/gutenberg-output
 Found 1 items
 /user/hduser/gutenberg-output/part-00000     <r 1>   903193  2007-09-21 13:00
 hduser@ubuntu:/usr/local/hadoop$

You can then inspect the contents of the file with the dfs -cat command:

hduser@ubuntu:/usr/local/hadoop$ bin/hadoop dfs -cat /user/hduser/gutenberg-output/part-00000
 "(Lo)cra"       1
 "1490   1
 "1498," 1
 "35"    1
 "40,"   1
 "A      2
 "AS-IS".        2
 "A_     1
 "Absoluti       1
 [...]
 hduser@ubuntu:/usr/local/hadoop$

Note that in this specific output above the quote signs (“) enclosing the words have not been inserted by Hadoop. They are the result of how our Python code splits words, and in this case it matched the beginning of a quote in the ebook texts. Just inspect the part-00000 file further to see it for yourself.

Improved Mapper and Reducer code: using Python iterators and generators

The Mapper and Reducer examples above should have given you an idea of how to create your first MapReduce application. The focus was code simplicity and ease of understanding, particularly for beginners of the Python programming language. In a real-world application however, you might want to optimize your code by using Python iterators and generators (an even better introduction in PDF) as some readers have pointed out.

Generally speaking, iterators and generators (functions that create iterators, for example with Python’s yield statement) have the advantage that an element of a sequence is not produced until you actually need it. This can help a lot in terms of computational expensiveness or memory consumption depending on the task at hand.

Note: The following Map and Reduce scripts will only work “correctly” when being run in the Hadoop context, i.e. as Mapper and Reducer in a MapReduce job. This means that running the naive test “cat DATA | ./mapper.py | sort -k1,1 | ./reducer.py” will not work correctly anymore because some functionality is intentionally outsourced to Hadoop.

Precisely, we compute the sum of a word’s occurrences, e.g. (“foo”, 4), only if by chance the same word (“foo”) appears multiple times in succession. In the majority of cases, however, we let the Hadoop group the (key, value) pairs between the Map and the Reduce step because Hadoop is more efficient in this regard than our simple Python scripts.

mapper.py

01 #!/usr/bin/env python
02 """A more advanced Mapper, using Python iterators and generators."""
03   
04 import sys
05   
06 def read_input(file):
07     for line in file:
08         # split the line into words
09         yield line.split()
10   
11 def main(separator='\t'):
12     # input comes from STDIN (standard input)
13     data = read_input(sys.stdin)
14     for words in data:
15         # write the results to STDOUT (standard output);
16         # what we output here will be the input for the
17         # Reduce step, i.e. the input for reducer.py
18         #
19         # tab-delimited; the trivial word count is 1
20         for word in words:
21             print '%s%s%d' % (word, separator, 1)
22   
23 if __name__ == "__main__":
24     main()

reducer.py

01 #!/usr/bin/env python
02 """A more advanced Reducer, using Python iterators and generators."""
03   
04 from itertools import groupby
05 from operator import itemgetter
06 import sys
07   
08 def read_mapper_output(file, separator='\t'):
09     for line in file:
10         yield line.rstrip().split(separator, 1)
11   
12 def main(separator='\t'):
13     # input comes from STDIN (standard input)
14     data = read_mapper_output(sys.stdin, separator=separator)
15     # groupby groups multiple word-count pairs by word,
16     # and creates an iterator that returns consecutive keys and their group:
17     #   current_word - string containing a word (the key)
18     #   group - iterator yielding all ["<current_word>", "<count>"] items
19     for current_word, group in groupby(data, itemgetter(0)):
20         try:
21             total_count = sum(int(count) for current_word, count in group)
22             print "%s%s%d" % (current_word, separator, total_count)
23         except ValueError:
24             # count was not a number, so silently discard this item
25             pass
26   
27 if __name__ == "__main__":
28     main()
Posted by 옥탑방람보
,
Posted by 옥탑방람보
,

1.  blazeds 다운로드 및 설치 (blazeds tourkey)

2. 자바 및 톰캣 환경설정 (JAVA_HOME, CATALINA_HOME)

3. (이클립스) 서버설정

4. (이클립스) 자바 프로젝트 생성 및 클래스 작성

5. (이클립스) 자바 프로젝트 내 remote-config, server-config 설정

6. (플래쉬빌더) 프로젝트 생성 및 코딩 - blazeds 설치된 폴더에 있는 blazeds

7. (플래쉬빌더) 프로젝트 속성 - Flex Server 에 BlazeDS 선택, Root folder, Root URL, Context root, Output folder 설정

8. (플래쉬빌더) 프로젝트 - export build 실행 (타겟을 이클립스 워크스페이스내 자바 프로젝트)

Posted by 옥탑방람보
,