EMMA Coverage Report (generated Tue Aug 23 05:57:12 CDT 2011)
[all classes][felix.io]

COVERAGE SUMMARY FOR SOURCE FILE [TestHadoop.java]

nameclass, %method, %block, %line, %
TestHadoop.java100% (3/3)82%  (9/11)83%  (445/535)82%  (90/110)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class TestHadoop100% (1/1)60%  (3/5)71%  (203/287)70%  (44/63)
main (String []): void 0%   (0/1)0%   (0/22)0%   (0/7)
post (String []): void 0%   (0/1)0%   (0/50)0%   (0/9)
run (String []): int 100% (1/1)93%  (157/169)92%  (33/36)
TestHadoop (String, String): void 100% (1/1)100% (13/13)100% (6/6)
executeHadoopProgram (String []): void 100% (1/1)100% (33/33)100% (5/5)
     
class TestHadoop$Reduce100% (1/1)100% (3/3)95%  (122/128)96%  (24/25)
setup (Reducer$Context): void 100% (1/1)83%  (30/36)88%  (7/8)
TestHadoop$Reduce (): void 100% (1/1)100% (3/3)100% (1/1)
reduce (Text, Iterable, Reducer$Context): void 100% (1/1)100% (89/89)100% (16/16)
     
class TestHadoop$Map100% (1/1)100% (3/3)100% (120/120)100% (22/22)
TestHadoop$Map (): void 100% (1/1)100% (8/8)100% (2/2)
map (LongWritable, Text, Mapper$Context): void 100% (1/1)100% (81/81)100% (13/13)
setup (Mapper$Context): void 100% (1/1)100% (31/31)100% (7/7)

