EMMA Coverage Report (generated Tue Aug 23 05:57:12 CDT 2011)
[all classes][felix.optimizer]

COVERAGE SUMMARY FOR SOURCE FILE [Scheduler.java]

nameclass, %method, %block, %line, %
Scheduler.java100% (2/2)100% (10/10)95%  (643/675)95%  (122.7/129)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class Scheduler100% (1/1)100% (8/8)95%  (631/663)95%  (120.7/127)
orderOperators (OperatorBucketGraph): ExecutionPlan 100% (1/1)84%  (147/175)85%  (34.1/40)
parseCommonPredicates (ExecutionPlan): void 100% (1/1)97%  (138/142)98%  (20.6/21)
Scheduler (Felix, FelixQuery, FelixCommandOptions): void 100% (1/1)100% (12/12)100% (5/5)
dataDecomposition (HashSet, CostModel): OperatorBucketGraph 100% (1/1)100% (257/257)100% (43/43)
getOperatorBucketGraph (): OperatorBucketGraph 100% (1/1)100% (3/3)100% (1/1)
rank (Collection): ArrayList 100% (1/1)100% (16/16)100% (4/4)
ruleDecomposition (CostModel): HashSet 100% (1/1)100% (14/14)100% (3/3)
schedule (): ExecutionPlan 100% (1/1)100% (44/44)100% (10/10)
     
class Scheduler$1100% (1/1)100% (2/2)100% (12/12)100% (3/3)
Scheduler$1 (Scheduler): void 100% (1/1)100% (6/6)100% (2/2)
compare (ConcurrentOperatorsBucket, ConcurrentOperatorsBucket): int 100% (1/1)100% (6/6)100% (1/1)

