raw
9985-single-thread      1 from peer import Peer
genesis 2 import sqlite3
genesis 3 import imp
genesis 4 import hashlib
9987-embargoing 5 import logging
9984-unbork-at-co... 6 import datetime
genesis 7 from itertools import chain
genesis 8
genesis 9 class State(object):
9987-embargoing 10 __instance = None
9987-embargoing 11 @staticmethod
9987-embargoing 12 def get_instance(socket=None, db_path=None):
9987-embargoing 13 if State.__instance == None:
9987-embargoing 14 State(socket, db_path)
9987-embargoing 15 return State.__instance
9987-embargoing 16
9987-embargoing 17 def __init__(self, socket, db_path):
9987-embargoing 18 if State.__instance != None:
9987-embargoing 19 raise Exception("This class is a singleton")
9987-embargoing 20 else:
9985-single-thread 21 self.socket = socket
9985-single-thread 22 self.conn = sqlite3.connect(db_path, check_same_thread=False)
9985-single-thread 23 cursor = self.cursor()
9985-single-thread 24 cursor.execute("create table if not exists at(handle_id integer,\
9985-single-thread 25 address text not null,\
9985-single-thread 26 port integer not null,\
9984-unbork-at-co... 27 updated_at datetime default null,\
9985-single-thread 28 unique(handle_id, address, port))")
9985-single-thread 29
9985-single-thread 30 cursor.execute("create table if not exists wot(peer_id integer primary key)")
9985-single-thread 31
9985-single-thread 32 cursor.execute("create table if not exists handles(handle_id integer primary key,\
9985-single-thread 33 peer_id integer,\
9985-single-thread 34 handle text,\
9985-single-thread 35 unique(handle))")
9985-single-thread 36
9985-single-thread 37 cursor.execute("create table if not exists keys(peer_id intenger,\
9985-single-thread 38 key text,\
9985-single-thread 39 used_at datetime default current_timestamp,\
9985-single-thread 40 unique(key))")
9985-single-thread 41
9985-single-thread 42 cursor.execute("create table if not exists logs(\
9985-single-thread 43 handle text not null,\
9985-single-thread 44 peer_id integer,\
9985-single-thread 45 message_bytes blob not null,\
9985-single-thread 46 created_at datetime default current_timestamp)")
9985-single-thread 47
9985-single-thread 48 cursor.execute("create table if not exists dedup_queue(\
9985-single-thread 49 hash text not null,\
9985-single-thread 50 created_at datetime default current_timestamp)")
9987-embargoing 51 State.__instance = self
9988-hash-dedup 52
9985-single-thread 53 def cursor(self):
9985-single-thread 54 return self.conn.cursor()
9985-single-thread 55
genesis 56 def get_at(self, handle=None):
9985-single-thread 57 cursor = self.cursor()
genesis 58 at = []
genesis 59 if handle == None:
9984-unbork-at-co... 60 results = cursor.execute("select handle_id, address, port, updated_at from at\
genesis 61 order by updated_at desc").fetchall()
genesis 62 else:
9985-single-thread 63 result = cursor.execute("select handle_id from handles where handle=?",
9992-handle-edge-... 64 (handle,)).fetchone()
9992-handle-edge-... 65 if None != result:
9992-handle-edge-... 66 handle_id = result[0]
genesis 67 else:
genesis 68 return []
9984-unbork-at-co... 69 results = cursor.execute("select handle_id, address, port, updated_at from at \
9992-handle-edge-... 70 where handle_id=? order by updated_at desc",
9992-handle-edge-... 71 (handle_id,)).fetchall()
genesis 72 for result in results:
genesis 73 handle_id, address, port, updated_at = result
9985-single-thread 74 h = cursor.execute("select handle from handles where handle_id=?",
genesis 75 (handle_id,)).fetchone()[0]
genesis 76 at.append({"handle": h,
genesis 77 "address": "%s:%s" % (address, port),
9987-embargoing 78 "active_at": updated_at if updated_at else "no packets received from this address"})
genesis 79 return at
genesis 80
genesis 81
9988-hash-dedup 82 def is_duplicate_message(self, message_hash):
9985-single-thread 83 cursor = self.cursor()
9985-single-thread 84 cursor.execute("delete from dedup_queue where created_at < datetime(current_timestamp, '-1 hour')")
9985-single-thread 85 self.conn.commit()
9985-single-thread 86 result = cursor.execute("select hash from dedup_queue where hash=?",
9985-single-thread 87 (message_hash,)).fetchone()
9985-single-thread 88 logging.debug("checking if %s is dupe" % message_hash)
9985-single-thread 89 if(result != None):
9985-single-thread 90 return True
9985-single-thread 91 else:
9985-single-thread 92 return False
9988-hash-dedup 93
9988-hash-dedup 94 def add_to_dedup_queue(self, message_hash):
9985-single-thread 95 cursor = self.cursor()
9985-single-thread 96 cursor.execute("insert into dedup_queue(hash)\
9985-single-thread 97 values(?)",
9985-single-thread 98 (message_hash,))
9985-single-thread 99 logging.debug("added %s to dedup" % message_hash)
9985-single-thread 100 self.conn.commit()
9988-hash-dedup 101
genesis 102 def get_last_message_hash(self, handle, peer_id=None):
9985-single-thread 103 cursor = self.cursor()
genesis 104 if peer_id:
9985-single-thread 105 message_bytes = cursor.execute("select message_bytes from logs\
genesis 106 where handle=? and peer_id=?\
genesis 107 order by created_at desc limit 1",
genesis 108 (handle, peer_id)).fetchone()
genesis 109
genesis 110 else:
9985-single-thread 111 message_bytes = cursor.execute("select message_bytes from logs\
genesis 112 where handle=? and peer_id is null\
genesis 113 order by created_at desc limit 1",
genesis 114 (handle,)).fetchone()
genesis 115
genesis 116 if message_bytes:
genesis 117 return hashlib.sha256(message_bytes[0][:]).digest()
genesis 118 else:
9987-embargoing 119 return "\x00" * 32
9987-embargoing 120
9987-embargoing 121 def log(self, handle, message_bytes, peer=None):
9985-single-thread 122 cursor = self.cursor()
9985-single-thread 123 if peer != None:
9985-single-thread 124 peer_id = peer.peer_id
9985-single-thread 125 else:
9985-single-thread 126 peer_id = None
genesis 127
9985-single-thread 128 cursor.execute("insert into logs(handle, peer_id, message_bytes)\
9985-single-thread 129 values(?, ?, ?)",
9985-single-thread 130 (handle, peer_id, buffer(message_bytes)))
genesis 131
genesis 132 def import_at_and_wot(self, at_path):
9985-single-thread 133 cursor = self.cursor()
9985-single-thread 134 wot = imp.load_source('wot', at_path)
9985-single-thread 135 for peer in wot.peers:
9985-single-thread 136 results = cursor.execute("select * from handles where handle=? limit 1",
9985-single-thread 137 (peer["name"],)).fetchall()
9985-single-thread 138 if len(results) == 0:
9985-single-thread 139 key = peer["key"]
9985-single-thread 140 port = peer["port"]
9985-single-thread 141 address = peer["address"]
9985-single-thread 142 cursor.execute("insert into wot(peer_id) values(null)")
9985-single-thread 143 peer_id = cursor.lastrowid
9985-single-thread 144 cursor.execute("insert into handles(peer_id, handle) values(?, ?)",
9985-single-thread 145 (peer_id, peer["name"]))
9985-single-thread 146 handle_id = cursor.lastrowid
9985-single-thread 147 cursor.execute("insert into at(handle_id, address, port, updated_at) values(?, ?, ?, ?)",
9985-single-thread 148 (handle_id, peer["address"], peer["port"], None))
9985-single-thread 149 cursor.execute("insert into keys(peer_id, key) values(?, ?)",
9985-single-thread 150 (peer_id, key))
genesis 151
9985-single-thread 152 self.conn.commit()
genesis 153
9987-embargoing 154 def update_at(self, peer, set_active_at=True):
9985-single-thread 155 cursor = self.cursor()
9985-single-thread 156 row = cursor.execute("select handle_id from handles where handle=?",
9985-single-thread 157 (peer["handle"],)).fetchone()
9985-single-thread 158 if row != None:
9985-single-thread 159 handle_id = row[0]
9985-single-thread 160 else:
9984-unbork-at-co... 161 raise Exception("handle not found")
9985-single-thread 162
9984-unbork-at-co... 163 at_entry = cursor.execute("select handle_id, address, port from at where handle_id=?",
9984-unbork-at-co... 164 (handle_id,)).fetchone()
9984-unbork-at-co... 165
9984-unbork-at-co... 166 # if there are no AT entries for this handle, insert one
9984-unbork-at-co... 167 timestamp = datetime.datetime.now() if set_active_at else None
9984-unbork-at-co... 168 if at_entry == None:
9984-unbork-at-co... 169 cursor.execute("insert into at(handle_id, address, port, updated_at) values(?, ?, ?, ?)",
9984-unbork-at-co... 170 (handle_id,
9984-unbork-at-co... 171 peer["address"],
9984-unbork-at-co... 172 peer["port"],
9984-unbork-at-co... 173 timestamp))
9984-unbork-at-co... 174 logging.debug("inserted new at entry for %s: %s:%d" % (
9984-unbork-at-co... 175 peer['handle'],
9984-unbork-at-co... 176 peer['address'],
9984-unbork-at-co... 177 peer['port']))
9984-unbork-at-co... 178
9984-unbork-at-co... 179 # otherwise update the existing entry if it differs
9984-unbork-at-co... 180 else:
9984-unbork-at-co... 181 try:
9984-unbork-at-co... 182 if (at_entry[1] != peer['address'] or
9984-unbork-at-co... 183 at_entry[2] != peer['port']):
9984-unbork-at-co... 184 cursor.execute("update at set updated_at = ?,\
9984-unbork-at-co... 185 address = ?,\
9984-unbork-at-co... 186 port = ?\
9984-unbork-at-co... 187 where handle_id=?",
9984-unbork-at-co... 188 (timestamp,
9984-unbork-at-co... 189 peer["address"],
9984-unbork-at-co... 190 peer["port"],
9984-unbork-at-co... 191 handle_id))
9984-unbork-at-co... 192
9984-unbork-at-co... 193 logging.debug("updated at entry for %s: %s:%d" % (
9984-unbork-at-co... 194 peer['handle'],
9984-unbork-at-co... 195 peer['address'],
9984-unbork-at-co... 196 peer['port']))
9984-unbork-at-co... 197 except sqlite3.IntegrityError:
9984-unbork-at-co... 198 cursor.execute("delete from at where handle_id=?", (handle_id,))
9984-unbork-at-co... 199
9984-unbork-at-co... 200
9985-single-thread 201 self.conn.commit()
genesis 202
genesis 203 def add_peer(self, handle):
9985-single-thread 204 cursor = self.cursor()
9985-single-thread 205 cursor.execute("insert into wot(peer_id) values(null)")
9984-unbork-at-co... 206 peer_id = cursor.lastrowid
9985-single-thread 207 cursor.execute("insert into handles(peer_id, handle) values(?, ?)",
9985-single-thread 208 (peer_id, handle))
9985-single-thread 209 self.conn.commit()
9991-improved-log... 210
genesis 211
genesis 212 def remove_peer(self, handle):
9985-single-thread 213 cursor = self.cursor()
9985-single-thread 214
9985-single-thread 215 # get peer id
9985-single-thread 216 result = cursor.execute("select peer_id from handles where handle=?",
9985-single-thread 217 (handle,)).fetchone()
9985-single-thread 218 if result == None:
9984-unbork-at-co... 219 raise Exception("handle not found")
9985-single-thread 220 else:
9985-single-thread 221 peer_id = result[0]
9985-single-thread 222 # get all aliases
genesis 223
9985-single-thread 224 handle_ids = self.get_handle_ids_for_peer(peer_id)
9985-single-thread 225 for handle_id in handle_ids:
9985-single-thread 226 # delete at entries for each alias
genesis 227
9985-single-thread 228 cursor.execute("delete from at where handle_id=?", (handle_id,))
genesis 229
9985-single-thread 230 cursor.execute("delete from handles where peer_id=?", (peer_id,))
genesis 231
9985-single-thread 232 # delete all keys for peer id
genesis 233
9985-single-thread 234 cursor.execute("delete from keys where peer_id=?", (handle_id,))
9985-single-thread 235
9985-single-thread 236 # delete peer from wot
9985-single-thread 237
9985-single-thread 238 cursor.execute("delete from wot where peer_id=?", (peer_id,))
9985-single-thread 239 self.conn.commit()
genesis 240
genesis 241
genesis 242 def add_key(self, handle, key):
9985-single-thread 243 cursor = self.cursor()
9985-single-thread 244 peer_id = cursor.execute("select peer_id from handles where handle=?", (handle,)).fetchone()[0]
9985-single-thread 245 if peer_id != None:
9985-single-thread 246 cursor.execute("insert into keys(peer_id, key) values(?, ?)", (peer_id, key))
9985-single-thread 247 self.conn.commit()
genesis 248
genesis 249 def remove_key(self, key):
9985-single-thread 250 cursor = self.cursor()
9985-single-thread 251 cursor.execute("delete from keys where key=?", (key,))
9985-single-thread 252 self.conn.commit()
genesis 253
genesis 254 def get_handle_ids_for_peer(self, peer_id):
9985-single-thread 255 cursor = self.cursor()
9985-single-thread 256 return list(chain.from_iterable(cursor.execute("select handle_id from handles where peer_id=?",
genesis 257 (peer_id,)).fetchall()))
genesis 258
9989-show-wot-nicks 259 def get_peer_handles(self):
9985-single-thread 260 cursor = self.cursor()
9985-single-thread 261 handles = self.listify(cursor.execute("select handle from handles").fetchall())
9989-show-wot-nicks 262 return handles
9989-show-wot-nicks 263
genesis 264 def get_peers(self):
9985-single-thread 265 cursor = self.cursor()
genesis 266 peers = []
9985-single-thread 267 handles = cursor.execute("select handle from handles").fetchall()
genesis 268
genesis 269 for handle in handles:
genesis 270 peer = self.get_peer_by_handle(handle[0])
9992-handle-edge-... 271 if not (self.is_duplicate(peers, peer)):
genesis 272 peers.append(peer)
genesis 273 return peers
genesis 274
9987-embargoing 275 def listify(self, results):
9987-embargoing 276 return list(chain.from_iterable(results))
9987-embargoing 277
9984-unbork-at-co... 278 def get_keyed_peers(self, exclude_addressless=False):
9985-single-thread 279 cursor = self.cursor()
9985-single-thread 280 peer_ids = self.listify(cursor.execute("select peer_id from keys").fetchall())
9987-embargoing 281 peers = []
9987-embargoing 282 for peer_id in peer_ids:
9985-single-thread 283 handle = cursor.execute("select handle from handles where peer_id=?", (peer_id,)).fetchone()[0]
9987-embargoing 284 peer = self.get_peer_by_handle(handle)
9986-rebroadcast-... 285 if self.is_duplicate(peers, peer):
9986-rebroadcast-... 286 continue
9984-unbork-at-co... 287 if exclude_addressless and (peer.address == None or peer.port == None):
9986-rebroadcast-... 288 continue
9986-rebroadcast-... 289 peers.append(peer)
9987-embargoing 290 return peers
9987-embargoing 291
9987-embargoing 292
genesis 293 def get_peer_by_handle(self, handle):
9985-single-thread 294 cursor = self.cursor()
9985-single-thread 295 handle_info = cursor.execute("select handle_id, peer_id from handles where handle=?",
genesis 296 (handle,)).fetchone()
genesis 297
genesis 298 if handle_info == None:
genesis 299 return None
genesis 300
9985-single-thread 301 address = cursor.execute("select address, port from at where handle_id=?\
genesis 302 order by updated_at desc limit 1",
genesis 303 (handle_info[0],)).fetchone()
9985-single-thread 304 handles = self.listify(cursor.execute("select handle from handles where peer_id=?",
9987-embargoing 305 (handle_info[1],)).fetchall())
9985-single-thread 306 keys = self.listify(cursor.execute("select key from keys where peer_id=?\
genesis 307 order by used_at desc",
9987-embargoing 308 (handle_info[1],)).fetchall())
9987-embargoing 309 return Peer(self.socket, {
genesis 310 "handles": handles,
genesis 311 "peer_id": handle_info[1],
9986-rebroadcast-... 312 "address": address[0] if address else None,
9986-rebroadcast-... 313 "port": address[1] if address else None,
genesis 314 "keys": keys
genesis 315 })
9987-embargoing 316
genesis 317 def is_duplicate(self, peers, peer):
genesis 318 for existing_peer in peers:
genesis 319 if existing_peer.address == peer.address and existing_peer.port == peer.port:
genesis 320 return True
genesis 321 return False