genesis 1 from lib.peer import Peer
genesis 2 import sqlite3
genesis 3 import imp
genesis 4 import hashlib
9987-embargoing 5 import logging
9986-rebroadcast-... 6 import threading
genesis 7 from itertools import chain
genesis 8
genesis 9 class State(object):
9987-embargoing 10 __instance = None
9987-embargoing 11 @staticmethod
9987-embargoing 12 def get_instance(socket=None, db_path=None):
9987-embargoing 13 if State.__instance == None:
9987-embargoing 14 State(socket, db_path)
9987-embargoing 15 return State.__instance
9987-embargoing 16
9987-embargoing 17 def __init__(self, socket, db_path):
9987-embargoing 18 if State.__instance != None:
9987-embargoing 19 raise Exception("This class is a singleton")
9987-embargoing 20 else:
9986-rebroadcast-... 21 self.write_lock = threading.Lock()
9986-rebroadcast-... 22 with self.write_lock:
9986-rebroadcast-... 23 self.socket = socket
9986-rebroadcast-... 24 self.conn = sqlite3.connect(db_path, check_same_thread=False)
9986-rebroadcast-... 25 self.cursor = self.conn.cursor()
9986-rebroadcast-... 26 self.cursor.execute("create table if not exists at(handle_id integer,\
9986-rebroadcast-... 27 address text not null,\
9986-rebroadcast-... 28 port integer not null,\
9986-rebroadcast-... 29 active_at datetime default null,\
9986-rebroadcast-... 30 updated_at datetime default current_timestamp,\
9986-rebroadcast-... 31 unique(handle_id, address, port))")
9986-rebroadcast-... 32
9986-rebroadcast-... 33 self.cursor.execute("create table if not exists wot(peer_id integer primary key)")
9986-rebroadcast-... 34
9986-rebroadcast-... 35 self.cursor.execute("create table if not exists handles(handle_id integer primary key,\
9986-rebroadcast-... 36 peer_id integer,\
9986-rebroadcast-... 37 handle text,\
9986-rebroadcast-... 38 unique(handle))")
9986-rebroadcast-... 39
9986-rebroadcast-... 40 self.cursor.execute("create table if not exists keys(peer_id intenger,\
9986-rebroadcast-... 41 key text,\
9986-rebroadcast-... 42 used_at datetime default current_timestamp,\
9986-rebroadcast-... 43 unique(key))")
9986-rebroadcast-... 44
9986-rebroadcast-... 45 self.cursor.execute("create table if not exists logs(\
9986-rebroadcast-... 46 handle text not null,\
9986-rebroadcast-... 47 peer_id integer,\
9986-rebroadcast-... 48 message_bytes blob not null,\
9986-rebroadcast-... 49 created_at datetime default current_timestamp)")
9986-rebroadcast-... 50
9986-rebroadcast-... 51 self.cursor.execute("create table if not exists dedup_queue(\
9986-rebroadcast-... 52 hash text not null,\
9986-rebroadcast-... 53 created_at datetime default current_timestamp)")
9987-embargoing 54 State.__instance = self
9988-hash-dedup 55
genesis 56 def get_at(self, handle=None):
genesis 57 at = []
genesis 58 if handle == None:
9991-improved-log... 59 results = self.cursor.execute("select handle_id,address,port,active_at from at\
genesis 60 order by updated_at desc").fetchall()
genesis 61 else:
9992-handle-edge-... 62 result = self.cursor.execute("select handle_id from handles where handle=?",
9992-handle-edge-... 63 (handle,)).fetchone()
9992-handle-edge-... 64 if None != result:
9992-handle-edge-... 65 handle_id = result[0]
genesis 66 else:
genesis 67 return []
9991-improved-log... 68 results = self.cursor.execute("select handle_id,address,port,active_at from at \
9992-handle-edge-... 69 where handle_id=? order by updated_at desc",
9992-handle-edge-... 70 (handle_id,)).fetchall()
genesis 71 for result in results:
genesis 72 handle_id, address, port, updated_at = result
genesis 73 h = self.cursor.execute("select handle from handles where handle_id=?",
genesis 74 (handle_id,)).fetchone()[0]
genesis 75 at.append({"handle": h,
genesis 76 "address": "%s:%s" % (address, port),
9987-embargoing 77 "active_at": updated_at if updated_at else "no packets received from this address"})
genesis 78 return at
genesis 79
genesis 80
9988-hash-dedup 81 def is_duplicate_message(self, message_hash):
9986-rebroadcast-... 82 with self.write_lock:
9986-rebroadcast-... 83 self.cursor.execute("delete from dedup_queue where created_at < datetime(current_timestamp, '-1 hour')")
9986-rebroadcast-... 84 self.conn.commit()
9986-rebroadcast-... 85 result = self.cursor.execute("select hash from dedup_queue where hash=?",
9986-rebroadcast-... 86 (message_hash,)).fetchone()
9986-rebroadcast-... 87 logging.debug("checking if %s is dupe" % message_hash)
9986-rebroadcast-... 88 if(result != None):
9986-rebroadcast-... 89 return True
9986-rebroadcast-... 90 else:
9986-rebroadcast-... 91 return False
9988-hash-dedup 92
9988-hash-dedup 93 def add_to_dedup_queue(self, message_hash):
9986-rebroadcast-... 94 with self.write_lock:
9986-rebroadcast-... 95 self.cursor.execute("insert into dedup_queue(hash)\
9986-rebroadcast-... 96 values(?)",
9986-rebroadcast-... 97 (message_hash,))
9986-rebroadcast-... 98 logging.debug("added %s to dedup" % message_hash)
9986-rebroadcast-... 99 self.conn.commit()
9988-hash-dedup 100
genesis 101 def get_last_message_hash(self, handle, peer_id=None):
genesis 102 if peer_id:
genesis 103 message_bytes = self.cursor.execute("select message_bytes from logs\
genesis 104 where handle=? and peer_id=?\
genesis 105 order by created_at desc limit 1",
genesis 106 (handle, peer_id)).fetchone()
genesis 107
genesis 108 else:
genesis 109 message_bytes = self.cursor.execute("select message_bytes from logs\
genesis 110 where handle=? and peer_id is null\
genesis 111 order by created_at desc limit 1",
genesis 112 (handle,)).fetchone()
genesis 113
genesis 114 if message_bytes:
genesis 115 return hashlib.sha256(message_bytes[0][:]).digest()
genesis 116 else:
9987-embargoing 117 return "\x00" * 32
9987-embargoing 118
9987-embargoing 119 def log(self, handle, message_bytes, peer=None):
9986-rebroadcast-... 120 with self.write_lock:
9986-rebroadcast-... 121 if peer != None:
9986-rebroadcast-... 122 peer_id = peer.peer_id
9986-rebroadcast-... 123 else:
9986-rebroadcast-... 124 peer_id = None
genesis 125
9986-rebroadcast-... 126 self.cursor.execute("insert into logs(handle, peer_id, message_bytes)\
9986-rebroadcast-... 127 values(?, ?, ?)",
9986-rebroadcast-... 128 (handle, peer_id, buffer(message_bytes)))
genesis 129
genesis 130 def import_at_and_wot(self, at_path):
9986-rebroadcast-... 131 with self.write_lock:
9986-rebroadcast-... 132 wot = imp.load_source('wot', at_path)
9986-rebroadcast-... 133 for peer in wot.peers:
9986-rebroadcast-... 134 results = self.cursor.execute("select * from handles where handle=? limit 1",
9986-rebroadcast-... 135 (peer["name"],)).fetchall()
9986-rebroadcast-... 136 if len(results) == 0:
9986-rebroadcast-... 137 key = peer["key"]
9986-rebroadcast-... 138 port = peer["port"]
9986-rebroadcast-... 139 address = peer["address"]
9986-rebroadcast-... 140 self.cursor.execute("insert into wot(peer_id) values(null)")
9986-rebroadcast-... 141 peer_id = self.cursor.lastrowid
9986-rebroadcast-... 142 self.cursor.execute("insert into handles(peer_id, handle) values(?, ?)",
9986-rebroadcast-... 143 (peer_id, peer["name"]))
9986-rebroadcast-... 144 handle_id = self.cursor.lastrowid
9986-rebroadcast-... 145 self.cursor.execute("insert into at(handle_id, address, port, updated_at) values(?, ?, ?, ?)",
9986-rebroadcast-... 146 (handle_id, peer["address"], peer["port"], None))
9986-rebroadcast-... 147 self.cursor.execute("insert into keys(peer_id, key) values(?, ?)",
9986-rebroadcast-... 148 (peer_id, key))
genesis 149
9986-rebroadcast-... 150 self.conn.commit()
genesis 151
9987-embargoing 152 def update_at(self, peer, set_active_at=True):
9986-rebroadcast-... 153 with self.write_lock:
9986-rebroadcast-... 154 row = self.cursor.execute("select handle_id from handles where handle=?",
9986-rebroadcast-... 155 (peer["handle"],)).fetchone()
9986-rebroadcast-... 156 if row != None:
9986-rebroadcast-... 157 handle_id = row[0]
9986-rebroadcast-... 158 else:
9986-rebroadcast-... 159 return
9986-rebroadcast-... 160
9986-rebroadcast-... 161 try:
9986-rebroadcast-... 162 self.cursor.execute("insert into at(handle_id, address, port) values(?, ?, ?)",
9986-rebroadcast-... 163 (handle_id, peer["address"], peer["port"]))
9986-rebroadcast-... 164 except sqlite3.IntegrityError as ex:
9986-rebroadcast-... 165 self.cursor.execute("update at set updated_at = current_timestamp\
9986-rebroadcast-... 166 where handle_id=? and address=? and port=?",
9986-rebroadcast-... 167 (handle_id, peer["address"], peer["port"]))
9986-rebroadcast-... 168 if set_active_at:
9986-rebroadcast-... 169 self.cursor.execute("update at set active_at = current_timestamp\
9986-rebroadcast-... 170 where handle_id=? and address=? and port=?",
9986-rebroadcast-... 171 (handle_id, peer["address"], peer["port"]))
9986-rebroadcast-... 172 self.conn.commit()
genesis 173
genesis 174 def add_peer(self, handle):
9986-rebroadcast-... 175 with self.write_lock:
9986-rebroadcast-... 176 self.cursor.execute("insert into wot(peer_id) values(null)")
9986-rebroadcast-... 177 peer_id = self.cursor.lastrowid
9986-rebroadcast-... 178 self.cursor.execute("insert into handles(peer_id, handle) values(?, ?)",
9986-rebroadcast-... 179 (peer_id, handle))
9986-rebroadcast-... 180 self.conn.commit()
9991-improved-log... 181
genesis 182
genesis 183 def remove_peer(self, handle):
9986-rebroadcast-... 184 with self.write_lock:
9986-rebroadcast-... 185
9986-rebroadcast-... 186 result = self.cursor.execute("select peer_id from handles where handle=?",
9986-rebroadcast-... 187 (handle,)).fetchone()
9986-rebroadcast-... 188 if result == None:
9986-rebroadcast-... 189 return
9986-rebroadcast-... 190 else:
9986-rebroadcast-... 191 peer_id = result[0]
9986-rebroadcast-... 192
genesis 193
9986-rebroadcast-... 194 handle_ids = self.get_handle_ids_for_peer(peer_id)
9986-rebroadcast-... 195 for handle_id in handle_ids:
9986-rebroadcast-... 196
genesis 197
9986-rebroadcast-... 198 self.cursor.execute("delete from at where handle_id=?", (handle_id,))
genesis 199
9986-rebroadcast-... 200 self.cursor.execute("delete from handles where peer_id=?", (peer_id,))
genesis 201
9986-rebroadcast-... 202
genesis 203
9986-rebroadcast-... 204 self.cursor.execute("delete from keys where peer_id=?", (handle_id,))
9986-rebroadcast-... 205
9986-rebroadcast-... 206
9986-rebroadcast-... 207
9986-rebroadcast-... 208 self.cursor.execute("delete from wot where peer_id=?", (peer_id,))
9986-rebroadcast-... 209 self.conn.commit()
genesis 210
genesis 211
genesis 212 def add_key(self, handle, key):
9986-rebroadcast-... 213 with self.write_lock:
9986-rebroadcast-... 214 peer_id = self.cursor.execute("select peer_id from handles where handle=?", (handle,)).fetchone()[0]
9986-rebroadcast-... 215 if peer_id != None:
9986-rebroadcast-... 216 self.cursor.execute("insert into keys(peer_id, key) values(?, ?)", (peer_id, key))
9986-rebroadcast-... 217 self.conn.commit()
genesis 218
genesis 219 def remove_key(self, key):
9986-rebroadcast-... 220 with self.write_lock:
9986-rebroadcast-... 221 self.cursor.execute("delete from keys where key=?", (key,))
9986-rebroadcast-... 222 self.conn.commit()
genesis 223
genesis 224 def get_handle_ids_for_peer(self, peer_id):
genesis 225 return list(chain.from_iterable(self.cursor.execute("select handle_id from handles where peer_id=?",
genesis 226 (peer_id,)).fetchall()))
genesis 227
9989-show-wot-nicks 228 def get_peer_handles(self):
9987-embargoing 229 handles = self.listify(self.cursor.execute("select handle from handles").fetchall())
9989-show-wot-nicks 230 return handles
9989-show-wot-nicks 231
genesis 232 def get_peers(self):
genesis 233 peers = []
genesis 234 handles = self.cursor.execute("select handle from handles").fetchall()
genesis 235
genesis 236 for handle in handles:
genesis 237 peer = self.get_peer_by_handle(handle[0])
9992-handle-edge-... 238 if not (self.is_duplicate(peers, peer)):
genesis 239 peers.append(peer)
genesis 240 return peers
genesis 241
9987-embargoing 242 def listify(self, results):
9987-embargoing 243 return list(chain.from_iterable(results))
9987-embargoing 244
9987-embargoing 245 def get_keyed_peers(self):
9987-embargoing 246 peer_ids = self.listify(self.cursor.execute("select peer_id from keys").fetchall())
9987-embargoing 247 peers = []
9987-embargoing 248 for peer_id in peer_ids:
9987-embargoing 249 handle = self.cursor.execute("select handle from handles where peer_id=?", (peer_id,)).fetchone()[0]
9987-embargoing 250 peer = self.get_peer_by_handle(handle)
9986-rebroadcast-... 251 if self.is_duplicate(peers, peer):
9986-rebroadcast-... 252 continue
9986-rebroadcast-... 253 if peer.address == None or peer.port == None:
9986-rebroadcast-... 254 continue
9986-rebroadcast-... 255 peers.append(peer)
9987-embargoing 256 return peers
9987-embargoing 257
9987-embargoing 258
genesis 259 def get_peer_by_handle(self, handle):
genesis 260 handle_info = self.cursor.execute("select handle_id, peer_id from handles where handle=?",
genesis 261 (handle,)).fetchone()
genesis 262
genesis 263 if handle_info == None:
genesis 264 return None
genesis 265
genesis 266 address = self.cursor.execute("select address, port from at where handle_id=?\
genesis 267 order by updated_at desc limit 1",
genesis 268 (handle_info[0],)).fetchone()
9987-embargoing 269 handles = self.listify(self.cursor.execute("select handle from handles where peer_id=?",
9987-embargoing 270 (handle_info[1],)).fetchall())
9987-embargoing 271 keys = self.listify(self.cursor.execute("select key from keys where peer_id=?\
genesis 272 order by used_at desc",
9987-embargoing 273 (handle_info[1],)).fetchall())
9987-embargoing 274 return Peer(self.socket, {
genesis 275 "handles": handles,
genesis 276 "peer_id": handle_info[1],
9986-rebroadcast-... 277 "address": address[0] if address else None,
9986-rebroadcast-... 278 "port": address[1] if address else None,
genesis 279 "keys": keys
genesis 280 })
9987-embargoing 281
genesis 282 def is_duplicate(self, peers, peer):
genesis 283 for existing_peer in peers:
genesis 284 if existing_peer.address == peer.address and existing_peer.port == peer.port:
genesis 285 return True
genesis 286 return False