from lib.peer import Peer import sqlite3 import imp import hashlib from itertools import chain class State(object): def __init__(self, server, db_path): self.server = server self.conn = sqlite3.connect(db_path) self.cursor = self.conn.cursor() self.cursor.execute("create table if not exists at(handle_id integer,\ address text not null,\ port integer not null,\ active_at datetime default null,\ updated_at datetime default current_timestamp,\ unique(handle_id, address, port))") self.cursor.execute("create table if not exists wot(peer_id integer primary key)") self.cursor.execute("create table if not exists handles(handle_id integer primary key,\ peer_id integer,\ handle text,\ unique(handle))") self.cursor.execute("create table if not exists keys(peer_id intenger,\ key text,\ used_at datetime default current_timestamp,\ unique(key))") self.cursor.execute("create table if not exists logs(\ handle text not null,\ peer_id integer,\ message_bytes blob not null,\ created_at datetime default current_timestamp)") def get_at(self, handle=None): at = [] if handle == None: results = self.cursor.execute("select handle_id,address,port,active_at from at\ order by updated_at desc").fetchall() else: result = self.cursor.execute("select handle_id from handles where handle=?", (handle,)).fetchone() if None != result: handle_id = result[0] else: return [] results = self.cursor.execute("select handle_id,address,port,active_at from at \ where handle_id=? order by updated_at desc", (handle_id,)).fetchall() for result in results: handle_id, address, port, updated_at = result h = self.cursor.execute("select handle from handles where handle_id=?", (handle_id,)).fetchone()[0] at.append({"handle": h, "address": "%s:%s" % (address, port), "active_at": updated_at}) return at def get_last_message_hash(self, handle, peer_id=None): if peer_id: message_bytes = self.cursor.execute("select message_bytes from logs\ where handle=? and peer_id=?\ order by created_at desc limit 1", (handle, peer_id)).fetchone() else: message_bytes = self.cursor.execute("select message_bytes from logs\ where handle=? and peer_id is null\ order by created_at desc limit 1", (handle,)).fetchone() if message_bytes: return hashlib.sha256(message_bytes[0][:]).digest() else: return "0" * 32 def log(self, handle, message_bytes, peer_id=None): self.cursor.execute("insert into logs(handle, peer_id, message_bytes)\ values(?, ?, ?)", (handle, peer_id, buffer(message_bytes))) def import_at_and_wot(self, at_path): wot = imp.load_source('wot', at_path) for peer in wot.peers: results = self.cursor.execute("select * from handles where handle=? limit 1", (peer["name"],)).fetchall() if len(results) == 0: key = peer["key"] port = peer["port"] address = peer["address"] self.cursor.execute("insert into wot(peer_id) values(null)") peer_id = self.cursor.lastrowid self.cursor.execute("insert into handles(peer_id, handle) values(?, ?)", (peer_id, peer["name"])) handle_id = self.cursor.lastrowid self.cursor.execute("insert into at(handle_id, address, port, updated_at) values(?, ?, ?, ?)", (handle_id, peer["address"], peer["port"], None)) self.cursor.execute("insert into keys(peer_id, key) values(?, ?)", (peer_id, key)) self.conn.commit() def update_address_table(self, peer, set_active_at=True): row = self.cursor.execute("select handle_id from handles where handle=?", (peer["handle"],)).fetchone() if row != None: handle_id = row[0] else: return try: self.cursor.execute("insert into at(handle_id, address, port) values(?, ?, ?)", (handle_id, peer["address"], peer["port"])) except sqlite3.IntegrityError as ex: self.cursor.execute("update at set updated_at = current_timestamp\ where handle_id=? and address=? and port=?", (handle_id, peer["address"], peer["port"])) if set_active_at: self.cursor.execute("update at set active_at = current_timestamp\ where handle_id=? and address=? and port=?", (handle_id, peer["address"], peer["port"])) self.conn.commit() def add_peer(self, handle): self.cursor.execute("insert into wot(peer_id) values(null)") peer_id = self.cursor.lastrowid self.cursor.execute("insert into handles(peer_id, handle) values(?, ?)", (peer_id, handle)) self.conn.commit() def remove_peer(self, handle): # get peer id result = self.cursor.execute("select peer_id from handles where handle=?", (handle,)).fetchone() if result == None: return else: peer_id = result[0] # get all aliases handle_ids = self.get_handle_ids_for_peer(peer_id) for handle_id in handle_ids: # delete at entries for each alias self.cursor.execute("delete from at where handle_id=?", (handle_id,)) self.cursor.execute("delete from handles where peer_id=?", (peer_id,)) # delete all keys for peer id self.cursor.execute("delete from keys where peer_id=?", (handle_id,)) # delete peer from wot self.cursor.execute("delete from wot where peer_id=?", (peer_id,)) self.conn.commit() def add_key(self, handle, key): peer_id = self.cursor.execute("select peer_id from handles where handle=?", (handle,)).fetchone()[0] if peer_id != None: self.cursor.execute("insert into keys(peer_id, key) values(?, ?)", (peer_id, key)) self.conn.commit() def remove_key(self, key): self.cursor.execute("delete from keys where key=?", (key,)) self.conn.commit() def get_handle_ids_for_peer(self, peer_id): return list(chain.from_iterable(self.cursor.execute("select handle_id from handles where peer_id=?", (peer_id,)).fetchall())) def get_peer_handles(self): handles = list(chain.from_iterable(self.cursor.execute("select handle from handles").fetchall())) return handles def get_peers(self): peers = [] handles = self.cursor.execute("select handle from handles").fetchall() for handle in handles: peer = self.get_peer_by_handle(handle[0]) if not (self.is_duplicate(peers, peer)): peers.append(peer) return peers def get_peer_by_handle(self, handle): handle_info = self.cursor.execute("select handle_id, peer_id from handles where handle=?", (handle,)).fetchone() if handle_info == None: return None address = self.cursor.execute("select address, port from at where handle_id=?\ order by updated_at desc limit 1", (handle_info[0],)).fetchone() handles = list(chain.from_iterable(self.cursor.execute("select handle from handles where peer_id=?", (handle_info[1],)).fetchall())) keys = list(chain.from_iterable(self.cursor.execute("select key from keys where peer_id=?\ order by used_at desc", (handle_info[1],)).fetchall())) return Peer(self.server, { "handles": handles, "peer_id": handle_info[1], "address": address[0] if address else "", "port": address[1] if address else "", "keys": keys }) def is_duplicate(self, peers, peer): for existing_peer in peers: if existing_peer.address == peer.address and existing_peer.port == peer.port: return True return False