1 | package felix.dstruct; |
2 | |
3 | import java.util.HashMap; |
4 | import java.util.HashSet; |
5 | |
6 | import tuffy.mln.Predicate; |
7 | |
8 | |
9 | /** |
10 | * The class of an OperatorBucketGraph. Each operator graph contains 1) a set of |
11 | * {@link ConcurrentOperatorsBucket}; and 2) Dependencies between different buckets. |
12 | * Current assumption is DAG (although there are no data-structure-level constraints |
13 | * for DAG, some algorithms may become trivial when dealing with cycles). |
14 | * An OperatorBucketGraph is a complete description of the logic plan. |
15 | * |
16 | * @author Ce Zhang |
17 | * |
18 | */ |
19 | public class OperatorBucketGraph { |
20 | |
21 | /** |
22 | * Counter for buckets |
23 | * in this OperatorBucketGraph. |
24 | */ |
25 | int counter = 1; |
26 | |
27 | /** |
28 | * Set of operators in this OperatorBucketGraph. |
29 | */ |
30 | HashSet<ConcurrentOperatorsBucket> operators = new HashSet<ConcurrentOperatorsBucket>(); |
31 | |
32 | /** |
33 | * Map from the bucket to its upstream buckets. |
34 | */ |
35 | public HashMap<ConcurrentOperatorsBucket, HashSet<ConcurrentOperatorsBucket>> upStreams = |
36 | new HashMap<ConcurrentOperatorsBucket, HashSet<ConcurrentOperatorsBucket>>(); |
37 | |
38 | /** |
39 | * Map from the bucket to its downstream buckets. |
40 | */ |
41 | public HashMap<ConcurrentOperatorsBucket, HashSet<ConcurrentOperatorsBucket>> downStreams = |
42 | new HashMap<ConcurrentOperatorsBucket, HashSet<ConcurrentOperatorsBucket>>(); |
43 | |
44 | /** |
45 | * Add a new bucket to this OperatorBucketGraph. An ID will be |
46 | * automatically assigned to this bucket. |
47 | * @param sop statistical operator. |
48 | */ |
49 | public void addOperator(ConcurrentOperatorsBucket sop){ |
50 | assert(this.operators.contains(sop) == false); |
51 | |
52 | sop.id = this.counter ++; |
53 | |
54 | this.operators.add(sop); |
55 | this.upStreams.put(sop, new HashSet<ConcurrentOperatorsBucket>()); |
56 | this.downStreams.put(sop, new HashSet<ConcurrentOperatorsBucket>()); |
57 | } |
58 | |
59 | /** |
60 | * Add a new bucket relation to this OperatorBucketGraph. Here by ``bucket relation'' |
61 | * it means a directed edge between two buckets from upstream to downstream. |
62 | * @param upStream |
63 | * @param downStream |
64 | */ |
65 | private void addOperatorRelation(ConcurrentOperatorsBucket upStream, ConcurrentOperatorsBucket downStream){ |
66 | |
67 | assert(this.operators.contains(upStream) == true); |
68 | assert(this.operators.contains(downStream) == true); |
69 | |
70 | this.upStreams.get(downStream).add(upStream); |
71 | this.downStreams.get(upStream).add(downStream); |
72 | |
73 | } |
74 | |
75 | /** |
76 | * Parse the dependency between buckets. This function will transform |
77 | * the input/output predicates information of each statistical bucket |
78 | * into the upstream/downstream relations between statistical buckets. |
79 | * Note that, this upstream/downstream relation is only a preference and is |
80 | * not necessarily as the same as the final execution plan. |
81 | * |
82 | */ |
83 | public void parseDependency(){ |
84 | |
85 | HashMap<Predicate, HashSet<ConcurrentOperatorsBucket>> regardPredicateAsInput = |
86 | new HashMap<Predicate, HashSet<ConcurrentOperatorsBucket>>(); |
87 | HashMap<Predicate, HashSet<ConcurrentOperatorsBucket>> regardPredicateAsOutput = |
88 | new HashMap<Predicate, HashSet<ConcurrentOperatorsBucket>>(); |
89 | |
90 | for(ConcurrentOperatorsBucket op : this.operators){ |
91 | for(Predicate p : op.inputPredicates){ |
92 | if(!regardPredicateAsInput.containsKey(p)){ |
93 | regardPredicateAsInput.put(p, new HashSet<ConcurrentOperatorsBucket>()); |
94 | } |
95 | regardPredicateAsInput.get(p).add(op); |
96 | } |
97 | for(Predicate p : op.outputPredicates){ |
98 | if(!regardPredicateAsOutput.containsKey(p)){ |
99 | regardPredicateAsOutput.put(p, new HashSet<ConcurrentOperatorsBucket>()); |
100 | } |
101 | regardPredicateAsOutput.get(p).add(op); |
102 | } |
103 | } |
104 | |
105 | for(ConcurrentOperatorsBucket op : this.operators){ |
106 | for(Predicate p : op.inputPredicates){ |
107 | if(regardPredicateAsOutput.get(p) == null){ |
108 | continue; |
109 | } |
110 | for(ConcurrentOperatorsBucket op2 : regardPredicateAsOutput.get(p)){ |
111 | this.addOperatorRelation(op2, op); |
112 | } |
113 | } |
114 | } |
115 | |
116 | } |
117 | |
118 | /** |
119 | * Return the set of buckets in this OperatorBucketGraph. |
120 | */ |
121 | public HashSet<ConcurrentOperatorsBucket> getOperators(){ |
122 | return this.operators; |
123 | } |
124 | |
125 | /** |
126 | * Randomly pick one bucket in this OperatorBucketGraph. |
127 | */ |
128 | public ConcurrentOperatorsBucket getOneRandomOperator(){ |
129 | while(true){ |
130 | for(ConcurrentOperatorsBucket op : this.operators){ |
131 | if(Math.random() < 0.5){ |
132 | return op; |
133 | } |
134 | } |
135 | } |
136 | } |
137 | |
138 | /** |
139 | * Gets all upstream buckets of the given bucket. |
140 | */ |
141 | public HashSet<ConcurrentOperatorsBucket> getUpStreamOperator(ConcurrentOperatorsBucket sop){ |
142 | return this.upStreams.get(sop); |
143 | } |
144 | |
145 | /** |
146 | * Gets all downstream buckets of the given bucket. |
147 | */ |
148 | public HashSet<ConcurrentOperatorsBucket> getDownStreamOperator(ConcurrentOperatorsBucket oup){ |
149 | return this.downStreams.get(oup); |
150 | } |
151 | |
152 | } |