1 | package felix.executor; |
2 | |
3 | import java.io.BufferedWriter; |
4 | import java.io.File; |
5 | import java.sql.ResultSet; |
6 | import java.util.ArrayList; |
7 | import java.util.HashMap; |
8 | import java.util.HashSet; |
9 | |
10 | import com.google.common.io.Files; |
11 | |
12 | import tuffy.db.RDB; |
13 | import tuffy.infer.DataMover; |
14 | import tuffy.mln.MarkovLogicNetwork; |
15 | import tuffy.mln.Predicate; |
16 | import tuffy.parse.CommandOptions; |
17 | import tuffy.util.Config; |
18 | import tuffy.util.ExceptionMan; |
19 | import tuffy.util.FileMan; |
20 | import tuffy.util.StringMan; |
21 | import tuffy.util.UIMan; |
22 | |
23 | import felix.dstruct.ConcurrentOperatorsBucket; |
24 | import felix.dstruct.ExecutionPlan; |
25 | import felix.dstruct.FelixPredicate; |
26 | import felix.dstruct.FelixQuery; |
27 | import felix.dstruct.StatOperator.OPType; |
28 | import felix.main.Felix; |
29 | import felix.optimizer.DMOOptimizer; |
30 | import felix.parser.FelixCommandOptions; |
31 | import felix.util.FelixConfig; |
32 | import felix.util.FelixUIMan; |
33 | |
34 | |
35 | |
36 | /** |
37 | * Class for executing a given physical {@link ExecutionPlan} using |
38 | * dual decomposition. |
39 | * |
40 | * @author Ce Zhang |
41 | * |
42 | */ |
43 | public class DDExecutor { |
44 | |
45 | /** |
46 | * The execution plan to be executed. |
47 | */ |
48 | ExecutionPlan ep; |
49 | |
50 | /** |
51 | * The FelixQuery used by this Executor. |
52 | */ |
53 | FelixQuery fq; |
54 | |
55 | /** |
56 | * The FelixCommandOptions used by this Executor. |
57 | */ |
58 | FelixCommandOptions options; |
59 | |
60 | /** |
61 | * The DMOOptimizer used by this Executor. |
62 | */ |
63 | public DMOOptimizer dmoo; |
64 | |
65 | /** |
66 | * Execute this plan. If Felix runs in |
67 | * explain mode, just prints out the physical plan. |
68 | * |
69 | * <p> TODO: Find a better way to the explain mode (e.g., a graph). |
70 | */ |
71 | @SuppressWarnings("unchecked") |
72 | public void run(){ |
73 | |
74 | try{ |
75 | |
76 | int sum = -1; |
77 | int nIt = FelixConfig.nDDIT; |
78 | |
79 | for(int i = ep.operators.size()-1; i >=0; i --){ |
80 | ep.operators.get(i).setNCore(Config.getNumThreads()); |
81 | ep.operators.get(i).prepareDB44DD(); |
82 | } |
83 | |
84 | // the first run of DD replicates the NON-DD executor. |
85 | // this is used to decides the scope of the downstream MLN |
86 | // operator. |
87 | FelixConfig.isFirstRunOfDD = true; |
88 | |
89 | for(int i = ep.operators.size()-1; i >=0; i --){ |
90 | dmoo.optimizeDMO(ep.operators.get(i)); |
91 | ep.operators.get(i).run(); |
92 | |
93 | } |
94 | |
95 | BufferedWriter bw = FileMan.getBufferedWriterMaybeGZ(options.fout + "_snap_0" ); |
96 | RDB newDB = RDB.getRDBbyConfig(Config.db_schema); |
97 | for(FelixPredicate fp : fq.getPredicates()){ |
98 | if(fp.hasQuery()){ |
99 | UIMan.println(">>> Dumping results for " + fp + "\n"); |
100 | this.dumpMapAnswerForPredicate(newDB, fp, bw); |
101 | } |
102 | } |
103 | bw.close(); |
104 | |
105 | File from = new File(options.fout + "_snap_" + (FelixConfig.nDDIT-nIt)); |
106 | File to = new File(options.fout); |
107 | if(to.exists()){ |
108 | to.delete(); |
109 | } |
110 | Files.copy(from, to); |
111 | |
112 | FelixConfig.isFirstRunOfDD = false; |
113 | |
114 | // copy state of each table |
115 | HashSet<FelixPredicate> sharedPreds = new HashSet<FelixPredicate>(); |
116 | for(int i = ep.operators.size()-1; i >=0; i --){ |
117 | sharedPreds.addAll(ep.operators.get(i).dd_CommonOutput); |
118 | if(ep.operators.get(i).type == OPType.TUFFY){ |
119 | sharedPreds.addAll(ep.operators.get(i).outputPredicates); |
120 | } |
121 | } |
122 | for(FelixPredicate fp : sharedPreds){ |
123 | UIMan.println(">>> Backing up table " + fp.getName()); |
124 | newDB.dropTable("_copy_ori_" + fp.getRelName()); |
125 | String sql = "CREATE TABLE _copy_ori_" + fp.getRelName() |
126 | + " AS SELECT * FROM " + fp.getRelName() + ";"; |
127 | newDB.execute(sql); |
128 | } |
129 | newDB.close(); |
130 | |
131 | |
132 | while(sum != 0 && nIt-- > 0 ){ |
133 | |
134 | newDB = RDB.getRDBbyConfig(Config.db_schema); |
135 | |
136 | for(FelixPredicate fp : sharedPreds){ |
137 | UIMan.println(">>> Restoring table " + fp.getName()); |
138 | newDB.dropTable(fp.getRelName()); |
139 | newDB.dropView(fp.getRelName()); |
140 | String sql = "CREATE TABLE " + fp.getRelName() |
141 | + " AS SELECT * FROM _copy_ori_" + fp.getRelName() + ";"; |
142 | newDB.execute(sql); |
143 | } |
144 | for(FelixPredicate fp : fq.getAllPred()){ |
145 | if(fp.isCorefPredicate && !sharedPreds.contains(fp)){ |
146 | newDB.dropTable(fp.getRelName()); |
147 | newDB.dropView(fp.getRelName()); |
148 | newDB.execute(fp.viewDef); |
149 | } |
150 | } |
151 | newDB.commit(); |
152 | newDB.close(); |
153 | |
154 | for(int i = ep.operators.size()-1; i >=0; i --){ |
155 | dmoo.optimizeDMO(ep.operators.get(i)); |
156 | ep.operators.get(i).run(); |
157 | } |
158 | |
159 | bw = FileMan.getBufferedWriterMaybeGZ(options.fout + "_snap_" + (FelixConfig.nDDIT-nIt) ); |
160 | newDB = RDB.getRDBbyConfig(Config.db_schema); |
161 | for(FelixPredicate fp : fq.getPredicates()){ |
162 | if(fp.hasQuery()){ |
163 | UIMan.println(">>> Dumping results for " + fp + "\n"); |
164 | this.dumpMapAnswerForPredicate(newDB, fp, bw); |
165 | } |
166 | } |
167 | newDB.close(); |
168 | bw.close(); |
169 | |
170 | from = new File(options.fout + "_snap_" + (FelixConfig.nDDIT-nIt)); |
171 | to = new File(options.fout); |
172 | if(to.exists()){ |
173 | to.delete(); |
174 | } |
175 | Files.copy(from, to); |
176 | |
177 | |
178 | |
179 | sum = 0; |
180 | |
181 | //CURRENTLY ASSUME ONE PREDICATE IS SHARED BY ONLY TWO OPERATORS |
182 | // this is ensured by Felix's compiler. |
183 | for(FelixPredicate fp : fq.getAllPred()){ |
184 | |
185 | ConcurrentOperatorsBucket cob1 = null; |
186 | ConcurrentOperatorsBucket cob2 = null; |
187 | |
188 | for(int i = ep.operators.size()-1; i >=0; i --){ |
189 | if(ep.operators.get(i).dd_CommonOutput.contains(fp)){ |
190 | if(cob1 == null){ |
191 | cob1 = ep.operators.get(i); |
192 | }else if(cob2 == null){ |
193 | cob2 = ep.operators.get(i); |
194 | }else{ |
195 | ExceptionMan.die("ERROR 12: " + |
196 | "There must be something wrong with the compiler"); |
197 | } |
198 | } |
199 | } |
200 | |
201 | if(cob1==null || cob2==null){ |
202 | continue; |
203 | } |
204 | |
205 | String tableName1; |
206 | String tableName2; |
207 | String priorTable1; |
208 | String priorTable2; |
209 | |
210 | tableName1 = cob1.dd_commonOutputPredicate_2_tableName.get(fp); |
211 | tableName2 = cob2.dd_commonOutputPredicate_2_tableName.get(fp); |
212 | |
213 | priorTable1 = cob1.dd_commonOutputPredicate_2_priorTableName.get(fp); |
214 | priorTable2 = cob2.dd_commonOutputPredicate_2_priorTableName.get(fp); |
215 | |
216 | |
217 | String args = StringMan.commaList(fp.getArgs()); |
218 | ArrayList<String> whereargsarray = new ArrayList<String>(); |
219 | for(String a : fp.getArgs()){ |
220 | whereargsarray.add("t0." + a + "=" + "t1." + a); |
221 | } |
222 | String whereargs = StringMan.join(" AND ", whereargsarray); |
223 | ArrayList<String> t0argsArray = new ArrayList<String>(); |
224 | for(String a : fp.getArgs()){ |
225 | t0argsArray.add("t0." + a); |
226 | } |
227 | String t0args = StringMan.commaList(t0argsArray); |
228 | |
229 | ArrayList<String> t1argsArray = new ArrayList<String>(); |
230 | for(String a : fp.getArgs()){ |
231 | t1argsArray.add("t1." + a); |
232 | } |
233 | String t1args = StringMan.commaList(t1argsArray); |
234 | |
235 | String priorArgs = StringMan.commaList(fp.getArgs()); |
236 | priorArgs = priorArgs + "," + "float_" + (fp.arity()+1); |
237 | priorArgs = "truth, club, " + priorArgs; |
238 | |
239 | if(options.marginal == true){ |
240 | |
241 | String deltaCase = "(CASE WHEN abs(t0.prior-t1.prior)<0.1 THEN " + 0 + |
242 | " ELSE 0.9*(t0.prior-t1.prior) END) AS prior11"; |
243 | String negDeltaCase = "(CASE WHEN abs(t1.prior-t0.prior)<0.1 THEN " + 0 + |
244 | " ELSE 0.9*(t1.prior-t0.prior) END) AS prior11"; |
245 | |
246 | |
247 | String sql = "(SELECT NULL::Float, TRUE, 2, " + t0args + ", " + deltaCase +" FROM " + tableName1 + " t0, " + tableName2 + " t1 " + " WHERE " |
248 | + whereargs+")"; |
249 | |
250 | sql = sql + " UNION ALL " + "(SELECT NULL::Float, TRUE, 2, " + t0args + ", t0.prior FROM " + tableName1 + " t0 " + |
251 | " WHERE (" + t0args + ") NOT IN (SELECT " + t1args + " FROM " + tableName2 + " t1) AND t0.prior>0.1)"; |
252 | |
253 | sql = "INSERT INTO " + priorTable2 + "(prior, " + priorArgs + ")" + " SELECT * FROM (" + sql + ") nt WHERE nt.prior11<>0"; |
254 | Felix.db.update(sql); |
255 | |
256 | |
257 | UIMan.println(">>> INSERT " + Felix.db.getLastUpdateRowCount() + " NEW TUPLES FOR " + fp + " OF " + cob2); |
258 | |
259 | sql = "(SELECT NULL::Float, TRUE, 2, " + t0args + "," + negDeltaCase + " FROM " + tableName1 + " t0, " + tableName2 + " t1 " + " WHERE " |
260 | + whereargs+")"; |
261 | |
262 | sql = sql + " UNION ALL " + "(SELECT NULL::Float, TRUE, 2, " + t0args + ", -t0.prior FROM " + tableName2 + " t0 " + |
263 | " WHERE (" + t0args + ") NOT IN (SELECT " + t1args + " FROM " + tableName1 + " t1) AND t0.prior>0.1)"; |
264 | |
265 | sql = "INSERT INTO " + priorTable1 + "(prior, " + priorArgs + ")" + " SELECT * FROM (" + sql + ") nt WHERE nt.prior11<>0"; |
266 | Felix.db.update(sql); |
267 | UIMan.println(">>> INSERT " + Felix.db.getLastUpdateRowCount() + " NEW TUPLES FOR " + fp + " OF " + cob1); |
268 | |
269 | sum += Felix.db.getLastUpdateRowCount(); |
270 | |
271 | }else{ |
272 | String sql = "SELECT NULL::Float, TRUE, 2, " + args + ", " + (0.9) + " FROM " + tableName1 + " WHERE (" |
273 | + args + ") NOT IN ( SELECT " + args + " FROM " + tableName2 + ")" |
274 | + " UNION ALL " + |
275 | "SELECT NULL::Float, TRUE, 2, " + args + ", " + (-0.9) + " FROM " + tableName2 + " WHERE (" |
276 | + args + ") NOT IN ( SELECT " + args + " FROM " + tableName1 + ")"; |
277 | sql = "INSERT INTO " + priorTable2 + "(prior, " + priorArgs + ")" + " SELECT * FROM (" + sql + ") nt"; |
278 | Felix.db.update(sql); |
279 | UIMan.println(">>> INSERT " + Felix.db.getLastUpdateRowCount() + " NEW TUPLES FOR " + fp + " OF " + cob2); |
280 | |
281 | sql = "SELECT NULL::Float, TRUE, 2, " + args + ", " + (-0.9) + " FROM " + tableName1 + " WHERE (" |
282 | + args + ") NOT IN ( SELECT " + args + " FROM " + tableName2 + ")" |
283 | + " UNION ALL " + |
284 | "SELECT NULL::Float, TRUE, 2, " + args + ", " + (0.9) + " FROM " + tableName2 + " WHERE (" |
285 | + args + ") NOT IN ( SELECT " + args + " FROM " + tableName1 + ")"; |
286 | sql = "INSERT INTO " + priorTable1 + "(prior, " + priorArgs + ")" + " SELECT * FROM (" + sql + ") nt"; |
287 | Felix.db.update(sql); |
288 | UIMan.println(">>> INSERT " + Felix.db.getLastUpdateRowCount() + " NEW TUPLES FOR " + fp + " OF " + cob1); |
289 | |
290 | sum += Felix.db.getLastUpdateRowCount(); |
291 | |
292 | } |
293 | } |
294 | |
295 | Felix.db.commit(); |
296 | |
297 | } |
298 | |
299 | |
300 | }catch(Exception e){ |
301 | e.printStackTrace(); |
302 | } |
303 | |
304 | |
305 | } |
306 | |
307 | /** |
308 | * The constructor. |
309 | * @param _ep |
310 | */ |
311 | public DDExecutor(ExecutionPlan _ep, FelixQuery _fq, FelixCommandOptions _options){ |
312 | ep = _ep; |
313 | dmoo = new DMOOptimizer(ep.getCostModel()); |
314 | fq = _fq; |
315 | options = _options; |
316 | } |
317 | |
318 | /** |
319 | * Output the results of this bucket. |
320 | * @param db |
321 | * @param fout |
322 | * @param p |
323 | */ |
324 | public void dumpMapAnswerForPredicate(RDB db, FelixPredicate p, BufferedWriter bufferedWriter) { |
325 | // spreadTruth(); |
326 | HashMap<Long,String> cmap = db.loadIdSymbolMapFromTable(); |
327 | try { |
328 | |
329 | int digits = 4; |
330 | |
331 | String sql; |
332 | String tableName = p.getRelName(); |
333 | String predName = p.getName(); |
334 | |
335 | if(options.useDualDecomposition){ |
336 | //sql = "SELECT * FROM " + tableName + " WHERE truth=TRUE ORDER BY prior DESC"; |
337 | if(!p.isCorefMapPredicate){ |
338 | if(p.belongsTo != null && p.belongsTo.dd_commonOutputPredicate_2_tableName.containsKey(p)){ |
339 | sql = "SELECT * FROM " + p.belongsTo.dd_commonOutputPredicate_2_tableName.get(p) + " WHERE truth=TRUE ORDER BY prior DESC"; |
340 | }else{ |
341 | sql = "SELECT * FROM " + p.getRelName() + " WHERE truth=TRUE ORDER BY prior DESC"; |
342 | } |
343 | }else{ |
344 | sql = "SELECT * FROM " + tableName + " ORDER BY prior DESC"; |
345 | } |
346 | }else{ |
347 | sql = "SELECT * FROM " + tableName + " WHERE truth=TRUE ORDER BY prior DESC"; |
348 | } |
349 | |
350 | ResultSet rs = db.query(sql); |
351 | while(rs == null){ |
352 | rs = db.query(sql); |
353 | } |
354 | while(rs.next()) { |
355 | String line = predName + "("; |
356 | ArrayList<String> cs = new ArrayList<String>(); |
357 | int ct = 0; |
358 | for(String a : p.getArgs()) { |
359 | |
360 | |
361 | if(p.getTypeAt(ct).isProbArg == true || p.getTypeAt(ct).isNonSymbolicType()){ |
362 | cs.add(rs.getDouble(a)+""); |
363 | }else{ |
364 | long c = rs.getLong(a); |
365 | |
366 | |
367 | String v = StringMan.escapeJavaString(cmap.get(c)); |
368 | |
369 | //if(v.matches("^[0-9].*$") && !StringMan.escapeJavaString(v).contains(" ")){ |
370 | // cs.add("" + StringMan.escapeJavaString(v) + ""); |
371 | //}else{ |
372 | cs.add("\"" + StringMan.escapeJavaString(v) + "\""); |
373 | //} |
374 | } |
375 | ct ++; |
376 | } |
377 | line += StringMan.commaList(cs) + ")"; |
378 | |
379 | double prior = 1; |
380 | if(options.marginal){ |
381 | |
382 | double prob; |
383 | if(rs.getString("prior") == null){ |
384 | prob = 1; |
385 | }else{ |
386 | prob = Double.valueOf(rs.getString("prior")); |
387 | } |
388 | |
389 | if(Config.output_prolog_format){ |
390 | |
391 | line = "tuffyPrediction(" + UIMan.decimalRound(digits, prob) + |
392 | ", " + line + ")."; |
393 | }else{ |
394 | line = UIMan.decimalRound(digits, prob) + "\t" + line; |
395 | |
396 | } |
397 | |
398 | }else{ |
399 | line = line; |
400 | } |
401 | |
402 | if(prior >= options.minProb){ |
403 | bufferedWriter.append(line + "\n"); |
404 | } |
405 | |
406 | } |
407 | rs.close(); |
408 | //bufferedWriter.close(); |
409 | } catch (Exception e) { |
410 | ExceptionMan.handle(e); |
411 | } |
412 | } |
413 | } |
414 | |
415 | |
416 | |
417 | |