raw
genesis                 1 from lib.peer import Peer
genesis 2 import sqlite3
genesis 3 import imp
genesis 4 import hashlib
genesis 5 from itertools import chain
genesis 6
genesis 7 class State(object):
genesis 8
genesis 9 def __init__(self, server, db_path):
genesis 10 self.server = server
genesis 11 self.conn = sqlite3.connect(db_path)
genesis 12 self.cursor = self.conn.cursor()
genesis 13 self.cursor.execute("create table if not exists at(handle_id integer,\
genesis 14 address text not null,\
genesis 15 port integer not null,\
9991-improved-log... 16 active_at datetime default null,\
genesis 17 updated_at datetime default current_timestamp,\
genesis 18 unique(handle_id, address, port))")
genesis 19
genesis 20 self.cursor.execute("create table if not exists wot(peer_id integer primary key)")
genesis 21
genesis 22 self.cursor.execute("create table if not exists handles(handle_id integer primary key,\
genesis 23 peer_id integer,\
genesis 24 handle text,\
genesis 25 unique(handle))")
genesis 26
genesis 27 self.cursor.execute("create table if not exists keys(peer_id intenger,\
genesis 28 key text,\
genesis 29 used_at datetime default current_timestamp,\
genesis 30 unique(key))")
genesis 31
genesis 32 self.cursor.execute("create table if not exists logs(\
genesis 33 handle text not null,\
genesis 34 peer_id integer,\
genesis 35 message_bytes blob not null,\
genesis 36 created_at datetime default current_timestamp)")
genesis 37
9988-hash-dedup 38 self.cursor.execute("create table if not exists dedup_queue(\
9988-hash-dedup 39 hash text not null,\
9988-hash-dedup 40 created_at datetime default current_timestamp)")
9988-hash-dedup 41
genesis 42 def get_at(self, handle=None):
genesis 43 at = []
genesis 44 if handle == None:
9991-improved-log... 45 results = self.cursor.execute("select handle_id,address,port,active_at from at\
genesis 46 order by updated_at desc").fetchall()
genesis 47 else:
9992-handle-edge-... 48 result = self.cursor.execute("select handle_id from handles where handle=?",
9992-handle-edge-... 49 (handle,)).fetchone()
9992-handle-edge-... 50 if None != result:
9992-handle-edge-... 51 handle_id = result[0]
genesis 52 else:
genesis 53 return []
9991-improved-log... 54 results = self.cursor.execute("select handle_id,address,port,active_at from at \
9992-handle-edge-... 55 where handle_id=? order by updated_at desc",
9992-handle-edge-... 56 (handle_id,)).fetchall()
genesis 57 for result in results:
genesis 58 handle_id, address, port, updated_at = result
genesis 59 h = self.cursor.execute("select handle from handles where handle_id=?",
genesis 60 (handle_id,)).fetchone()[0]
genesis 61 at.append({"handle": h,
genesis 62 "address": "%s:%s" % (address, port),
9991-improved-log... 63 "active_at": updated_at})
genesis 64 return at
genesis 65
genesis 66
9988-hash-dedup 67 def is_duplicate_message(self, message_hash):
9988-hash-dedup 68 self.cursor.execute("delete from dedup_queue where created_at < datetime(current_timestamp, '-1 hour')")
9988-hash-dedup 69 self.conn.commit()
9988-hash-dedup 70 result = self.cursor.execute("select hash from dedup_queue where hash=?",
9988-hash-dedup 71 (message_hash,)).fetchone()
9988-hash-dedup 72 if(result != None):
9988-hash-dedup 73 return True
9988-hash-dedup 74 else:
9988-hash-dedup 75 return False
9988-hash-dedup 76
9988-hash-dedup 77 def add_to_dedup_queue(self, message_hash):
9988-hash-dedup 78 self.cursor.execute("insert into dedup_queue(hash)\
9988-hash-dedup 79 values(?)",
9988-hash-dedup 80 (message_hash,))
9988-hash-dedup 81 self.conn.commit()
9988-hash-dedup 82
genesis 83 def get_last_message_hash(self, handle, peer_id=None):
genesis 84 if peer_id:
genesis 85 message_bytes = self.cursor.execute("select message_bytes from logs\
genesis 86 where handle=? and peer_id=?\
genesis 87 order by created_at desc limit 1",
genesis 88 (handle, peer_id)).fetchone()
genesis 89
genesis 90 else:
genesis 91 message_bytes = self.cursor.execute("select message_bytes from logs\
genesis 92 where handle=? and peer_id is null\
genesis 93 order by created_at desc limit 1",
genesis 94 (handle,)).fetchone()
genesis 95
genesis 96 if message_bytes:
genesis 97 return hashlib.sha256(message_bytes[0][:]).digest()
genesis 98 else:
genesis 99 return "0" * 32
genesis 100
genesis 101 def log(self, handle, message_bytes, peer_id=None):
genesis 102 self.cursor.execute("insert into logs(handle, peer_id, message_bytes)\
genesis 103 values(?, ?, ?)",
genesis 104 (handle, peer_id, buffer(message_bytes)))
genesis 105
genesis 106 def import_at_and_wot(self, at_path):
genesis 107 wot = imp.load_source('wot', at_path)
genesis 108 for peer in wot.peers:
genesis 109 results = self.cursor.execute("select * from handles where handle=? limit 1",
genesis 110 (peer["name"],)).fetchall()
genesis 111 if len(results) == 0:
genesis 112 key = peer["key"]
genesis 113 port = peer["port"]
genesis 114 address = peer["address"]
genesis 115 self.cursor.execute("insert into wot(peer_id) values(null)")
genesis 116 peer_id = self.cursor.lastrowid
genesis 117 self.cursor.execute("insert into handles(peer_id, handle) values(?, ?)",
genesis 118 (peer_id, peer["name"]))
genesis 119 handle_id = self.cursor.lastrowid
9991-improved-log... 120 self.cursor.execute("insert into at(handle_id, address, port, updated_at) values(?, ?, ?, ?)",
9991-improved-log... 121 (handle_id, peer["address"], peer["port"], None))
genesis 122 self.cursor.execute("insert into keys(peer_id, key) values(?, ?)",
genesis 123 (peer_id, key))
genesis 124
genesis 125 self.conn.commit()
genesis 126
9991-improved-log... 127 def update_address_table(self, peer, set_active_at=True):
genesis 128 row = self.cursor.execute("select handle_id from handles where handle=?",
genesis 129 (peer["handle"],)).fetchone()
genesis 130 if row != None:
genesis 131 handle_id = row[0]
genesis 132 else:
genesis 133 return
9991-improved-log... 134
genesis 135 try:
genesis 136 self.cursor.execute("insert into at(handle_id, address, port) values(?, ?, ?)",
genesis 137 (handle_id, peer["address"], peer["port"]))
genesis 138 except sqlite3.IntegrityError as ex:
genesis 139 self.cursor.execute("update at set updated_at = current_timestamp\
genesis 140 where handle_id=? and address=? and port=?",
genesis 141 (handle_id, peer["address"], peer["port"]))
9991-improved-log... 142 if set_active_at:
9991-improved-log... 143 self.cursor.execute("update at set active_at = current_timestamp\
9991-improved-log... 144 where handle_id=? and address=? and port=?",
9991-improved-log... 145 (handle_id, peer["address"], peer["port"]))
9991-improved-log... 146 self.conn.commit()
genesis 147
genesis 148 def add_peer(self, handle):
genesis 149 self.cursor.execute("insert into wot(peer_id) values(null)")
genesis 150 peer_id = self.cursor.lastrowid
genesis 151 self.cursor.execute("insert into handles(peer_id, handle) values(?, ?)",
genesis 152 (peer_id, handle))
genesis 153 self.conn.commit()
9991-improved-log... 154
genesis 155
genesis 156 def remove_peer(self, handle):
genesis 157 # get peer id
genesis 158
9992-handle-edge-... 159 result = self.cursor.execute("select peer_id from handles where handle=?", (handle,)).fetchone()
9992-handle-edge-... 160 if result == None:
9992-handle-edge-... 161 return
genesis 162 else:
9992-handle-edge-... 163 peer_id = result[0]
genesis 164 # get all aliases
genesis 165
genesis 166 handle_ids = self.get_handle_ids_for_peer(peer_id)
genesis 167 for handle_id in handle_ids:
genesis 168 # delete at entries for each alias
genesis 169
genesis 170 self.cursor.execute("delete from at where handle_id=?", (handle_id,))
genesis 171
genesis 172 self.cursor.execute("delete from handles where peer_id=?", (peer_id,))
genesis 173
genesis 174 # delete all keys for peer id
genesis 175
genesis 176 self.cursor.execute("delete from keys where peer_id=?", (handle_id,))
genesis 177
genesis 178 # delete peer from wot
genesis 179
genesis 180 self.cursor.execute("delete from wot where peer_id=?", (peer_id,))
genesis 181 self.conn.commit()
genesis 182
genesis 183
genesis 184 def add_key(self, handle, key):
genesis 185 peer_id = self.cursor.execute("select peer_id from handles where handle=?", (handle,)).fetchone()[0]
genesis 186 if peer_id != None:
genesis 187 self.cursor.execute("insert into keys(peer_id, key) values(?, ?)", (peer_id, key))
genesis 188 self.conn.commit()
genesis 189
genesis 190 def remove_key(self, key):
genesis 191 self.cursor.execute("delete from keys where key=?", (key,))
genesis 192 self.conn.commit()
genesis 193
genesis 194 def get_handle_ids_for_peer(self, peer_id):
genesis 195 return list(chain.from_iterable(self.cursor.execute("select handle_id from handles where peer_id=?",
genesis 196 (peer_id,)).fetchall()))
genesis 197
9989-show-wot-nicks 198 def get_peer_handles(self):
9989-show-wot-nicks 199 handles = list(chain.from_iterable(self.cursor.execute("select handle from handles").fetchall()))
9989-show-wot-nicks 200 return handles
9989-show-wot-nicks 201
genesis 202 def get_peers(self):
genesis 203 peers = []
genesis 204 handles = self.cursor.execute("select handle from handles").fetchall()
genesis 205
genesis 206 for handle in handles:
genesis 207 peer = self.get_peer_by_handle(handle[0])
9992-handle-edge-... 208 if not (self.is_duplicate(peers, peer)):
genesis 209 peers.append(peer)
genesis 210 return peers
genesis 211
genesis 212 def get_peer_by_handle(self, handle):
genesis 213 handle_info = self.cursor.execute("select handle_id, peer_id from handles where handle=?",
genesis 214 (handle,)).fetchone()
genesis 215
genesis 216 if handle_info == None:
genesis 217 return None
genesis 218
genesis 219 address = self.cursor.execute("select address, port from at where handle_id=?\
genesis 220 order by updated_at desc limit 1",
genesis 221 (handle_info[0],)).fetchone()
genesis 222 handles = list(chain.from_iterable(self.cursor.execute("select handle from handles where peer_id=?",
genesis 223 (handle_info[1],)).fetchall()))
genesis 224 keys = list(chain.from_iterable(self.cursor.execute("select key from keys where peer_id=?\
genesis 225 order by used_at desc",
genesis 226 (handle_info[1],)).fetchall()))
genesis 227 return Peer(self.server, {
genesis 228 "handles": handles,
genesis 229 "peer_id": handle_info[1],
genesis 230 "address": address[0] if address else "",
genesis 231 "port": address[1] if address else "",
genesis 232 "keys": keys
genesis 233 })
genesis 234 def is_duplicate(self, peers, peer):
genesis 235 for existing_peer in peers:
genesis 236 if existing_peer.address == peer.address and existing_peer.port == peer.port:
genesis 237 return True
genesis 238 return False