1 | package felix.dstruct; |
2 | |
3 | import java.util.ArrayList; |
4 | import java.util.Collection; |
5 | import java.util.HashMap; |
6 | import java.util.HashSet; |
7 | import java.util.List; |
8 | |
9 | import felix.dstruct.FelixPredicate.FPProperty; |
10 | import felix.optimizer.DataCracker1991; |
11 | import felix.parser.FelixCommandOptions; |
12 | import felix.util.FelixStringMan; |
13 | import felix.util.FelixUIMan; |
14 | |
15 | import tuffy.db.RDB; |
16 | import tuffy.mln.Predicate; |
17 | import tuffy.ra.ConjunctiveQuery; |
18 | import tuffy.ra.Expression; |
19 | import tuffy.util.ExceptionMan; |
20 | import tuffy.util.StringMan; |
21 | |
22 | |
23 | /** |
24 | * Abstract class of a statistical operator (e.g., Tuffy, Coref, CRF etc. ). |
25 | * A valid Felix operator should extend this class and implement methods |
26 | * like run(), prepare(), etc. |
27 | * |
28 | * @author Ce Zhang |
29 | * |
30 | */ |
31 | public abstract class StatOperator extends Thread implements Cloneable{ |
32 | |
33 | /** |
34 | * Parameter assigning the degree of data partitioning. |
35 | */ |
36 | public int partitionedInto = 1; |
37 | |
38 | public DataCracker1991 dc = null; |
39 | |
40 | public int dPart = 1; |
41 | |
42 | public int nPart = 1; |
43 | |
44 | /** |
45 | * @deprecated |
46 | */ |
47 | public boolean currentState = false; |
48 | |
49 | /** |
50 | * The set of predicates that are shared with other |
51 | * operators via dual decomposition. |
52 | */ |
53 | protected HashSet<FelixPredicate> dd_CommonOutput = new HashSet<FelixPredicate>(); |
54 | |
55 | /** |
56 | * Map from predicate to database table name |
57 | */ |
58 | protected HashMap<FelixPredicate, String> dd_commonOutputPredicate_2_tableName = |
59 | new HashMap<FelixPredicate, String>(); |
60 | |
61 | /** |
62 | * Type of StatOperator. |
63 | */ |
64 | public enum OPType {CRF, LR, COREF, TUFFY}; |
65 | |
66 | /** |
67 | * The human-readable representation of data partitions. This is only used in toString() |
68 | */ |
69 | public String dataCrackerSignature = null; |
70 | |
71 | /** |
72 | * Rules for Lagrangian Multipliers. |
73 | */ |
74 | public HashSet<FelixClause> dd_PriorClauses = new HashSet<FelixClause>(); |
75 | |
76 | /** |
77 | * Map from clauses to expressions partitioning the data. |
78 | */ |
79 | public HashMap<FelixClause, HashSet<Expression>> clauseConstraints = |
80 | new HashMap<FelixClause, HashSet<Expression>>(); |
81 | |
82 | /** |
83 | * The ConcurrentOperatorsBucket this StatOperator belongs to. |
84 | * StatOperators in one ConcurrentOperatorsBucket can be executed |
85 | * in parallel. |
86 | */ |
87 | public ConcurrentOperatorsBucket belongsToBucket = null; |
88 | |
89 | /** |
90 | * Whether this operator works in marginal or MAP mode. |
91 | */ |
92 | public boolean isMarginal = true; |
93 | |
94 | /** |
95 | * Options parsed from command line or configuration file. |
96 | */ |
97 | protected FelixCommandOptions options = null; |
98 | |
99 | /** |
100 | * Predicates computed by previous operators. |
101 | */ |
102 | protected HashSet<String> inputPredicateScope = new HashSet<String>(); |
103 | |
104 | /** |
105 | * Max time this operator can run. This parameter will be useful |
106 | * when there are cycles in the operator graph. |
107 | * @deprecated |
108 | */ |
109 | int maxRunTime = 5; |
110 | |
111 | /** |
112 | * Set of predicates that can potentially be shared with |
113 | * other operators via dual decomposition. |
114 | */ |
115 | public HashSet<FelixPredicate> commonCandidate = new HashSet<FelixPredicate>(); |
116 | |
117 | /** |
118 | * How many times this operator has run. |
119 | * @deprecated |
120 | */ |
121 | int currentRunTime = 0; |
122 | |
123 | /** |
124 | * Global counter of StatOperators ID. |
125 | */ |
126 | static int idCounter = 1; |
127 | |
128 | /** |
129 | * ID of this operator. Note that, the statistical operator returned by |
130 | * .clone() has a different ID. |
131 | */ |
132 | int id = -1; |
133 | |
134 | /** |
135 | * Type of this operator. |
136 | */ |
137 | public OPType type = null; |
138 | |
139 | /** |
140 | * Set of predicates whose values are used by this operator as inputs. |
141 | */ |
142 | public HashSet<FelixPredicate> inputPredicates = new HashSet<FelixPredicate>(); |
143 | |
144 | /** |
145 | * Set of predicates whose values are output by this operator. |
146 | */ |
147 | public HashSet<FelixPredicate> outputPredicates = new HashSet<FelixPredicate>(); |
148 | |
149 | /** |
150 | * Set of clauses assigned to this operator. |
151 | */ |
152 | public HashSet<FelixClause> allRelevantFelixClause = new HashSet<FelixClause>(); |
153 | |
154 | /** |
155 | * Database connection. |
156 | */ |
157 | public RDB db; |
158 | |
159 | /** |
160 | * Felix query. |
161 | */ |
162 | public FelixQuery fq; |
163 | |
164 | /** |
165 | * @deprecated |
166 | */ |
167 | public boolean isBinaryArbLR = false; |
168 | |
169 | /** |
170 | * The precedence of this operator. The larger this value, the earlier this operator |
171 | * will be run in the final physical plan. |
172 | */ |
173 | protected int precedence = -1; |
174 | |
175 | /** |
176 | * @deprecated |
177 | * @param _predicates |
178 | */ |
179 | public void pushPredicateScopes(HashSet<String> _predicates){ |
180 | this.inputPredicateScope.addAll(_predicates); |
181 | } |
182 | |
183 | /** |
184 | * Returns a clone of this statistical operator. |
185 | */ |
186 | @SuppressWarnings("unchecked") |
187 | public StatOperator clone(){ |
188 | try{ |
189 | StatOperator ret; |
190 | |
191 | if(this.type == OPType.LR){ |
192 | ret = this.getClass().getConstructor( |
193 | this.fq.getClass(), |
194 | this.outputPredicates.getClass(), |
195 | this.options.getClass()) |
196 | .newInstance(this.fq, this.outputPredicates, this.options); |
197 | }else{ |
198 | ret = this.getClass().getConstructor( |
199 | this.fq.getClass(), |
200 | this.outputPredicates.getClass(), |
201 | this.options.getClass()) |
202 | .newInstance(this.fq, this.outputPredicates, this.options); |
203 | } |
204 | |
205 | |
206 | for(FelixClause fc : this.allRelevantFelixClause){ |
207 | ret.registerRelevantClause(fc); |
208 | } |
209 | ret.sealDefinition(); |
210 | |
211 | |
212 | ret.clauseConstraints = (HashMap<FelixClause, HashSet<Expression>>) |
213 | this.clauseConstraints.clone(); |
214 | ret.belongsToBucket = this.belongsToBucket; |
215 | ret.isMarginal = this.isMarginal; |
216 | ret.precedence = this.precedence; |
217 | |
218 | return ret; |
219 | }catch(Exception e){ |
220 | e.printStackTrace(); |
221 | return null; |
222 | } |
223 | |
224 | } |
225 | |
226 | /** |
227 | * See {@link ConcurrentOperatorsBucket#nStartingRule}. |
228 | */ |
229 | public int nStartingRules = 0; |
230 | |
231 | /** |
232 | * Sets precedence of this statistical operator. |
233 | * @param _value |
234 | */ |
235 | public void setPrecedence(int _value){ |
236 | this.precedence = _value; |
237 | } |
238 | |
239 | /** |
240 | * Returns set of all clauses associated with the given property. |
241 | * @param col |
242 | * @param prop |
243 | * @return |
244 | */ |
245 | HashSet<FelixClause> getPropertyClausesUnion(Collection<FelixPredicate> col, FPProperty prop){ |
246 | HashSet<FelixClause> ret = new HashSet<FelixClause>(); |
247 | for(FelixPredicate fp : col){ |
248 | ret.addAll(fp.getPropertyClauses(prop)); |
249 | } |
250 | return ret; |
251 | } |
252 | |
253 | /** |
254 | * the constructor. |
255 | * @param _oriMLN |
256 | * @param _opt |
257 | */ |
258 | public StatOperator(FelixQuery _fq, HashSet<FelixPredicate> _goalPredicates, FelixCommandOptions _opt){ |
259 | this.outputPredicates.clear(); |
260 | this.outputPredicates.addAll(_goalPredicates); |
261 | options = _opt; |
262 | id = idCounter++; |
263 | fq = _fq; |
264 | this.isMarginal = _opt.marginal; |
265 | |
266 | } |
267 | |
268 | /** |
269 | * Registers clause to this statistical operator. |
270 | * @param fc |
271 | */ |
272 | public void registerRelevantClause(FelixClause fc){ |
273 | this.allRelevantFelixClause.add(fc); |
274 | } |
275 | |
276 | /** |
277 | * According to the clauses added via {@link #registerRelevantClause(FelixClause)}, |
278 | * parse {@link #outputPredicates}, {@link #inputPredicates}, {@link #nStartingRules}, |
279 | * and {@link #commonCandidate}. |
280 | * |
281 | */ |
282 | public void sealDefinition(){ |
283 | |
284 | for(FelixClause fc : this.allRelevantFelixClause){ |
285 | |
286 | int nOpen = 0; |
287 | |
288 | for(Predicate p : fc.getReferencedPredicates()){ |
289 | |
290 | if(!p.isClosedWorld() && !outputPredicates.contains(p)){ |
291 | nOpen ++; |
292 | } |
293 | |
294 | FelixPredicate fp = fq.getPredByName(p.getName()); |
295 | |
296 | if(this.outputPredicates.contains(fp)){ |
297 | continue; |
298 | } |
299 | |
300 | if(fp.isCorefMapPredicate){ |
301 | |
302 | this.inputPredicates.add(fp.getOriCorefPredicate()); |
303 | |
304 | }else{ |
305 | |
306 | if(fp.isClosedWorld() == true){ |
307 | continue; |
308 | } |
309 | |
310 | if(this.isBinaryArbLR == false){ |
311 | this.inputPredicates.add(fp); |
312 | }else{ |
313 | if(this.getPropertyClausesUnion(outputPredicates, FPProperty.NON_RECUR).contains(fc)){ |
314 | this.inputPredicates.add(fp); |
315 | } |
316 | } |
317 | } |
318 | |
319 | } |
320 | |
321 | for(Predicate p : fc.getReferencedPredicates()){ |
322 | if(!p.isClosedWorld() || fq.getPredByName(p.getName()).isCorefMapPredicate){ |
323 | this.commonCandidate.add(fq.getPredByName(p.getName())); |
324 | } |
325 | } |
326 | |
327 | if(nOpen == 0){ |
328 | this.nStartingRules ++; |
329 | } |
330 | |
331 | } |
332 | } |
333 | |
334 | |
335 | /** |
336 | * All {@link DataMovementOperator}s used in this statistical operator. |
337 | */ |
338 | protected List<DataMovementOperator> allDMOs = new ArrayList<DataMovementOperator>(); |
339 | |
340 | |
341 | /** |
342 | * Generate the operator-specified logic plan, i.e., all data movement operators |
343 | * that will be used for inference. |
344 | * |
345 | * This function should be invoked after a new instance of operator is created. |
346 | * Any valid Felix operator should implement this method. |
347 | */ |
348 | public abstract void prepare(); |
349 | |
350 | /** |
351 | * Method that executes this operator. Any valid Felix operator should implement |
352 | * this method. |
353 | */ |
354 | abstract public void run(); |
355 | |
356 | /** |
357 | * @deprecated |
358 | */ |
359 | abstract public void learn(); |
360 | |
361 | /** |
362 | * Human-readable representation of the logic plan. Any valid Felix operator should |
363 | * implement this method. |
364 | * |
365 | * TODO: need to think out a better to explain physical plan. (e.g., a graph?) |
366 | */ |
367 | abstract public String explain(); |
368 | |
369 | /** |
370 | * @deprecated |
371 | */ |
372 | public HashSet<String> throwAwayPredicatesNames = new HashSet<String>(); |
373 | |
374 | /** |
375 | * Get the target predicate of this StatOperator if this operator is |
376 | * CRF, LR or COREF. |
377 | * @return |
378 | */ |
379 | public FelixPredicate getTargetPredicateIfHasOnlyOne(){ |
380 | if(this.outputPredicates.size() != 1){ |
381 | ExceptionMan.die("Why this operator has >two or =zero output predicates?"); |
382 | } |
383 | return this.outputPredicates.iterator().next(); |
384 | } |
385 | |
386 | |
387 | /** |
388 | * Given a first order logic clause and a target predicate, translate it into |
389 | * a conjunctive query with properly assigned weight. |
390 | * |
391 | * @param target |
392 | * @param forceRecursive If there are multiple literals of the given predicate, |
393 | * this parameter specifies whether the generated conjunctive queries are recursive. |
394 | * CRF will set this parameter as FALSE, while COREF will set it as TRUE. |
395 | * @param props The property selected for the target predicate. |
396 | * @return |
397 | */ |
398 | public HashSet<ConjunctiveQuery> translateFelixClasesIntoFactorGraphEdgeQueries( |
399 | FelixPredicate target, |
400 | boolean forceRecursive, |
401 | HashSet<String> allowedOpenPredicates, |
402 | FPProperty... props){ |
403 | |
404 | HashSet<FelixClause> rules = new HashSet<FelixClause>(); |
405 | |
406 | for(FPProperty prop : props){ |
407 | rules.addAll(target.getPropertyClauses(prop)); |
408 | } |
409 | |
410 | if(options.useDualDecomposition){ |
411 | for(FPProperty prop : props){ |
412 | if(prop == FPProperty.NON_RECUR){ |
413 | rules.addAll(this.dd_PriorClauses); |
414 | } |
415 | } |
416 | } |
417 | |
418 | HashSet<ConjunctiveQuery> ret = new HashSet<ConjunctiveQuery> (); |
419 | |
420 | FelixUIMan.println(2, 0, "{" + StringMan.join(",", FelixStringMan.colToStringArray(props)) + |
421 | "} ConjunctiveQueries for " + this.toString()); |
422 | for(FelixClause rule : rules){ |
423 | |
424 | boolean flag = false; |
425 | boolean notRelated = true; |
426 | for(Predicate fp : rule.getReferencedPredicates()){ |
427 | if(!fp.isClosedWorld() && !fp.equals(target)){ |
428 | if(!allowedOpenPredicates.contains(fp.getName())){ |
429 | flag = true; |
430 | } |
431 | |
432 | if(this.throwAwayPredicatesNames.contains(fp.getName())){ |
433 | flag = true; |
434 | } |
435 | } |
436 | |
437 | if(fp.equals(target)){ |
438 | notRelated = false; |
439 | } |
440 | } |
441 | |
442 | if(notRelated == true){ |
443 | continue; |
444 | } |
445 | |
446 | if(flag == true){ |
447 | continue; |
448 | } |
449 | |
450 | if(this.isBinaryArbLR == true){ |
451 | rule.isBinaryLRRules = true; |
452 | } |
453 | |
454 | ConjunctiveQuery returnedCQ = rule.toSimplifiedFactorGraphQuery(this, target, forceRecursive); |
455 | ret.add(returnedCQ); |
456 | FelixUIMan.printobj(2, 0, returnedCQ); |
457 | } |
458 | |
459 | return ret; |
460 | } |
461 | |
462 | /** |
463 | * Get all DMOs used by this operator. |
464 | * @return |
465 | */ |
466 | public List<DataMovementOperator> getAllDMOs(){ |
467 | return this.allDMOs; |
468 | } |
469 | |
470 | /** |
471 | * Get the precedence of this operator. |
472 | * @return |
473 | */ |
474 | public int getPrecedence(){ |
475 | return this.precedence; |
476 | } |
477 | |
478 | public String toString(){ |
479 | String ret = FelixStringMan.indentHead(); |
480 | |
481 | ret += "{" + this.type + "} "; |
482 | ret += "Operator of {"; |
483 | ret += StringMan.join(",", FelixStringMan.colToStringArray(this.outputPredicates)); |
484 | ret += "} with {" + this.allRelevantFelixClause.size() + "} Relevant Clauses"; |
485 | |
486 | if(this.clauseConstraints.size() == 0){ |
487 | ret += " Communicate { " + |
488 | StringMan.join(",", FelixStringMan.colToStringArray(this.dd_CommonOutput)) |
489 | + "}"; |
490 | return ret; |
491 | } |
492 | |
493 | ret += " Partitioned by: "; |
494 | |
495 | if(dataCrackerSignature != null){ |
496 | ret += dataCrackerSignature; |
497 | } |
498 | |
499 | ret += " Communicating { " + |
500 | StringMan.join(",", FelixStringMan.colToStringArray(this.dd_CommonOutput)) |
501 | + "}"; |
502 | |
503 | return ret; |
504 | } |
505 | |
506 | public String toNoParString(){ |
507 | String ret = FelixStringMan.indentHead(); |
508 | |
509 | ret += "{" + this.type + "} "; |
510 | ret += "Operator of {"; |
511 | ret += StringMan.join(",", FelixStringMan.colToStringArray(this.outputPredicates)); |
512 | ret += "} "; |
513 | |
514 | ret += " Communicating { " + |
515 | StringMan.join(",", FelixStringMan.colToStringArray(this.dd_CommonOutput)) |
516 | + "}"; |
517 | |
518 | return ret; |
519 | } |
520 | |
521 | //TODO: Maybe add legibility-checking of rules? |
522 | } |
523 | |
524 | |
525 | |
526 | |
527 | |
528 | |
529 | |
530 | |
531 | |
532 | |