EMMA Coverage Report (generated Tue Aug 23 05:57:12 CDT 2011)
[all classes][felix.io]

COVERAGE SUMMARY FOR SOURCE FILE [HadoopPostgreSQLPopulator.java]

nameclass, %method, %block, %line, %
HadoopPostgreSQLPopulator.java100% (1/1)60%  (3/5)69%  (237/345)67%  (44/66)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class HadoopPostgreSQLPopulator100% (1/1)60%  (3/5)69%  (237/345)67%  (44/66)
HadoopPostgreSQLPopulator (): void 0%   (0/1)0%   (0/3)0%   (0/1)
dumpTableToHDFS (String, String, int, String): void 0%   (0/1)0%   (0/99)0%   (0/19)
createAndPopulateTable (String, String, List, List, String): void 100% (1/1)96%  (69/72)92%  (12/13)
populateTable (String, String, List, List, String): void 100% (1/1)97%  (112/115)96%  (23/24)
createAndPopulateTableFromDir (String, String, List, List, String, int): void 100% (1/1)100% (56/56)100% (9/9)

1package felix.io;
2 
3import java.io.BufferedWriter;
4import java.io.OutputStreamWriter;
5import java.sql.ResultSet;
6import java.util.ArrayList;
7import java.util.Arrays;
8import java.util.List;
9import org.apache.hadoop.conf.Configuration;
10import org.apache.hadoop.fs.FSDataInputStream;
11import org.apache.hadoop.fs.FileSystem;
12import org.apache.hadoop.fs.Path;
13import org.postgresql.PGConnection;
14 
15import felix.util.FelixConfig;
16import tuffy.db.RDB;
17import tuffy.util.StringMan;
18import tuffy.util.UIMan;
19 
20/**
21 * Class for populating tables from 
22 * files/directories generated with hadoop.
23 * @author Ce Zhang
24 *
25 */
26public class HadoopPostgreSQLPopulator{
27        
28        /**
29         * Dumps given schema.table in to the given file.
30         * @param schema
31         * @param tableName
32         * @param nColumn
33         * @param filePath
34         */
35        public static void dumpTableToHDFS(
36                                                                                String           schema,
37                                                                                String           tableName,
38                                                                                int              nColumn,
39                                                                                String                         filePath){
40                try{
41                        Path pt=new Path(filePath);
42                        Configuration conf = new Configuration();
43                        
44                        conf.set("fs.default.name", FelixConfig.hdfsServer);
45                        conf.set("fs.defaultFS", FelixConfig.hdfsServer);
46                        
47                        FileSystem fs = FileSystem.get(conf);
48                        BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true)));
49                        
50                        RDB db = RDB.getRDBbyConfig(schema);
51                        db.disableAutoCommitForNow();
52                        
53                        ResultSet rs = db.query("SELECT * FROM " + tableName);
54                        while(rs.next()){
55                                ArrayList<String> tmps = new ArrayList<String>();
56                                for(int i=0;i<nColumn;i++){
57                                        tmps.add(rs.getString(i+1).replaceAll("\t", "\\\\t").replaceAll("\n", "\\\\n").replaceAll("\r", "\\\\r"));
58                                }
59                                br.write(StringMan.join("\t", tmps) + "\n");
60                        }
61                        
62                        db.close();
63                        br.close();
64                }catch(Exception e){
65                        e.printStackTrace();
66                }
67                
68                
69        }
70        
71        
72        
73        /**
74         * Creates and populates the given schema.table from the given file.
75         * @param schema
76         * @param tableName
77         * @param columnNames
78         * @param columnTypes
79         * @param filePath
80         * @throws Exception
81         */
82        public static void createAndPopulateTable(        
83                                                                                String            schema,
84                                                                                String            tableName,
85                                                                                List<String>           columnNames,
86                                                                                List<String>           columnTypes,
87                                                                                String                      filePath      ) throws Exception{
88                
89                RDB db;
90                if(schema == null){
91                        db = RDB.getRDBbyConfig();
92                }else{
93                        db = RDB.getRDBbyConfig(schema);
94                }
95                db.dropTable(tableName);
96                
97                ArrayList<String> schemas = new ArrayList<String>();
98                for(int i=0;i<columnNames.size();i++){
99                        schemas.add(columnNames.get(i) + " " + columnTypes.get(i));
100                }
101                
102                db.dropTable(tableName);
103                
104                String sql = "CREATE TABLE " + tableName + " ( truth BOOL, prior FLOAT, " + StringMan.commaList(schemas) + " )";
105                db.execute(sql);
106                db.close();
107                
108                HadoopPostgreSQLPopulator.populateTable(schema, tableName, columnNames, columnTypes, filePath);
109                
110                
111        }
112        
113        //TODO: CHECK HDFS API AND MOVE TO MORE ELEGANT WAY
114        /**
115         * Populates the given schema.table from the given directory.
116         * @param schema
117         * @param tableName
118         * @param columnNames
119         * @param columnTypes
120         * @param dirPath
121         * @param nReducer
122         * @throws Exception
123         */
124        /*
125        public static void populateTableFromDir(        
126                                                                        String            schema,
127                                                                        String            tableName,
128                                                                        List<String>           columnNames,
129                                                                        List<String>           columnTypes,
130                                                                        String                      dirPath,
131                                                                        int                           nReducer) throws Exception{
132                
133                for(int i=0;i<nReducer;i++){
134                        String fileName = "" + nReducer;
135                        while(fileName.length() < 5){
136                                fileName = "0" + fileName;
137                        }
138                        fileName = "part-" + fileName;
139                        HadoopPostgreSQLPopulator.populateTable(schema, tableName, columnNames, columnTypes, fileName);
140                }
141        }
142        */
143        
144        //TODO: CHECK HDFS API AND MOVE TO MORE ELEGANT WAY
145        /**
146         * Creates and populates the given schema.table from the given directory.
147         * @param schema
148         * @param tableName
149         * @param columnNames
150         * @param columnTypes
151         * @param dirPath
152         * @param nReducer
153         * @throws Exception
154         */
155        public static void createAndPopulateTableFromDir(        
156                                                                        String            schema,
157                                                                        String            tableName,
158                                                                        List<String>           columnNames,
159                                                                        List<String>           columnTypes,
160                                                                        String                      dirPath,
161                                                                        int                           nReducer) throws Exception{
162 
163                for(int i=0;i<nReducer;i++){
164                        String fileName = "" + i;
165                        while(fileName.length() < 5){
166                                fileName = "0" + fileName;
167                        }
168                        fileName = dirPath + "/" + "part-r-" + fileName;
169                        if(i==0){
170                                HadoopPostgreSQLPopulator.createAndPopulateTable(schema, tableName, columnNames, columnTypes, fileName);
171                        }else{
172                                HadoopPostgreSQLPopulator.populateTable(schema, tableName, columnNames, columnTypes, fileName);
173                        }
174                }
175        }
176        
177        /**
178         * Populates the given schema.table from the given file.
179         * @param schema
180         * @param tableName
181         * @param columnNames
182         * @param columnTypes
183         * @param filePath
184         * @throws Exception
185         */
186        public static void populateTable(        
187                                                                                String            schema,
188                                                                                String            tableName,
189                                                                                List<String>           columnNames,
190                                                                                List<String>           columnTypes,
191                                                                                String                      filePath      ) throws Exception{
192 
193                RDB db;
194                if(schema == null){
195                        db = RDB.getRDBbyConfig();
196                }else{
197                        db = RDB.getRDBbyConfig(schema);
198                }
199                
200                ArrayList<String> schemas = new ArrayList<String>();
201                for(int i=0;i<columnNames.size();i++){
202                        schemas.add(columnNames.get(i) + " " + columnTypes.get(i));
203                }
204                String sql;
205                
206                Configuration conf = new Configuration();
207                //TOOD: HARD-WIRED ADDRESS IS NTO GOOD
208                conf.set("fs.default.name", FelixConfig.hdfsServer);
209                conf.set("fs.defaultFS", FelixConfig.hdfsServer);
210                
211        FileSystem fileSystem = FileSystem.get(conf);
212 
213        Path path = new Path(filePath);
214        if (!fileSystem.exists(path)) {
215            UIMan.verbose(2, "File " + filePath + " does not exists");
216            return;
217        }
218 
219        FSDataInputStream in = fileSystem.open(path);
220                PGConnection con = (PGConnection)db.getConnection();
221                sql = "COPY " + tableName + "( truth , prior , " + StringMan.commaList(columnNames) + ") " +
222                                " FROM STDIN WITH DELIMITER '\t'";
223                con.getCopyAPI().copyIn(sql, in);
224                
225                in.close();
226        fileSystem.close();
227                
228        db.commit();
229        db.close();
230        
231        
232        }
233        
234        /**
235         * Main test method.
236         * @param args
237         * @throws Exception
238         */
239        /*
240        public static void main(String[] args) throws Exception{
241        
242                
243                //HadoopPostgreSQLPopulator.populateTable(
244                //                                                null, 
245                //                                                "test", 
246                //                                                Arrays.asList("c1", "c2"), 
247                //                                                Arrays.asList("TEXT", "TEXT"),
248                //                                                "hdfs://d-02.cs.wisc.edu:9000/firstMapReduceOut38/part-00000");
249                
250                
251                HadoopPostgreSQLPopulator.createAndPopulateTable(
252                                        null, 
253                                        "test", 
254                                        Arrays.asList("c1", "c2"), 
255                                        Arrays.asList("TEXT", "TEXT"),
256                                        "hdfs://d-02.cs.wisc.edu:9000/firstMapReduceOut42/part-00000");
257        }
258        */
259        
260}

[all classes][felix.io]
EMMA 2.0.5312 EclEmma Fix 2 (C) Vladimir Roubtsov