1package felix.io;
2import edu.umd.cloud9.collection.XMLInputFormat;
3 
4import java.io.IOException;
5import java.util.ArrayList;
6import java.util.Date;
7import java.util.Iterator;
8import java.util.StringTokenizer;
9import org.apache.hadoop.streaming.StreamInputFormat;
10import org.apache.hadoop.streaming.StreamXmlRecordReader;
11 
12import org.apache.hadoop.conf.Configuration;
13import org.apache.hadoop.conf.Configured;
14import org.apache.hadoop.fs.FileSystem;
15import org.apache.hadoop.fs.FileUtil;
16import org.apache.hadoop.fs.Path;
17import org.apache.hadoop.io.*;
18import org.apache.hadoop.mapred.JobClient;
19import org.apache.hadoop.mapred.OutputCollector;
20import org.apache.hadoop.mapred.Reporter;
21import org.apache.hadoop.mapred.lib.db.DBInputFormat;
22 
23import org.apache.hadoop.mapreduce.Job;
24import org.apache.hadoop.mapreduce.Mapper;
25import org.apache.hadoop.mapreduce.Reducer;
26import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
27import org.apache.hadoop.mapreduce.lib.input.FileSplit;
28import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
29import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
30import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
31 
32 
33import org.apache.hadoop.util.Tool;
34import org.apache.hadoop.util.ToolRunner;
35import org.python.core.PyArray;
36import org.python.core.PyList;
37import org.python.core.PyLong;
38import org.python.core.PyObject;
39import org.python.core.PyString;
40 
41import tuffy.util.ExceptionMan;
42 
43import felix.thirdpart.XmlInputFormat;
44import felix.util.FelixConfig;
45 
46/**
47 * This class communicates with Hadoop. It translates
48 * the given MAP and REDUCE codes to Hadoop API.
49 * @author czhang
50 *
51 */
52public class TestHadoop extends Configured implements Tool{
53 
54        /**
55         * MAP
56         * @author czhang
57         *
58         */
59        public static class Map extends Mapper<LongWritable, Text, Text, IntWritable>{
60                
61                private Text word = new Text();
62                public static PythonExecutor pyMap;
63                
64                public static String _inputvalue;
65                
66                @Override
67                public void setup(Mapper.Context contex) {
68 
69                        pyMap = new PythonExecutor(
70                                        "_felix_donotusemyname_outkey=[]\n" + "_felix_donotusemyname_outvalue=[]\n"+
71                                        contex.getConfiguration().get("pyMapScript"));
72                        _inputvalue = contex.getConfiguration().get("mapinputvalue");
73                        
74                        if(contex.getConfiguration().get("pyMapInitScript") != null){
75                                pyMap.execSingle(contex.getConfiguration().get("pyMapInitScript"));
76                        }
77                        
78                }
79 
80                public void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {        
81                        
82                        FileSplit fileSplit = (FileSplit)context.getInputSplit();
83                        String fileName = fileSplit.getPath().getName();
84                        
85                        pyMap.set("_offset", new PyLong(key.toString()));
86                        pyMap.set("_filename", new PyString(fileName.toString()));                        
87                        pyMap.set(_inputvalue, new PyString(value.toString()));
88                        
89                        pyMap.run();
90                        PyList outKey = (PyList) pyMap.get("_felix_donotusemyname_outkey");
91                        PyList outValue = (PyList) pyMap.get("_felix_donotusemyname_outvalue");
92                        
93                        for(int i=0;i<outKey.__len__();i++){
94                                PyObject _key = outKey.__getitem__(i);
95                                PyObject _value = outValue.__getitem__(i);
96                                
97                                context.write(new Text(""+_key), new Text(""+_value));
98                        //        System.out.println(""+key + "\t" + value);
99                        }
100                        
101                }
102                
103        }
104        
105        /**
106         * REDUCE
107         * @author czhang
108         *
109         */
110        public static class Reduce extends Reducer<Text, Text, Text, Text>{
111                
112                public static PythonExecutor pyReducer;
113                
114                public static String _inputkey;
115                public static String _inputvalues;
116                
117                @Override
118                public void setup(Reducer.Context contex) {
119                        pyReducer = new PythonExecutor(
120                                        "_felix_donotusemyname_outkey=[]\n" + "_felix_donotusemyname_outvalue=[]\n" +
121                                        contex.getConfiguration().get("pyReduceScript"));
122                        _inputkey = contex.getConfiguration().get("reduceinputkey");
123                        _inputvalues = contex.getConfiguration().get("reduceinputvalues");
124                        
125                        if(contex.getConfiguration().get("pyReduceInitScript") != null){
126                                pyReducer.execSingle(contex.getConfiguration().get("pyReduceInitScript"));
127                        }
128                        
129                }
130 
131                protected void reduce(Text _key, Iterable _values, Context context) throws IOException, InterruptedException{
132                        
133                
134                        ArrayList<PyString> args = new ArrayList<PyString>();
135                        Iterator it = _values.iterator();
136                        while(it.hasNext()){
137                                args.add(new PyString(it.next().toString()));
138                        }
139                                                
140                        PyList values = new PyList(args);
141                        PyObject inputkey  = new PyString(_key.toString());
142                        
143                        pyReducer.set(_inputvalues, values);
144                        pyReducer.set(_inputkey, inputkey);
145                        
146                        pyReducer.run();
147                        PyList outKey = (PyList) pyReducer.get("_felix_donotusemyname_outkey");
148                        PyList outValue = (PyList) pyReducer.get("_felix_donotusemyname_outvalue");
149                        
150                        for(int i=0;i<outKey.__len__();i++){
151                                PyObject key = outKey.__getitem__(i);
152                                PyObject value = outValue.__getitem__(i);
153                                context.write(new Text("True\t1\t"+key), new Text(""+value));
154                        }
155                        
156                }
157                
158        }
159        
160        /**
161         * @deprecated
162         * @param mapScript
163         * @param reduceScript
164         */
165        public TestHadoop(String mapScript, String reduceScript){
166                Map.pyMap = new PythonExecutor(
167                                mapScript);
168                
169                Reduce.pyReducer = new PythonExecutor(
170                                reduceScript);
171        }
172        
173        /**
174         * Execute a given task configuration (via arg0).
175         * @param toPass
176         * @throws Exception
177         */
178        public static void executeHadoopProgram(String[] toPass) throws Exception{
179                
180                String[] dirtoPass = toPass.clone();
181                dirtoPass[2] += "_dir";
182                int res = ToolRunner.run(new Configuration(), new TestHadoop(dirtoPass[3], dirtoPass[4]), 
183                                dirtoPass);
184                                
185        }
186        
187        /**
188         * @deprecated
189         * @param toPass
190         * @throws Exception
191         */
192        public static void post(String[] toPass) throws Exception{
193                
194                Configuration fsconf = new Configuration();
195                fsconf.set("fs.default.name", FelixConfig.hdfsServer);
196        FileSystem fileSystem = FileSystem.get(fsconf);
197                
198                String[] dirtoPass = toPass.clone();
199                dirtoPass[2] += "_dir";
200                
201                FileUtil.copyMerge(fileSystem, new Path(dirtoPass[2]), fileSystem, new Path(toPass[2]), true,
202                                fsconf, "");
203                
204                fileSystem.close();
205                        
206        }
207        
208        
209        /**
210         * Test entry.
211         * @param args
212         * @throws Exception
213         */
214        public static void main(String[] args) throws Exception{
215        
216                String[] toPass = {
217                                
218                                "hdfs://d-02.cs.wisc.edu:9000/felixNE/Entity.db",
219                                //"hdfs://d-02.cs.wisc.edu:9000/firstMapReduceOut42/part-00000",
220                                
221                                "hdfs://d-02.cs.wisc.edu:9000/firstMapReduceOut100",
222                                
223                                "for k in _input.split(' '):\n"+
224                                "\t_outkey.append(k)\n"+
225                                "\t_outvalue.append('1')\n",
226                                
227                                "_outkey.append(_inputkey)\n"+
228                                "_outvalue.append(len(_inputvalues))\n"
229                                
230                };
231                
232                TestHadoop.executeHadoopProgram(toPass);
233                
234        
235        }
236 
237 
238        @Override
239        /**
240         * Execute a given task configuration (via arg0).
241         */
242        public int run(String[] arg0) throws Exception {
243                                
244 
245                Configuration conf = getConf();
246                
247                if(!FelixConfig.hadoopLocal){
248                        conf.set("fs.defaultFS", FelixConfig.hdfsServer);
249                        conf.set("mapred.job.tracker", FelixConfig.mrServer);
250                        conf.set("mapred.child.java.opts", "-Xmx3192m");
251                }
252                conf.set("pyMapScript", arg0[3]);
253                conf.set("pyReduceScript", arg0[4]);
254                conf.set("mapinputvalue", arg0[5]);
255                conf.set("reduceinputkey", arg0[6]);
256                conf.set("reduceinputvalues", arg0[7]);
257                conf.set("pyMapInitScript", arg0[8]);
258                conf.set("pyReduceInitScript", arg0[9]);
259        
260                
261                if(arg0[0].equals("xml")){
262                        conf.set("xmlinput.start", arg0[10]);
263                        conf.set("xmlinput.end", arg0[11]);
264                }
265                
266                Job job = new Job(conf, "Felix_Run_On" + (new Date()).toLocaleString());
267 
268                job.setNumReduceTasks(FelixConfig.nReduce);
269                
270                job.setJarByClass(TestHadoop.class);
271                
272                job.setMapperClass(Map.class);
273                job.setReducerClass(Reduce.class);
274 
275                job.setOutputKeyClass(Text.class);
276                job.setOutputValueClass(Text.class);
277 
278                FileInputFormat.addInputPath(job, new Path(arg0[1]));
279                FileOutputFormat.setOutputPath(job, new Path(arg0[2]));
280                
281                
282                if(arg0[0].equals("xml")){
283                        //XML
284                                                
285                        job.setInputFormatClass(XmlInputFormat.class);
286                        job.setOutputFormatClass(TextOutputFormat.class);
287                        
288                        job.setOutputKeyClass(Text.class);
289                        job.setOutputValueClass(Text.class);
290                        
291                }else if(arg0[0].equals("standard")){
292                        
293                        job.setInputFormatClass(TextInputFormat.class);
294                        job.setOutputFormatClass(TextOutputFormat.class);
295                        
296                        job.setOutputKeyClass(Text.class);
297                        job.setOutputValueClass(Text.class);
298                }
299                
300                job.waitForCompletion(true);
301                return 0;
302        }
303        
304}

[all classes][felix.io]
EMMA 2.0.5312 EclEmma Fix 2 (C) Vladimir Roubtsov