raw
genesis                 1 from lib.peer import Peer
genesis 2 import sqlite3
genesis 3 import imp
genesis 4 import hashlib
9987-embargoing 5 import logging
genesis 6 from itertools import chain
genesis 7
genesis 8 class State(object):
9987-embargoing 9 __instance = None
9987-embargoing 10 @staticmethod
9987-embargoing 11 def get_instance(socket=None, db_path=None):
9987-embargoing 12 if State.__instance == None:
9987-embargoing 13 State(socket, db_path)
9987-embargoing 14 return State.__instance
9987-embargoing 15
9987-embargoing 16 def __init__(self, socket, db_path):
9987-embargoing 17 if State.__instance != None:
9987-embargoing 18 raise Exception("This class is a singleton")
9987-embargoing 19 else:
9987-embargoing 20 self.socket = socket
9987-embargoing 21 self.conn = sqlite3.connect(db_path, check_same_thread=False)
9987-embargoing 22 self.cursor = self.conn.cursor()
9987-embargoing 23 self.cursor.execute("create table if not exists at(handle_id integer,\
9987-embargoing 24 address text not null,\
9987-embargoing 25 port integer not null,\
9987-embargoing 26 active_at datetime default null,\
9987-embargoing 27 updated_at datetime default current_timestamp,\
9987-embargoing 28 unique(handle_id, address, port))")
9987-embargoing 29
9987-embargoing 30 self.cursor.execute("create table if not exists wot(peer_id integer primary key)")
9987-embargoing 31
9987-embargoing 32 self.cursor.execute("create table if not exists handles(handle_id integer primary key,\
9987-embargoing 33 peer_id integer,\
9987-embargoing 34 handle text,\
9987-embargoing 35 unique(handle))")
9987-embargoing 36
9987-embargoing 37 self.cursor.execute("create table if not exists keys(peer_id intenger,\
9987-embargoing 38 key text,\
9987-embargoing 39 used_at datetime default current_timestamp,\
9987-embargoing 40 unique(key))")
9987-embargoing 41
9987-embargoing 42 self.cursor.execute("create table if not exists logs(\
9987-embargoing 43 handle text not null,\
9987-embargoing 44 peer_id integer,\
9987-embargoing 45 message_bytes blob not null,\
9987-embargoing 46 created_at datetime default current_timestamp)")
9987-embargoing 47
9987-embargoing 48 self.cursor.execute("create table if not exists dedup_queue(\
9987-embargoing 49 hash text not null,\
9987-embargoing 50 created_at datetime default current_timestamp)")
9987-embargoing 51 State.__instance = self
9988-hash-dedup 52
genesis 53 def get_at(self, handle=None):
genesis 54 at = []
genesis 55 if handle == None:
9991-improved-log... 56 results = self.cursor.execute("select handle_id,address,port,active_at from at\
genesis 57 order by updated_at desc").fetchall()
genesis 58 else:
9992-handle-edge-... 59 result = self.cursor.execute("select handle_id from handles where handle=?",
9992-handle-edge-... 60 (handle,)).fetchone()
9992-handle-edge-... 61 if None != result:
9992-handle-edge-... 62 handle_id = result[0]
genesis 63 else:
genesis 64 return []
9991-improved-log... 65 results = self.cursor.execute("select handle_id,address,port,active_at from at \
9992-handle-edge-... 66 where handle_id=? order by updated_at desc",
9992-handle-edge-... 67 (handle_id,)).fetchall()
genesis 68 for result in results:
genesis 69 handle_id, address, port, updated_at = result
genesis 70 h = self.cursor.execute("select handle from handles where handle_id=?",
genesis 71 (handle_id,)).fetchone()[0]
genesis 72 at.append({"handle": h,
genesis 73 "address": "%s:%s" % (address, port),
9987-embargoing 74 "active_at": updated_at if updated_at else "no packets received from this address"})
genesis 75 return at
genesis 76
genesis 77
9988-hash-dedup 78 def is_duplicate_message(self, message_hash):
9988-hash-dedup 79 self.cursor.execute("delete from dedup_queue where created_at < datetime(current_timestamp, '-1 hour')")
9988-hash-dedup 80 self.conn.commit()
9988-hash-dedup 81 result = self.cursor.execute("select hash from dedup_queue where hash=?",
9988-hash-dedup 82 (message_hash,)).fetchone()
9987-embargoing 83 logging.debug("checking if %s is dupe" % message_hash)
9988-hash-dedup 84 if(result != None):
9988-hash-dedup 85 return True
9988-hash-dedup 86 else:
9988-hash-dedup 87 return False
9988-hash-dedup 88
9988-hash-dedup 89 def add_to_dedup_queue(self, message_hash):
9988-hash-dedup 90 self.cursor.execute("insert into dedup_queue(hash)\
9988-hash-dedup 91 values(?)",
9988-hash-dedup 92 (message_hash,))
9987-embargoing 93 logging.debug("added %s to dedup" % message_hash)
9988-hash-dedup 94 self.conn.commit()
9988-hash-dedup 95
genesis 96 def get_last_message_hash(self, handle, peer_id=None):
genesis 97 if peer_id:
genesis 98 message_bytes = self.cursor.execute("select message_bytes from logs\
genesis 99 where handle=? and peer_id=?\
genesis 100 order by created_at desc limit 1",
genesis 101 (handle, peer_id)).fetchone()
genesis 102
genesis 103 else:
genesis 104 message_bytes = self.cursor.execute("select message_bytes from logs\
genesis 105 where handle=? and peer_id is null\
genesis 106 order by created_at desc limit 1",
genesis 107 (handle,)).fetchone()
genesis 108
genesis 109 if message_bytes:
genesis 110 return hashlib.sha256(message_bytes[0][:]).digest()
genesis 111 else:
9987-embargoing 112 return "\x00" * 32
9987-embargoing 113
9987-embargoing 114 def log(self, handle, message_bytes, peer=None):
9987-embargoing 115 if peer != None:
9987-embargoing 116 peer_id = peer.peer_id
9987-embargoing 117 else:
9987-embargoing 118 peer_id = None
genesis 119
genesis 120 self.cursor.execute("insert into logs(handle, peer_id, message_bytes)\
genesis 121 values(?, ?, ?)",
genesis 122 (handle, peer_id, buffer(message_bytes)))
genesis 123
genesis 124 def import_at_and_wot(self, at_path):
genesis 125 wot = imp.load_source('wot', at_path)
genesis 126 for peer in wot.peers:
genesis 127 results = self.cursor.execute("select * from handles where handle=? limit 1",
genesis 128 (peer["name"],)).fetchall()
genesis 129 if len(results) == 0:
genesis 130 key = peer["key"]
genesis 131 port = peer["port"]
genesis 132 address = peer["address"]
genesis 133 self.cursor.execute("insert into wot(peer_id) values(null)")
genesis 134 peer_id = self.cursor.lastrowid
genesis 135 self.cursor.execute("insert into handles(peer_id, handle) values(?, ?)",
genesis 136 (peer_id, peer["name"]))
genesis 137 handle_id = self.cursor.lastrowid
9991-improved-log... 138 self.cursor.execute("insert into at(handle_id, address, port, updated_at) values(?, ?, ?, ?)",
9991-improved-log... 139 (handle_id, peer["address"], peer["port"], None))
genesis 140 self.cursor.execute("insert into keys(peer_id, key) values(?, ?)",
genesis 141 (peer_id, key))
genesis 142
genesis 143 self.conn.commit()
genesis 144
9987-embargoing 145 def update_at(self, peer, set_active_at=True):
genesis 146 row = self.cursor.execute("select handle_id from handles where handle=?",
genesis 147 (peer["handle"],)).fetchone()
genesis 148 if row != None:
genesis 149 handle_id = row[0]
genesis 150 else:
genesis 151 return
9991-improved-log... 152
genesis 153 try:
genesis 154 self.cursor.execute("insert into at(handle_id, address, port) values(?, ?, ?)",
genesis 155 (handle_id, peer["address"], peer["port"]))
genesis 156 except sqlite3.IntegrityError as ex:
genesis 157 self.cursor.execute("update at set updated_at = current_timestamp\
genesis 158 where handle_id=? and address=? and port=?",
genesis 159 (handle_id, peer["address"], peer["port"]))
9991-improved-log... 160 if set_active_at:
9991-improved-log... 161 self.cursor.execute("update at set active_at = current_timestamp\
9991-improved-log... 162 where handle_id=? and address=? and port=?",
9991-improved-log... 163 (handle_id, peer["address"], peer["port"]))
9991-improved-log... 164 self.conn.commit()
genesis 165
genesis 166 def add_peer(self, handle):
genesis 167 self.cursor.execute("insert into wot(peer_id) values(null)")
genesis 168 peer_id = self.cursor.lastrowid
genesis 169 self.cursor.execute("insert into handles(peer_id, handle) values(?, ?)",
genesis 170 (peer_id, handle))
genesis 171 self.conn.commit()
9991-improved-log... 172
genesis 173
genesis 174 def remove_peer(self, handle):
genesis 175 # get peer id
genesis 176
9992-handle-edge-... 177 result = self.cursor.execute("select peer_id from handles where handle=?", (handle,)).fetchone()
9992-handle-edge-... 178 if result == None:
9992-handle-edge-... 179 return
genesis 180 else:
9992-handle-edge-... 181 peer_id = result[0]
genesis 182 # get all aliases
genesis 183
genesis 184 handle_ids = self.get_handle_ids_for_peer(peer_id)
genesis 185 for handle_id in handle_ids:
genesis 186 # delete at entries for each alias
genesis 187
genesis 188 self.cursor.execute("delete from at where handle_id=?", (handle_id,))
genesis 189
genesis 190 self.cursor.execute("delete from handles where peer_id=?", (peer_id,))
genesis 191
genesis 192 # delete all keys for peer id
genesis 193
genesis 194 self.cursor.execute("delete from keys where peer_id=?", (handle_id,))
genesis 195
genesis 196 # delete peer from wot
genesis 197
genesis 198 self.cursor.execute("delete from wot where peer_id=?", (peer_id,))
genesis 199 self.conn.commit()
genesis 200
genesis 201
genesis 202 def add_key(self, handle, key):
genesis 203 peer_id = self.cursor.execute("select peer_id from handles where handle=?", (handle,)).fetchone()[0]
genesis 204 if peer_id != None:
genesis 205 self.cursor.execute("insert into keys(peer_id, key) values(?, ?)", (peer_id, key))
genesis 206 self.conn.commit()
genesis 207
genesis 208 def remove_key(self, key):
genesis 209 self.cursor.execute("delete from keys where key=?", (key,))
genesis 210 self.conn.commit()
genesis 211
genesis 212 def get_handle_ids_for_peer(self, peer_id):
genesis 213 return list(chain.from_iterable(self.cursor.execute("select handle_id from handles where peer_id=?",
genesis 214 (peer_id,)).fetchall()))
genesis 215
9989-show-wot-nicks 216 def get_peer_handles(self):
9987-embargoing 217 handles = self.listify(self.cursor.execute("select handle from handles").fetchall())
9989-show-wot-nicks 218 return handles
9989-show-wot-nicks 219
genesis 220 def get_peers(self):
genesis 221 peers = []
genesis 222 handles = self.cursor.execute("select handle from handles").fetchall()
genesis 223
genesis 224 for handle in handles:
genesis 225 peer = self.get_peer_by_handle(handle[0])
9992-handle-edge-... 226 if not (self.is_duplicate(peers, peer)):
genesis 227 peers.append(peer)
genesis 228 return peers
genesis 229
9987-embargoing 230 def listify(self, results):
9987-embargoing 231 return list(chain.from_iterable(results))
9987-embargoing 232
9987-embargoing 233 def get_keyed_peers(self):
9987-embargoing 234 peer_ids = self.listify(self.cursor.execute("select peer_id from keys").fetchall())
9987-embargoing 235 peers = []
9987-embargoing 236 for peer_id in peer_ids:
9987-embargoing 237 handle = self.cursor.execute("select handle from handles where peer_id=?", (peer_id,)).fetchone()[0]
9987-embargoing 238 peer = self.get_peer_by_handle(handle)
9987-embargoing 239 if not (self.is_duplicate(peers, peer)):
9987-embargoing 240 peers.append(peer)
9987-embargoing 241 return peers
9987-embargoing 242
9987-embargoing 243
genesis 244 def get_peer_by_handle(self, handle):
genesis 245 handle_info = self.cursor.execute("select handle_id, peer_id from handles where handle=?",
genesis 246 (handle,)).fetchone()
genesis 247
genesis 248 if handle_info == None:
genesis 249 return None
genesis 250
genesis 251 address = self.cursor.execute("select address, port from at where handle_id=?\
genesis 252 order by updated_at desc limit 1",
genesis 253 (handle_info[0],)).fetchone()
9987-embargoing 254 handles = self.listify(self.cursor.execute("select handle from handles where peer_id=?",
9987-embargoing 255 (handle_info[1],)).fetchall())
9987-embargoing 256 keys = self.listify(self.cursor.execute("select key from keys where peer_id=?\
genesis 257 order by used_at desc",
9987-embargoing 258 (handle_info[1],)).fetchall())
9987-embargoing 259 return Peer(self.socket, {
genesis 260 "handles": handles,
genesis 261 "peer_id": handle_info[1],
genesis 262 "address": address[0] if address else "",
genesis 263 "port": address[1] if address else "",
genesis 264 "keys": keys
genesis 265 })
9987-embargoing 266
genesis 267 def is_duplicate(self, peers, peer):
genesis 268 for existing_peer in peers:
genesis 269 if existing_peer.address == peer.address and existing_peer.port == peer.port:
genesis 270 return True
genesis 271 return False