from peer import Peer import sqlite3 import imp import hashlib import logging import datetime from itertools import chain KNOBS=({'max_bounces': 3, 'embargo_interval': 1, 'rubbish_interval': 10}) class State(object): __instance = None @staticmethod def get_instance(socket=None, db_path=None): if State.__instance == None: State(socket, db_path) return State.__instance def __init__(self, socket, db_path): if State.__instance != None: raise Exception("This class is a singleton") else: self.socket = socket self.conn = sqlite3.connect(db_path, check_same_thread=False) cursor = self.cursor() cursor.execute("create table if not exists at(handle_id integer,\ address text not null,\ port integer not null,\ updated_at datetime default null,\ unique(handle_id, address, port))") cursor.execute("create table if not exists wot(peer_id integer primary key)") cursor.execute("create table if not exists handles(handle_id integer primary key,\ peer_id integer,\ handle text,\ unique(handle))") cursor.execute("create table if not exists keys(peer_id intenger,\ key text,\ used_at datetime default current_timestamp,\ unique(key))") cursor.execute("create table if not exists logs(\ handle text not null,\ peer_id integer,\ message_bytes blob not null,\ created_at datetime default current_timestamp)") cursor.execute("create table if not exists dedup_queue(\ hash text not null,\ created_at datetime default current_timestamp)") cursor.execute("create table if not exists knobs(\ name text not null,\ value text not null)") State.__instance = self def cursor(self): return self.conn.cursor() def get_knobs(self): cursor = self.cursor() results = cursor.execute("select name, value from knobs order by name asc").fetchall() knobs = {} for result in results: knobs[result[0]] = result[1] for key in KNOBS.keys(): if not knobs.get(key): knobs[key] = KNOBS[key] return knobs def get_knob(self, knob_name): cursor = self.cursor() result = cursor.execute("select value from knobs where name=?", (knob_name,)).fetchone() if result: return result[0] elif KNOBS.get(knob_name): return KNOBS.get(knob_name) else: return None def set_knob(self, knob_name, knob_value): cursor = self.cursor() result = cursor.execute("select value from knobs where name=?", (knob_name,)).fetchone() if result: cursor.execute("update knobs set value=? where name=?", (knob_value, knob_name,)) else: cursor.execute("insert into knobs(name, value) values(?, ?)", (knob_name, knob_value,)) def get_at(self, handle=None): cursor = self.cursor() at = [] if handle == None: results = cursor.execute("select handle_id, address, port, updated_at from at\ order by updated_at desc").fetchall() else: result = cursor.execute("select handle_id from handles where handle=?", (handle,)).fetchone() if None != result: handle_id = result[0] else: return [] results = cursor.execute("select handle_id, address, port, updated_at from at \ where handle_id=? order by updated_at desc", (handle_id,)).fetchall() for result in results: handle_id, address, port, updated_at = result h = cursor.execute("select handle from handles where handle_id=?", (handle_id,)).fetchone()[0] at.append({"handle": h, "address": "%s:%s" % (address, port), "active_at": updated_at if updated_at else "no packets received from this address"}) return at def is_duplicate_message(self, message_hash): cursor = self.cursor() cursor.execute("delete from dedup_queue where created_at < datetime(current_timestamp, '-1 hour')") self.conn.commit() result = cursor.execute("select hash from dedup_queue where hash=?", (message_hash,)).fetchone() logging.debug("checking if %s is dupe" % message_hash) if(result != None): return True else: return False def add_to_dedup_queue(self, message_hash): cursor = self.cursor() cursor.execute("insert into dedup_queue(hash)\ values(?)", (message_hash,)) logging.debug("added %s to dedup" % message_hash) self.conn.commit() def get_last_message_hash(self, handle, peer_id=None): cursor = self.cursor() if peer_id: message_bytes = cursor.execute("select message_bytes from logs\ where handle=? and peer_id=?\ order by created_at desc limit 1", (handle, peer_id)).fetchone() else: message_bytes = cursor.execute("select message_bytes from logs\ where handle=? and peer_id is null\ order by created_at desc limit 1", (handle,)).fetchone() if message_bytes: return hashlib.sha256(message_bytes[0][:]).digest() else: return "\x00" * 32 def log(self, handle, message_bytes, peer=None): cursor = self.cursor() if peer != None: peer_id = peer.peer_id else: peer_id = None cursor.execute("insert into logs(handle, peer_id, message_bytes)\ values(?, ?, ?)", (handle, peer_id, buffer(message_bytes))) def import_at_and_wot(self, at_path): cursor = self.cursor() wot = imp.load_source('wot', at_path) for peer in wot.peers: results = cursor.execute("select * from handles where handle=? limit 1", (peer["name"],)).fetchall() if len(results) == 0: key = peer["key"] port = peer["port"] address = peer["address"] cursor.execute("insert into wot(peer_id) values(null)") peer_id = cursor.lastrowid cursor.execute("insert into handles(peer_id, handle) values(?, ?)", (peer_id, peer["name"])) handle_id = cursor.lastrowid cursor.execute("insert into at(handle_id, address, port, updated_at) values(?, ?, ?, ?)", (handle_id, peer["address"], peer["port"], None)) cursor.execute("insert into keys(peer_id, key) values(?, ?)", (peer_id, key)) self.conn.commit() def update_at(self, peer, set_active_at=True): cursor = self.cursor() row = cursor.execute("select handle_id from handles where handle=?", (peer["handle"],)).fetchone() if row != None: handle_id = row[0] else: raise Exception("handle not found") at_entry = cursor.execute("select handle_id, address, port from at where handle_id=?", (handle_id,)).fetchone() # if there are no AT entries for this handle, insert one timestamp = datetime.datetime.now() if set_active_at else None if at_entry == None: cursor.execute("insert into at(handle_id, address, port, updated_at) values(?, ?, ?, ?)", (handle_id, peer["address"], peer["port"], timestamp)) logging.debug("inserted new at entry for %s: %s:%d" % ( peer['handle'], peer['address'], peer['port'])) # otherwise update the existing entry if it differs else: try: if (at_entry[1] != peer['address'] or at_entry[2] != peer['port']): cursor.execute("update at set updated_at = ?,\ address = ?,\ port = ?\ where handle_id=?", (timestamp, peer["address"], peer["port"], handle_id)) logging.debug("updated at entry for %s: %s:%d" % ( peer['handle'], peer['address'], peer['port'])) except sqlite3.IntegrityError: cursor.execute("delete from at where handle_id=?", (handle_id,)) self.conn.commit() def add_peer(self, handle): cursor = self.cursor() cursor.execute("insert into wot(peer_id) values(null)") peer_id = cursor.lastrowid cursor.execute("insert into handles(peer_id, handle) values(?, ?)", (peer_id, handle)) self.conn.commit() def remove_peer(self, handle): cursor = self.cursor() # get peer id result = cursor.execute("select peer_id from handles where handle=?", (handle,)).fetchone() if result == None: raise Exception("handle not found") else: peer_id = result[0] # get all aliases handle_ids = self.get_handle_ids_for_peer(peer_id) for handle_id in handle_ids: # delete at entries for each alias cursor.execute("delete from at where handle_id=?", (handle_id,)) cursor.execute("delete from handles where peer_id=?", (peer_id,)) # delete all keys for peer id cursor.execute("delete from keys where peer_id=?", (handle_id,)) # delete peer from wot cursor.execute("delete from wot where peer_id=?", (peer_id,)) self.conn.commit() def add_key(self, handle, key): cursor = self.cursor() peer_id = cursor.execute("select peer_id from handles where handle=?", (handle,)).fetchone()[0] if peer_id != None: cursor.execute("insert into keys(peer_id, key) values(?, ?)", (peer_id, key)) self.conn.commit() def remove_key(self, key): cursor = self.cursor() cursor.execute("delete from keys where key=?", (key,)) self.conn.commit() def get_handle_ids_for_peer(self, peer_id): cursor = self.cursor() return list(chain.from_iterable(cursor.execute("select handle_id from handles where peer_id=?", (peer_id,)).fetchall())) def get_peer_handles(self): cursor = self.cursor() handles = self.listify(cursor.execute("select handle from handles").fetchall()) return handles def get_peers(self): cursor = self.cursor() peers = [] handles = cursor.execute("select handle from handles").fetchall() for handle in handles: peer = self.get_peer_by_handle(handle[0]) if not (self.is_duplicate(peers, peer)): peers.append(peer) return peers def listify(self, results): return list(chain.from_iterable(results)) def get_keyed_peers(self, exclude_addressless=False): cursor = self.cursor() peer_ids = self.listify(cursor.execute("select peer_id from keys").fetchall()) peers = [] for peer_id in peer_ids: handle = cursor.execute("select handle from handles where peer_id=?", (peer_id,)).fetchone()[0] peer = self.get_peer_by_handle(handle) if self.is_duplicate(peers, peer): continue if exclude_addressless and (peer.address == None or peer.port == None): continue peers.append(peer) return peers def get_peer_by_handle(self, handle): cursor = self.cursor() handle_info = cursor.execute("select handle_id, peer_id from handles where handle=?", (handle,)).fetchone() if handle_info == None: return None address = cursor.execute("select address, port from at where handle_id=?\ order by updated_at desc limit 1", (handle_info[0],)).fetchone() handles = self.listify(cursor.execute("select handle from handles where peer_id=?", (handle_info[1],)).fetchall()) keys = self.listify(cursor.execute("select key from keys where peer_id=?\ order by used_at desc", (handle_info[1],)).fetchall()) return Peer(self.socket, { "handles": handles, "peer_id": handle_info[1], "address": address[0] if address else None, "port": address[1] if address else None, "keys": keys }) def is_duplicate(self, peers, peer): for existing_peer in peers: if existing_peer.address == peer.address and existing_peer.port == peer.port: return True return False