- 4F78202D4744A3284C00C4AAC9C055F4ABAE95EEA1C51C4ACD519A9723A990D4D1FC336254140F75B7D75995A781935A2A6250DD19C7E7610B6643365A47938F+ 8CEB7B9145164136BE138992805EAF4B9565362E11A87AF8D28E0F172B45400FE324CD0A24F4221DBD8E694972B03931C7594D0E9F7BBD064F2C86CD0ED25DD0blatta/lib/state.py(3 . 6)(3 . 7)
132 import imp
133 import hashlib
134 import logging
135 import threading
136 from itertools import chain
137
138 class State(object):
(17 . 37)(18 . 39)
140 if State.__instance != None:
141 raise Exception("This class is a singleton")
142 else:
143 self.socket = socket
144 self.conn = sqlite3.connect(db_path, check_same_thread=False)
145 self.cursor = self.conn.cursor()
146 self.cursor.execute("create table if not exists at(handle_id integer,\
147 address text not null,\
148 port integer not null,\
149 active_at datetime default null,\
150 updated_at datetime default current_timestamp,\
151 unique(handle_id, address, port))")
152
153 self.cursor.execute("create table if not exists wot(peer_id integer primary key)")
154
155 self.cursor.execute("create table if not exists handles(handle_id integer primary key,\
156 peer_id integer,\
157 handle text,\
158 unique(handle))")
159
160 self.cursor.execute("create table if not exists keys(peer_id intenger,\
161 key text,\
162 used_at datetime default current_timestamp,\
163 unique(key))")
164
165 self.cursor.execute("create table if not exists logs(\
166 handle text not null,\
167 peer_id integer,\
168 message_bytes blob not null,\
169 created_at datetime default current_timestamp)")
170
171 self.cursor.execute("create table if not exists dedup_queue(\
172 hash text not null,\
173 created_at datetime default current_timestamp)")
174 self.write_lock = threading.Lock()
175 with self.write_lock:
176 self.socket = socket
177 self.conn = sqlite3.connect(db_path, check_same_thread=False)
178 self.cursor = self.conn.cursor()
179 self.cursor.execute("create table if not exists at(handle_id integer,\
180 address text not null,\
181 port integer not null,\
182 active_at datetime default null,\
183 updated_at datetime default current_timestamp,\
184 unique(handle_id, address, port))")
185
186 self.cursor.execute("create table if not exists wot(peer_id integer primary key)")
187
188 self.cursor.execute("create table if not exists handles(handle_id integer primary key,\
189 peer_id integer,\
190 handle text,\
191 unique(handle))")
192
193 self.cursor.execute("create table if not exists keys(peer_id intenger,\
194 key text,\
195 used_at datetime default current_timestamp,\
196 unique(key))")
197
198 self.cursor.execute("create table if not exists logs(\
199 handle text not null,\
200 peer_id integer,\
201 message_bytes blob not null,\
202 created_at datetime default current_timestamp)")
203
204 self.cursor.execute("create table if not exists dedup_queue(\
205 hash text not null,\
206 created_at datetime default current_timestamp)")
207 State.__instance = self
208
209 def get_at(self, handle=None):
(76 . 22)(79 . 24)
211
212
213 def is_duplicate_message(self, message_hash):
214 self.cursor.execute("delete from dedup_queue where created_at < datetime(current_timestamp, '-1 hour')")
215 self.conn.commit()
216 result = self.cursor.execute("select hash from dedup_queue where hash=?",
217 (message_hash,)).fetchone()
218 logging.debug("checking if %s is dupe" % message_hash)
219 if(result != None):
220 return True
221 else:
222 return False
223 with self.write_lock:
224 self.cursor.execute("delete from dedup_queue where created_at < datetime(current_timestamp, '-1 hour')")
225 self.conn.commit()
226 result = self.cursor.execute("select hash from dedup_queue where hash=?",
227 (message_hash,)).fetchone()
228 logging.debug("checking if %s is dupe" % message_hash)
229 if(result != None):
230 return True
231 else:
232 return False
233
234 def add_to_dedup_queue(self, message_hash):
235 self.cursor.execute("insert into dedup_queue(hash)\
236 values(?)",
237 (message_hash,))
238 logging.debug("added %s to dedup" % message_hash)
239 self.conn.commit()
240 with self.write_lock:
241 self.cursor.execute("insert into dedup_queue(hash)\
242 values(?)",
243 (message_hash,))
244 logging.debug("added %s to dedup" % message_hash)
245 self.conn.commit()
246
247 def get_last_message_hash(self, handle, peer_id=None):
248 if peer_id:
(112 . 102)(117 . 109)
250 return "\x00" * 32
251
252 def log(self, handle, message_bytes, peer=None):
253 if peer != None:
254 peer_id = peer.peer_id
255 else:
256 peer_id = None
257 with self.write_lock:
258 if peer != None:
259 peer_id = peer.peer_id
260 else:
261 peer_id = None
262
263 self.cursor.execute("insert into logs(handle, peer_id, message_bytes)\
264 values(?, ?, ?)",
265 (handle, peer_id, buffer(message_bytes)))
266 self.cursor.execute("insert into logs(handle, peer_id, message_bytes)\
267 values(?, ?, ?)",
268 (handle, peer_id, buffer(message_bytes)))
269
270 def import_at_and_wot(self, at_path):
271 wot = imp.load_source('wot', at_path)
272 for peer in wot.peers:
273 results = self.cursor.execute("select * from handles where handle=? limit 1",
274 (peer["name"],)).fetchall()
275 if len(results) == 0:
276 key = peer["key"]
277 port = peer["port"]
278 address = peer["address"]
279 self.cursor.execute("insert into wot(peer_id) values(null)")
280 peer_id = self.cursor.lastrowid
281 self.cursor.execute("insert into handles(peer_id, handle) values(?, ?)",
282 (peer_id, peer["name"]))
283 handle_id = self.cursor.lastrowid
284 self.cursor.execute("insert into at(handle_id, address, port, updated_at) values(?, ?, ?, ?)",
285 (handle_id, peer["address"], peer["port"], None))
286 self.cursor.execute("insert into keys(peer_id, key) values(?, ?)",
287 (peer_id, key))
288 with self.write_lock:
289 wot = imp.load_source('wot', at_path)
290 for peer in wot.peers:
291 results = self.cursor.execute("select * from handles where handle=? limit 1",
292 (peer["name"],)).fetchall()
293 if len(results) == 0:
294 key = peer["key"]
295 port = peer["port"]
296 address = peer["address"]
297 self.cursor.execute("insert into wot(peer_id) values(null)")
298 peer_id = self.cursor.lastrowid
299 self.cursor.execute("insert into handles(peer_id, handle) values(?, ?)",
300 (peer_id, peer["name"]))
301 handle_id = self.cursor.lastrowid
302 self.cursor.execute("insert into at(handle_id, address, port, updated_at) values(?, ?, ?, ?)",
303 (handle_id, peer["address"], peer["port"], None))
304 self.cursor.execute("insert into keys(peer_id, key) values(?, ?)",
305 (peer_id, key))
306
307 self.conn.commit()
308 self.conn.commit()
309
310 def update_at(self, peer, set_active_at=True):
311 row = self.cursor.execute("select handle_id from handles where handle=?",
312 (peer["handle"],)).fetchone()
313 if row != None:
314 handle_id = row[0]
315 else:
316 return
317
318 try:
319 self.cursor.execute("insert into at(handle_id, address, port) values(?, ?, ?)",
320 (handle_id, peer["address"], peer["port"]))
321 except sqlite3.IntegrityError as ex:
322 self.cursor.execute("update at set updated_at = current_timestamp\
323 where handle_id=? and address=? and port=?",
324 (handle_id, peer["address"], peer["port"]))
325 if set_active_at:
326 self.cursor.execute("update at set active_at = current_timestamp\
327 where handle_id=? and address=? and port=?",
328 (handle_id, peer["address"], peer["port"]))
329 self.conn.commit()
330 with self.write_lock:
331 row = self.cursor.execute("select handle_id from handles where handle=?",
332 (peer["handle"],)).fetchone()
333 if row != None:
334 handle_id = row[0]
335 else:
336 return
337
338 try:
339 self.cursor.execute("insert into at(handle_id, address, port) values(?, ?, ?)",
340 (handle_id, peer["address"], peer["port"]))
341 except sqlite3.IntegrityError as ex:
342 self.cursor.execute("update at set updated_at = current_timestamp\
343 where handle_id=? and address=? and port=?",
344 (handle_id, peer["address"], peer["port"]))
345 if set_active_at:
346 self.cursor.execute("update at set active_at = current_timestamp\
347 where handle_id=? and address=? and port=?",
348 (handle_id, peer["address"], peer["port"]))
349 self.conn.commit()
350
351 def add_peer(self, handle):
352 self.cursor.execute("insert into wot(peer_id) values(null)")
353 peer_id = self.cursor.lastrowid
354 self.cursor.execute("insert into handles(peer_id, handle) values(?, ?)",
355 (peer_id, handle))
356 self.conn.commit()
357 with self.write_lock:
358 self.cursor.execute("insert into wot(peer_id) values(null)")
359 peer_id = self.cursor.lastrowid
360 self.cursor.execute("insert into handles(peer_id, handle) values(?, ?)",
361 (peer_id, handle))
362 self.conn.commit()
363
364
365 def remove_peer(self, handle):
366 # get peer id
367
368 result = self.cursor.execute("select peer_id from handles where handle=?", (handle,)).fetchone()
369 if result == None:
370 return
371 else:
372 peer_id = result[0]
373 # get all aliases
374 with self.write_lock:
375 # get peer id
376 result = self.cursor.execute("select peer_id from handles where handle=?",
377 (handle,)).fetchone()
378 if result == None:
379 return
380 else:
381 peer_id = result[0]
382 # get all aliases
383
384 handle_ids = self.get_handle_ids_for_peer(peer_id)
385 for handle_id in handle_ids:
386 # delete at entries for each alias
387 handle_ids = self.get_handle_ids_for_peer(peer_id)
388 for handle_id in handle_ids:
389 # delete at entries for each alias
390
391 self.cursor.execute("delete from at where handle_id=?", (handle_id,))
392 self.cursor.execute("delete from at where handle_id=?", (handle_id,))
393
394 self.cursor.execute("delete from handles where peer_id=?", (peer_id,))
395 self.cursor.execute("delete from handles where peer_id=?", (peer_id,))
396
397 # delete all keys for peer id
398 # delete all keys for peer id
399
400 self.cursor.execute("delete from keys where peer_id=?", (handle_id,))
401
402 # delete peer from wot
403
404 self.cursor.execute("delete from wot where peer_id=?", (peer_id,))
405 self.conn.commit()
406 self.cursor.execute("delete from keys where peer_id=?", (handle_id,))
407
408 # delete peer from wot
409
410 self.cursor.execute("delete from wot where peer_id=?", (peer_id,))
411 self.conn.commit()
412
413
414 def add_key(self, handle, key):
415 peer_id = self.cursor.execute("select peer_id from handles where handle=?", (handle,)).fetchone()[0]
416 if peer_id != None:
417 self.cursor.execute("insert into keys(peer_id, key) values(?, ?)", (peer_id, key))
418 self.conn.commit()
419 with self.write_lock:
420 peer_id = self.cursor.execute("select peer_id from handles where handle=?", (handle,)).fetchone()[0]
421 if peer_id != None:
422 self.cursor.execute("insert into keys(peer_id, key) values(?, ?)", (peer_id, key))
423 self.conn.commit()
424
425 def remove_key(self, key):
426 self.cursor.execute("delete from keys where key=?", (key,))
427 self.conn.commit()
428 with self.write_lock:
429 self.cursor.execute("delete from keys where key=?", (key,))
430 self.conn.commit()
431
432 def get_handle_ids_for_peer(self, peer_id):
433 return list(chain.from_iterable(self.cursor.execute("select handle_id from handles where peer_id=?",
(236 . 8)(248 . 11)
435 for peer_id in peer_ids:
436 handle = self.cursor.execute("select handle from handles where peer_id=?", (peer_id,)).fetchone()[0]
437 peer = self.get_peer_by_handle(handle)
438 if not (self.is_duplicate(peers, peer)):
439 peers.append(peer)
440 if self.is_duplicate(peers, peer):
441 continue
442 if peer.address == None or peer.port == None:
443 continue
444 peers.append(peer)
445 return peers
446
447
(259 . 8)(274 . 8)
449 return Peer(self.socket, {
450 "handles": handles,
451 "peer_id": handle_info[1],
452 "address": address[0] if address else "",
453 "port": address[1] if address else "",
454 "address": address[0] if address else None,
455 "port": address[1] if address else None,
456 "keys": keys
457 })
458