- 302F32CC4525B593ED2B8E61AE3610003F5811F59778E1085B3E6CB28DD87646D77758A48B7BB7643D41D7602524B2CB35F2D7C9D96FC1D3E4F5136CD5A1A6F8+ 770B11A71459E33F5F4D2C72CF55DCB79B18C711F3041D8D23375A1419224D309A50C4F10C466527DC7EE910DBDA120ABE877D68E8981C7F51CA4A401BF54775blatta/lib/state.py(1 . 65)(1 . 119)
2154 from peer import Peer
2155 from message import EMPTY_CHAIN
2156 import sqlite3
2157 import imp
2158 import hashlib
2159 import binascii
2160 import logging
2161 import datetime
2162 import caribou
2163
2164 from itertools import chain
2165
2166 KNOBS=({'max_bounces': 3,
2167 'embargo_interval': 1,
2168 'rubbish_interval': 10})
2169 'embargo_interval_seconds': 1,
2170 'rubbish_interval_seconds': 10,
2171 'nick': '',
2172 'order_buffer_check_seconds': 5 * 60,
2173 'order_buffer_expiration_seconds': 5 * 60,
2174 'short_buffer_expiration_seconds': 1,
2175 'short_buffer_check_interval_seconds': 1})
2176
2177 class State(object):
2178 __instance = None
2179 @staticmethod
2180 def get_instance(socket=None, db_path=None):
2181 if State.__instance == None:
2182 State(socket, db_path)
2183 return State.__instance
2184
2185 def __init__(self, socket, db_path):
2186 if State.__instance != None:
2187 raise Exception("This class is a singleton")
2188 else:
2189 self.socket = socket
2190 self.conn = sqlite3.connect(db_path, check_same_thread=False)
2191 cursor = self.cursor()
2192 cursor.execute("create table if not exists at(handle_id integer,\
2193 address text not null,\
2194 port integer not null,\
2195 updated_at datetime default null,\
2196 unique(handle_id, address, port))")
2197
2198 cursor.execute("create table if not exists wot(peer_id integer primary key)")
2199
2200 cursor.execute("create table if not exists handles(handle_id integer primary key,\
2201 peer_id integer,\
2202 handle text,\
2203 unique(handle))")
2204
2205 cursor.execute("create table if not exists keys(peer_id intenger,\
2206 key text,\
2207 used_at datetime default current_timestamp,\
2208 unique(key))")
2209
2210 cursor.execute("create table if not exists logs(\
2211 handle text not null,\
2212 peer_id integer,\
2213 message_bytes blob not null,\
2214 created_at datetime default current_timestamp)")
2215
2216 cursor.execute("create table if not exists dedup_queue(\
2217 hash text not null,\
2218 created_at datetime default current_timestamp)")
2219 cursor.execute("create table if not exists knobs(\
2220 name text not null,\
2221 value text not null)")
2222 State.__instance = self
2223 def __init__(self, station, db_path=None):
2224 self.station = station
2225 if db_path:
2226 self.conn = sqlite3.connect(db_path)
2227 else:
2228 self.conn = sqlite3.connect("file::memory:")
2229
2230 cursor = self.cursor()
2231 cursor.execute("create table if not exists handle_self_chain(id integer primary key autoincrement,\
2232 handle string not null,\
2233 message_hash blob not null)")
2234
2235 cursor.execute("create table if not exists broadcast_self_chain(id integer primary key autoincrement,\
2236 message_hash blob not null)")
2237
2238 cursor.execute("create table if not exists net_chain(id integer primary key autoincrement,\
2239 message_hash blob not null)")
2240
2241 cursor.execute("create table if not exists at(handle_id integer,\
2242 address text not null,\
2243 port integer not null,\
2244 updated_at datetime default null,\
2245 unique(handle_id, address, port))")
2246
2247 cursor.execute("create table if not exists wot(peer_id integer primary key autoincrement)")
2248
2249 cursor.execute("create table if not exists handles(handle_id integer primary key,\
2250 peer_id integer,\
2251 handle text,\
2252 unique(handle))")
2253
2254 cursor.execute("create table if not exists keys(peer_id intenger,\
2255 key text,\
2256 used_at datetime default current_timestamp,\
2257 unique(key))")
2258
2259 cursor.execute("create table if not exists log(\
2260 message_bytes blob not null,\
2261 message_hash text not null, \
2262 command integer not null, \
2263 timestamp datetime not null, \
2264 created_at datetime default current_timestamp)")
2265
2266 cursor.execute("create table if not exists knobs(\
2267 name text not null,\
2268 value text not null)")
2269
2270 # migrate the db if necessary
2271 if db_path:
2272 caribou.upgrade(db_path, "migrations")
2273
2274 self.conn.commit()
2275
2276 def cursor(self):
2277 return self.conn.cursor()
2278
2279 def update_handle_self_chain(self, handle, message_hash):
2280 cursor = self.cursor()
2281 cursor.execute("insert into handle_self_chain(handle, message_hash) values(?, ?)", (handle, buffer(message_hash)))
2282 self.conn.commit()
2283
2284 def get_handle_self_chain(self, handle):
2285 cursor = self.cursor()
2286 results = cursor.execute("select message_hash from handle_self_chain where handle=?\
2287 order by id desc limit 1", (handle,)).fetchone()
2288 if results is not None:
2289 return results[0][:]
2290 else:
2291 return EMPTY_CHAIN
2292
2293 def update_broadcast_self_chain(self, message_hash):
2294 cursor = self.cursor()
2295 cursor.execute("insert into broadcast_self_chain(message_hash) values(?)", (buffer(message_hash),))
2296 self.conn.commit()
2297
2298 def get_broadcast_self_chain(self):
2299 cursor = self.cursor()
2300 results = cursor.execute("select message_hash from broadcast_self_chain order by id desc limit 1").fetchone()
2301 if results is not None:
2302 return results[0][:]
2303 else:
2304 return EMPTY_CHAIN
2305
2306 def update_net_chain(self, message_hash):
2307 self.cursor().execute("insert into net_chain(message_hash) values(?)", (buffer(message_hash),))
2308 self.conn.commit()
2309
2310 def get_net_chain(self):
2311 cursor = self.cursor()
2312 results = cursor.execute("select message_hash from net_chain order by id desc limit 1").fetchone()
2313 if results is not None:
2314 return results[0][:]
2315 else:
2316 return EMPTY_CHAIN
2317
2318 def get_knobs(self):
2319 cursor = self.cursor()
2320 results = cursor.execute("select name, value from knobs order by name asc").fetchall()
(88 . 7)(142 . 15)
2322 cursor.execute("update knobs set value=? where name=?", (knob_value, knob_name,))
2323 else:
2324 cursor.execute("insert into knobs(name, value) values(?, ?)", (knob_name, knob_value,))
2325
2326
2327 self.conn.commit()
2328
2329 def get_latest_message_timestamp(self):
2330 cursor = self.cursor()
2331 result = cursor.execute("select timestamp from log order by timestamp desc limit 1").fetchone()
2332 if result:
2333 return result[0]
2334
2335 def get_at(self, handle=None):
2336 cursor = self.cursor()
2337 at = []
(114 . 57)(176 . 6)
2339 "active_at": updated_at if updated_at else "no packets received from this address"})
2340 return at
2341
2342
2343 def is_duplicate_message(self, message_hash):
2344 cursor = self.cursor()
2345 cursor.execute("delete from dedup_queue where created_at < datetime(current_timestamp, '-1 hour')")
2346 self.conn.commit()
2347 result = cursor.execute("select hash from dedup_queue where hash=?",
2348 (message_hash,)).fetchone()
2349 logging.debug("checking if %s is dupe" % message_hash)
2350 if(result != None):
2351 return True
2352 else:
2353 return False
2354
2355 def add_to_dedup_queue(self, message_hash):
2356 cursor = self.cursor()
2357 cursor.execute("insert into dedup_queue(hash)\
2358 values(?)",
2359 (message_hash,))
2360 logging.debug("added %s to dedup" % message_hash)
2361 self.conn.commit()
2362
2363 def get_last_message_hash(self, handle, peer_id=None):
2364 cursor = self.cursor()
2365 if peer_id:
2366 message_bytes = cursor.execute("select message_bytes from logs\
2367 where handle=? and peer_id=?\
2368 order by created_at desc limit 1",
2369 (handle, peer_id)).fetchone()
2370
2371 else:
2372 message_bytes = cursor.execute("select message_bytes from logs\
2373 where handle=? and peer_id is null\
2374 order by created_at desc limit 1",
2375 (handle,)).fetchone()
2376
2377 if message_bytes:
2378 return hashlib.sha256(message_bytes[0][:]).digest()
2379 else:
2380 return "\x00" * 32
2381
2382 def log(self, handle, message_bytes, peer=None):
2383 cursor = self.cursor()
2384 if peer != None:
2385 peer_id = peer.peer_id
2386 else:
2387 peer_id = None
2388
2389 cursor.execute("insert into logs(handle, peer_id, message_bytes)\
2390 values(?, ?, ?)",
2391 (handle, peer_id, buffer(message_bytes)))
2392
2393 def import_at_and_wot(self, at_path):
2394 cursor = self.cursor()
2395 wot = imp.load_source('wot', at_path)
(184 . 7)(195 . 6)
2397 (handle_id, peer["address"], peer["port"], None))
2398 cursor.execute("insert into keys(peer_id, key) values(?, ?)",
2399 (peer_id, key))
2400
2401 self.conn.commit()
2402
2403 def update_at(self, peer, set_active_at=True):
(212 . 24)(222 . 18)
2405 peer['address'],
2406 peer['port']))
2407
2408 # otherwise update the existing entry if it differs
2409 # otherwise just update the existing entry
2410 else:
2411 try:
2412 if (at_entry[1] != peer['address'] or
2413 at_entry[2] != peer['port']):
2414 cursor.execute("update at set updated_at = ?,\
2415 address = ?,\
2416 port = ?\
2417 where handle_id=?",
2418 (timestamp,
2419 peer["address"],
2420 peer["port"],
2421 handle_id))
2422
2423 logging.debug("updated at entry for %s: %s:%d" % (
2424 peer['handle'],
2425 peer['address'],
2426 peer['port']))
2427 cursor.execute("update at set updated_at = ?,\
2428 address = ?,\
2429 port = ?\
2430 where handle_id=?",
2431 (timestamp,
2432 peer["address"],
2433 peer["port"],
2434 handle_id))
2435
2436 except sqlite3.IntegrityError:
2437 cursor.execute("delete from at where handle_id=?", (handle_id,))
2438
(310 . 10)(314 . 38)
2440
2441 def listify(self, results):
2442 return list(chain.from_iterable(results))
2443
2444 def get_keyed_peers(self, exclude_addressless=False):
2445
2446 def log_has_message(self, message_hash):
2447 cursor = self.cursor()
2448 result = cursor.execute("select exists(select 1 from log where message_hash=?)\
2449 limit 1", (binascii.hexlify(message_hash),)).fetchone()
2450 return result[0]
2451
2452 def log_message(self, message):
2453 cursor = self.cursor()
2454 message_hash_hex_string = binascii.hexlify(message.message_hash)
2455 cursor.execute("insert into log(message_hash, message_bytes, command, timestamp) values(?, ?, ?, ?)",
2456 (message_hash_hex_string,
2457 buffer(message.message_bytes),
2458 message.command,
2459 message.timestamp))
2460 self.conn.commit()
2461
2462 def get_message(self, message_hash):
2463 cursor = self.cursor()
2464 message_hash_hex_string = binascii.hexlify(message_hash)
2465 result = cursor.execute("select command, message_bytes from log where message_hash=? limit 1",
2466 (message_hash_hex_string,)).fetchone()
2467 if result:
2468 return result[0], result[1][:]
2469
2470 return None, None
2471
2472 def get_keyed_peers(self, exclude_addressless=False, exclude_ids=[]):
2473 cursor = self.cursor()
2474 peer_ids = self.listify(cursor.execute("select peer_id from keys").fetchall())
2475 peer_ids = self.listify(cursor.execute("select peer_id from keys\
2476 where peer_id not in (%s) order by random()" % ','.join('?'*len(exclude_ids)),
2477 exclude_ids).fetchall())
2478 peers = []
2479 for peer_id in peer_ids:
2480 handle = cursor.execute("select handle from handles where peer_id=?", (peer_id,)).fetchone()[0]
(334 . 15)(366 . 16)
2482 if handle_info == None:
2483 return None
2484
2485 peer_id = handle_info[1]
2486 address = cursor.execute("select address, port from at where handle_id=?\
2487 order by updated_at desc limit 1",
2488 (handle_info[0],)).fetchone()
2489 handles = self.listify(cursor.execute("select handle from handles where peer_id=?",
2490 (handle_info[1],)).fetchall())
2491 (peer_id,)).fetchall())
2492 keys = self.listify(cursor.execute("select key from keys where peer_id=?\
2493 order by used_at desc",
2494 (handle_info[1],)).fetchall())
2495 return Peer(self.socket, {
2496 order by random()",
2497 (peer_id,)).fetchall())
2498 return Peer(self.station.socket, {
2499 "handles": handles,
2500 "peer_id": handle_info[1],
2501 "address": address[0] if address else None,
(352 . 6)(385 . 8)
2503
2504 def is_duplicate(self, peers, peer):
2505 for existing_peer in peers:
2506 if existing_peer.address == peer.address and existing_peer.port == peer.port:
2507 if (not existing_peer.address is None
2508 and existing_peer.address == peer.address
2509 and existing_peer.port == peer.port):
2510 return True
2511 return False