1 | package felix.io; |
2 | |
3 | import java.io.BufferedWriter; |
4 | import java.io.OutputStreamWriter; |
5 | import java.sql.ResultSet; |
6 | import java.util.ArrayList; |
7 | import java.util.Arrays; |
8 | import java.util.List; |
9 | import org.apache.hadoop.conf.Configuration; |
10 | import org.apache.hadoop.fs.FSDataInputStream; |
11 | import org.apache.hadoop.fs.FileSystem; |
12 | import org.apache.hadoop.fs.Path; |
13 | import org.postgresql.PGConnection; |
14 | |
15 | import felix.util.FelixConfig; |
16 | import tuffy.db.RDB; |
17 | import tuffy.util.StringMan; |
18 | import tuffy.util.UIMan; |
19 | |
20 | /** |
21 | * Class for populating tables from |
22 | * files/directories generated with hadoop. |
23 | * @author Ce Zhang |
24 | * |
25 | */ |
26 | public class HadoopPostgreSQLPopulator{ |
27 | |
28 | /** |
29 | * Dumps given schema.table in to the given file. |
30 | * @param schema |
31 | * @param tableName |
32 | * @param nColumn |
33 | * @param filePath |
34 | */ |
35 | public static void dumpTableToHDFS( |
36 | String schema, |
37 | String tableName, |
38 | int nColumn, |
39 | String filePath){ |
40 | try{ |
41 | Path pt=new Path(filePath); |
42 | Configuration conf = new Configuration(); |
43 | |
44 | conf.set("fs.default.name", FelixConfig.hdfsServer); |
45 | conf.set("fs.defaultFS", FelixConfig.hdfsServer); |
46 | |
47 | FileSystem fs = FileSystem.get(conf); |
48 | BufferedWriter br=new BufferedWriter(new OutputStreamWriter(fs.create(pt,true))); |
49 | |
50 | RDB db = RDB.getRDBbyConfig(schema); |
51 | db.disableAutoCommitForNow(); |
52 | |
53 | ResultSet rs = db.query("SELECT * FROM " + tableName); |
54 | while(rs.next()){ |
55 | ArrayList<String> tmps = new ArrayList<String>(); |
56 | for(int i=0;i<nColumn;i++){ |
57 | tmps.add(rs.getString(i+1).replaceAll("\t", "\\\\t").replaceAll("\n", "\\\\n").replaceAll("\r", "\\\\r")); |
58 | } |
59 | br.write(StringMan.join("\t", tmps) + "\n"); |
60 | } |
61 | |
62 | db.close(); |
63 | br.close(); |
64 | }catch(Exception e){ |
65 | e.printStackTrace(); |
66 | } |
67 | |
68 | |
69 | } |
70 | |
71 | |
72 | |
73 | /** |
74 | * Creates and populates the given schema.table from the given file. |
75 | * @param schema |
76 | * @param tableName |
77 | * @param columnNames |
78 | * @param columnTypes |
79 | * @param filePath |
80 | * @throws Exception |
81 | */ |
82 | public static void createAndPopulateTable( |
83 | String schema, |
84 | String tableName, |
85 | List<String> columnNames, |
86 | List<String> columnTypes, |
87 | String filePath ) throws Exception{ |
88 | |
89 | RDB db; |
90 | if(schema == null){ |
91 | db = RDB.getRDBbyConfig(); |
92 | }else{ |
93 | db = RDB.getRDBbyConfig(schema); |
94 | } |
95 | db.dropTable(tableName); |
96 | |
97 | ArrayList<String> schemas = new ArrayList<String>(); |
98 | for(int i=0;i<columnNames.size();i++){ |
99 | schemas.add(columnNames.get(i) + " " + columnTypes.get(i)); |
100 | } |
101 | |
102 | db.dropTable(tableName); |
103 | |
104 | String sql = "CREATE TABLE " + tableName + " ( truth BOOL, prior FLOAT, " + StringMan.commaList(schemas) + " )"; |
105 | db.execute(sql); |
106 | db.close(); |
107 | |
108 | HadoopPostgreSQLPopulator.populateTable(schema, tableName, columnNames, columnTypes, filePath); |
109 | |
110 | |
111 | } |
112 | |
113 | //TODO: CHECK HDFS API AND MOVE TO MORE ELEGANT WAY |
114 | /** |
115 | * Populates the given schema.table from the given directory. |
116 | * @param schema |
117 | * @param tableName |
118 | * @param columnNames |
119 | * @param columnTypes |
120 | * @param dirPath |
121 | * @param nReducer |
122 | * @throws Exception |
123 | */ |
124 | /* |
125 | public static void populateTableFromDir( |
126 | String schema, |
127 | String tableName, |
128 | List<String> columnNames, |
129 | List<String> columnTypes, |
130 | String dirPath, |
131 | int nReducer) throws Exception{ |
132 | |
133 | for(int i=0;i<nReducer;i++){ |
134 | String fileName = "" + nReducer; |
135 | while(fileName.length() < 5){ |
136 | fileName = "0" + fileName; |
137 | } |
138 | fileName = "part-" + fileName; |
139 | HadoopPostgreSQLPopulator.populateTable(schema, tableName, columnNames, columnTypes, fileName); |
140 | } |
141 | } |
142 | */ |
143 | |
144 | //TODO: CHECK HDFS API AND MOVE TO MORE ELEGANT WAY |
145 | /** |
146 | * Creates and populates the given schema.table from the given directory. |
147 | * @param schema |
148 | * @param tableName |
149 | * @param columnNames |
150 | * @param columnTypes |
151 | * @param dirPath |
152 | * @param nReducer |
153 | * @throws Exception |
154 | */ |
155 | public static void createAndPopulateTableFromDir( |
156 | String schema, |
157 | String tableName, |
158 | List<String> columnNames, |
159 | List<String> columnTypes, |
160 | String dirPath, |
161 | int nReducer) throws Exception{ |
162 | |
163 | for(int i=0;i<nReducer;i++){ |
164 | String fileName = "" + i; |
165 | while(fileName.length() < 5){ |
166 | fileName = "0" + fileName; |
167 | } |
168 | fileName = dirPath + "/" + "part-r-" + fileName; |
169 | if(i==0){ |
170 | HadoopPostgreSQLPopulator.createAndPopulateTable(schema, tableName, columnNames, columnTypes, fileName); |
171 | }else{ |
172 | HadoopPostgreSQLPopulator.populateTable(schema, tableName, columnNames, columnTypes, fileName); |
173 | } |
174 | } |
175 | } |
176 | |
177 | /** |
178 | * Populates the given schema.table from the given file. |
179 | * @param schema |
180 | * @param tableName |
181 | * @param columnNames |
182 | * @param columnTypes |
183 | * @param filePath |
184 | * @throws Exception |
185 | */ |
186 | public static void populateTable( |
187 | String schema, |
188 | String tableName, |
189 | List<String> columnNames, |
190 | List<String> columnTypes, |
191 | String filePath ) throws Exception{ |
192 | |
193 | RDB db; |
194 | if(schema == null){ |
195 | db = RDB.getRDBbyConfig(); |
196 | }else{ |
197 | db = RDB.getRDBbyConfig(schema); |
198 | } |
199 | |
200 | ArrayList<String> schemas = new ArrayList<String>(); |
201 | for(int i=0;i<columnNames.size();i++){ |
202 | schemas.add(columnNames.get(i) + " " + columnTypes.get(i)); |
203 | } |
204 | String sql; |
205 | |
206 | Configuration conf = new Configuration(); |
207 | //TOOD: HARD-WIRED ADDRESS IS NTO GOOD |
208 | conf.set("fs.default.name", FelixConfig.hdfsServer); |
209 | conf.set("fs.defaultFS", FelixConfig.hdfsServer); |
210 | |
211 | FileSystem fileSystem = FileSystem.get(conf); |
212 | |
213 | Path path = new Path(filePath); |
214 | if (!fileSystem.exists(path)) { |
215 | UIMan.verbose(2, "File " + filePath + " does not exists"); |
216 | return; |
217 | } |
218 | |
219 | FSDataInputStream in = fileSystem.open(path); |
220 | PGConnection con = (PGConnection)db.getConnection(); |
221 | sql = "COPY " + tableName + "( truth , prior , " + StringMan.commaList(columnNames) + ") " + |
222 | " FROM STDIN WITH DELIMITER '\t'"; |
223 | con.getCopyAPI().copyIn(sql, in); |
224 | |
225 | in.close(); |
226 | fileSystem.close(); |
227 | |
228 | db.commit(); |
229 | db.close(); |
230 | |
231 | |
232 | } |
233 | |
234 | /** |
235 | * Main test method. |
236 | * @param args |
237 | * @throws Exception |
238 | */ |
239 | /* |
240 | public static void main(String[] args) throws Exception{ |
241 | |
242 | |
243 | //HadoopPostgreSQLPopulator.populateTable( |
244 | // null, |
245 | // "test", |
246 | // Arrays.asList("c1", "c2"), |
247 | // Arrays.asList("TEXT", "TEXT"), |
248 | // "hdfs://d-02.cs.wisc.edu:9000/firstMapReduceOut38/part-00000"); |
249 | |
250 | |
251 | HadoopPostgreSQLPopulator.createAndPopulateTable( |
252 | null, |
253 | "test", |
254 | Arrays.asList("c1", "c2"), |
255 | Arrays.asList("TEXT", "TEXT"), |
256 | "hdfs://d-02.cs.wisc.edu:9000/firstMapReduceOut42/part-00000"); |
257 | } |
258 | */ |
259 | |
260 | } |