1 | package felix.executor; |
2 | |
3 | import java.io.BufferedWriter; |
4 | import java.sql.ResultSet; |
5 | import java.util.ArrayList; |
6 | import java.util.HashMap; |
7 | import java.util.HashSet; |
8 | |
9 | import tuffy.db.RDB; |
10 | import tuffy.infer.DataMover; |
11 | import tuffy.mln.MarkovLogicNetwork; |
12 | import tuffy.mln.Predicate; |
13 | import tuffy.parse.CommandOptions; |
14 | import tuffy.util.Config; |
15 | import tuffy.util.ExceptionMan; |
16 | import tuffy.util.FileMan; |
17 | import tuffy.util.StringMan; |
18 | import tuffy.util.UIMan; |
19 | |
20 | import felix.dstruct.ConcurrentOperatorsBucket; |
21 | import felix.dstruct.ExecutionPlan; |
22 | import felix.dstruct.FelixPredicate; |
23 | import felix.dstruct.FelixQuery; |
24 | import felix.dstruct.StatOperator.OPType; |
25 | import felix.optimizer.DMOOptimizer; |
26 | import felix.parser.FelixCommandOptions; |
27 | import felix.util.FelixConfig; |
28 | import felix.util.FelixUIMan; |
29 | |
30 | |
31 | |
32 | /** |
33 | * Class for executing a given physical {@link ExecutionPlan}. |
34 | * |
35 | * @deprecated |
36 | * |
37 | * @author Ce Zhang |
38 | * |
39 | */ |
40 | public class Executor { |
41 | |
42 | /** |
43 | * The execution plan to be executed. |
44 | */ |
45 | ExecutionPlan ep; |
46 | |
47 | /** |
48 | * The FelixQuery used by this Executor. |
49 | */ |
50 | FelixQuery fq; |
51 | |
52 | /** |
53 | * The FelixCommandOptions used by this Executor. |
54 | */ |
55 | FelixCommandOptions options; |
56 | |
57 | /** |
58 | * The DMOOptimizer used by this Executor. |
59 | */ |
60 | public DMOOptimizer dmoo; |
61 | |
62 | /** |
63 | * Execute this plan. If Felix runs in |
64 | * explain mode, just prints out the physical plan. |
65 | * |
66 | * <p> TODO: Find a better way to the explain mode (e.g., a graph). |
67 | */ |
68 | @SuppressWarnings("unchecked") |
69 | public void run(){ |
70 | try{ |
71 | |
72 | if(FelixConfig.explainMode){ |
73 | FelixUIMan.println(0, 0, "\nExecution Plan:\n"); |
74 | } |
75 | |
76 | ArrayList<ConcurrentOperatorsBucket> toBeJoined = new ArrayList<ConcurrentOperatorsBucket>(); |
77 | |
78 | HashSet<FelixPredicate> finishedPredicates = new HashSet<FelixPredicate>(); |
79 | |
80 | |
81 | for(int i = ep.operators.size()-1; i >=0; i --){ |
82 | |
83 | if(!FelixConfig.explainMode){ |
84 | //model for bucket core allocator |
85 | ep.operators.get(i).setNCore(Config.getNumThreads()); |
86 | ep.operators.get(i).pushPredicateScopes(finishedPredicates); |
87 | dmoo.optimizeDMO(ep.operators.get(i)); |
88 | ep.operators.get(i).start(); |
89 | |
90 | } |
91 | |
92 | HashSet<Predicate> currentOutput = (HashSet<Predicate>) |
93 | ep.operators.get(i).outputPredicates.clone(); |
94 | |
95 | HashSet<Predicate> nextInput = new HashSet<Predicate>(); |
96 | if(i>0){ |
97 | nextInput = (HashSet<Predicate>) |
98 | ep.operators.get(i-1).inputPredicates.clone(); |
99 | } |
100 | |
101 | currentOutput.retainAll(nextInput); |
102 | |
103 | //TODO: |
104 | if(currentOutput.size() != 0 || i == 0 || toBeJoined.size() == FelixConfig.nCores - 1){ |
105 | toBeJoined.add(ep.operators.get(i)); |
106 | |
107 | if(FelixConfig.explainMode){ |
108 | FelixUIMan.println("Concurrently:"); |
109 | } |
110 | |
111 | for(ConcurrentOperatorsBucket o : toBeJoined){ |
112 | if(FelixConfig.explainMode){ |
113 | FelixUIMan.println(0,0, o.toString()); |
114 | }else{ |
115 | o.myJoin(); |
116 | finishedPredicates.addAll(o.outputPredicates); |
117 | } |
118 | } |
119 | |
120 | toBeJoined.clear(); |
121 | |
122 | }else{ |
123 | toBeJoined.add(ep.operators.get(i)); |
124 | } |
125 | |
126 | } |
127 | |
128 | |
129 | if(!FelixConfig.explainMode){ |
130 | BufferedWriter bw = FileMan.getBufferedWriterMaybeGZ(options.fout); |
131 | RDB newDB = RDB.getRDBbyConfig(Config.db_schema); |
132 | for(FelixPredicate fp : fq.getPredicates()){ |
133 | |
134 | if(fp.hasQuery()){ |
135 | |
136 | UIMan.println(">>> Dumping results for " + fp + "\n"); |
137 | this.dumpMapAnswerForPredicate(newDB, fp, bw); |
138 | |
139 | } |
140 | } |
141 | newDB.close(); |
142 | bw.close(); |
143 | } |
144 | |
145 | }catch(Exception e){ |
146 | e.printStackTrace(); |
147 | } |
148 | |
149 | } |
150 | |
151 | /** |
152 | * The constructor. |
153 | * @param _ep |
154 | */ |
155 | public Executor(ExecutionPlan _ep, FelixQuery _fq, FelixCommandOptions _options){ |
156 | ep = _ep; |
157 | dmoo = new DMOOptimizer(ep.getCostModel()); |
158 | fq = _fq; |
159 | options = _options; |
160 | } |
161 | |
162 | /** |
163 | * Output the results of this bucket. |
164 | * @param db |
165 | * @param fout |
166 | * @param p |
167 | */ |
168 | public void dumpMapAnswerForPredicate(RDB db, FelixPredicate p, BufferedWriter bufferedWriter) { |
169 | |
170 | // spreadTruth(); |
171 | HashMap<Long,String> cmap = db.loadIdSymbolMapFromTable(); |
172 | try { |
173 | |
174 | int digits = 4; |
175 | |
176 | String sql; |
177 | String tableName = p.getRelName(); |
178 | String predName = p.getName(); |
179 | |
180 | sql = "SELECT * FROM " + tableName + " WHERE truth=TRUE ORDER BY prior DESC"; |
181 | |
182 | ResultSet rs = db.query(sql); |
183 | while(rs == null){ |
184 | rs = db.query(sql); |
185 | } |
186 | while(rs.next()) { |
187 | String line = predName + "("; |
188 | ArrayList<String> cs = new ArrayList<String>(); |
189 | int ct = 0; |
190 | for(String a : p.getArgs()) { |
191 | |
192 | |
193 | if(p.getTypeAt(ct).isProbArg == true || p.getTypeAt(ct).isNonSymbolicType()){ |
194 | cs.add(rs.getDouble(a)+""); |
195 | }else{ |
196 | long c = rs.getLong(a); |
197 | |
198 | |
199 | String v = StringMan.escapeJavaString(cmap.get(c)); |
200 | |
201 | //if(v.matches("^[0-9].*$") && !StringMan.escapeJavaString(v).contains(" ")){ |
202 | // cs.add("" + StringMan.escapeJavaString(v) + ""); |
203 | //}else{ |
204 | cs.add("\"" + StringMan.escapeJavaString(v) + "\""); |
205 | //} |
206 | } |
207 | ct ++; |
208 | } |
209 | line += StringMan.commaList(cs) + ")"; |
210 | |
211 | double prior = 1; |
212 | if(options.marginal){ |
213 | |
214 | double prob; |
215 | if(rs.getString("prior") == null){ |
216 | prob = 1; |
217 | }else{ |
218 | prob = Double.valueOf(rs.getString("prior")); |
219 | } |
220 | |
221 | if(Config.output_prolog_format){ |
222 | |
223 | line = "tuffyPrediction(" + UIMan.decimalRound(digits, prob) + |
224 | ", " + line + ")."; |
225 | }else{ |
226 | line = UIMan.decimalRound(digits, prob) + "\t" + line; |
227 | |
228 | } |
229 | |
230 | }else{ |
231 | line = line; |
232 | } |
233 | |
234 | if(prior >= options.minProb){ |
235 | bufferedWriter.append(line + "\n"); |
236 | } |
237 | |
238 | } |
239 | rs.close(); |
240 | //bufferedWriter.close(); |
241 | } catch (Exception e) { |
242 | ExceptionMan.handle(e); |
243 | } |
244 | } |
245 | } |
246 | |
247 | |
248 | |
249 | |