9985-single-thread 1 from peer import Peer
genesis 2 import sqlite3
genesis 3 import imp
genesis 4 import hashlib
9987-embargoing 5 import logging
9984-unbork-at-co... 6 import datetime
genesis 7 from itertools import chain
genesis 8
9983-knobs 9 KNOBS=({'max_bounces': 3,
9983-knobs 10 'embargo_interval': 1,
9983-knobs 11 'rubbish_interval': 10})
9983-knobs 12
genesis 13 class State(object):
9987-embargoing 14 __instance = None
9987-embargoing 15 @staticmethod
9987-embargoing 16 def get_instance(socket=None, db_path=None):
9987-embargoing 17 if State.__instance == None:
9987-embargoing 18 State(socket, db_path)
9987-embargoing 19 return State.__instance
9987-embargoing 20
9987-embargoing 21 def __init__(self, socket, db_path):
9987-embargoing 22 if State.__instance != None:
9987-embargoing 23 raise Exception("This class is a singleton")
9987-embargoing 24 else:
9985-single-thread 25 self.socket = socket
9985-single-thread 26 self.conn = sqlite3.connect(db_path, check_same_thread=False)
9985-single-thread 27 cursor = self.cursor()
9985-single-thread 28 cursor.execute("create table if not exists at(handle_id integer,\
9985-single-thread 29 address text not null,\
9985-single-thread 30 port integer not null,\
9984-unbork-at-co... 31 updated_at datetime default null,\
9985-single-thread 32 unique(handle_id, address, port))")
9985-single-thread 33
9985-single-thread 34 cursor.execute("create table if not exists wot(peer_id integer primary key)")
9985-single-thread 35
9985-single-thread 36 cursor.execute("create table if not exists handles(handle_id integer primary key,\
9985-single-thread 37 peer_id integer,\
9985-single-thread 38 handle text,\
9985-single-thread 39 unique(handle))")
9985-single-thread 40
9985-single-thread 41 cursor.execute("create table if not exists keys(peer_id intenger,\
9985-single-thread 42 key text,\
9985-single-thread 43 used_at datetime default current_timestamp,\
9985-single-thread 44 unique(key))")
9985-single-thread 45
9985-single-thread 46 cursor.execute("create table if not exists logs(\
9985-single-thread 47 handle text not null,\
9985-single-thread 48 peer_id integer,\
9985-single-thread 49 message_bytes blob not null,\
9985-single-thread 50 created_at datetime default current_timestamp)")
9985-single-thread 51
9985-single-thread 52 cursor.execute("create table if not exists dedup_queue(\
9985-single-thread 53 hash text not null,\
9985-single-thread 54 created_at datetime default current_timestamp)")
9983-knobs 55 cursor.execute("create table if not exists knobs(\
9983-knobs 56 name text not null,\
9983-knobs 57 value text not null)")
9987-embargoing 58 State.__instance = self
9988-hash-dedup 59
9985-single-thread 60 def cursor(self):
9985-single-thread 61 return self.conn.cursor()
9985-single-thread 62
9983-knobs 63 def get_knobs(self):
9983-knobs 64 cursor = self.cursor()
9983-knobs 65 results = cursor.execute("select name, value from knobs order by name asc").fetchall()
9983-knobs 66 knobs = {}
9983-knobs 67 for result in results:
9983-knobs 68 knobs[result[0]] = result[1]
9983-knobs 69 for key in KNOBS.keys():
9983-knobs 70 if not knobs.get(key):
9983-knobs 71 knobs[key] = KNOBS[key]
9983-knobs 72 return knobs
9983-knobs 73
9983-knobs 74 def get_knob(self, knob_name):
9983-knobs 75 cursor = self.cursor()
9983-knobs 76 result = cursor.execute("select value from knobs where name=?", (knob_name,)).fetchone()
9983-knobs 77 if result:
9983-knobs 78 return result[0]
9983-knobs 79 elif KNOBS.get(knob_name):
9983-knobs 80 return KNOBS.get(knob_name)
9983-knobs 81 else:
9983-knobs 82 return None
9983-knobs 83
9983-knobs 84 def set_knob(self, knob_name, knob_value):
9983-knobs 85 cursor = self.cursor()
9983-knobs 86 result = cursor.execute("select value from knobs where name=?", (knob_name,)).fetchone()
9983-knobs 87 if result:
9983-knobs 88 cursor.execute("update knobs set value=? where name=?", (knob_value, knob_name,))
9983-knobs 89 else:
9983-knobs 90 cursor.execute("insert into knobs(name, value) values(?, ?)", (knob_name, knob_value,))
9983-knobs 91
genesis 92 def get_at(self, handle=None):
9985-single-thread 93 cursor = self.cursor()
genesis 94 at = []
genesis 95 if handle == None:
9984-unbork-at-co... 96 results = cursor.execute("select handle_id, address, port, updated_at from at\
genesis 97 order by updated_at desc").fetchall()
genesis 98 else:
9985-single-thread 99 result = cursor.execute("select handle_id from handles where handle=?",
9992-handle-edge-... 100 (handle,)).fetchone()
9992-handle-edge-... 101 if None != result:
9992-handle-edge-... 102 handle_id = result[0]
genesis 103 else:
genesis 104 return []
9984-unbork-at-co... 105 results = cursor.execute("select handle_id, address, port, updated_at from at \
9992-handle-edge-... 106 where handle_id=? order by updated_at desc",
9992-handle-edge-... 107 (handle_id,)).fetchall()
genesis 108 for result in results:
genesis 109 handle_id, address, port, updated_at = result
9985-single-thread 110 h = cursor.execute("select handle from handles where handle_id=?",
genesis 111 (handle_id,)).fetchone()[0]
genesis 112 at.append({"handle": h,
genesis 113 "address": "%s:%s" % (address, port),
9987-embargoing 114 "active_at": updated_at if updated_at else "no packets received from this address"})
genesis 115 return at
genesis 116
genesis 117
9988-hash-dedup 118 def is_duplicate_message(self, message_hash):
9985-single-thread 119 cursor = self.cursor()
9985-single-thread 120 cursor.execute("delete from dedup_queue where created_at < datetime(current_timestamp, '-1 hour')")
9985-single-thread 121 self.conn.commit()
9985-single-thread 122 result = cursor.execute("select hash from dedup_queue where hash=?",
9985-single-thread 123 (message_hash,)).fetchone()
9985-single-thread 124 logging.debug("checking if %s is dupe" % message_hash)
9985-single-thread 125 if(result != None):
9985-single-thread 126 return True
9985-single-thread 127 else:
9985-single-thread 128 return False
9988-hash-dedup 129
9988-hash-dedup 130 def add_to_dedup_queue(self, message_hash):
9985-single-thread 131 cursor = self.cursor()
9985-single-thread 132 cursor.execute("insert into dedup_queue(hash)\
9985-single-thread 133 values(?)",
9985-single-thread 134 (message_hash,))
9985-single-thread 135 logging.debug("added %s to dedup" % message_hash)
9985-single-thread 136 self.conn.commit()
9988-hash-dedup 137
genesis 138 def get_last_message_hash(self, handle, peer_id=None):
9985-single-thread 139 cursor = self.cursor()
genesis 140 if peer_id:
9985-single-thread 141 message_bytes = cursor.execute("select message_bytes from logs\
genesis 142 where handle=? and peer_id=?\
genesis 143 order by created_at desc limit 1",
genesis 144 (handle, peer_id)).fetchone()
genesis 145
genesis 146 else:
9985-single-thread 147 message_bytes = cursor.execute("select message_bytes from logs\
genesis 148 where handle=? and peer_id is null\
genesis 149 order by created_at desc limit 1",
genesis 150 (handle,)).fetchone()
genesis 151
genesis 152 if message_bytes:
genesis 153 return hashlib.sha256(message_bytes[0][:]).digest()
genesis 154 else:
9987-embargoing 155 return "\x00" * 32
9987-embargoing 156
9987-embargoing 157 def log(self, handle, message_bytes, peer=None):
9985-single-thread 158 cursor = self.cursor()
9985-single-thread 159 if peer != None:
9985-single-thread 160 peer_id = peer.peer_id
9985-single-thread 161 else:
9985-single-thread 162 peer_id = None
genesis 163
9985-single-thread 164 cursor.execute("insert into logs(handle, peer_id, message_bytes)\
9985-single-thread 165 values(?, ?, ?)",
9985-single-thread 166 (handle, peer_id, buffer(message_bytes)))
genesis 167
genesis 168 def import_at_and_wot(self, at_path):
9985-single-thread 169 cursor = self.cursor()
9985-single-thread 170 wot = imp.load_source('wot', at_path)
9985-single-thread 171 for peer in wot.peers:
9985-single-thread 172 results = cursor.execute("select * from handles where handle=? limit 1",
9985-single-thread 173 (peer["name"],)).fetchall()
9985-single-thread 174 if len(results) == 0:
9985-single-thread 175 key = peer["key"]
9985-single-thread 176 port = peer["port"]
9985-single-thread 177 address = peer["address"]
9985-single-thread 178 cursor.execute("insert into wot(peer_id) values(null)")
9985-single-thread 179 peer_id = cursor.lastrowid
9985-single-thread 180 cursor.execute("insert into handles(peer_id, handle) values(?, ?)",
9985-single-thread 181 (peer_id, peer["name"]))
9985-single-thread 182 handle_id = cursor.lastrowid
9985-single-thread 183 cursor.execute("insert into at(handle_id, address, port, updated_at) values(?, ?, ?, ?)",
9985-single-thread 184 (handle_id, peer["address"], peer["port"], None))
9985-single-thread 185 cursor.execute("insert into keys(peer_id, key) values(?, ?)",
9985-single-thread 186 (peer_id, key))
genesis 187
9985-single-thread 188 self.conn.commit()
genesis 189
9987-embargoing 190 def update_at(self, peer, set_active_at=True):
9985-single-thread 191 cursor = self.cursor()
9985-single-thread 192 row = cursor.execute("select handle_id from handles where handle=?",
9985-single-thread 193 (peer["handle"],)).fetchone()
9985-single-thread 194 if row != None:
9985-single-thread 195 handle_id = row[0]
9985-single-thread 196 else:
9984-unbork-at-co... 197 raise Exception("handle not found")
9985-single-thread 198
9984-unbork-at-co... 199 at_entry = cursor.execute("select handle_id, address, port from at where handle_id=?",
9984-unbork-at-co... 200 (handle_id,)).fetchone()
9984-unbork-at-co... 201
9984-unbork-at-co... 202
9984-unbork-at-co... 203 timestamp = datetime.datetime.now() if set_active_at else None
9984-unbork-at-co... 204 if at_entry == None:
9984-unbork-at-co... 205 cursor.execute("insert into at(handle_id, address, port, updated_at) values(?, ?, ?, ?)",
9984-unbork-at-co... 206 (handle_id,
9984-unbork-at-co... 207 peer["address"],
9984-unbork-at-co... 208 peer["port"],
9984-unbork-at-co... 209 timestamp))
9984-unbork-at-co... 210 logging.debug("inserted new at entry for %s: %s:%d" % (
9984-unbork-at-co... 211 peer['handle'],
9984-unbork-at-co... 212 peer['address'],
9984-unbork-at-co... 213 peer['port']))
9984-unbork-at-co... 214
9984-unbork-at-co... 215
9984-unbork-at-co... 216 else:
9984-unbork-at-co... 217 try:
9984-unbork-at-co... 218 if (at_entry[1] != peer['address'] or
9984-unbork-at-co... 219 at_entry[2] != peer['port']):
9984-unbork-at-co... 220 cursor.execute("update at set updated_at = ?,\
9984-unbork-at-co... 221 address = ?,\
9984-unbork-at-co... 222 port = ?\
9984-unbork-at-co... 223 where handle_id=?",
9984-unbork-at-co... 224 (timestamp,
9984-unbork-at-co... 225 peer["address"],
9984-unbork-at-co... 226 peer["port"],
9984-unbork-at-co... 227 handle_id))
9984-unbork-at-co... 228
9984-unbork-at-co... 229 logging.debug("updated at entry for %s: %s:%d" % (
9984-unbork-at-co... 230 peer['handle'],
9984-unbork-at-co... 231 peer['address'],
9984-unbork-at-co... 232 peer['port']))
9984-unbork-at-co... 233 except sqlite3.IntegrityError:
9984-unbork-at-co... 234 cursor.execute("delete from at where handle_id=?", (handle_id,))
9984-unbork-at-co... 235
9984-unbork-at-co... 236
9985-single-thread 237 self.conn.commit()
genesis 238
genesis 239 def add_peer(self, handle):
9985-single-thread 240 cursor = self.cursor()
9985-single-thread 241 cursor.execute("insert into wot(peer_id) values(null)")
9984-unbork-at-co... 242 peer_id = cursor.lastrowid
9985-single-thread 243 cursor.execute("insert into handles(peer_id, handle) values(?, ?)",
9985-single-thread 244 (peer_id, handle))
9985-single-thread 245 self.conn.commit()
9991-improved-log... 246
genesis 247
genesis 248 def remove_peer(self, handle):
9985-single-thread 249 cursor = self.cursor()
9985-single-thread 250
9985-single-thread 251
9985-single-thread 252 result = cursor.execute("select peer_id from handles where handle=?",
9985-single-thread 253 (handle,)).fetchone()
9985-single-thread 254 if result == None:
9984-unbork-at-co... 255 raise Exception("handle not found")
9985-single-thread 256 else:
9985-single-thread 257 peer_id = result[0]
9985-single-thread 258
genesis 259
9985-single-thread 260 handle_ids = self.get_handle_ids_for_peer(peer_id)
9985-single-thread 261 for handle_id in handle_ids:
9985-single-thread 262
genesis 263
9985-single-thread 264 cursor.execute("delete from at where handle_id=?", (handle_id,))
genesis 265
9985-single-thread 266 cursor.execute("delete from handles where peer_id=?", (peer_id,))
genesis 267
9985-single-thread 268
genesis 269
9985-single-thread 270 cursor.execute("delete from keys where peer_id=?", (handle_id,))
9985-single-thread 271
9985-single-thread 272
9985-single-thread 273
9985-single-thread 274 cursor.execute("delete from wot where peer_id=?", (peer_id,))
9985-single-thread 275 self.conn.commit()
genesis 276
genesis 277
genesis 278 def add_key(self, handle, key):
9985-single-thread 279 cursor = self.cursor()
9985-single-thread 280 peer_id = cursor.execute("select peer_id from handles where handle=?", (handle,)).fetchone()[0]
9985-single-thread 281 if peer_id != None:
9985-single-thread 282 cursor.execute("insert into keys(peer_id, key) values(?, ?)", (peer_id, key))
9985-single-thread 283 self.conn.commit()
genesis 284
genesis 285 def remove_key(self, key):
9985-single-thread 286 cursor = self.cursor()
9985-single-thread 287 cursor.execute("delete from keys where key=?", (key,))
9985-single-thread 288 self.conn.commit()
genesis 289
genesis 290 def get_handle_ids_for_peer(self, peer_id):
9985-single-thread 291 cursor = self.cursor()
9985-single-thread 292 return list(chain.from_iterable(cursor.execute("select handle_id from handles where peer_id=?",
genesis 293 (peer_id,)).fetchall()))
genesis 294
9989-show-wot-nicks 295 def get_peer_handles(self):
9985-single-thread 296 cursor = self.cursor()
9985-single-thread 297 handles = self.listify(cursor.execute("select handle from handles").fetchall())
9989-show-wot-nicks 298 return handles
9989-show-wot-nicks 299
genesis 300 def get_peers(self):
9985-single-thread 301 cursor = self.cursor()
genesis 302 peers = []
9985-single-thread 303 handles = cursor.execute("select handle from handles").fetchall()
genesis 304
genesis 305 for handle in handles:
genesis 306 peer = self.get_peer_by_handle(handle[0])
9992-handle-edge-... 307 if not (self.is_duplicate(peers, peer)):
genesis 308 peers.append(peer)
genesis 309 return peers
genesis 310
9987-embargoing 311 def listify(self, results):
9987-embargoing 312 return list(chain.from_iterable(results))
9987-embargoing 313
9984-unbork-at-co... 314 def get_keyed_peers(self, exclude_addressless=False):
9985-single-thread 315 cursor = self.cursor()
9985-single-thread 316 peer_ids = self.listify(cursor.execute("select peer_id from keys").fetchall())
9987-embargoing 317 peers = []
9987-embargoing 318 for peer_id in peer_ids:
9985-single-thread 319 handle = cursor.execute("select handle from handles where peer_id=?", (peer_id,)).fetchone()[0]
9987-embargoing 320 peer = self.get_peer_by_handle(handle)
9986-rebroadcast-... 321 if self.is_duplicate(peers, peer):
9986-rebroadcast-... 322 continue
9984-unbork-at-co... 323 if exclude_addressless and (peer.address == None or peer.port == None):
9986-rebroadcast-... 324 continue
9986-rebroadcast-... 325 peers.append(peer)
9987-embargoing 326 return peers
9987-embargoing 327
9987-embargoing 328
genesis 329 def get_peer_by_handle(self, handle):
9985-single-thread 330 cursor = self.cursor()
9985-single-thread 331 handle_info = cursor.execute("select handle_id, peer_id from handles where handle=?",
genesis 332 (handle,)).fetchone()
genesis 333
genesis 334 if handle_info == None:
genesis 335 return None
genesis 336
9985-single-thread 337 address = cursor.execute("select address, port from at where handle_id=?\
genesis 338 order by updated_at desc limit 1",
genesis 339 (handle_info[0],)).fetchone()
9985-single-thread 340 handles = self.listify(cursor.execute("select handle from handles where peer_id=?",
9987-embargoing 341 (handle_info[1],)).fetchall())
9985-single-thread 342 keys = self.listify(cursor.execute("select key from keys where peer_id=?\
genesis 343 order by used_at desc",
9987-embargoing 344 (handle_info[1],)).fetchall())
9987-embargoing 345 return Peer(self.socket, {
genesis 346 "handles": handles,
genesis 347 "peer_id": handle_info[1],
9986-rebroadcast-... 348 "address": address[0] if address else None,
9986-rebroadcast-... 349 "port": address[1] if address else None,
genesis 350 "keys": keys
genesis 351 })
9987-embargoing 352
genesis 353 def is_duplicate(self, peers, peer):
genesis 354 for existing_peer in peers:
genesis 355 if existing_peer.address == peer.address and existing_peer.port == peer.port:
genesis 356 return True
genesis 357 return False