9985-single-thread 1 from peer import Peer
genesis 2 import sqlite3
genesis 3 import imp
genesis 4 import hashlib
9987-embargoing 5 import logging
genesis 6 from itertools import chain
genesis 7
genesis 8 class State(object):
9987-embargoing 9 __instance = None
9987-embargoing 10 @staticmethod
9987-embargoing 11 def get_instance(socket=None, db_path=None):
9987-embargoing 12 if State.__instance == None:
9987-embargoing 13 State(socket, db_path)
9987-embargoing 14 return State.__instance
9987-embargoing 15
9987-embargoing 16 def __init__(self, socket, db_path):
9987-embargoing 17 if State.__instance != None:
9987-embargoing 18 raise Exception("This class is a singleton")
9987-embargoing 19 else:
9985-single-thread 20 self.socket = socket
9985-single-thread 21 self.conn = sqlite3.connect(db_path, check_same_thread=False)
9985-single-thread 22 cursor = self.cursor()
9985-single-thread 23 cursor.execute("create table if not exists at(handle_id integer,\
9985-single-thread 24 address text not null,\
9985-single-thread 25 port integer not null,\
9985-single-thread 26 active_at datetime default null,\
9985-single-thread 27 updated_at datetime default current_timestamp,\
9985-single-thread 28 unique(handle_id, address, port))")
9985-single-thread 29
9985-single-thread 30 cursor.execute("create table if not exists wot(peer_id integer primary key)")
9985-single-thread 31
9985-single-thread 32 cursor.execute("create table if not exists handles(handle_id integer primary key,\
9985-single-thread 33 peer_id integer,\
9985-single-thread 34 handle text,\
9985-single-thread 35 unique(handle))")
9985-single-thread 36
9985-single-thread 37 cursor.execute("create table if not exists keys(peer_id intenger,\
9985-single-thread 38 key text,\
9985-single-thread 39 used_at datetime default current_timestamp,\
9985-single-thread 40 unique(key))")
9985-single-thread 41
9985-single-thread 42 cursor.execute("create table if not exists logs(\
9985-single-thread 43 handle text not null,\
9985-single-thread 44 peer_id integer,\
9985-single-thread 45 message_bytes blob not null,\
9985-single-thread 46 created_at datetime default current_timestamp)")
9985-single-thread 47
9985-single-thread 48 cursor.execute("create table if not exists dedup_queue(\
9985-single-thread 49 hash text not null,\
9985-single-thread 50 created_at datetime default current_timestamp)")
9987-embargoing 51 State.__instance = self
9988-hash-dedup 52
9985-single-thread 53 def cursor(self):
9985-single-thread 54 return self.conn.cursor()
9985-single-thread 55
genesis 56 def get_at(self, handle=None):
9985-single-thread 57 cursor = self.cursor()
genesis 58 at = []
genesis 59 if handle == None:
9985-single-thread 60 results = cursor.execute("select handle_id,address,port,active_at from at\
genesis 61 order by updated_at desc").fetchall()
genesis 62 else:
9985-single-thread 63 result = cursor.execute("select handle_id from handles where handle=?",
9992-handle-edge-... 64 (handle,)).fetchone()
9992-handle-edge-... 65 if None != result:
9992-handle-edge-... 66 handle_id = result[0]
genesis 67 else:
genesis 68 return []
9985-single-thread 69 results = cursor.execute("select handle_id,address,port,active_at from at \
9992-handle-edge-... 70 where handle_id=? order by updated_at desc",
9992-handle-edge-... 71 (handle_id,)).fetchall()
genesis 72 for result in results:
genesis 73 handle_id, address, port, updated_at = result
9985-single-thread 74 h = cursor.execute("select handle from handles where handle_id=?",
genesis 75 (handle_id,)).fetchone()[0]
genesis 76 at.append({"handle": h,
genesis 77 "address": "%s:%s" % (address, port),
9987-embargoing 78 "active_at": updated_at if updated_at else "no packets received from this address"})
genesis 79 return at
genesis 80
genesis 81
9988-hash-dedup 82 def is_duplicate_message(self, message_hash):
9985-single-thread 83 cursor = self.cursor()
9985-single-thread 84 cursor.execute("delete from dedup_queue where created_at < datetime(current_timestamp, '-1 hour')")
9985-single-thread 85 self.conn.commit()
9985-single-thread 86 result = cursor.execute("select hash from dedup_queue where hash=?",
9985-single-thread 87 (message_hash,)).fetchone()
9985-single-thread 88 logging.debug("checking if %s is dupe" % message_hash)
9985-single-thread 89 if(result != None):
9985-single-thread 90 return True
9985-single-thread 91 else:
9985-single-thread 92 return False
9988-hash-dedup 93
9988-hash-dedup 94 def add_to_dedup_queue(self, message_hash):
9985-single-thread 95 cursor = self.cursor()
9985-single-thread 96 cursor.execute("insert into dedup_queue(hash)\
9985-single-thread 97 values(?)",
9985-single-thread 98 (message_hash,))
9985-single-thread 99 logging.debug("added %s to dedup" % message_hash)
9985-single-thread 100 self.conn.commit()
9988-hash-dedup 101
genesis 102 def get_last_message_hash(self, handle, peer_id=None):
9985-single-thread 103 cursor = self.cursor()
genesis 104 if peer_id:
9985-single-thread 105 message_bytes = cursor.execute("select message_bytes from logs\
genesis 106 where handle=? and peer_id=?\
genesis 107 order by created_at desc limit 1",
genesis 108 (handle, peer_id)).fetchone()
genesis 109
genesis 110 else:
9985-single-thread 111 message_bytes = cursor.execute("select message_bytes from logs\
genesis 112 where handle=? and peer_id is null\
genesis 113 order by created_at desc limit 1",
genesis 114 (handle,)).fetchone()
genesis 115
genesis 116 if message_bytes:
genesis 117 return hashlib.sha256(message_bytes[0][:]).digest()
genesis 118 else:
9987-embargoing 119 return "\x00" * 32
9987-embargoing 120
9987-embargoing 121 def log(self, handle, message_bytes, peer=None):
9985-single-thread 122 cursor = self.cursor()
9985-single-thread 123 if peer != None:
9985-single-thread 124 peer_id = peer.peer_id
9985-single-thread 125 else:
9985-single-thread 126 peer_id = None
genesis 127
9985-single-thread 128 cursor.execute("insert into logs(handle, peer_id, message_bytes)\
9985-single-thread 129 values(?, ?, ?)",
9985-single-thread 130 (handle, peer_id, buffer(message_bytes)))
genesis 131
genesis 132 def import_at_and_wot(self, at_path):
9985-single-thread 133 cursor = self.cursor()
9985-single-thread 134 wot = imp.load_source('wot', at_path)
9985-single-thread 135 for peer in wot.peers:
9985-single-thread 136 results = cursor.execute("select * from handles where handle=? limit 1",
9985-single-thread 137 (peer["name"],)).fetchall()
9985-single-thread 138 if len(results) == 0:
9985-single-thread 139 key = peer["key"]
9985-single-thread 140 port = peer["port"]
9985-single-thread 141 address = peer["address"]
9985-single-thread 142 cursor.execute("insert into wot(peer_id) values(null)")
9985-single-thread 143 peer_id = cursor.lastrowid
9985-single-thread 144 cursor.execute("insert into handles(peer_id, handle) values(?, ?)",
9985-single-thread 145 (peer_id, peer["name"]))
9985-single-thread 146 handle_id = cursor.lastrowid
9985-single-thread 147 cursor.execute("insert into at(handle_id, address, port, updated_at) values(?, ?, ?, ?)",
9985-single-thread 148 (handle_id, peer["address"], peer["port"], None))
9985-single-thread 149 cursor.execute("insert into keys(peer_id, key) values(?, ?)",
9985-single-thread 150 (peer_id, key))
genesis 151
9985-single-thread 152 self.conn.commit()
genesis 153
9987-embargoing 154 def update_at(self, peer, set_active_at=True):
9985-single-thread 155 cursor = self.cursor()
9985-single-thread 156 row = cursor.execute("select handle_id from handles where handle=?",
9985-single-thread 157 (peer["handle"],)).fetchone()
9985-single-thread 158 if row != None:
9985-single-thread 159 handle_id = row[0]
9985-single-thread 160 else:
9985-single-thread 161 return
9985-single-thread 162
9985-single-thread 163 try:
9985-single-thread 164 cursor.execute("insert into at(handle_id, address, port) values(?, ?, ?)",
9985-single-thread 165 (handle_id, peer["address"], peer["port"]))
9985-single-thread 166 except sqlite3.IntegrityError as ex:
9985-single-thread 167 cursor.execute("update at set updated_at = current_timestamp\
9985-single-thread 168 where handle_id=? and address=? and port=?",
9985-single-thread 169 (handle_id, peer["address"], peer["port"]))
9985-single-thread 170 if set_active_at:
9985-single-thread 171 cursor.execute("update at set active_at = current_timestamp\
9985-single-thread 172 where handle_id=? and address=? and port=?",
9985-single-thread 173 (handle_id, peer["address"], peer["port"]))
9985-single-thread 174 self.conn.commit()
genesis 175
genesis 176 def add_peer(self, handle):
9985-single-thread 177 cursor = self.cursor()
9985-single-thread 178 cursor.execute("insert into wot(peer_id) values(null)")
9985-single-thread 179 peer_id = self.cursor.lastrowid
9985-single-thread 180 cursor.execute("insert into handles(peer_id, handle) values(?, ?)",
9985-single-thread 181 (peer_id, handle))
9985-single-thread 182 self.conn.commit()
9991-improved-log... 183
genesis 184
genesis 185 def remove_peer(self, handle):
9985-single-thread 186 cursor = self.cursor()
9985-single-thread 187
9985-single-thread 188
9985-single-thread 189 result = cursor.execute("select peer_id from handles where handle=?",
9985-single-thread 190 (handle,)).fetchone()
9985-single-thread 191 if result == None:
9985-single-thread 192 return
9985-single-thread 193 else:
9985-single-thread 194 peer_id = result[0]
9985-single-thread 195
genesis 196
9985-single-thread 197 handle_ids = self.get_handle_ids_for_peer(peer_id)
9985-single-thread 198 for handle_id in handle_ids:
9985-single-thread 199
genesis 200
9985-single-thread 201 cursor.execute("delete from at where handle_id=?", (handle_id,))
genesis 202
9985-single-thread 203 cursor.execute("delete from handles where peer_id=?", (peer_id,))
genesis 204
9985-single-thread 205
genesis 206
9985-single-thread 207 cursor.execute("delete from keys where peer_id=?", (handle_id,))
9985-single-thread 208
9985-single-thread 209
9985-single-thread 210
9985-single-thread 211 cursor.execute("delete from wot where peer_id=?", (peer_id,))
9985-single-thread 212 self.conn.commit()
genesis 213
genesis 214
genesis 215 def add_key(self, handle, key):
9985-single-thread 216 cursor = self.cursor()
9985-single-thread 217 peer_id = cursor.execute("select peer_id from handles where handle=?", (handle,)).fetchone()[0]
9985-single-thread 218 if peer_id != None:
9985-single-thread 219 cursor.execute("insert into keys(peer_id, key) values(?, ?)", (peer_id, key))
9985-single-thread 220 self.conn.commit()
genesis 221
genesis 222 def remove_key(self, key):
9985-single-thread 223 cursor = self.cursor()
9985-single-thread 224 cursor.execute("delete from keys where key=?", (key,))
9985-single-thread 225 self.conn.commit()
genesis 226
genesis 227 def get_handle_ids_for_peer(self, peer_id):
9985-single-thread 228 cursor = self.cursor()
9985-single-thread 229 return list(chain.from_iterable(cursor.execute("select handle_id from handles where peer_id=?",
genesis 230 (peer_id,)).fetchall()))
genesis 231
9989-show-wot-nicks 232 def get_peer_handles(self):
9985-single-thread 233 cursor = self.cursor()
9985-single-thread 234 handles = self.listify(cursor.execute("select handle from handles").fetchall())
9989-show-wot-nicks 235 return handles
9989-show-wot-nicks 236
genesis 237 def get_peers(self):
9985-single-thread 238 cursor = self.cursor()
genesis 239 peers = []
9985-single-thread 240 handles = cursor.execute("select handle from handles").fetchall()
genesis 241
genesis 242 for handle in handles:
genesis 243 peer = self.get_peer_by_handle(handle[0])
9992-handle-edge-... 244 if not (self.is_duplicate(peers, peer)):
genesis 245 peers.append(peer)
genesis 246 return peers
genesis 247
9987-embargoing 248 def listify(self, results):
9987-embargoing 249 return list(chain.from_iterable(results))
9987-embargoing 250
9987-embargoing 251 def get_keyed_peers(self):
9985-single-thread 252 cursor = self.cursor()
9985-single-thread 253 peer_ids = self.listify(cursor.execute("select peer_id from keys").fetchall())
9987-embargoing 254 peers = []
9987-embargoing 255 for peer_id in peer_ids:
9985-single-thread 256 handle = cursor.execute("select handle from handles where peer_id=?", (peer_id,)).fetchone()[0]
9987-embargoing 257 peer = self.get_peer_by_handle(handle)
9986-rebroadcast-... 258 if self.is_duplicate(peers, peer):
9986-rebroadcast-... 259 continue
9986-rebroadcast-... 260 if peer.address == None or peer.port == None:
9986-rebroadcast-... 261 continue
9986-rebroadcast-... 262 peers.append(peer)
9987-embargoing 263 return peers
9987-embargoing 264
9987-embargoing 265
genesis 266 def get_peer_by_handle(self, handle):
9985-single-thread 267 cursor = self.cursor()
9985-single-thread 268 handle_info = cursor.execute("select handle_id, peer_id from handles where handle=?",
genesis 269 (handle,)).fetchone()
genesis 270
genesis 271 if handle_info == None:
genesis 272 return None
genesis 273
9985-single-thread 274 address = cursor.execute("select address, port from at where handle_id=?\
genesis 275 order by updated_at desc limit 1",
genesis 276 (handle_info[0],)).fetchone()
9985-single-thread 277 handles = self.listify(cursor.execute("select handle from handles where peer_id=?",
9987-embargoing 278 (handle_info[1],)).fetchall())
9985-single-thread 279 keys = self.listify(cursor.execute("select key from keys where peer_id=?\
genesis 280 order by used_at desc",
9987-embargoing 281 (handle_info[1],)).fetchall())
9987-embargoing 282 return Peer(self.socket, {
genesis 283 "handles": handles,
genesis 284 "peer_id": handle_info[1],
9986-rebroadcast-... 285 "address": address[0] if address else None,
9986-rebroadcast-... 286 "port": address[1] if address else None,
genesis 287 "keys": keys
genesis 288 })
9987-embargoing 289
genesis 290 def is_duplicate(self, peers, peer):
genesis 291 for existing_peer in peers:
genesis 292 if existing_peer.address == peer.address and existing_peer.port == peer.port:
genesis 293 return True
genesis 294 return False