1 """
2 This class is the parallel execution for factors
3 """
4
5 from multiprocessing import Pool
6 import sys_appends
7 import time
8 import sys, getopt
9 import random
10
11
12
13
14
15 import psycopg2
16
18 - def __init__(self, mid, max_rank, nRows, nCols, nSplits, connect_str,table_name):
19 """
20 Initialize method
21 """
22 self.mid = mid
23 self.nRows = nRows
24 self.nCols = nCols
25 self.nSplits = nSplits
26 self.conn = psycopg2.connect(connect_str)
27 self.conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
28 self.psql = self.conn.cursor()
29 self.max_rank = max_rank
30 self.table_name = table_name
31
33
34
35 t0 = time.time()
36 print "[take_step:%s,%s] Enter" % (roundid,chunkid)
37 self.psql.execute("SELECT library_setup();")
38 self.psql.execute("SELECT retrieve_model_instance_string(%s)" % (self.mid))
39 print "[take_step:%s,%s] Retrieved Model in %s" % (roundid,chunkid, time.time() - t0)
40 t1 = time.time()
41
42 self.psql.execute("SELECT create_permutations(%s,%s,%s)" % (self.nRows, self.nCols, seed))
43 print "[take_step:%s,%s] Permutations Created. Executing Gradient." % (roundid,chunkid)
44
45 t2 = time.time()
46 exec_str = "SELECT COUNT(*) FROM (SELECT take_step(row - 1, col - 1, rating) FROM " + self.table_name
47 exec_str = exec_str + " WHERE round(%s,%s,%s,row-1,col-1) = %s AND chunk(%s,%s,%s,row-1,col-1) = %s) as t;"
48 exec_str = exec_str % (self.nSplits, self.nRows, self.nCols, roundid, self.nSplits, self.nRows, self.nCols, chunkid)
49
50
51 self.psql.execute(exec_str)
52 (nCount,) = self.psql.fetchone()
53 print "[take_step:%s,%s] Finished Step (%d tuples) in %s secs. Storing" % (roundid,chunkid, nCount, time.time() - t2)
54
55 t3 = time.time()
56
57
58 self.psql.execute("SELECT overwrite_fragment(%s,%s,%s,%s,%s,%s,%s);" % (self.max_rank, self.nSplits, self.nRows, self.nCols, chunkid, roundid, self.mid))
59 print "[take_step:%s,%s] Complete in %f sec (update=%s)" % (roundid,chunkid, time.time() - t0, time.time() - t3)
60
63
65 - def __init__(self, mid, max_rank, nRows, nCols, nsplits, connect_str, seed, chunkid, roundid, table_name):
66 self.mid = mid
67 self.max_rank = max_rank
68 self.nRows = nRows
69 self.nCols = nCols
70 self.nsplits = nsplits
71 self.connect_str = connect_str
72 self.seed = seed
73 self.chunkid = chunkid
74 self.roundid = roundid
75 self.table_name = table_name
76
77 -def chunk_maker(mid, max_rank, nRows, nCols, nsplits, connect_str, seed, roundid,table_name):
78 ret = []
79 for chunkid in range(nsplits):
80 ret.append(chunk_desc(mid, max_rank, nRows, nCols, nsplits, connect_str, seed, chunkid, roundid, table_name))
81 return ret
82
88
90 print "[INIT] Installing fresh model instance to mid=%d with rank=%d b=%f stepsize=%f" % (mid, max_rank, b, stepsize)
91 table_query = "SELECT MAX(row), MAX(col), AVG(rating) FROM %s" % table_name
92 psql.execute(table_query)
93 (nRows, nCols, mean,) = psql.fetchone()
94
95 psql.execute("DELETE FROM lr_model_instance WHERE MID=%d;" % (mid))
96 psql.execute("SELECT initialize_model(%d,%d,%d,%f,%f,%f)" % (max_rank, nRows, nCols, b, mean, stepsize))
97 psql.execute("SELECT store_model(%d);" % (mid))
98 print "[INIT] Stored model mid=%d" % mid
99 return (nRows, nCols)
100
102 print "[INIT] Getting stats about mid=%d and table=%s" % (mid, table_name)
103 table_query = "SELECT MAX(row), MAX(col) FROM %s" % table_name
104 psql.execute(table_query)
105 (nrows, ncols) = psql.fetchone()
106 psql.execute("SELECT array_upper(L,2) as l_rank FROM lr_model_instance WHERE mid=%d LIMIT 1" % (mid))
107 (max_rank,) = psql.fetchone()
108 return (nrows, ncols, max_rank)
109
110 -def fresh_execute(mid, nsplits, nepochs, table_name, probe_table, max_rank, b, initial_stepsize, diminish):
111 connect_str = 'dbname=' + sys_appends.dbname
112 conn = psycopg2.connect(connect_str)
113 psql = conn.cursor()
114 conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
115 psql.execute("SELECT library_setup();")
116
117 (nRows, nCols) = install_fresh_database(psql, mid, max_rank, b, initial_stepsize, table_name)
118
119 pool = Pool(processes=nsplits)
120 for epoch in range(nepochs):
121 start_epoch = time.time()
122 seed = random.randint(1,1000000)
123 print "[EPOCH] using seed=%d" % (seed)
124 for i in range(nsplits):
125 print pool.map(perform_work, chunk_maker(mid, max_rank, nRows, nCols, nsplits, connect_str, seed, i, table_name))
126 print "[End Epoch] %s" % (time.time() - start_epoch)
127
128 psql.execute("SELECT retrieve_model_instance_string(%d)" % mid)
129 if not (probe_table is None):
130 psql.execute("select SQRT(SUM(mse(row - 1, col - 1, rating)))/SQRT(COUNT(*)) from %s" % probe_table)
131 (rmse, ) = psql.fetchone()
132 print "[RMSE] %f" % (rmse)
133 psql.execute("UPDATE lr_model_instance SET stepsize=stepsize*%f WHERE MID=%d" % (diminish, mid))
134 return
135
136 -def stored_execute(mid, nsplits, nepochs, table_name, probe_table, diminish):
137 connect_str = 'dbname=' + sys_appends.dbname
138 conn = psycopg2.connect(connect_str)
139 conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
140 psql = conn.cursor()
141 psql.execute("SELECT library_setup();")
142
143 (nRows, nCols, max_rank) = get_database_stats(psql, mid, table_name)
144
145 pool = Pool(processes=nsplits)
146 for epoch in range(nepochs):
147 start_epoch = time.time()
148 seed = random.randint(1,1000000)
149 print "[EPOCH] using seed=%d" % (seed)
150 for i in range(nsplits):
151 print pool.map(perform_work, chunk_maker(mid, max_rank, nRows, nCols, nsplits, connect_str, seed, i, table_name))
152 print "[End Epoch] %s" % (time.time() - start_epoch)
153 psql.execute("SELECT retrieve_model_instance_string(%d)" % mid)
154 if not (probe_table is None):
155 psql.execute("select SQRT(SUM(mse(row - 1, col - 1, rating)))/SQRT(COUNT(*)) from %s" % probe_table)
156 (rmse, ) = psql.fetchone()
157 print "[RMSE] %f" % (rmse)
158 psql.execute("UPDATE lr_model_instance SET stepsize=stepsize*%f WHERE MID=%d" % (diminish, mid))
159
160 psql.close()
161
163 print "%s: <mid> <table_name>" % (argv[0])
164
166 try:
167 long_opts = ["mid=", "parallel=", "epochs=", "table=", "probe=", "rank=", "stepsize=", "diminish="]
168 opts, args = getopt.getopt(sys.argv[1:], "fm:p:e:t:r:b:s:d:", long_opts)
169 except getopt.GetoptError:
170 print str(err)
171 usage(sys.argv)
172 sys.exit(2)
173 output = None
174 verbose = False
175
176 mid = None
177 nthreads = 1
178 epochs = 10
179 rank = 10
180 table_name = None
181 probe_name = None
182 stepsize = 1e-2
183 diminish = 0.8
184 B = 1.5
185 exec_fresh = True
186
187 for o,a in opts:
188 if o in ("-m", "--mid"):
189 mid = int(a)
190 elif o == "-f":
191 exec_fresh = False
192 elif o in ("--parallel"):
193 nthreads = int(a)
194 elif o in ("-e", "--epochs"):
195 epochs = int(a)
196 elif o in ("-t", "--table"):
197 table_name = a
198 elif o in ("-p", "--probe"):
199 probe_name = a
200 elif o in ("-r", "--rank"):
201 rank = int(a)
202 elif o in ("-b"):
203 B = float(a)
204 elif o in ("-s", "--stepsize"):
205 stepsize = float(a)
206 elif o in ("-d", "--diminish"):
207 diminish = float(a)
208
209 if (mid is None) or (table_name is None):
210 print "mid and table name are required"
211 usage(sys.argv)
212 sys.exit()
213 else:
214 print "Execute on table %s with mid %d using %d processes" % (table_name,mid, nthreads)
215
216 if exec_fresh:
217 fresh_execute(mid, nthreads, epochs, table_name, probe_name, rank, B, stepsize, diminish)
218 else:
219 stored_execute(mid, nthreads, epochs, table_name, probe_name, diminish)
220
221
222 main()
223