1package felix.optimizer;
2 
3import java.util.ArrayList;
4import java.util.Collection;
5import java.util.Collections;
6import java.util.Comparator;
7import java.util.HashMap;
8import java.util.HashSet;
9 
10import tuffy.mln.Literal;
11import tuffy.mln.Predicate;
12import tuffy.ra.Expression;
13import tuffy.util.Config;
14 
15 
16import felix.dstruct.ConcurrentOperatorsBucket;
17import felix.dstruct.ExecutionPlan;
18import felix.dstruct.FelixClause;
19import felix.dstruct.FelixPredicate;
20import felix.dstruct.FelixQuery;
21import felix.dstruct.OperatorBucketGraph;
22import felix.dstruct.StatOperator;
23import felix.dstruct.StatOperator.OPType;
24import felix.main.Felix;
25import felix.parser.FelixCommandOptions;
26import felix.util.FelixUIMan;
27 
28/**
29 * The class of a scheduler, which takes as input a FelixQuery, and 
30 * outputs an physical execution plan that is ready to be fed into {@link Executor}.
31 * The
32 * output of a scheduler, i.e., {@link ExecutionPlan}, can be regarded
33 * as a description of the physical plan, which contains the execution
34 * order of different statistical operators.
35 * 
36 * <br/>
37 * TODO: + better cycle support for operator graph.
38 * 
39 * 
40 * @author Ce Zhang
41 *
42 */
43public class Scheduler {
44 
45        /**
46         * Felix object.
47         */
48        Felix parentFelix;
49        
50        /**
51         * Felix query to be scheduled.
52         */
53        FelixQuery fq;
54        
55        /**
56         * Command line options.
57         */
58        FelixCommandOptions options;
59 
60        /**
61         * Dependency graph between different operator buckets.
62         */
63        OperatorBucketGraph obg;
64        
65        /**
66         * Partition the rules into different operators.
67         * @param cm
68         * @return
69         */
70        public HashSet<StatOperator> ruleDecomposition(CostModel cm){
71                OperatorSelector os = new OperatorSelector(fq, cm, options);
72                HashSet<StatOperator> ops = os.getOperators();
73                return ops;
74        }
75        
76        /**
77         * Partition the data into different parts. This method will partition
78         * one operator returned by {@link Scheduler#ruleDecomposition} into different ones,
79         * with each of them deals with different portions of data. These operators
80         * will be put into a ConcurrentOperatorsBucket, which means they can be 
81         * executed in parallel.
82         * @param ops
83         * @param cm
84         * @return
85         */
86        public OperatorBucketGraph dataDecomposition(HashSet<StatOperator> ops, CostModel cm){
87                
88                OperatorBucketGraph obg = new OperatorBucketGraph();
89                int dpart = Config.getNumThreads()-1;
90                
91                if(Config.getNumThreads() > 1){
92                        for(StatOperator op : ops){
93                                DataCracker1991 dc = new DataCracker1991();
94                                dc.decompose(op);
95                                
96                                if(dc.isDecomposable == true && (options.decomposeTuffy || op.type != OPType.TUFFY)){
97                                        
98                                        FelixUIMan.println(0, 0, ">>> Decomposing the following operator into " + dpart + " parts:");
99                                        FelixUIMan.printobj(0, 1, op);
100                                        ConcurrentOperatorsBucket bucket = new ConcurrentOperatorsBucket(options.marginal);
101                                        
102                                        for(int i=0;i<dpart;i++){
103                                        
104                                                StatOperator newOp = op.clone();
105                                                newOp.dc = dc;
106                                                
107                                                HashMap<Predicate, Expression> toSig = new HashMap<Predicate, Expression>();
108                                                for(FelixClause fc : op.allRelevantFelixClause){
109                                                        
110                                                        newOp.dPart = dpart;
111                                                        newOp.nPart = i;
112                                                        
113                                                        HashSet<Expression> e = dc.getExpressions(fc, null, dpart, i, false);
114                                                        newOp.clauseConstraints.put(fc, e);
115                                                        
116                                                        for(FelixPredicate fop : newOp.outputPredicates){
117                                                                
118                                                                Predicate _p = fop;
119                                                                if(_p.isClosedWorld() || toSig.containsKey(_p) ){
120                                                                        continue;
121                                                                }
122                                                                
123                                                                HashSet<Expression> expForClause = dc.getExpressions(null, _p, dpart, i, true);
124                                                                
125                                                                if(expForClause.size() != 0){
126                                                                        toSig.put(_p, expForClause.iterator().next());
127                                                                }
128                                                                
129                                                        }
130                                                }
131                                                newOp.dataCrackerSignature = toSig.toString();
132                                                
133                                                newOp.partitionedInto = dpart;
134                                                
135                                                FelixUIMan.printobj(2, 2, newOp);
136                                                bucket.addOperator(newOp);
137                                        }
138                                        
139                                        obg.addOperator(bucket);
140                                        
141                                }else{
142                                        
143                                        FelixUIMan.println(0, 0, ">>> The following operator is not decomposable:\n\t" + op.toString() + "\n");
144                                        
145                                        ConcurrentOperatorsBucket bucket = new ConcurrentOperatorsBucket(options.marginal);
146                                        bucket.addOperator(op);
147                                        obg.addOperator(bucket);
148                                }
149                                
150                        }
151                }else{
152                        // do not partition
153                        for(StatOperator op : ops){
154 
155                                FelixUIMan.println(0, 0, ">>> Decomposing the following operator into " + dpart + " parts:");
156                                FelixUIMan.printobj(0, 1, op);
157                                        
158                                ConcurrentOperatorsBucket bucket = new ConcurrentOperatorsBucket(options.marginal);
159                                bucket.addOperator(op);
160                                obg.addOperator(bucket);
161                        }
162                }
163                
164                obg.parseDependency();
165                
166                //FelixUIMan.println(2, 0, obg.toString());
167                
168                return obg;
169        }
170        
171        /**
172         * Returns list of operator buckets sorted by precedence.
173         * @param torank
174         * @return
175         */
176        public ArrayList<ConcurrentOperatorsBucket> rank(Collection<ConcurrentOperatorsBucket> torank){
177                ArrayList<ConcurrentOperatorsBucket> ret = new ArrayList<ConcurrentOperatorsBucket>();
178                
179                ret.addAll(torank);
180                
181                Collections.sort(ret, new Comparator<ConcurrentOperatorsBucket>(){
182 
183                        @Override
184                        public int compare(ConcurrentOperatorsBucket o1,
185                                        ConcurrentOperatorsBucket o2) {
186                                // TODO Auto-generated method stub
187                                return o1.precedence - o2.precedence;
188                        }
189                        
190                });
191                
192                
193                return ret;
194        }
195        
196        /**
197         * Schedule the order of running the operators.
198         * @param obg
199         * @return
200         */
201        public ExecutionPlan orderOperators(OperatorBucketGraph obg){
202                ExecutionPlan ep = new ExecutionPlan();
203                
204                // generate the order of execution
205                ArrayList<ConcurrentOperatorsBucket> HotList = new ArrayList<ConcurrentOperatorsBucket>();
206                HashSet<ConcurrentOperatorsBucket> notFinished = new HashSet<ConcurrentOperatorsBucket>();
207                notFinished.addAll(obg.getOperators());
208                
209                while(notFinished.isEmpty() != true){
210                        ConcurrentOperatorsBucket op = null;
211                        
212                        int lowest = 10000;
213                        int numberOfClause = 0;
214                        // pick one with lowest precedence and put it in the last
215                        for(ConcurrentOperatorsBucket aop : notFinished){
216                                if(lowest > aop.getPrecedence()){
217                                        numberOfClause = aop.nStartingRule;
218                                        lowest = aop.getPrecedence();
219                                        op = aop;
220                                }
221                                
222                                if(lowest == aop.getPrecedence() && 
223                                                aop.nStartingRule > numberOfClause){
224                                        numberOfClause = aop.nStartingRule;
225                                        lowest = aop.getPrecedence();
226                                        op = aop;
227                                }
228                        }
229                        
230                        HotList = new ArrayList<ConcurrentOperatorsBucket>();
231                        HotList.add(op);
232                        
233                        while(HotList.isEmpty() != true){
234                        
235                                // TODO: CHANGE TO A SMARTER VERSION
236                                ConcurrentOperatorsBucket current = rank(HotList).get(0);
237                                HotList.remove(0);
238                                
239                                //put in the last position
240                                ep.addOperatorAfter(current);
241                                notFinished.remove(current);
242                                
243                                //put upstream opts in HotList, which will be added after current position
244                                //in next iterations
245                                HashSet<ConcurrentOperatorsBucket> upstreams = obg.upStreams.get(current);
246                                HotList.addAll(upstreams);
247                                HotList.retainAll(notFinished);
248                                                                
249                                //put downstream operators before current position
250                                HashSet<ConcurrentOperatorsBucket> downstreams = obg.downStreams.get(current);
251                                for(ConcurrentOperatorsBucket ooo : rank(downstreams)){
252                                        if(notFinished.contains(ooo) && !HotList.contains(ooo)){
253                                                ep.addOperatorBefore(ooo);
254                                        }
255                                        if(HotList.contains(ooo) && ooo.getPrecedence() == current.getPrecedence()
256                                                        && ooo.nStartingRule < current.nStartingRule){
257                                                ep.addOperatorBefore(ooo);
258                                                HotList.remove(ooo);
259                                                notFinished.remove(ooo);
260                                        }
261                                        if(!HotList.contains(ooo)){
262                                                notFinished.remove(ooo);
263                                        }
264                                }
265 
266                        }
267                }
268                
269                return ep;
270        }
271        
272        /**
273         * Returns operator bucket graph.
274         * @return
275         */
276        public OperatorBucketGraph getOperatorBucketGraph(){
277                return obg;
278        }
279        
280        /**
281         * Parses common predicates among buckets.
282         * @param _ep
283         */
284        public void parseCommonPredicates(ExecutionPlan _ep){
285                
286                HashMap<FelixPredicate, HashSet<ConcurrentOperatorsBucket>>
287                        commonPredicates = new HashMap<FelixPredicate, HashSet<ConcurrentOperatorsBucket>>();
288                
289                for(ConcurrentOperatorsBucket cob : _ep.operators){
290                                                
291                        for(FelixPredicate fp : cob.commonCandidates){
292                                if(fp.isClosedWorld() && !fp.isCorefMap()) continue;
293                                FelixPredicate _fp = fp;
294                                //if(fp.isCorefMap()){
295                                //        _fp = fp.getOriCorefPredicate();
296                                //}
297                                if(!commonPredicates.containsKey(_fp)){
298                                        commonPredicates.put(_fp, new HashSet<ConcurrentOperatorsBucket>());
299                                }
300                                commonPredicates.get(_fp).add(cob);
301                                
302                                if(_fp.isCorefPredicate && cob.outputPredicates.contains(_fp)){
303                                        _fp = fp.corefMAPPredicate;
304                                        if(!commonPredicates.containsKey(_fp)){
305                                                commonPredicates.put(_fp, new HashSet<ConcurrentOperatorsBucket>());
306                                        }
307                                        commonPredicates.get(_fp).add(cob);
308                                }
309                                        
310                        }
311                }
312                
313                for(FelixPredicate fp : commonPredicates.keySet()){
314                        if(commonPredicates.get(fp).size() > 1){
315                                _ep.dd_CommonPredicates.add(fp);
316                        }
317                }
318                
319                _ep.dd_Predicate2OperatorBucket = commonPredicates;
320                
321                for(FelixPredicate fp : _ep.dd_CommonPredicates){
322                        for(ConcurrentOperatorsBucket cob : _ep.dd_Predicate2OperatorBucket.get(fp)){
323                                cob.addCommonOutput(fp);
324                        }
325                }
326                
327        }
328        
329        /**
330         * The entry of this optimizer.
331         * @return
332         */
333        public ExecutionPlan schedule(){
334                
335                // decompose the whole MLN into different operators
336                HashSet<StatOperator> ops = this.ruleDecomposition(null);
337 
338                // decompose each operators into smaller operators dealing
339                // with different portion of data.
340                obg = this.dataDecomposition(ops, null);
341                
342                // optimize execution plan.
343                        //note that, in the future we way want to merge 1) operator ordering and 2) DMO optimization part
344                        //into an unified optimization model. however, currently, we only worry about them separately.
345                // therefore, current assumption is, the ordering of operators does not rely on the cost model.
346                ExecutionPlan ep = this.orderOperators(obg);
347                                
348                if(options.useDualDecomposition){
349                        this.parseCommonPredicates(ep);
350                }
351                
352                FelixUIMan.println(">>> Serialized Execution Plan:");
353                FelixUIMan.printobj(0, 1, ep);
354 
355                
356                CostModel cm = new CostModel(this.parentFelix);
357                
358                ep.setCostModel(cm);
359                
360                //DMOOptimizer dmoo = new DMOOptimizer(cm);
361                //this.optimizeDMO(ep, dmoo);
362                //dmoo.close();
363        
364                return ep;
365                
366        }
367        
368        /**
369         * The constructor.
370         * @param _felix
371         * @param _fq
372         * @param _options
373         */
374        public Scheduler(Felix _felix, FelixQuery _fq, FelixCommandOptions _options){
375                parentFelix = _felix;
376                fq = _fq;
377                options = _options;
378        }
379        
380        
381        
382}
383 
384 
385 

[all classes][felix.optimizer]
EMMA 2.0.5312 EclEmma Fix 2 (C) Vladimir Roubtsov