1 | package felix.optimizer; |
2 | |
3 | import java.util.ArrayList; |
4 | import java.util.Collection; |
5 | import java.util.Collections; |
6 | import java.util.Comparator; |
7 | import java.util.HashMap; |
8 | import java.util.HashSet; |
9 | |
10 | import tuffy.mln.Literal; |
11 | import tuffy.mln.Predicate; |
12 | import tuffy.ra.Expression; |
13 | import tuffy.util.Config; |
14 | |
15 | |
16 | import felix.dstruct.ConcurrentOperatorsBucket; |
17 | import felix.dstruct.ExecutionPlan; |
18 | import felix.dstruct.FelixClause; |
19 | import felix.dstruct.FelixPredicate; |
20 | import felix.dstruct.FelixQuery; |
21 | import felix.dstruct.OperatorBucketGraph; |
22 | import felix.dstruct.StatOperator; |
23 | import felix.dstruct.StatOperator.OPType; |
24 | import felix.main.Felix; |
25 | import felix.parser.FelixCommandOptions; |
26 | import felix.util.FelixUIMan; |
27 | |
28 | /** |
29 | * The class of a scheduler, which takes as input a FelixQuery, and |
30 | * outputs an physical execution plan that is ready to be fed into {@link Executor}. |
31 | * The |
32 | * output of a scheduler, i.e., {@link ExecutionPlan}, can be regarded |
33 | * as a description of the physical plan, which contains the execution |
34 | * order of different statistical operators. |
35 | * |
36 | * <br/> |
37 | * TODO: + better cycle support for operator graph. |
38 | * |
39 | * |
40 | * @author Ce Zhang |
41 | * |
42 | */ |
43 | public class Scheduler { |
44 | |
45 | /** |
46 | * Felix object. |
47 | */ |
48 | Felix parentFelix; |
49 | |
50 | /** |
51 | * Felix query to be scheduled. |
52 | */ |
53 | FelixQuery fq; |
54 | |
55 | /** |
56 | * Command line options. |
57 | */ |
58 | FelixCommandOptions options; |
59 | |
60 | /** |
61 | * Dependency graph between different operator buckets. |
62 | */ |
63 | OperatorBucketGraph obg; |
64 | |
65 | /** |
66 | * Partition the rules into different operators. |
67 | * @param cm |
68 | * @return |
69 | */ |
70 | public HashSet<StatOperator> ruleDecomposition(CostModel cm){ |
71 | OperatorSelector os = new OperatorSelector(fq, cm, options); |
72 | HashSet<StatOperator> ops = os.getOperators(); |
73 | return ops; |
74 | } |
75 | |
76 | /** |
77 | * Partition the data into different parts. This method will partition |
78 | * one operator returned by {@link Scheduler#ruleDecomposition} into different ones, |
79 | * with each of them deals with different portions of data. These operators |
80 | * will be put into a ConcurrentOperatorsBucket, which means they can be |
81 | * executed in parallel. |
82 | * @param ops |
83 | * @param cm |
84 | * @return |
85 | */ |
86 | public OperatorBucketGraph dataDecomposition(HashSet<StatOperator> ops, CostModel cm){ |
87 | |
88 | OperatorBucketGraph obg = new OperatorBucketGraph(); |
89 | int dpart = Config.getNumThreads()-1; |
90 | |
91 | if(Config.getNumThreads() > 1){ |
92 | for(StatOperator op : ops){ |
93 | DataCracker1991 dc = new DataCracker1991(); |
94 | dc.decompose(op); |
95 | |
96 | if(dc.isDecomposable == true && (options.decomposeTuffy || op.type != OPType.TUFFY)){ |
97 | |
98 | FelixUIMan.println(0, 0, ">>> Decomposing the following operator into " + dpart + " parts:"); |
99 | FelixUIMan.printobj(0, 1, op); |
100 | ConcurrentOperatorsBucket bucket = new ConcurrentOperatorsBucket(options.marginal); |
101 | |
102 | for(int i=0;i<dpart;i++){ |
103 | |
104 | StatOperator newOp = op.clone(); |
105 | newOp.dc = dc; |
106 | |
107 | HashMap<Predicate, Expression> toSig = new HashMap<Predicate, Expression>(); |
108 | for(FelixClause fc : op.allRelevantFelixClause){ |
109 | |
110 | newOp.dPart = dpart; |
111 | newOp.nPart = i; |
112 | |
113 | HashSet<Expression> e = dc.getExpressions(fc, null, dpart, i, false); |
114 | newOp.clauseConstraints.put(fc, e); |
115 | |
116 | for(FelixPredicate fop : newOp.outputPredicates){ |
117 | |
118 | Predicate _p = fop; |
119 | if(_p.isClosedWorld() || toSig.containsKey(_p) ){ |
120 | continue; |
121 | } |
122 | |
123 | HashSet<Expression> expForClause = dc.getExpressions(null, _p, dpart, i, true); |
124 | |
125 | if(expForClause.size() != 0){ |
126 | toSig.put(_p, expForClause.iterator().next()); |
127 | } |
128 | |
129 | } |
130 | } |
131 | newOp.dataCrackerSignature = toSig.toString(); |
132 | |
133 | newOp.partitionedInto = dpart; |
134 | |
135 | FelixUIMan.printobj(2, 2, newOp); |
136 | bucket.addOperator(newOp); |
137 | } |
138 | |
139 | obg.addOperator(bucket); |
140 | |
141 | }else{ |
142 | |
143 | FelixUIMan.println(0, 0, ">>> The following operator is not decomposable:\n\t" + op.toString() + "\n"); |
144 | |
145 | ConcurrentOperatorsBucket bucket = new ConcurrentOperatorsBucket(options.marginal); |
146 | bucket.addOperator(op); |
147 | obg.addOperator(bucket); |
148 | } |
149 | |
150 | } |
151 | }else{ |
152 | // do not partition |
153 | for(StatOperator op : ops){ |
154 | |
155 | FelixUIMan.println(0, 0, ">>> Decomposing the following operator into " + dpart + " parts:"); |
156 | FelixUIMan.printobj(0, 1, op); |
157 | |
158 | ConcurrentOperatorsBucket bucket = new ConcurrentOperatorsBucket(options.marginal); |
159 | bucket.addOperator(op); |
160 | obg.addOperator(bucket); |
161 | } |
162 | } |
163 | |
164 | obg.parseDependency(); |
165 | |
166 | //FelixUIMan.println(2, 0, obg.toString()); |
167 | |
168 | return obg; |
169 | } |
170 | |
171 | /** |
172 | * Returns list of operator buckets sorted by precedence. |
173 | * @param torank |
174 | * @return |
175 | */ |
176 | public ArrayList<ConcurrentOperatorsBucket> rank(Collection<ConcurrentOperatorsBucket> torank){ |
177 | ArrayList<ConcurrentOperatorsBucket> ret = new ArrayList<ConcurrentOperatorsBucket>(); |
178 | |
179 | ret.addAll(torank); |
180 | |
181 | Collections.sort(ret, new Comparator<ConcurrentOperatorsBucket>(){ |
182 | |
183 | @Override |
184 | public int compare(ConcurrentOperatorsBucket o1, |
185 | ConcurrentOperatorsBucket o2) { |
186 | // TODO Auto-generated method stub |
187 | return o1.precedence - o2.precedence; |
188 | } |
189 | |
190 | }); |
191 | |
192 | |
193 | return ret; |
194 | } |
195 | |
196 | /** |
197 | * Schedule the order of running the operators. |
198 | * @param obg |
199 | * @return |
200 | */ |
201 | public ExecutionPlan orderOperators(OperatorBucketGraph obg){ |
202 | ExecutionPlan ep = new ExecutionPlan(); |
203 | |
204 | // generate the order of execution |
205 | ArrayList<ConcurrentOperatorsBucket> HotList = new ArrayList<ConcurrentOperatorsBucket>(); |
206 | HashSet<ConcurrentOperatorsBucket> notFinished = new HashSet<ConcurrentOperatorsBucket>(); |
207 | notFinished.addAll(obg.getOperators()); |
208 | |
209 | while(notFinished.isEmpty() != true){ |
210 | ConcurrentOperatorsBucket op = null; |
211 | |
212 | int lowest = 10000; |
213 | int numberOfClause = 0; |
214 | // pick one with lowest precedence and put it in the last |
215 | for(ConcurrentOperatorsBucket aop : notFinished){ |
216 | if(lowest > aop.getPrecedence()){ |
217 | numberOfClause = aop.nStartingRule; |
218 | lowest = aop.getPrecedence(); |
219 | op = aop; |
220 | } |
221 | |
222 | if(lowest == aop.getPrecedence() && |
223 | aop.nStartingRule > numberOfClause){ |
224 | numberOfClause = aop.nStartingRule; |
225 | lowest = aop.getPrecedence(); |
226 | op = aop; |
227 | } |
228 | } |
229 | |
230 | HotList = new ArrayList<ConcurrentOperatorsBucket>(); |
231 | HotList.add(op); |
232 | |
233 | while(HotList.isEmpty() != true){ |
234 | |
235 | // TODO: CHANGE TO A SMARTER VERSION |
236 | ConcurrentOperatorsBucket current = rank(HotList).get(0); |
237 | HotList.remove(0); |
238 | |
239 | //put in the last position |
240 | ep.addOperatorAfter(current); |
241 | notFinished.remove(current); |
242 | |
243 | //put upstream opts in HotList, which will be added after current position |
244 | //in next iterations |
245 | HashSet<ConcurrentOperatorsBucket> upstreams = obg.upStreams.get(current); |
246 | HotList.addAll(upstreams); |
247 | HotList.retainAll(notFinished); |
248 | |
249 | //put downstream operators before current position |
250 | HashSet<ConcurrentOperatorsBucket> downstreams = obg.downStreams.get(current); |
251 | for(ConcurrentOperatorsBucket ooo : rank(downstreams)){ |
252 | if(notFinished.contains(ooo) && !HotList.contains(ooo)){ |
253 | ep.addOperatorBefore(ooo); |
254 | } |
255 | if(HotList.contains(ooo) && ooo.getPrecedence() == current.getPrecedence() |
256 | && ooo.nStartingRule < current.nStartingRule){ |
257 | ep.addOperatorBefore(ooo); |
258 | HotList.remove(ooo); |
259 | notFinished.remove(ooo); |
260 | } |
261 | if(!HotList.contains(ooo)){ |
262 | notFinished.remove(ooo); |
263 | } |
264 | } |
265 | |
266 | } |
267 | } |
268 | |
269 | return ep; |
270 | } |
271 | |
272 | /** |
273 | * Returns operator bucket graph. |
274 | * @return |
275 | */ |
276 | public OperatorBucketGraph getOperatorBucketGraph(){ |
277 | return obg; |
278 | } |
279 | |
280 | /** |
281 | * Parses common predicates among buckets. |
282 | * @param _ep |
283 | */ |
284 | public void parseCommonPredicates(ExecutionPlan _ep){ |
285 | |
286 | HashMap<FelixPredicate, HashSet<ConcurrentOperatorsBucket>> |
287 | commonPredicates = new HashMap<FelixPredicate, HashSet<ConcurrentOperatorsBucket>>(); |
288 | |
289 | for(ConcurrentOperatorsBucket cob : _ep.operators){ |
290 | |
291 | for(FelixPredicate fp : cob.commonCandidates){ |
292 | if(fp.isClosedWorld() && !fp.isCorefMap()) continue; |
293 | FelixPredicate _fp = fp; |
294 | //if(fp.isCorefMap()){ |
295 | // _fp = fp.getOriCorefPredicate(); |
296 | //} |
297 | if(!commonPredicates.containsKey(_fp)){ |
298 | commonPredicates.put(_fp, new HashSet<ConcurrentOperatorsBucket>()); |
299 | } |
300 | commonPredicates.get(_fp).add(cob); |
301 | |
302 | if(_fp.isCorefPredicate && cob.outputPredicates.contains(_fp)){ |
303 | _fp = fp.corefMAPPredicate; |
304 | if(!commonPredicates.containsKey(_fp)){ |
305 | commonPredicates.put(_fp, new HashSet<ConcurrentOperatorsBucket>()); |
306 | } |
307 | commonPredicates.get(_fp).add(cob); |
308 | } |
309 | |
310 | } |
311 | } |
312 | |
313 | for(FelixPredicate fp : commonPredicates.keySet()){ |
314 | if(commonPredicates.get(fp).size() > 1){ |
315 | _ep.dd_CommonPredicates.add(fp); |
316 | } |
317 | } |
318 | |
319 | _ep.dd_Predicate2OperatorBucket = commonPredicates; |
320 | |
321 | for(FelixPredicate fp : _ep.dd_CommonPredicates){ |
322 | for(ConcurrentOperatorsBucket cob : _ep.dd_Predicate2OperatorBucket.get(fp)){ |
323 | cob.addCommonOutput(fp); |
324 | } |
325 | } |
326 | |
327 | } |
328 | |
329 | /** |
330 | * The entry of this optimizer. |
331 | * @return |
332 | */ |
333 | public ExecutionPlan schedule(){ |
334 | |
335 | // decompose the whole MLN into different operators |
336 | HashSet<StatOperator> ops = this.ruleDecomposition(null); |
337 | |
338 | // decompose each operators into smaller operators dealing |
339 | // with different portion of data. |
340 | obg = this.dataDecomposition(ops, null); |
341 | |
342 | // optimize execution plan. |
343 | //note that, in the future we way want to merge 1) operator ordering and 2) DMO optimization part |
344 | //into an unified optimization model. however, currently, we only worry about them separately. |
345 | // therefore, current assumption is, the ordering of operators does not rely on the cost model. |
346 | ExecutionPlan ep = this.orderOperators(obg); |
347 | |
348 | if(options.useDualDecomposition){ |
349 | this.parseCommonPredicates(ep); |
350 | } |
351 | |
352 | FelixUIMan.println(">>> Serialized Execution Plan:"); |
353 | FelixUIMan.printobj(0, 1, ep); |
354 | |
355 | |
356 | CostModel cm = new CostModel(this.parentFelix); |
357 | |
358 | ep.setCostModel(cm); |
359 | |
360 | //DMOOptimizer dmoo = new DMOOptimizer(cm); |
361 | //this.optimizeDMO(ep, dmoo); |
362 | //dmoo.close(); |
363 | |
364 | return ep; |
365 | |
366 | } |
367 | |
368 | /** |
369 | * The constructor. |
370 | * @param _felix |
371 | * @param _fq |
372 | * @param _options |
373 | */ |
374 | public Scheduler(Felix _felix, FelixQuery _fq, FelixCommandOptions _options){ |
375 | parentFelix = _felix; |
376 | fq = _fq; |
377 | options = _options; |
378 | } |
379 | |
380 | |
381 | |
382 | } |
383 | |
384 | |
385 | |