from lib.peer import Peer import sqlite3 import imp import hashlib import logging import threading from itertools import chain class State(object): __instance = None @staticmethod def get_instance(socket=None, db_path=None): if State.__instance == None: State(socket, db_path) return State.__instance def __init__(self, socket, db_path): if State.__instance != None: raise Exception("This class is a singleton") else: self.write_lock = threading.Lock() with self.write_lock: self.socket = socket self.conn = sqlite3.connect(db_path, check_same_thread=False) self.cursor = self.conn.cursor() self.cursor.execute("create table if not exists at(handle_id integer,\ address text not null,\ port integer not null,\ active_at datetime default null,\ updated_at datetime default current_timestamp,\ unique(handle_id, address, port))") self.cursor.execute("create table if not exists wot(peer_id integer primary key)") self.cursor.execute("create table if not exists handles(handle_id integer primary key,\ peer_id integer,\ handle text,\ unique(handle))") self.cursor.execute("create table if not exists keys(peer_id intenger,\ key text,\ used_at datetime default current_timestamp,\ unique(key))") self.cursor.execute("create table if not exists logs(\ handle text not null,\ peer_id integer,\ message_bytes blob not null,\ created_at datetime default current_timestamp)") self.cursor.execute("create table if not exists dedup_queue(\ hash text not null,\ created_at datetime default current_timestamp)") State.__instance = self def get_at(self, handle=None): at = [] if handle == None: results = self.cursor.execute("select handle_id,address,port,active_at from at\ order by updated_at desc").fetchall() else: result = self.cursor.execute("select handle_id from handles where handle=?", (handle,)).fetchone() if None != result: handle_id = result[0] else: return [] results = self.cursor.execute("select handle_id,address,port,active_at from at \ where handle_id=? order by updated_at desc", (handle_id,)).fetchall() for result in results: handle_id, address, port, updated_at = result h = self.cursor.execute("select handle from handles where handle_id=?", (handle_id,)).fetchone()[0] at.append({"handle": h, "address": "%s:%s" % (address, port), "active_at": updated_at if updated_at else "no packets received from this address"}) return at def is_duplicate_message(self, message_hash): with self.write_lock: self.cursor.execute("delete from dedup_queue where created_at < datetime(current_timestamp, '-1 hour')") self.conn.commit() result = self.cursor.execute("select hash from dedup_queue where hash=?", (message_hash,)).fetchone() logging.debug("checking if %s is dupe" % message_hash) if(result != None): return True else: return False def add_to_dedup_queue(self, message_hash): with self.write_lock: self.cursor.execute("insert into dedup_queue(hash)\ values(?)", (message_hash,)) logging.debug("added %s to dedup" % message_hash) self.conn.commit() def get_last_message_hash(self, handle, peer_id=None): if peer_id: message_bytes = self.cursor.execute("select message_bytes from logs\ where handle=? and peer_id=?\ order by created_at desc limit 1", (handle, peer_id)).fetchone() else: message_bytes = self.cursor.execute("select message_bytes from logs\ where handle=? and peer_id is null\ order by created_at desc limit 1", (handle,)).fetchone() if message_bytes: return hashlib.sha256(message_bytes[0][:]).digest() else: return "\x00" * 32 def log(self, handle, message_bytes, peer=None): with self.write_lock: if peer != None: peer_id = peer.peer_id else: peer_id = None self.cursor.execute("insert into logs(handle, peer_id, message_bytes)\ values(?, ?, ?)", (handle, peer_id, buffer(message_bytes))) def import_at_and_wot(self, at_path): with self.write_lock: wot = imp.load_source('wot', at_path) for peer in wot.peers: results = self.cursor.execute("select * from handles where handle=? limit 1", (peer["name"],)).fetchall() if len(results) == 0: key = peer["key"] port = peer["port"] address = peer["address"] self.cursor.execute("insert into wot(peer_id) values(null)") peer_id = self.cursor.lastrowid self.cursor.execute("insert into handles(peer_id, handle) values(?, ?)", (peer_id, peer["name"])) handle_id = self.cursor.lastrowid self.cursor.execute("insert into at(handle_id, address, port, updated_at) values(?, ?, ?, ?)", (handle_id, peer["address"], peer["port"], None)) self.cursor.execute("insert into keys(peer_id, key) values(?, ?)", (peer_id, key)) self.conn.commit() def update_at(self, peer, set_active_at=True): with self.write_lock: row = self.cursor.execute("select handle_id from handles where handle=?", (peer["handle"],)).fetchone() if row != None: handle_id = row[0] else: return try: self.cursor.execute("insert into at(handle_id, address, port) values(?, ?, ?)", (handle_id, peer["address"], peer["port"])) except sqlite3.IntegrityError as ex: self.cursor.execute("update at set updated_at = current_timestamp\ where handle_id=? and address=? and port=?", (handle_id, peer["address"], peer["port"])) if set_active_at: self.cursor.execute("update at set active_at = current_timestamp\ where handle_id=? and address=? and port=?", (handle_id, peer["address"], peer["port"])) self.conn.commit() def add_peer(self, handle): with self.write_lock: self.cursor.execute("insert into wot(peer_id) values(null)") peer_id = self.cursor.lastrowid self.cursor.execute("insert into handles(peer_id, handle) values(?, ?)", (peer_id, handle)) self.conn.commit() def remove_peer(self, handle): with self.write_lock: # get peer id result = self.cursor.execute("select peer_id from handles where handle=?", (handle,)).fetchone() if result == None: return else: peer_id = result[0] # get all aliases handle_ids = self.get_handle_ids_for_peer(peer_id) for handle_id in handle_ids: # delete at entries for each alias self.cursor.execute("delete from at where handle_id=?", (handle_id,)) self.cursor.execute("delete from handles where peer_id=?", (peer_id,)) # delete all keys for peer id self.cursor.execute("delete from keys where peer_id=?", (handle_id,)) # delete peer from wot self.cursor.execute("delete from wot where peer_id=?", (peer_id,)) self.conn.commit() def add_key(self, handle, key): with self.write_lock: peer_id = self.cursor.execute("select peer_id from handles where handle=?", (handle,)).fetchone()[0] if peer_id != None: self.cursor.execute("insert into keys(peer_id, key) values(?, ?)", (peer_id, key)) self.conn.commit() def remove_key(self, key): with self.write_lock: self.cursor.execute("delete from keys where key=?", (key,)) self.conn.commit() def get_handle_ids_for_peer(self, peer_id): return list(chain.from_iterable(self.cursor.execute("select handle_id from handles where peer_id=?", (peer_id,)).fetchall())) def get_peer_handles(self): handles = self.listify(self.cursor.execute("select handle from handles").fetchall()) return handles def get_peers(self): peers = [] handles = self.cursor.execute("select handle from handles").fetchall() for handle in handles: peer = self.get_peer_by_handle(handle[0]) if not (self.is_duplicate(peers, peer)): peers.append(peer) return peers def listify(self, results): return list(chain.from_iterable(results)) def get_keyed_peers(self): peer_ids = self.listify(self.cursor.execute("select peer_id from keys").fetchall()) peers = [] for peer_id in peer_ids: handle = self.cursor.execute("select handle from handles where peer_id=?", (peer_id,)).fetchone()[0] peer = self.get_peer_by_handle(handle) if self.is_duplicate(peers, peer): continue if peer.address == None or peer.port == None: continue peers.append(peer) return peers def get_peer_by_handle(self, handle): handle_info = self.cursor.execute("select handle_id, peer_id from handles where handle=?", (handle,)).fetchone() if handle_info == None: return None address = self.cursor.execute("select address, port from at where handle_id=?\ order by updated_at desc limit 1", (handle_info[0],)).fetchone() handles = self.listify(self.cursor.execute("select handle from handles where peer_id=?", (handle_info[1],)).fetchall()) keys = self.listify(self.cursor.execute("select key from keys where peer_id=?\ order by used_at desc", (handle_info[1],)).fetchall()) return Peer(self.socket, { "handles": handles, "peer_id": handle_info[1], "address": address[0] if address else None, "port": address[1] if address else None, "keys": keys }) def is_duplicate(self, peers, peer): for existing_peer in peers: if existing_peer.address == peer.address and existing_peer.port == peer.port: return True return False