- 8CEB7B9145164136BE138992805EAF4B9565362E11A87AF8D28E0F172B45400FE324CD0A24F4221DBD8E694972B03931C7594D0E9F7BBD064F2C86CD0ED25DD0+ 034E0AF7BE67265B08FA39744E1AE5637F0B6BF28021748B964CBC1DCB5147F95B6B94EE94160946C54198669F4DAD4F1C98552ADE5B16BB78F327C20958777Fblatta/lib/state.py(1 . 9)(1 . 8)
358 from lib.peer import Peer
359 from peer import Peer
360 import sqlite3
361 import imp
362 import hashlib
363 import logging
364 import threading
365 from itertools import chain
366
367 class State(object):
(18 . 59)(17 . 61)
369 if State.__instance != None:
370 raise Exception("This class is a singleton")
371 else:
372 self.write_lock = threading.Lock()
373 with self.write_lock:
374 self.socket = socket
375 self.conn = sqlite3.connect(db_path, check_same_thread=False)
376 self.cursor = self.conn.cursor()
377 self.cursor.execute("create table if not exists at(handle_id integer,\
378 address text not null,\
379 port integer not null,\
380 active_at datetime default null,\
381 updated_at datetime default current_timestamp,\
382 unique(handle_id, address, port))")
383
384 self.cursor.execute("create table if not exists wot(peer_id integer primary key)")
385
386 self.cursor.execute("create table if not exists handles(handle_id integer primary key,\
387 peer_id integer,\
388 handle text,\
389 unique(handle))")
390
391 self.cursor.execute("create table if not exists keys(peer_id intenger,\
392 key text,\
393 used_at datetime default current_timestamp,\
394 unique(key))")
395
396 self.cursor.execute("create table if not exists logs(\
397 handle text not null,\
398 peer_id integer,\
399 message_bytes blob not null,\
400 created_at datetime default current_timestamp)")
401
402 self.cursor.execute("create table if not exists dedup_queue(\
403 hash text not null,\
404 created_at datetime default current_timestamp)")
405 self.socket = socket
406 self.conn = sqlite3.connect(db_path, check_same_thread=False)
407 cursor = self.cursor()
408 cursor.execute("create table if not exists at(handle_id integer,\
409 address text not null,\
410 port integer not null,\
411 active_at datetime default null,\
412 updated_at datetime default current_timestamp,\
413 unique(handle_id, address, port))")
414
415 cursor.execute("create table if not exists wot(peer_id integer primary key)")
416
417 cursor.execute("create table if not exists handles(handle_id integer primary key,\
418 peer_id integer,\
419 handle text,\
420 unique(handle))")
421
422 cursor.execute("create table if not exists keys(peer_id intenger,\
423 key text,\
424 used_at datetime default current_timestamp,\
425 unique(key))")
426
427 cursor.execute("create table if not exists logs(\
428 handle text not null,\
429 peer_id integer,\
430 message_bytes blob not null,\
431 created_at datetime default current_timestamp)")
432
433 cursor.execute("create table if not exists dedup_queue(\
434 hash text not null,\
435 created_at datetime default current_timestamp)")
436 State.__instance = self
437
438 def cursor(self):
439 return self.conn.cursor()
440
441 def get_at(self, handle=None):
442 cursor = self.cursor()
443 at = []
444 if handle == None:
445 results = self.cursor.execute("select handle_id,address,port,active_at from at\
446 results = cursor.execute("select handle_id,address,port,active_at from at\
447 order by updated_at desc").fetchall()
448 else:
449 result = self.cursor.execute("select handle_id from handles where handle=?",
450 result = cursor.execute("select handle_id from handles where handle=?",
451 (handle,)).fetchone()
452 if None != result:
453 handle_id = result[0]
454 else:
455 return []
456 results = self.cursor.execute("select handle_id,address,port,active_at from at \
457 results = cursor.execute("select handle_id,address,port,active_at from at \
458 where handle_id=? order by updated_at desc",
459 (handle_id,)).fetchall()
460 for result in results:
461 handle_id, address, port, updated_at = result
462 h = self.cursor.execute("select handle from handles where handle_id=?",
463 h = cursor.execute("select handle from handles where handle_id=?",
464 (handle_id,)).fetchone()[0]
465 at.append({"handle": h,
466 "address": "%s:%s" % (address, port),
(79 . 34)(80 . 35)
468
469
470 def is_duplicate_message(self, message_hash):
471 with self.write_lock:
472 self.cursor.execute("delete from dedup_queue where created_at < datetime(current_timestamp, '-1 hour')")
473 self.conn.commit()
474 result = self.cursor.execute("select hash from dedup_queue where hash=?",
475 (message_hash,)).fetchone()
476 logging.debug("checking if %s is dupe" % message_hash)
477 if(result != None):
478 return True
479 else:
480 return False
481 cursor = self.cursor()
482 cursor.execute("delete from dedup_queue where created_at < datetime(current_timestamp, '-1 hour')")
483 self.conn.commit()
484 result = cursor.execute("select hash from dedup_queue where hash=?",
485 (message_hash,)).fetchone()
486 logging.debug("checking if %s is dupe" % message_hash)
487 if(result != None):
488 return True
489 else:
490 return False
491
492 def add_to_dedup_queue(self, message_hash):
493 with self.write_lock:
494 self.cursor.execute("insert into dedup_queue(hash)\
495 values(?)",
496 (message_hash,))
497 logging.debug("added %s to dedup" % message_hash)
498 self.conn.commit()
499 cursor = self.cursor()
500 cursor.execute("insert into dedup_queue(hash)\
501 values(?)",
502 (message_hash,))
503 logging.debug("added %s to dedup" % message_hash)
504 self.conn.commit()
505
506 def get_last_message_hash(self, handle, peer_id=None):
507 cursor = self.cursor()
508 if peer_id:
509 message_bytes = self.cursor.execute("select message_bytes from logs\
510 message_bytes = cursor.execute("select message_bytes from logs\
511 where handle=? and peer_id=?\
512 order by created_at desc limit 1",
513 (handle, peer_id)).fetchone()
514
515 else:
516 message_bytes = self.cursor.execute("select message_bytes from logs\
517 message_bytes = cursor.execute("select message_bytes from logs\
518 where handle=? and peer_id is null\
519 order by created_at desc limit 1",
520 (handle,)).fetchone()
(117 . 121)(119 . 125)
522 return "\x00" * 32
523
524 def log(self, handle, message_bytes, peer=None):
525 with self.write_lock:
526 if peer != None:
527 peer_id = peer.peer_id
528 else:
529 peer_id = None
530 cursor = self.cursor()
531 if peer != None:
532 peer_id = peer.peer_id
533 else:
534 peer_id = None
535
536 self.cursor.execute("insert into logs(handle, peer_id, message_bytes)\
537 values(?, ?, ?)",
538 (handle, peer_id, buffer(message_bytes)))
539 cursor.execute("insert into logs(handle, peer_id, message_bytes)\
540 values(?, ?, ?)",
541 (handle, peer_id, buffer(message_bytes)))
542
543 def import_at_and_wot(self, at_path):
544 with self.write_lock:
545 wot = imp.load_source('wot', at_path)
546 for peer in wot.peers:
547 results = self.cursor.execute("select * from handles where handle=? limit 1",
548 (peer["name"],)).fetchall()
549 if len(results) == 0:
550 key = peer["key"]
551 port = peer["port"]
552 address = peer["address"]
553 self.cursor.execute("insert into wot(peer_id) values(null)")
554 peer_id = self.cursor.lastrowid
555 self.cursor.execute("insert into handles(peer_id, handle) values(?, ?)",
556 (peer_id, peer["name"]))
557 handle_id = self.cursor.lastrowid
558 self.cursor.execute("insert into at(handle_id, address, port, updated_at) values(?, ?, ?, ?)",
559 (handle_id, peer["address"], peer["port"], None))
560 self.cursor.execute("insert into keys(peer_id, key) values(?, ?)",
561 (peer_id, key))
562 cursor = self.cursor()
563 wot = imp.load_source('wot', at_path)
564 for peer in wot.peers:
565 results = cursor.execute("select * from handles where handle=? limit 1",
566 (peer["name"],)).fetchall()
567 if len(results) == 0:
568 key = peer["key"]
569 port = peer["port"]
570 address = peer["address"]
571 cursor.execute("insert into wot(peer_id) values(null)")
572 peer_id = cursor.lastrowid
573 cursor.execute("insert into handles(peer_id, handle) values(?, ?)",
574 (peer_id, peer["name"]))
575 handle_id = cursor.lastrowid
576 cursor.execute("insert into at(handle_id, address, port, updated_at) values(?, ?, ?, ?)",
577 (handle_id, peer["address"], peer["port"], None))
578 cursor.execute("insert into keys(peer_id, key) values(?, ?)",
579 (peer_id, key))
580
581 self.conn.commit()
582 self.conn.commit()
583
584 def update_at(self, peer, set_active_at=True):
585 with self.write_lock:
586 row = self.cursor.execute("select handle_id from handles where handle=?",
587 (peer["handle"],)).fetchone()
588 if row != None:
589 handle_id = row[0]
590 else:
591 return
592
593 try:
594 self.cursor.execute("insert into at(handle_id, address, port) values(?, ?, ?)",
595 (handle_id, peer["address"], peer["port"]))
596 except sqlite3.IntegrityError as ex:
597 self.cursor.execute("update at set updated_at = current_timestamp\
598 where handle_id=? and address=? and port=?",
599 (handle_id, peer["address"], peer["port"]))
600 if set_active_at:
601 self.cursor.execute("update at set active_at = current_timestamp\
602 where handle_id=? and address=? and port=?",
603 (handle_id, peer["address"], peer["port"]))
604 self.conn.commit()
605 cursor = self.cursor()
606 row = cursor.execute("select handle_id from handles where handle=?",
607 (peer["handle"],)).fetchone()
608 if row != None:
609 handle_id = row[0]
610 else:
611 return
612
613 try:
614 cursor.execute("insert into at(handle_id, address, port) values(?, ?, ?)",
615 (handle_id, peer["address"], peer["port"]))
616 except sqlite3.IntegrityError as ex:
617 cursor.execute("update at set updated_at = current_timestamp\
618 where handle_id=? and address=? and port=?",
619 (handle_id, peer["address"], peer["port"]))
620 if set_active_at:
621 cursor.execute("update at set active_at = current_timestamp\
622 where handle_id=? and address=? and port=?",
623 (handle_id, peer["address"], peer["port"]))
624 self.conn.commit()
625
626 def add_peer(self, handle):
627 with self.write_lock:
628 self.cursor.execute("insert into wot(peer_id) values(null)")
629 peer_id = self.cursor.lastrowid
630 self.cursor.execute("insert into handles(peer_id, handle) values(?, ?)",
631 (peer_id, handle))
632 self.conn.commit()
633 cursor = self.cursor()
634 cursor.execute("insert into wot(peer_id) values(null)")
635 peer_id = self.cursor.lastrowid
636 cursor.execute("insert into handles(peer_id, handle) values(?, ?)",
637 (peer_id, handle))
638 self.conn.commit()
639
640
641 def remove_peer(self, handle):
642 with self.write_lock:
643 # get peer id
644 result = self.cursor.execute("select peer_id from handles where handle=?",
645 (handle,)).fetchone()
646 if result == None:
647 return
648 else:
649 peer_id = result[0]
650 # get all aliases
651 cursor = self.cursor()
652
653 # get peer id
654 result = cursor.execute("select peer_id from handles where handle=?",
655 (handle,)).fetchone()
656 if result == None:
657 return
658 else:
659 peer_id = result[0]
660 # get all aliases
661
662 handle_ids = self.get_handle_ids_for_peer(peer_id)
663 for handle_id in handle_ids:
664 # delete at entries for each alias
665 handle_ids = self.get_handle_ids_for_peer(peer_id)
666 for handle_id in handle_ids:
667 # delete at entries for each alias
668
669 self.cursor.execute("delete from at where handle_id=?", (handle_id,))
670 cursor.execute("delete from at where handle_id=?", (handle_id,))
671
672 self.cursor.execute("delete from handles where peer_id=?", (peer_id,))
673 cursor.execute("delete from handles where peer_id=?", (peer_id,))
674
675 # delete all keys for peer id
676 # delete all keys for peer id
677
678 self.cursor.execute("delete from keys where peer_id=?", (handle_id,))
679
680 # delete peer from wot
681
682 self.cursor.execute("delete from wot where peer_id=?", (peer_id,))
683 self.conn.commit()
684 cursor.execute("delete from keys where peer_id=?", (handle_id,))
685
686 # delete peer from wot
687
688 cursor.execute("delete from wot where peer_id=?", (peer_id,))
689 self.conn.commit()
690
691
692 def add_key(self, handle, key):
693 with self.write_lock:
694 peer_id = self.cursor.execute("select peer_id from handles where handle=?", (handle,)).fetchone()[0]
695 if peer_id != None:
696 self.cursor.execute("insert into keys(peer_id, key) values(?, ?)", (peer_id, key))
697 self.conn.commit()
698 cursor = self.cursor()
699 peer_id = cursor.execute("select peer_id from handles where handle=?", (handle,)).fetchone()[0]
700 if peer_id != None:
701 cursor.execute("insert into keys(peer_id, key) values(?, ?)", (peer_id, key))
702 self.conn.commit()
703
704 def remove_key(self, key):
705 with self.write_lock:
706 self.cursor.execute("delete from keys where key=?", (key,))
707 self.conn.commit()
708 cursor = self.cursor()
709 cursor.execute("delete from keys where key=?", (key,))
710 self.conn.commit()
711
712 def get_handle_ids_for_peer(self, peer_id):
713 return list(chain.from_iterable(self.cursor.execute("select handle_id from handles where peer_id=?",
714 cursor = self.cursor()
715 return list(chain.from_iterable(cursor.execute("select handle_id from handles where peer_id=?",
716 (peer_id,)).fetchall()))
717
718 def get_peer_handles(self):
719 handles = self.listify(self.cursor.execute("select handle from handles").fetchall())
720 cursor = self.cursor()
721 handles = self.listify(cursor.execute("select handle from handles").fetchall())
722 return handles
723
724 def get_peers(self):
725 cursor = self.cursor()
726 peers = []
727 handles = self.cursor.execute("select handle from handles").fetchall()
728 handles = cursor.execute("select handle from handles").fetchall()
729
730 for handle in handles:
731 peer = self.get_peer_by_handle(handle[0])
(243 . 10)(249 . 11)
733 return list(chain.from_iterable(results))
734
735 def get_keyed_peers(self):
736 peer_ids = self.listify(self.cursor.execute("select peer_id from keys").fetchall())
737 cursor = self.cursor()
738 peer_ids = self.listify(cursor.execute("select peer_id from keys").fetchall())
739 peers = []
740 for peer_id in peer_ids:
741 handle = self.cursor.execute("select handle from handles where peer_id=?", (peer_id,)).fetchone()[0]
742 handle = cursor.execute("select handle from handles where peer_id=?", (peer_id,)).fetchone()[0]
743 peer = self.get_peer_by_handle(handle)
744 if self.is_duplicate(peers, peer):
745 continue
(257 . 18)(264 . 19)
747
748
749 def get_peer_by_handle(self, handle):
750 handle_info = self.cursor.execute("select handle_id, peer_id from handles where handle=?",
751 cursor = self.cursor()
752 handle_info = cursor.execute("select handle_id, peer_id from handles where handle=?",
753 (handle,)).fetchone()
754
755 if handle_info == None:
756 return None
757
758 address = self.cursor.execute("select address, port from at where handle_id=?\
759 address = cursor.execute("select address, port from at where handle_id=?\
760 order by updated_at desc limit 1",
761 (handle_info[0],)).fetchone()
762 handles = self.listify(self.cursor.execute("select handle from handles where peer_id=?",
763 handles = self.listify(cursor.execute("select handle from handles where peer_id=?",
764 (handle_info[1],)).fetchall())
765 keys = self.listify(self.cursor.execute("select key from keys where peer_id=?\
766 keys = self.listify(cursor.execute("select key from keys where peer_id=?\
767 order by used_at desc",
768 (handle_info[1],)).fetchall())
769 return Peer(self.socket, {