9979-presence 1 import calendar
9979-presence 2
9985-single-thread 3 from peer import Peer
9982-getdata 4 from message import EMPTY_CHAIN
9979-presence 5 from message import Message
genesis 6 import sqlite3
genesis 7 import imp
9982-getdata 8 import binascii
9987-embargoing 9 import logging
9984-unbork-at-co... 10 import datetime
9979-presence 11 import time
9982-getdata 12 import caribou
9982-getdata 13
genesis 14 from itertools import chain
genesis 15
9979-presence 16 KNOBS=(
9979-presence 17 {
9979-presence 18 'max_bounces': 3,
9982-getdata 19 'embargo_interval_seconds': 1,
9982-getdata 20 'rubbish_interval_seconds': 10,
9982-getdata 21 'nick': '',
9978-bugfixes 22 'order_buffer_check_seconds': 180,
9978-bugfixes 23 'order_buffer_expiration_seconds': 120,
9982-getdata 24 'short_buffer_expiration_seconds': 1,
9979-presence 25 'short_buffer_check_interval_seconds': 1,
9978-bugfixes 26 'getdata_requests_expiration_seconds': 10,
9979-presence 27 'peer_offline_interval_seconds': 60,
9979-presence 28 'peer_away_interval_seconds': 10 * 60,
9979-presence 29 'presence_check_seconds': 5,
9979-presence 30 }
9979-presence 31 )
9983-knobs 32
genesis 33 class State(object):
9982-getdata 34 def __init__(self, station, db_path=None):
9982-getdata 35 self.station = station
9982-getdata 36 if db_path:
9982-getdata 37 self.conn = sqlite3.connect(db_path)
9982-getdata 38 else:
9982-getdata 39 self.conn = sqlite3.connect("file::memory:")
9982-getdata 40
9982-getdata 41 cursor = self.cursor()
9982-getdata 42 cursor.execute("create table if not exists handle_self_chain(id integer primary key autoincrement,\
9982-getdata 43 handle string not null,\
9982-getdata 44 message_hash blob not null)")
9982-getdata 45
9982-getdata 46 cursor.execute("create table if not exists broadcast_self_chain(id integer primary key autoincrement,\
9982-getdata 47 message_hash blob not null)")
9982-getdata 48
9982-getdata 49 cursor.execute("create table if not exists net_chain(id integer primary key autoincrement,\
9982-getdata 50 message_hash blob not null)")
9982-getdata 51
9982-getdata 52 cursor.execute("create table if not exists at(handle_id integer,\
9982-getdata 53 address text not null,\
9982-getdata 54 port integer not null,\
9982-getdata 55 updated_at datetime default null,\
9982-getdata 56 unique(handle_id, address, port))")
9982-getdata 57
9982-getdata 58 cursor.execute("create table if not exists wot(peer_id integer primary key autoincrement)")
9982-getdata 59
9982-getdata 60 cursor.execute("create table if not exists handles(handle_id integer primary key,\
9982-getdata 61 peer_id integer,\
9982-getdata 62 handle text,\
9982-getdata 63 unique(handle))")
9982-getdata 64
9982-getdata 65 cursor.execute("create table if not exists keys(peer_id intenger,\
9982-getdata 66 key text,\
9982-getdata 67 used_at datetime default current_timestamp,\
9982-getdata 68 unique(key))")
9982-getdata 69
9982-getdata 70 cursor.execute("create table if not exists log(\
9982-getdata 71 message_bytes blob not null,\
9982-getdata 72 message_hash text not null, \
9982-getdata 73 command integer not null, \
9982-getdata 74 timestamp datetime not null, \
9982-getdata 75 created_at datetime default current_timestamp)")
9982-getdata 76
9982-getdata 77 cursor.execute("create table if not exists knobs(\
9982-getdata 78 name text not null,\
9982-getdata 79 value text not null)")
9982-getdata 80
9982-getdata 81
9982-getdata 82 if db_path:
9982-getdata 83 caribou.upgrade(db_path, "migrations")
9982-getdata 84
9982-getdata 85 self.conn.commit()
9988-hash-dedup 86
9985-single-thread 87 def cursor(self):
9985-single-thread 88 return self.conn.cursor()
9985-single-thread 89
9982-getdata 90 def update_handle_self_chain(self, handle, message_hash):
9982-getdata 91 cursor = self.cursor()
9982-getdata 92 cursor.execute("insert into handle_self_chain(handle, message_hash) values(?, ?)", (handle, buffer(message_hash)))
9982-getdata 93 self.conn.commit()
9982-getdata 94
9982-getdata 95 def get_handle_self_chain(self, handle):
9982-getdata 96 cursor = self.cursor()
9982-getdata 97 results = cursor.execute("select message_hash from handle_self_chain where handle=?\
9982-getdata 98 order by id desc limit 1", (handle,)).fetchone()
9982-getdata 99 if results is not None:
9982-getdata 100 return results[0][:]
9982-getdata 101 else:
9982-getdata 102 return EMPTY_CHAIN
9982-getdata 103
9982-getdata 104 def update_broadcast_self_chain(self, message_hash):
9982-getdata 105 cursor = self.cursor()
9982-getdata 106 cursor.execute("insert into broadcast_self_chain(message_hash) values(?)", (buffer(message_hash),))
9982-getdata 107 self.conn.commit()
9982-getdata 108
9982-getdata 109 def get_broadcast_self_chain(self):
9982-getdata 110 cursor = self.cursor()
9982-getdata 111 results = cursor.execute("select message_hash from broadcast_self_chain order by id desc limit 1").fetchone()
9982-getdata 112 if results is not None:
9982-getdata 113 return results[0][:]
9982-getdata 114 else:
9982-getdata 115 return EMPTY_CHAIN
9982-getdata 116
9982-getdata 117 def update_net_chain(self, message_hash):
9982-getdata 118 self.cursor().execute("insert into net_chain(message_hash) values(?)", (buffer(message_hash),))
9982-getdata 119 self.conn.commit()
9982-getdata 120
9982-getdata 121 def get_net_chain(self):
9982-getdata 122 cursor = self.cursor()
9982-getdata 123 results = cursor.execute("select message_hash from net_chain order by id desc limit 1").fetchone()
9982-getdata 124 if results is not None:
9982-getdata 125 return results[0][:]
9982-getdata 126 else:
9982-getdata 127 return EMPTY_CHAIN
9982-getdata 128
9983-knobs 129 def get_knobs(self):
9983-knobs 130 cursor = self.cursor()
9983-knobs 131 results = cursor.execute("select name, value from knobs order by name asc").fetchall()
9983-knobs 132 knobs = {}
9983-knobs 133 for result in results:
9983-knobs 134 knobs[result[0]] = result[1]
9983-knobs 135 for key in KNOBS.keys():
9983-knobs 136 if not knobs.get(key):
9983-knobs 137 knobs[key] = KNOBS[key]
9983-knobs 138 return knobs
9983-knobs 139
9983-knobs 140 def get_knob(self, knob_name):
9983-knobs 141 cursor = self.cursor()
9983-knobs 142 result = cursor.execute("select value from knobs where name=?", (knob_name,)).fetchone()
9983-knobs 143 if result:
9983-knobs 144 return result[0]
9983-knobs 145 elif KNOBS.get(knob_name):
9983-knobs 146 return KNOBS.get(knob_name)
9983-knobs 147 else:
9983-knobs 148 return None
9983-knobs 149
9983-knobs 150 def set_knob(self, knob_name, knob_value):
9983-knobs 151 cursor = self.cursor()
9983-knobs 152 result = cursor.execute("select value from knobs where name=?", (knob_name,)).fetchone()
9983-knobs 153 if result:
9983-knobs 154 cursor.execute("update knobs set value=? where name=?", (knob_value, knob_name,))
9983-knobs 155 else:
9983-knobs 156 cursor.execute("insert into knobs(name, value) values(?, ?)", (knob_name, knob_value,))
9982-getdata 157
9982-getdata 158 self.conn.commit()
9982-getdata 159
9982-getdata 160 def get_latest_message_timestamp(self):
9982-getdata 161 cursor = self.cursor()
9982-getdata 162 result = cursor.execute("select timestamp from log order by timestamp desc limit 1").fetchone()
9982-getdata 163 if result:
9982-getdata 164 return result[0]
9982-getdata 165
genesis 166 def get_at(self, handle=None):
9985-single-thread 167 cursor = self.cursor()
genesis 168 at = []
genesis 169 if handle == None:
9979-presence 170 results = cursor.execute("select handle_id, address, port, updated_at, strftime('%s', updated_at) from at\
genesis 171 order by updated_at desc").fetchall()
genesis 172 else:
9985-single-thread 173 result = cursor.execute("select handle_id from handles where handle=?",
9992-handle-edge-... 174 (handle,)).fetchone()
9992-handle-edge-... 175 if None != result:
9992-handle-edge-... 176 handle_id = result[0]
genesis 177 else:
genesis 178 return []
9979-presence 179 results = cursor.execute("select handle_id, address, port, updated_at, strftime('%s', updated_at) from at \
9992-handle-edge-... 180 where handle_id=? order by updated_at desc",
9992-handle-edge-... 181 (handle_id,)).fetchall()
genesis 182 for result in results:
9979-presence 183 handle_id, address, port, updated_at_utc, updated_at_unixtime = result
9985-single-thread 184 h = cursor.execute("select handle from handles where handle_id=?",
genesis 185 (handle_id,)).fetchone()[0]
9979-presence 186 if updated_at_utc:
9978-bugfixes 187 if '.' not in updated_at_utc:
9978-bugfixes 188 updated_at_utc = updated_at_utc + '.0'
9979-presence 189 dt_format = '%Y-%m-%d %H:%M:%S.%f'
9979-presence 190 dt_utc = datetime.datetime.strptime(updated_at_utc, dt_format)
9979-presence 191 dt_local = self.utc_to_local(dt_utc)
9979-presence 192 updated_at = datetime.datetime.strftime(dt_local, dt_format)
9979-presence 193 else:
9979-presence 194 updated_at = "no packets received from this address"
9979-presence 195
9979-presence 196 at.append({
9979-presence 197 "handle": h,
9979-presence 198 "address": "%s:%s" % (address, port),
9979-presence 199 "active_at": updated_at,
9979-presence 200 "active_at_unixtime": int(updated_at_unixtime) if updated_at_unixtime else 0
9979-presence 201 })
genesis 202 return at
genesis 203
genesis 204 def import_at_and_wot(self, at_path):
9985-single-thread 205 cursor = self.cursor()
9985-single-thread 206 wot = imp.load_source('wot', at_path)
9985-single-thread 207 for peer in wot.peers:
9985-single-thread 208 results = cursor.execute("select * from handles where handle=? limit 1",
9985-single-thread 209 (peer["name"],)).fetchall()
9985-single-thread 210 if len(results) == 0:
9985-single-thread 211 key = peer["key"]
9985-single-thread 212 port = peer["port"]
9985-single-thread 213 address = peer["address"]
9985-single-thread 214 cursor.execute("insert into wot(peer_id) values(null)")
9985-single-thread 215 peer_id = cursor.lastrowid
9985-single-thread 216 cursor.execute("insert into handles(peer_id, handle) values(?, ?)",
9985-single-thread 217 (peer_id, peer["name"]))
9985-single-thread 218 handle_id = cursor.lastrowid
9985-single-thread 219 cursor.execute("insert into at(handle_id, address, port, updated_at) values(?, ?, ?, ?)",
9985-single-thread 220 (handle_id, peer["address"], peer["port"], None))
9985-single-thread 221 cursor.execute("insert into keys(peer_id, key) values(?, ?)",
9985-single-thread 222 (peer_id, key))
9985-single-thread 223 self.conn.commit()
genesis 224
9987-embargoing 225 def update_at(self, peer, set_active_at=True):
9985-single-thread 226 cursor = self.cursor()
9985-single-thread 227 row = cursor.execute("select handle_id from handles where handle=?",
9985-single-thread 228 (peer["handle"],)).fetchone()
9985-single-thread 229 if row != None:
9985-single-thread 230 handle_id = row[0]
9985-single-thread 231 else:
9984-unbork-at-co... 232 raise Exception("handle not found")
9985-single-thread 233
9984-unbork-at-co... 234 at_entry = cursor.execute("select handle_id, address, port from at where handle_id=?",
9984-unbork-at-co... 235 (handle_id,)).fetchone()
9984-unbork-at-co... 236
9984-unbork-at-co... 237
9979-presence 238 timestamp = datetime.datetime.utcnow() if set_active_at else None
9984-unbork-at-co... 239 if at_entry == None:
9984-unbork-at-co... 240 cursor.execute("insert into at(handle_id, address, port, updated_at) values(?, ?, ?, ?)",
9984-unbork-at-co... 241 (handle_id,
9984-unbork-at-co... 242 peer["address"],
9984-unbork-at-co... 243 peer["port"],
9984-unbork-at-co... 244 timestamp))
9984-unbork-at-co... 245 logging.debug("inserted new at entry for %s: %s:%d" % (
9984-unbork-at-co... 246 peer['handle'],
9984-unbork-at-co... 247 peer['address'],
9984-unbork-at-co... 248 peer['port']))
9984-unbork-at-co... 249
9982-getdata 250
9984-unbork-at-co... 251 else:
9984-unbork-at-co... 252 try:
9982-getdata 253 cursor.execute("update at set updated_at = ?,\
9982-getdata 254 address = ?,\
9982-getdata 255 port = ?\
9982-getdata 256 where handle_id=?",
9982-getdata 257 (timestamp,
9982-getdata 258 peer["address"],
9982-getdata 259 peer["port"],
9982-getdata 260 handle_id))
9982-getdata 261
9984-unbork-at-co... 262 except sqlite3.IntegrityError:
9984-unbork-at-co... 263 cursor.execute("delete from at where handle_id=?", (handle_id,))
9984-unbork-at-co... 264
9984-unbork-at-co... 265
9985-single-thread 266 self.conn.commit()
genesis 267
genesis 268 def add_peer(self, handle):
9985-single-thread 269 cursor = self.cursor()
9985-single-thread 270 cursor.execute("insert into wot(peer_id) values(null)")
9984-unbork-at-co... 271 peer_id = cursor.lastrowid
9985-single-thread 272 cursor.execute("insert into handles(peer_id, handle) values(?, ?)",
9985-single-thread 273 (peer_id, handle))
9985-single-thread 274 self.conn.commit()
9991-improved-log... 275
genesis 276
genesis 277 def remove_peer(self, handle):
9985-single-thread 278 cursor = self.cursor()
9985-single-thread 279
9985-single-thread 280
9985-single-thread 281 result = cursor.execute("select peer_id from handles where handle=?",
9985-single-thread 282 (handle,)).fetchone()
9985-single-thread 283 if result == None:
9984-unbork-at-co... 284 raise Exception("handle not found")
9985-single-thread 285 else:
9985-single-thread 286 peer_id = result[0]
9985-single-thread 287
genesis 288
9985-single-thread 289 handle_ids = self.get_handle_ids_for_peer(peer_id)
9985-single-thread 290 for handle_id in handle_ids:
9985-single-thread 291
genesis 292
9985-single-thread 293 cursor.execute("delete from at where handle_id=?", (handle_id,))
genesis 294
9985-single-thread 295 cursor.execute("delete from handles where peer_id=?", (peer_id,))
genesis 296
9985-single-thread 297
genesis 298
9985-single-thread 299 cursor.execute("delete from keys where peer_id=?", (handle_id,))
9985-single-thread 300
9985-single-thread 301
9985-single-thread 302
9985-single-thread 303 cursor.execute("delete from wot where peer_id=?", (peer_id,))
9985-single-thread 304 self.conn.commit()
genesis 305
genesis 306
genesis 307 def add_key(self, handle, key):
9985-single-thread 308 cursor = self.cursor()
9985-single-thread 309 peer_id = cursor.execute("select peer_id from handles where handle=?", (handle,)).fetchone()[0]
9985-single-thread 310 if peer_id != None:
9985-single-thread 311 cursor.execute("insert into keys(peer_id, key) values(?, ?)", (peer_id, key))
9985-single-thread 312 self.conn.commit()
genesis 313
genesis 314 def remove_key(self, key):
9985-single-thread 315 cursor = self.cursor()
9985-single-thread 316 cursor.execute("delete from keys where key=?", (key,))
9985-single-thread 317 self.conn.commit()
genesis 318
genesis 319 def get_handle_ids_for_peer(self, peer_id):
9985-single-thread 320 cursor = self.cursor()
9985-single-thread 321 return list(chain.from_iterable(cursor.execute("select handle_id from handles where peer_id=?",
genesis 322 (peer_id,)).fetchall()))
genesis 323
9989-show-wot-nicks 324 def get_peer_handles(self):
9985-single-thread 325 cursor = self.cursor()
9985-single-thread 326 handles = self.listify(cursor.execute("select handle from handles").fetchall())
9989-show-wot-nicks 327 return handles
9989-show-wot-nicks 328
genesis 329 def get_peers(self):
9985-single-thread 330 cursor = self.cursor()
genesis 331 peers = []
9985-single-thread 332 handles = cursor.execute("select handle from handles").fetchall()
genesis 333
genesis 334 for handle in handles:
genesis 335 peer = self.get_peer_by_handle(handle[0])
9992-handle-edge-... 336 if not (self.is_duplicate(peers, peer)):
genesis 337 peers.append(peer)
genesis 338 return peers
genesis 339
9987-embargoing 340 def listify(self, results):
9987-embargoing 341 return list(chain.from_iterable(results))
9982-getdata 342
9982-getdata 343 def log_has_message(self, message_hash):
9982-getdata 344 cursor = self.cursor()
9982-getdata 345 result = cursor.execute("select exists(select 1 from log where message_hash=?)\
9982-getdata 346 limit 1", (binascii.hexlify(message_hash),)).fetchone()
9982-getdata 347 return result[0]
9982-getdata 348
9982-getdata 349 def log_message(self, message):
9982-getdata 350 cursor = self.cursor()
9982-getdata 351 message_hash_hex_string = binascii.hexlify(message.message_hash)
9982-getdata 352 cursor.execute("insert into log(message_hash, message_bytes, command, timestamp) values(?, ?, ?, ?)",
9982-getdata 353 (message_hash_hex_string,
9982-getdata 354 buffer(message.message_bytes),
9982-getdata 355 message.command,
9982-getdata 356 message.timestamp))
9982-getdata 357 self.conn.commit()
9982-getdata 358
9982-getdata 359 def get_message(self, message_hash):
9982-getdata 360 cursor = self.cursor()
9982-getdata 361 message_hash_hex_string = binascii.hexlify(message_hash)
9982-getdata 362 result = cursor.execute("select command, message_bytes from log where message_hash=? limit 1",
9982-getdata 363 (message_hash_hex_string,)).fetchone()
9982-getdata 364 if result:
9982-getdata 365 return result[0], result[1][:]
9982-getdata 366
9982-getdata 367 return None, None
9982-getdata 368
9982-getdata 369 def get_keyed_peers(self, exclude_addressless=False, exclude_ids=[]):
9985-single-thread 370 cursor = self.cursor()
9982-getdata 371 peer_ids = self.listify(cursor.execute("select peer_id from keys\
9982-getdata 372 where peer_id not in (%s) order by random()" % ','.join('?'*len(exclude_ids)),
9982-getdata 373 exclude_ids).fetchall())
9987-embargoing 374 peers = []
9987-embargoing 375 for peer_id in peer_ids:
9985-single-thread 376 handle = cursor.execute("select handle from handles where peer_id=?", (peer_id,)).fetchone()[0]
9987-embargoing 377 peer = self.get_peer_by_handle(handle)
9986-rebroadcast-... 378 if self.is_duplicate(peers, peer):
9986-rebroadcast-... 379 continue
9984-unbork-at-co... 380 if exclude_addressless and (peer.address == None or peer.port == None):
9986-rebroadcast-... 381 continue
9986-rebroadcast-... 382 peers.append(peer)
9987-embargoing 383 return peers
9987-embargoing 384
9979-presence 385
9979-presence 386 def handle_is_online(self, handle):
9979-presence 387
9979-presence 388
9978-bugfixes 389 try:
9978-bugfixes 390 at = self.get_at(handle)[0]
9978-bugfixes 391 except IndexError:
9978-bugfixes 392 return False
9978-bugfixes 393
9979-presence 394 if at["active_at_unixtime"] > time.time() - int(self.get_knob("peer_offline_interval_seconds")):
9979-presence 395 return True
9979-presence 396 else:
9979-presence 397 return False
9979-presence 398
9979-presence 399 def utc_to_local(self, utc_dt):
9979-presence 400
9979-presence 401 timestamp = calendar.timegm(utc_dt.timetuple())
9979-presence 402 local_dt = datetime.datetime.fromtimestamp(timestamp)
9979-presence 403 assert utc_dt.resolution >= datetime.timedelta(microseconds=1)
9979-presence 404 return local_dt.replace(microsecond=utc_dt.microsecond)
9979-presence 405
9979-presence 406 def handle_is_away(self, handle):
9979-presence 407
9979-presence 408 cursor = self.cursor()
9979-presence 409 away_interval_seconds = int(self.get_knob("peer_away_interval_seconds"))
9979-presence 410 dt = datetime.datetime.utcfromtimestamp(
9979-presence 411 time.time() - away_interval_seconds
9979-presence 412 )
9979-presence 413 raw_messages = cursor.execute("select message_bytes from log where created_at > ?", (dt,)).fetchall()
9979-presence 414 for message_bytes in raw_messages:
9979-presence 415 int_ts, self_chain, net_chain, speaker, body = Message._unpack_message(message_bytes[0][:])
9979-presence 416 if speaker == handle:
9979-presence 417 return False
9979-presence 418 return True
9979-presence 419
genesis 420 def get_peer_by_handle(self, handle):
9985-single-thread 421 cursor = self.cursor()
9985-single-thread 422 handle_info = cursor.execute("select handle_id, peer_id from handles where handle=?",
genesis 423 (handle,)).fetchone()
genesis 424
genesis 425 if handle_info == None:
genesis 426 return None
genesis 427
9982-getdata 428 peer_id = handle_info[1]
9985-single-thread 429 address = cursor.execute("select address, port from at where handle_id=?\
genesis 430 order by updated_at desc limit 1",
genesis 431 (handle_info[0],)).fetchone()
9985-single-thread 432 handles = self.listify(cursor.execute("select handle from handles where peer_id=?",
9982-getdata 433 (peer_id,)).fetchall())
9985-single-thread 434 keys = self.listify(cursor.execute("select key from keys where peer_id=?\
9982-getdata 435 order by random()",
9982-getdata 436 (peer_id,)).fetchall())
9982-getdata 437 return Peer(self.station.socket, {
genesis 438 "handles": handles,
genesis 439 "peer_id": handle_info[1],
9986-rebroadcast-... 440 "address": address[0] if address else None,
9986-rebroadcast-... 441 "port": address[1] if address else None,
genesis 442 "keys": keys
genesis 443 })
9987-embargoing 444
genesis 445 def is_duplicate(self, peers, peer):
genesis 446 for existing_peer in peers:
9982-getdata 447 if (not existing_peer.address is None
9982-getdata 448 and existing_peer.address == peer.address
9982-getdata 449 and existing_peer.port == peer.port):
genesis 450 return True
genesis 451 return False