9985-single-thread 1 from peer import Peer
9982-getdata 2 from message import EMPTY_CHAIN
genesis 3 import sqlite3
genesis 4 import imp
9982-getdata 5 import binascii
9987-embargoing 6 import logging
9984-unbork-at-co... 7 import datetime
9982-getdata 8 import caribou
9982-getdata 9
genesis 10 from itertools import chain
genesis 11
9983-knobs 12 KNOBS=({'max_bounces': 3,
9982-getdata 13 'embargo_interval_seconds': 1,
9982-getdata 14 'rubbish_interval_seconds': 10,
9982-getdata 15 'nick': '',
9982-getdata 16 'order_buffer_check_seconds': 5 * 60,
9982-getdata 17 'order_buffer_expiration_seconds': 5 * 60,
9982-getdata 18 'short_buffer_expiration_seconds': 1,
9982-getdata 19 'short_buffer_check_interval_seconds': 1})
9983-knobs 20
genesis 21 class State(object):
9982-getdata 22 def __init__(self, station, db_path=None):
9982-getdata 23 self.station = station
9982-getdata 24 if db_path:
9982-getdata 25 self.conn = sqlite3.connect(db_path)
9982-getdata 26 else:
9982-getdata 27 self.conn = sqlite3.connect("file::memory:")
9982-getdata 28
9982-getdata 29 cursor = self.cursor()
9982-getdata 30 cursor.execute("create table if not exists handle_self_chain(id integer primary key autoincrement,\
9982-getdata 31 handle string not null,\
9982-getdata 32 message_hash blob not null)")
9982-getdata 33
9982-getdata 34 cursor.execute("create table if not exists broadcast_self_chain(id integer primary key autoincrement,\
9982-getdata 35 message_hash blob not null)")
9982-getdata 36
9982-getdata 37 cursor.execute("create table if not exists net_chain(id integer primary key autoincrement,\
9982-getdata 38 message_hash blob not null)")
9982-getdata 39
9982-getdata 40 cursor.execute("create table if not exists at(handle_id integer,\
9982-getdata 41 address text not null,\
9982-getdata 42 port integer not null,\
9982-getdata 43 updated_at datetime default null,\
9982-getdata 44 unique(handle_id, address, port))")
9982-getdata 45
9982-getdata 46 cursor.execute("create table if not exists wot(peer_id integer primary key autoincrement)")
9982-getdata 47
9982-getdata 48 cursor.execute("create table if not exists handles(handle_id integer primary key,\
9982-getdata 49 peer_id integer,\
9982-getdata 50 handle text,\
9982-getdata 51 unique(handle))")
9982-getdata 52
9982-getdata 53 cursor.execute("create table if not exists keys(peer_id intenger,\
9982-getdata 54 key text,\
9982-getdata 55 used_at datetime default current_timestamp,\
9982-getdata 56 unique(key))")
9982-getdata 57
9982-getdata 58 cursor.execute("create table if not exists log(\
9982-getdata 59 message_bytes blob not null,\
9982-getdata 60 message_hash text not null, \
9982-getdata 61 command integer not null, \
9982-getdata 62 timestamp datetime not null, \
9982-getdata 63 created_at datetime default current_timestamp)")
9982-getdata 64
9982-getdata 65 cursor.execute("create table if not exists knobs(\
9982-getdata 66 name text not null,\
9982-getdata 67 value text not null)")
9982-getdata 68
9982-getdata 69
9982-getdata 70 if db_path:
9982-getdata 71 caribou.upgrade(db_path, "migrations")
9982-getdata 72
9982-getdata 73 self.conn.commit()
9988-hash-dedup 74
9985-single-thread 75 def cursor(self):
9985-single-thread 76 return self.conn.cursor()
9985-single-thread 77
9982-getdata 78 def update_handle_self_chain(self, handle, message_hash):
9982-getdata 79 cursor = self.cursor()
9982-getdata 80 cursor.execute("insert into handle_self_chain(handle, message_hash) values(?, ?)", (handle, buffer(message_hash)))
9982-getdata 81 self.conn.commit()
9982-getdata 82
9982-getdata 83 def get_handle_self_chain(self, handle):
9982-getdata 84 cursor = self.cursor()
9982-getdata 85 results = cursor.execute("select message_hash from handle_self_chain where handle=?\
9982-getdata 86 order by id desc limit 1", (handle,)).fetchone()
9982-getdata 87 if results is not None:
9982-getdata 88 return results[0][:]
9982-getdata 89 else:
9982-getdata 90 return EMPTY_CHAIN
9982-getdata 91
9982-getdata 92 def update_broadcast_self_chain(self, message_hash):
9982-getdata 93 cursor = self.cursor()
9982-getdata 94 cursor.execute("insert into broadcast_self_chain(message_hash) values(?)", (buffer(message_hash),))
9982-getdata 95 self.conn.commit()
9982-getdata 96
9982-getdata 97 def get_broadcast_self_chain(self):
9982-getdata 98 cursor = self.cursor()
9982-getdata 99 results = cursor.execute("select message_hash from broadcast_self_chain order by id desc limit 1").fetchone()
9982-getdata 100 if results is not None:
9982-getdata 101 return results[0][:]
9982-getdata 102 else:
9982-getdata 103 return EMPTY_CHAIN
9982-getdata 104
9982-getdata 105 def update_net_chain(self, message_hash):
9982-getdata 106 self.cursor().execute("insert into net_chain(message_hash) values(?)", (buffer(message_hash),))
9982-getdata 107 self.conn.commit()
9982-getdata 108
9982-getdata 109 def get_net_chain(self):
9982-getdata 110 cursor = self.cursor()
9982-getdata 111 results = cursor.execute("select message_hash from net_chain order by id desc limit 1").fetchone()
9982-getdata 112 if results is not None:
9982-getdata 113 return results[0][:]
9982-getdata 114 else:
9982-getdata 115 return EMPTY_CHAIN
9982-getdata 116
9983-knobs 117 def get_knobs(self):
9983-knobs 118 cursor = self.cursor()
9983-knobs 119 results = cursor.execute("select name, value from knobs order by name asc").fetchall()
9983-knobs 120 knobs = {}
9983-knobs 121 for result in results:
9983-knobs 122 knobs[result[0]] = result[1]
9983-knobs 123 for key in KNOBS.keys():
9983-knobs 124 if not knobs.get(key):
9983-knobs 125 knobs[key] = KNOBS[key]
9983-knobs 126 return knobs
9983-knobs 127
9983-knobs 128 def get_knob(self, knob_name):
9983-knobs 129 cursor = self.cursor()
9983-knobs 130 result = cursor.execute("select value from knobs where name=?", (knob_name,)).fetchone()
9983-knobs 131 if result:
9983-knobs 132 return result[0]
9983-knobs 133 elif KNOBS.get(knob_name):
9983-knobs 134 return KNOBS.get(knob_name)
9983-knobs 135 else:
9983-knobs 136 return None
9983-knobs 137
9983-knobs 138 def set_knob(self, knob_name, knob_value):
9983-knobs 139 cursor = self.cursor()
9983-knobs 140 result = cursor.execute("select value from knobs where name=?", (knob_name,)).fetchone()
9983-knobs 141 if result:
9983-knobs 142 cursor.execute("update knobs set value=? where name=?", (knob_value, knob_name,))
9983-knobs 143 else:
9983-knobs 144 cursor.execute("insert into knobs(name, value) values(?, ?)", (knob_name, knob_value,))
9982-getdata 145
9982-getdata 146 self.conn.commit()
9982-getdata 147
9982-getdata 148 def get_latest_message_timestamp(self):
9982-getdata 149 cursor = self.cursor()
9982-getdata 150 result = cursor.execute("select timestamp from log order by timestamp desc limit 1").fetchone()
9982-getdata 151 if result:
9982-getdata 152 return result[0]
9982-getdata 153
genesis 154 def get_at(self, handle=None):
9985-single-thread 155 cursor = self.cursor()
genesis 156 at = []
genesis 157 if handle == None:
9984-unbork-at-co... 158 results = cursor.execute("select handle_id, address, port, updated_at from at\
genesis 159 order by updated_at desc").fetchall()
genesis 160 else:
9985-single-thread 161 result = cursor.execute("select handle_id from handles where handle=?",
9992-handle-edge-... 162 (handle,)).fetchone()
9992-handle-edge-... 163 if None != result:
9992-handle-edge-... 164 handle_id = result[0]
genesis 165 else:
genesis 166 return []
9984-unbork-at-co... 167 results = cursor.execute("select handle_id, address, port, updated_at from at \
9992-handle-edge-... 168 where handle_id=? order by updated_at desc",
9992-handle-edge-... 169 (handle_id,)).fetchall()
genesis 170 for result in results:
genesis 171 handle_id, address, port, updated_at = result
9985-single-thread 172 h = cursor.execute("select handle from handles where handle_id=?",
genesis 173 (handle_id,)).fetchone()[0]
genesis 174 at.append({"handle": h,
genesis 175 "address": "%s:%s" % (address, port),
9987-embargoing 176 "active_at": updated_at if updated_at else "no packets received from this address"})
genesis 177 return at
genesis 178
genesis 179 def import_at_and_wot(self, at_path):
9985-single-thread 180 cursor = self.cursor()
9985-single-thread 181 wot = imp.load_source('wot', at_path)
9985-single-thread 182 for peer in wot.peers:
9985-single-thread 183 results = cursor.execute("select * from handles where handle=? limit 1",
9985-single-thread 184 (peer["name"],)).fetchall()
9985-single-thread 185 if len(results) == 0:
9985-single-thread 186 key = peer["key"]
9985-single-thread 187 port = peer["port"]
9985-single-thread 188 address = peer["address"]
9985-single-thread 189 cursor.execute("insert into wot(peer_id) values(null)")
9985-single-thread 190 peer_id = cursor.lastrowid
9985-single-thread 191 cursor.execute("insert into handles(peer_id, handle) values(?, ?)",
9985-single-thread 192 (peer_id, peer["name"]))
9985-single-thread 193 handle_id = cursor.lastrowid
9985-single-thread 194 cursor.execute("insert into at(handle_id, address, port, updated_at) values(?, ?, ?, ?)",
9985-single-thread 195 (handle_id, peer["address"], peer["port"], None))
9985-single-thread 196 cursor.execute("insert into keys(peer_id, key) values(?, ?)",
9985-single-thread 197 (peer_id, key))
9985-single-thread 198 self.conn.commit()
genesis 199
9987-embargoing 200 def update_at(self, peer, set_active_at=True):
9985-single-thread 201 cursor = self.cursor()
9985-single-thread 202 row = cursor.execute("select handle_id from handles where handle=?",
9985-single-thread 203 (peer["handle"],)).fetchone()
9985-single-thread 204 if row != None:
9985-single-thread 205 handle_id = row[0]
9985-single-thread 206 else:
9984-unbork-at-co... 207 raise Exception("handle not found")
9985-single-thread 208
9984-unbork-at-co... 209 at_entry = cursor.execute("select handle_id, address, port from at where handle_id=?",
9984-unbork-at-co... 210 (handle_id,)).fetchone()
9984-unbork-at-co... 211
9984-unbork-at-co... 212
9984-unbork-at-co... 213 timestamp = datetime.datetime.now() if set_active_at else None
9984-unbork-at-co... 214 if at_entry == None:
9984-unbork-at-co... 215 cursor.execute("insert into at(handle_id, address, port, updated_at) values(?, ?, ?, ?)",
9984-unbork-at-co... 216 (handle_id,
9984-unbork-at-co... 217 peer["address"],
9984-unbork-at-co... 218 peer["port"],
9984-unbork-at-co... 219 timestamp))
9984-unbork-at-co... 220 logging.debug("inserted new at entry for %s: %s:%d" % (
9984-unbork-at-co... 221 peer['handle'],
9984-unbork-at-co... 222 peer['address'],
9984-unbork-at-co... 223 peer['port']))
9984-unbork-at-co... 224
9982-getdata 225
9984-unbork-at-co... 226 else:
9984-unbork-at-co... 227 try:
9982-getdata 228 cursor.execute("update at set updated_at = ?,\
9982-getdata 229 address = ?,\
9982-getdata 230 port = ?\
9982-getdata 231 where handle_id=?",
9982-getdata 232 (timestamp,
9982-getdata 233 peer["address"],
9982-getdata 234 peer["port"],
9982-getdata 235 handle_id))
9982-getdata 236
9984-unbork-at-co... 237 except sqlite3.IntegrityError:
9984-unbork-at-co... 238 cursor.execute("delete from at where handle_id=?", (handle_id,))
9984-unbork-at-co... 239
9984-unbork-at-co... 240
9985-single-thread 241 self.conn.commit()
genesis 242
genesis 243 def add_peer(self, handle):
9985-single-thread 244 cursor = self.cursor()
9985-single-thread 245 cursor.execute("insert into wot(peer_id) values(null)")
9984-unbork-at-co... 246 peer_id = cursor.lastrowid
9985-single-thread 247 cursor.execute("insert into handles(peer_id, handle) values(?, ?)",
9985-single-thread 248 (peer_id, handle))
9985-single-thread 249 self.conn.commit()
9991-improved-log... 250
genesis 251
genesis 252 def remove_peer(self, handle):
9985-single-thread 253 cursor = self.cursor()
9985-single-thread 254
9985-single-thread 255
9985-single-thread 256 result = cursor.execute("select peer_id from handles where handle=?",
9985-single-thread 257 (handle,)).fetchone()
9985-single-thread 258 if result == None:
9984-unbork-at-co... 259 raise Exception("handle not found")
9985-single-thread 260 else:
9985-single-thread 261 peer_id = result[0]
9985-single-thread 262
genesis 263
9985-single-thread 264 handle_ids = self.get_handle_ids_for_peer(peer_id)
9985-single-thread 265 for handle_id in handle_ids:
9985-single-thread 266
genesis 267
9985-single-thread 268 cursor.execute("delete from at where handle_id=?", (handle_id,))
genesis 269
9985-single-thread 270 cursor.execute("delete from handles where peer_id=?", (peer_id,))
genesis 271
9985-single-thread 272
genesis 273
9985-single-thread 274 cursor.execute("delete from keys where peer_id=?", (handle_id,))
9985-single-thread 275
9985-single-thread 276
9985-single-thread 277
9985-single-thread 278 cursor.execute("delete from wot where peer_id=?", (peer_id,))
9985-single-thread 279 self.conn.commit()
genesis 280
genesis 281
genesis 282 def add_key(self, handle, key):
9985-single-thread 283 cursor = self.cursor()
9985-single-thread 284 peer_id = cursor.execute("select peer_id from handles where handle=?", (handle,)).fetchone()[0]
9985-single-thread 285 if peer_id != None:
9985-single-thread 286 cursor.execute("insert into keys(peer_id, key) values(?, ?)", (peer_id, key))
9985-single-thread 287 self.conn.commit()
genesis 288
genesis 289 def remove_key(self, key):
9985-single-thread 290 cursor = self.cursor()
9985-single-thread 291 cursor.execute("delete from keys where key=?", (key,))
9985-single-thread 292 self.conn.commit()
genesis 293
genesis 294 def get_handle_ids_for_peer(self, peer_id):
9985-single-thread 295 cursor = self.cursor()
9985-single-thread 296 return list(chain.from_iterable(cursor.execute("select handle_id from handles where peer_id=?",
genesis 297 (peer_id,)).fetchall()))
genesis 298
9989-show-wot-nicks 299 def get_peer_handles(self):
9985-single-thread 300 cursor = self.cursor()
9985-single-thread 301 handles = self.listify(cursor.execute("select handle from handles").fetchall())
9989-show-wot-nicks 302 return handles
9989-show-wot-nicks 303
genesis 304 def get_peers(self):
9985-single-thread 305 cursor = self.cursor()
genesis 306 peers = []
9985-single-thread 307 handles = cursor.execute("select handle from handles").fetchall()
genesis 308
genesis 309 for handle in handles:
genesis 310 peer = self.get_peer_by_handle(handle[0])
9992-handle-edge-... 311 if not (self.is_duplicate(peers, peer)):
genesis 312 peers.append(peer)
genesis 313 return peers
genesis 314
9987-embargoing 315 def listify(self, results):
9987-embargoing 316 return list(chain.from_iterable(results))
9982-getdata 317
9982-getdata 318 def log_has_message(self, message_hash):
9982-getdata 319 cursor = self.cursor()
9982-getdata 320 result = cursor.execute("select exists(select 1 from log where message_hash=?)\
9982-getdata 321 limit 1", (binascii.hexlify(message_hash),)).fetchone()
9982-getdata 322 return result[0]
9982-getdata 323
9982-getdata 324 def log_message(self, message):
9982-getdata 325 cursor = self.cursor()
9982-getdata 326 message_hash_hex_string = binascii.hexlify(message.message_hash)
9982-getdata 327 cursor.execute("insert into log(message_hash, message_bytes, command, timestamp) values(?, ?, ?, ?)",
9982-getdata 328 (message_hash_hex_string,
9982-getdata 329 buffer(message.message_bytes),
9982-getdata 330 message.command,
9982-getdata 331 message.timestamp))
9982-getdata 332 self.conn.commit()
9982-getdata 333
9982-getdata 334 def get_message(self, message_hash):
9982-getdata 335 cursor = self.cursor()
9982-getdata 336 message_hash_hex_string = binascii.hexlify(message_hash)
9982-getdata 337 result = cursor.execute("select command, message_bytes from log where message_hash=? limit 1",
9982-getdata 338 (message_hash_hex_string,)).fetchone()
9982-getdata 339 if result:
9982-getdata 340 return result[0], result[1][:]
9982-getdata 341
9982-getdata 342 return None, None
9982-getdata 343
9982-getdata 344 def get_keyed_peers(self, exclude_addressless=False, exclude_ids=[]):
9985-single-thread 345 cursor = self.cursor()
9982-getdata 346 peer_ids = self.listify(cursor.execute("select peer_id from keys\
9982-getdata 347 where peer_id not in (%s) order by random()" % ','.join('?'*len(exclude_ids)),
9982-getdata 348 exclude_ids).fetchall())
9987-embargoing 349 peers = []
9987-embargoing 350 for peer_id in peer_ids:
9985-single-thread 351 handle = cursor.execute("select handle from handles where peer_id=?", (peer_id,)).fetchone()[0]
9987-embargoing 352 peer = self.get_peer_by_handle(handle)
9986-rebroadcast-... 353 if self.is_duplicate(peers, peer):
9986-rebroadcast-... 354 continue
9984-unbork-at-co... 355 if exclude_addressless and (peer.address == None or peer.port == None):
9986-rebroadcast-... 356 continue
9986-rebroadcast-... 357 peers.append(peer)
9987-embargoing 358 return peers
9987-embargoing 359
9987-embargoing 360
genesis 361 def get_peer_by_handle(self, handle):
9985-single-thread 362 cursor = self.cursor()
9985-single-thread 363 handle_info = cursor.execute("select handle_id, peer_id from handles where handle=?",
genesis 364 (handle,)).fetchone()
genesis 365
genesis 366 if handle_info == None:
genesis 367 return None
genesis 368
9982-getdata 369 peer_id = handle_info[1]
9985-single-thread 370 address = cursor.execute("select address, port from at where handle_id=?\
genesis 371 order by updated_at desc limit 1",
genesis 372 (handle_info[0],)).fetchone()
9985-single-thread 373 handles = self.listify(cursor.execute("select handle from handles where peer_id=?",
9982-getdata 374 (peer_id,)).fetchall())
9985-single-thread 375 keys = self.listify(cursor.execute("select key from keys where peer_id=?\
9982-getdata 376 order by random()",
9982-getdata 377 (peer_id,)).fetchall())
9982-getdata 378 return Peer(self.station.socket, {
genesis 379 "handles": handles,
genesis 380 "peer_id": handle_info[1],
9986-rebroadcast-... 381 "address": address[0] if address else None,
9986-rebroadcast-... 382 "port": address[1] if address else None,
genesis 383 "keys": keys
genesis 384 })
9987-embargoing 385
genesis 386 def is_duplicate(self, peers, peer):
genesis 387 for existing_peer in peers:
9982-getdata 388 if (not existing_peer.address is None
9982-getdata 389 and existing_peer.address == peer.address
9982-getdata 390 and existing_peer.port == peer.port):
genesis 391 return True
genesis 392 return False