import calendar from peer import Peer from message import EMPTY_CHAIN from message import Message import sqlite3 import imp import binascii import logging import datetime import time import caribou from itertools import chain KNOBS=( { 'max_bounces': 3, 'embargo_interval_seconds': 1, 'rubbish_interval_seconds': 10, 'nick': '', 'order_buffer_check_seconds': 180, 'order_buffer_expiration_seconds': 120, 'short_buffer_expiration_seconds': 1, 'short_buffer_check_interval_seconds': 1, 'getdata_requests_expiration_seconds': 10, 'peer_offline_interval_seconds': 60, 'peer_away_interval_seconds': 10 * 60, 'presence_check_seconds': 5, } ) class State(object): def __init__(self, station, db_path=None): self.station = station if db_path: self.conn = sqlite3.connect(db_path) else: self.conn = sqlite3.connect("file::memory:") cursor = self.cursor() cursor.execute("create table if not exists handle_self_chain(id integer primary key autoincrement,\ handle string not null,\ message_hash blob not null)") cursor.execute("create table if not exists broadcast_self_chain(id integer primary key autoincrement,\ message_hash blob not null)") cursor.execute("create table if not exists net_chain(id integer primary key autoincrement,\ message_hash blob not null)") cursor.execute("create table if not exists at(handle_id integer,\ address text not null,\ port integer not null,\ updated_at datetime default null,\ unique(handle_id, address, port))") cursor.execute("create table if not exists wot(peer_id integer primary key autoincrement)") cursor.execute("create table if not exists handles(handle_id integer primary key,\ peer_id integer,\ handle text,\ unique(handle))") cursor.execute("create table if not exists keys(peer_id intenger,\ key text,\ used_at datetime default current_timestamp,\ unique(key))") cursor.execute("create table if not exists log(\ message_bytes blob not null,\ message_hash text not null, \ command integer not null, \ timestamp datetime not null, \ created_at datetime default current_timestamp)") cursor.execute("create table if not exists knobs(\ name text not null,\ value text not null)") # migrate the db if necessary if db_path: caribou.upgrade(db_path, "migrations") self.conn.commit() def cursor(self): return self.conn.cursor() def update_handle_self_chain(self, handle, message_hash): cursor = self.cursor() cursor.execute("insert into handle_self_chain(handle, message_hash) values(?, ?)", (handle, buffer(message_hash))) self.conn.commit() def get_handle_self_chain(self, handle): cursor = self.cursor() results = cursor.execute("select message_hash from handle_self_chain where handle=?\ order by id desc limit 1", (handle,)).fetchone() if results is not None: return results[0][:] else: return EMPTY_CHAIN def update_broadcast_self_chain(self, message_hash): cursor = self.cursor() cursor.execute("insert into broadcast_self_chain(message_hash) values(?)", (buffer(message_hash),)) self.conn.commit() def get_broadcast_self_chain(self): cursor = self.cursor() results = cursor.execute("select message_hash from broadcast_self_chain order by id desc limit 1").fetchone() if results is not None: return results[0][:] else: return EMPTY_CHAIN def update_net_chain(self, message_hash): self.cursor().execute("insert into net_chain(message_hash) values(?)", (buffer(message_hash),)) self.conn.commit() def get_net_chain(self): cursor = self.cursor() results = cursor.execute("select message_hash from net_chain order by id desc limit 1").fetchone() if results is not None: return results[0][:] else: return EMPTY_CHAIN def get_knobs(self): cursor = self.cursor() results = cursor.execute("select name, value from knobs order by name asc").fetchall() knobs = {} for result in results: knobs[result[0]] = result[1] for key in KNOBS.keys(): if not knobs.get(key): knobs[key] = KNOBS[key] return knobs def get_knob(self, knob_name): cursor = self.cursor() result = cursor.execute("select value from knobs where name=?", (knob_name,)).fetchone() if result: return result[0] elif KNOBS.get(knob_name): return KNOBS.get(knob_name) else: return None def set_knob(self, knob_name, knob_value): cursor = self.cursor() result = cursor.execute("select value from knobs where name=?", (knob_name,)).fetchone() if result: cursor.execute("update knobs set value=? where name=?", (knob_value, knob_name,)) else: cursor.execute("insert into knobs(name, value) values(?, ?)", (knob_name, knob_value,)) self.conn.commit() def get_latest_message_timestamp(self): cursor = self.cursor() result = cursor.execute("select timestamp from log order by timestamp desc limit 1").fetchone() if result: return result[0] def get_at(self, handle=None): cursor = self.cursor() at = [] if handle == None: results = cursor.execute("select handle_id, address, port, updated_at, strftime('%s', updated_at) from at\ order by updated_at desc").fetchall() else: result = cursor.execute("select handle_id from handles where handle=?", (handle,)).fetchone() if None != result: handle_id = result[0] else: return [] results = cursor.execute("select handle_id, address, port, updated_at, strftime('%s', updated_at) from at \ where handle_id=? order by updated_at desc", (handle_id,)).fetchall() for result in results: handle_id, address, port, updated_at_utc, updated_at_unixtime = result h = cursor.execute("select handle from handles where handle_id=?", (handle_id,)).fetchone()[0] if updated_at_utc: if '.' not in updated_at_utc: updated_at_utc = updated_at_utc + '.0' dt_format = '%Y-%m-%d %H:%M:%S.%f' dt_utc = datetime.datetime.strptime(updated_at_utc, dt_format) dt_local = self.utc_to_local(dt_utc) updated_at = datetime.datetime.strftime(dt_local, dt_format) else: updated_at = "no packets received from this address" at.append({ "handle": h, "address": "%s:%s" % (address, port), "active_at": updated_at, "active_at_unixtime": int(updated_at_unixtime) if updated_at_unixtime else 0 }) return at def import_at_and_wot(self, at_path): cursor = self.cursor() wot = imp.load_source('wot', at_path) for peer in wot.peers: results = cursor.execute("select * from handles where handle=? limit 1", (peer["name"],)).fetchall() if len(results) == 0: key = peer["key"] port = peer["port"] address = peer["address"] cursor.execute("insert into wot(peer_id) values(null)") peer_id = cursor.lastrowid cursor.execute("insert into handles(peer_id, handle) values(?, ?)", (peer_id, peer["name"])) handle_id = cursor.lastrowid cursor.execute("insert into at(handle_id, address, port, updated_at) values(?, ?, ?, ?)", (handle_id, peer["address"], peer["port"], None)) cursor.execute("insert into keys(peer_id, key) values(?, ?)", (peer_id, key)) self.conn.commit() def update_at(self, peer, set_active_at=True): cursor = self.cursor() row = cursor.execute("select handle_id from handles where handle=?", (peer["handle"],)).fetchone() if row != None: handle_id = row[0] else: raise Exception("handle not found") at_entry = cursor.execute("select handle_id, address, port from at where handle_id=?", (handle_id,)).fetchone() # if there are no AT entries for this handle, insert one timestamp = datetime.datetime.utcnow() if set_active_at else None if at_entry == None: cursor.execute("insert into at(handle_id, address, port, updated_at) values(?, ?, ?, ?)", (handle_id, peer["address"], peer["port"], timestamp)) logging.debug("inserted new at entry for %s: %s:%d" % ( peer['handle'], peer['address'], peer['port'])) # otherwise just update the existing entry else: try: cursor.execute("update at set updated_at = ?,\ address = ?,\ port = ?\ where handle_id=?", (timestamp, peer["address"], peer["port"], handle_id)) except sqlite3.IntegrityError: cursor.execute("delete from at where handle_id=?", (handle_id,)) self.conn.commit() def add_peer(self, handle): cursor = self.cursor() cursor.execute("insert into wot(peer_id) values(null)") peer_id = cursor.lastrowid cursor.execute("insert into handles(peer_id, handle) values(?, ?)", (peer_id, handle)) self.conn.commit() def remove_peer(self, handle): cursor = self.cursor() # get peer id result = cursor.execute("select peer_id from handles where handle=?", (handle,)).fetchone() if result == None: raise Exception("handle not found") else: peer_id = result[0] # get all aliases handle_ids = self.get_handle_ids_for_peer(peer_id) for handle_id in handle_ids: # delete at entries for each alias cursor.execute("delete from at where handle_id=?", (handle_id,)) cursor.execute("delete from handles where peer_id=?", (peer_id,)) # delete all keys for peer id cursor.execute("delete from keys where peer_id=?", (handle_id,)) # delete peer from wot cursor.execute("delete from wot where peer_id=?", (peer_id,)) self.conn.commit() def add_key(self, handle, key): cursor = self.cursor() peer_id = cursor.execute("select peer_id from handles where handle=?", (handle,)).fetchone()[0] if peer_id != None: cursor.execute("insert into keys(peer_id, key) values(?, ?)", (peer_id, key)) self.conn.commit() def remove_key(self, key): cursor = self.cursor() cursor.execute("delete from keys where key=?", (key,)) self.conn.commit() def get_handle_ids_for_peer(self, peer_id): cursor = self.cursor() return list(chain.from_iterable(cursor.execute("select handle_id from handles where peer_id=?", (peer_id,)).fetchall())) def get_peer_handles(self): cursor = self.cursor() handles = self.listify(cursor.execute("select handle from handles").fetchall()) return handles def get_peers(self): cursor = self.cursor() peers = [] handles = cursor.execute("select handle from handles").fetchall() for handle in handles: peer = self.get_peer_by_handle(handle[0]) if not (self.is_duplicate(peers, peer)): peers.append(peer) return peers def listify(self, results): return list(chain.from_iterable(results)) def log_has_message(self, message_hash): cursor = self.cursor() result = cursor.execute("select exists(select 1 from log where message_hash=?)\ limit 1", (binascii.hexlify(message_hash),)).fetchone() return result[0] def log_message(self, message): cursor = self.cursor() message_hash_hex_string = binascii.hexlify(message.message_hash) cursor.execute("insert into log(message_hash, message_bytes, command, timestamp) values(?, ?, ?, ?)", (message_hash_hex_string, buffer(message.message_bytes), message.command, message.timestamp)) self.conn.commit() def get_message(self, message_hash): cursor = self.cursor() message_hash_hex_string = binascii.hexlify(message_hash) result = cursor.execute("select command, message_bytes from log where message_hash=? limit 1", (message_hash_hex_string,)).fetchone() if result: return result[0], result[1][:] return None, None def get_keyed_peers(self, exclude_addressless=False, exclude_ids=[]): cursor = self.cursor() peer_ids = self.listify(cursor.execute("select peer_id from keys\ where peer_id not in (%s) order by random()" % ','.join('?'*len(exclude_ids)), exclude_ids).fetchall()) peers = [] for peer_id in peer_ids: handle = cursor.execute("select handle from handles where peer_id=?", (peer_id,)).fetchone()[0] peer = self.get_peer_by_handle(handle) if self.is_duplicate(peers, peer): continue if exclude_addressless and (peer.address == None or peer.port == None): continue peers.append(peer) return peers def handle_is_online(self, handle): # last rubbish message from peer associated with handle is # sufficiently recent try: at = self.get_at(handle)[0] except IndexError: return False if at["active_at_unixtime"] > time.time() - int(self.get_knob("peer_offline_interval_seconds")): return True else: return False def utc_to_local(self, utc_dt): # get integer timestamp to avoid precision lost timestamp = calendar.timegm(utc_dt.timetuple()) local_dt = datetime.datetime.fromtimestamp(timestamp) assert utc_dt.resolution >= datetime.timedelta(microseconds=1) return local_dt.replace(microsecond=utc_dt.microsecond) def handle_is_away(self, handle): # last broadcast or dm is sufficiently old cursor = self.cursor() away_interval_seconds = int(self.get_knob("peer_away_interval_seconds")) dt = datetime.datetime.utcfromtimestamp( time.time() - away_interval_seconds ) raw_messages = cursor.execute("select message_bytes from log where created_at > ?", (dt,)).fetchall() for message_bytes in raw_messages: int_ts, self_chain, net_chain, speaker, body = Message._unpack_message(message_bytes[0][:]) if speaker == handle: return False return True def get_peer_by_handle(self, handle): cursor = self.cursor() handle_info = cursor.execute("select handle_id, peer_id from handles where handle=?", (handle,)).fetchone() if handle_info == None: return None peer_id = handle_info[1] address = cursor.execute("select address, port from at where handle_id=?\ order by updated_at desc limit 1", (handle_info[0],)).fetchone() handles = self.listify(cursor.execute("select handle from handles where peer_id=?", (peer_id,)).fetchall()) keys = self.listify(cursor.execute("select key from keys where peer_id=?\ order by random()", (peer_id,)).fetchall()) return Peer(self.station.socket, { "handles": handles, "peer_id": handle_info[1], "address": address[0] if address else None, "port": address[1] if address else None, "keys": keys }) def is_duplicate(self, peers, peer): for existing_peer in peers: if (not existing_peer.address is None and existing_peer.address == peer.address and existing_peer.port == peer.port): return True return False