tree checksum vpatch file split hunks
all signers:
antecedents: 9987-embargoing
press order:
genesis | |
9992-handle-edge-cases-add-feedback | |
9991-improved-logging | |
9990-keep-ephemeral-ports-open | |
9989-show-wot-nicks | |
9988-hash-dedup | |
9987-embargoing | |
9986-rebroadcast-simple-hearsay-and-more |
patch:
(515 . 7)(515 . 7)- 2F8E9DF6CF92A779900080F585A1B9873218D949BE48299950C2B05A64FD5A3ABD8AB927F5C5D4C5D51E7C3DEF2BAB13A6FFE532907E681883730D60BD75B0E5
5 except Exception as ex:
6 self.pest_reply("Error attempting to update address table")
7 stack = traceback.format_exc()
8 logger.debug(stack)
9 logging.debug(stack)
10 return
11 elif len(arguments) > 2:
12 self.pest_reply("Usage: AT [<HANDLE>] [<ADDRESS>]")
(562 . 7)(562 . 7)
14 except KeyError:
15 self.reply("421 %s %s :Unknown command" % (self.nickname, command))
16 stack = traceback.format_exc()
17 logger.debug(stack)
18 logging.debug(stack)
19
20 def socket_readable_notification(self):
21 try:
(40 . 7)(40 . 6)- 0CF5CD14C7E157CF47CF2E4F0F9FD076E89A817A71B389AA8748A8472437E62A2AC35FA5F3B3AE9BE39A2F15A926E5920571267BDF39B9ED6B4E0E4CBF9C2970
26
27 def message(self, message):
28 # if we are not rebroadcasting we need to set the timestamp
29
30 if message.timestamp == None:
31 message.original = True
32 message.timestamp = int(time.time())
(59 . 7)(58 . 11)
34 logging.debug("Aborting message: unknown handle: %s" % message.handle)
35 return
36
37 message_bytes = self.get_message_bytes(message, target_peer)
38 if message.message_bytes == None:
39 message_bytes = self.get_message_bytes(message, target_peer)
40 else:
41 message_bytes = message.message_bytes
42
43 if message.command != IGNORE:
44 message_hash = binascii.hexlify(hashlib.sha256(message_bytes).digest())
45 logging.debug("generated message_hash: %s" % message_hash)
(70 . 12)(73 . 16)
47 signed_packet_bytes = self.pack(target_peer, message, message_bytes)
48 target_peer.send(signed_packet_bytes)
49 elif message.command == BROADCAST or message.command == IGNORE:
50 # sanity check
51 if message.message_bytes and message_bytes != message_bytes:
52 logging.debug("aborting send: message modified by station!")
53 return
54
55 for peer in self.state.get_keyed_peers():
56
57 # we don't want to send a broadcast back to the originator
58
59 if message.peer and (peer.peer_id == message.peer.peer_id):
60 next
61 continue
62
63 signed_packet_bytes = self.pack(peer, message, message_bytes)
64 peer.send(signed_packet_bytes)
(225 . 7)(232 . 8)
66 "self_chain": self_chain,
67 "net_chain": net_chain,
68 "self_chain_valid": self_chain_valid,
69 "message_hash": message_hash
70 "message_hash": message_hash,
71 "message_bytes": message_bytes
72 })
73
74 # check for duplicates
(14 . 3)(14 . 4)
79 self.self_chain_valid = message.get("self_chain_valid")
80 self.error_code = message.get("error_code")
81 self.message_hash = message.get("message_hash")
82 self.message_bytes = None
- C96DA174AE6CEB0489ED2B50872C02E481FA7D14D09A747B387BCF7EC45F1EC1DAC5326A3370951F20C8995C85FCC7A8B91BC333CEDAF78B11345A3897C8F95D(24 . 15)(24 . 15)
87 return None
88
89 def send(self, signed_packet_bytes):
90 if self.get_key() != None:
91 if self.get_key() != None and self.address != None and self.port != None:
92 try:
93 self.socket.sendto(signed_packet_bytes, (self.address, self.port))
94 logging.debug("[%s:%d] <- %s" % (self.address,
95 self.port,
96 int(self.port),
97 binascii.hexlify(signed_packet_bytes)[0:16]))
98
99 except Exception as ex:
100 stack = traceback.format_exc()
101 logging.debug(stack)
102 else:
103 logging.debug("Discarding message to unknown handle or handle with no key: %s" % message.handle)
104 logging.debug("Discarding message to unknown handle or handle with no key: %s" % self.handles[0])
- 7F7198C51EB6B00321C1754F1675D907263BF600B8EF67B79641E4A763357FE73935B9F1534234D226F9564E2341AEAAFB50BFCFF5128BFE14404651B8A36EF2(1 . 4)(1 . 4)- 4F78202D4744A3284C00C4AAC9C055F4ABAE95EEA1C51C4ACD519A9723A990D4D1FC336254140F75B7D75995A781935A2A6250DD19C7E7610B6643365A47938F
109 VERSION = "9987"
110 VERSION = "9986"
111
112 import os
113 import select
(166 . 12)(166 . 7)
115 for x in inputready:
116 if x == self.udp_server_socket:
117 bytes_address_pair = self.udp_server_socket.recvfrom(PACKET_SIZE)
118 self.station.embargo_queue_lock.acquire()
119 try:
120 self.station.handle_udp_data(bytes_address_pair)
121 except sqlite3.ProgrammingError as ex:
122 logging.error("sqlite3 concurrency problem")
123 self.station.embargo_queue_lock.release()
124 self.station.handle_udp_data(bytes_address_pair)
125 for x in iwtd:
126 if self.client != None:
127 self.client.socket_readable_notification()
(3 . 6)(3 . 7)- 9E41FDD532E857CEC8E4D3407560D8570B8E6B7B713739E6D81622B0A6ABCBE5A74A9E1CE70F192BE2CE5F5C2D0B2374E433BCB7A1D83E322C24460ADF92723A
132 import imp
133 import hashlib
134 import logging
135 import threading
136 from itertools import chain
137
138 class State(object):
(17 . 37)(18 . 39)
140 if State.__instance != None:
141 raise Exception("This class is a singleton")
142 else:
143 self.socket = socket
144 self.conn = sqlite3.connect(db_path, check_same_thread=False)
145 self.cursor = self.conn.cursor()
146 self.cursor.execute("create table if not exists at(handle_id integer,\
147 address text not null,\
148 port integer not null,\
149 active_at datetime default null,\
150 updated_at datetime default current_timestamp,\
151 unique(handle_id, address, port))")
152
153 self.cursor.execute("create table if not exists wot(peer_id integer primary key)")
154
155 self.cursor.execute("create table if not exists handles(handle_id integer primary key,\
156 peer_id integer,\
157 handle text,\
158 unique(handle))")
159
160 self.cursor.execute("create table if not exists keys(peer_id intenger,\
161 key text,\
162 used_at datetime default current_timestamp,\
163 unique(key))")
164
165 self.cursor.execute("create table if not exists logs(\
166 handle text not null,\
167 peer_id integer,\
168 message_bytes blob not null,\
169 created_at datetime default current_timestamp)")
170
171 self.cursor.execute("create table if not exists dedup_queue(\
172 hash text not null,\
173 created_at datetime default current_timestamp)")
174 self.write_lock = threading.Lock()
175 with self.write_lock:
176 self.socket = socket
177 self.conn = sqlite3.connect(db_path, check_same_thread=False)
178 self.cursor = self.conn.cursor()
179 self.cursor.execute("create table if not exists at(handle_id integer,\
180 address text not null,\
181 port integer not null,\
182 active_at datetime default null,\
183 updated_at datetime default current_timestamp,\
184 unique(handle_id, address, port))")
185
186 self.cursor.execute("create table if not exists wot(peer_id integer primary key)")
187
188 self.cursor.execute("create table if not exists handles(handle_id integer primary key,\
189 peer_id integer,\
190 handle text,\
191 unique(handle))")
192
193 self.cursor.execute("create table if not exists keys(peer_id intenger,\
194 key text,\
195 used_at datetime default current_timestamp,\
196 unique(key))")
197
198 self.cursor.execute("create table if not exists logs(\
199 handle text not null,\
200 peer_id integer,\
201 message_bytes blob not null,\
202 created_at datetime default current_timestamp)")
203
204 self.cursor.execute("create table if not exists dedup_queue(\
205 hash text not null,\
206 created_at datetime default current_timestamp)")
207 State.__instance = self
208
209 def get_at(self, handle=None):
(76 . 22)(79 . 24)
211
212
213 def is_duplicate_message(self, message_hash):
214 self.cursor.execute("delete from dedup_queue where created_at < datetime(current_timestamp, '-1 hour')")
215 self.conn.commit()
216 result = self.cursor.execute("select hash from dedup_queue where hash=?",
217 (message_hash,)).fetchone()
218 logging.debug("checking if %s is dupe" % message_hash)
219 if(result != None):
220 return True
221 else:
222 return False
223 with self.write_lock:
224 self.cursor.execute("delete from dedup_queue where created_at < datetime(current_timestamp, '-1 hour')")
225 self.conn.commit()
226 result = self.cursor.execute("select hash from dedup_queue where hash=?",
227 (message_hash,)).fetchone()
228 logging.debug("checking if %s is dupe" % message_hash)
229 if(result != None):
230 return True
231 else:
232 return False
233
234 def add_to_dedup_queue(self, message_hash):
235 self.cursor.execute("insert into dedup_queue(hash)\
236 values(?)",
237 (message_hash,))
238 logging.debug("added %s to dedup" % message_hash)
239 self.conn.commit()
240 with self.write_lock:
241 self.cursor.execute("insert into dedup_queue(hash)\
242 values(?)",
243 (message_hash,))
244 logging.debug("added %s to dedup" % message_hash)
245 self.conn.commit()
246
247 def get_last_message_hash(self, handle, peer_id=None):
248 if peer_id:
(112 . 102)(117 . 109)
250 return "\x00" * 32
251
252 def log(self, handle, message_bytes, peer=None):
253 if peer != None:
254 peer_id = peer.peer_id
255 else:
256 peer_id = None
257 with self.write_lock:
258 if peer != None:
259 peer_id = peer.peer_id
260 else:
261 peer_id = None
262
263 self.cursor.execute("insert into logs(handle, peer_id, message_bytes)\
264 values(?, ?, ?)",
265 (handle, peer_id, buffer(message_bytes)))
266 self.cursor.execute("insert into logs(handle, peer_id, message_bytes)\
267 values(?, ?, ?)",
268 (handle, peer_id, buffer(message_bytes)))
269
270 def import_at_and_wot(self, at_path):
271 wot = imp.load_source('wot', at_path)
272 for peer in wot.peers:
273 results = self.cursor.execute("select * from handles where handle=? limit 1",
274 (peer["name"],)).fetchall()
275 if len(results) == 0:
276 key = peer["key"]
277 port = peer["port"]
278 address = peer["address"]
279 self.cursor.execute("insert into wot(peer_id) values(null)")
280 peer_id = self.cursor.lastrowid
281 self.cursor.execute("insert into handles(peer_id, handle) values(?, ?)",
282 (peer_id, peer["name"]))
283 handle_id = self.cursor.lastrowid
284 self.cursor.execute("insert into at(handle_id, address, port, updated_at) values(?, ?, ?, ?)",
285 (handle_id, peer["address"], peer["port"], None))
286 self.cursor.execute("insert into keys(peer_id, key) values(?, ?)",
287 (peer_id, key))
288 with self.write_lock:
289 wot = imp.load_source('wot', at_path)
290 for peer in wot.peers:
291 results = self.cursor.execute("select * from handles where handle=? limit 1",
292 (peer["name"],)).fetchall()
293 if len(results) == 0:
294 key = peer["key"]
295 port = peer["port"]
296 address = peer["address"]
297 self.cursor.execute("insert into wot(peer_id) values(null)")
298 peer_id = self.cursor.lastrowid
299 self.cursor.execute("insert into handles(peer_id, handle) values(?, ?)",
300 (peer_id, peer["name"]))
301 handle_id = self.cursor.lastrowid
302 self.cursor.execute("insert into at(handle_id, address, port, updated_at) values(?, ?, ?, ?)",
303 (handle_id, peer["address"], peer["port"], None))
304 self.cursor.execute("insert into keys(peer_id, key) values(?, ?)",
305 (peer_id, key))
306
307 self.conn.commit()
308 self.conn.commit()
309
310 def update_at(self, peer, set_active_at=True):
311 row = self.cursor.execute("select handle_id from handles where handle=?",
312 (peer["handle"],)).fetchone()
313 if row != None:
314 handle_id = row[0]
315 else:
316 return
317
318 try:
319 self.cursor.execute("insert into at(handle_id, address, port) values(?, ?, ?)",
320 (handle_id, peer["address"], peer["port"]))
321 except sqlite3.IntegrityError as ex:
322 self.cursor.execute("update at set updated_at = current_timestamp\
323 where handle_id=? and address=? and port=?",
324 (handle_id, peer["address"], peer["port"]))
325 if set_active_at:
326 self.cursor.execute("update at set active_at = current_timestamp\
327 where handle_id=? and address=? and port=?",
328 (handle_id, peer["address"], peer["port"]))
329 self.conn.commit()
330 with self.write_lock:
331 row = self.cursor.execute("select handle_id from handles where handle=?",
332 (peer["handle"],)).fetchone()
333 if row != None:
334 handle_id = row[0]
335 else:
336 return
337
338 try:
339 self.cursor.execute("insert into at(handle_id, address, port) values(?, ?, ?)",
340 (handle_id, peer["address"], peer["port"]))
341 except sqlite3.IntegrityError as ex:
342 self.cursor.execute("update at set updated_at = current_timestamp\
343 where handle_id=? and address=? and port=?",
344 (handle_id, peer["address"], peer["port"]))
345 if set_active_at:
346 self.cursor.execute("update at set active_at = current_timestamp\
347 where handle_id=? and address=? and port=?",
348 (handle_id, peer["address"], peer["port"]))
349 self.conn.commit()
350
351 def add_peer(self, handle):
352 self.cursor.execute("insert into wot(peer_id) values(null)")
353 peer_id = self.cursor.lastrowid
354 self.cursor.execute("insert into handles(peer_id, handle) values(?, ?)",
355 (peer_id, handle))
356 self.conn.commit()
357 with self.write_lock:
358 self.cursor.execute("insert into wot(peer_id) values(null)")
359 peer_id = self.cursor.lastrowid
360 self.cursor.execute("insert into handles(peer_id, handle) values(?, ?)",
361 (peer_id, handle))
362 self.conn.commit()
363
364
365 def remove_peer(self, handle):
366 # get peer id
367
368 result = self.cursor.execute("select peer_id from handles where handle=?", (handle,)).fetchone()
369 if result == None:
370 return
371 else:
372 peer_id = result[0]
373 # get all aliases
374 with self.write_lock:
375 # get peer id
376 result = self.cursor.execute("select peer_id from handles where handle=?",
377 (handle,)).fetchone()
378 if result == None:
379 return
380 else:
381 peer_id = result[0]
382 # get all aliases
383
384 handle_ids = self.get_handle_ids_for_peer(peer_id)
385 for handle_id in handle_ids:
386 # delete at entries for each alias
387 handle_ids = self.get_handle_ids_for_peer(peer_id)
388 for handle_id in handle_ids:
389 # delete at entries for each alias
390
391 self.cursor.execute("delete from at where handle_id=?", (handle_id,))
392 self.cursor.execute("delete from at where handle_id=?", (handle_id,))
393
394 self.cursor.execute("delete from handles where peer_id=?", (peer_id,))
395 self.cursor.execute("delete from handles where peer_id=?", (peer_id,))
396
397 # delete all keys for peer id
398 # delete all keys for peer id
399
400 self.cursor.execute("delete from keys where peer_id=?", (handle_id,))
401
402 # delete peer from wot
403
404 self.cursor.execute("delete from wot where peer_id=?", (peer_id,))
405 self.conn.commit()
406 self.cursor.execute("delete from keys where peer_id=?", (handle_id,))
407
408 # delete peer from wot
409
410 self.cursor.execute("delete from wot where peer_id=?", (peer_id,))
411 self.conn.commit()
412
413
414 def add_key(self, handle, key):
415 peer_id = self.cursor.execute("select peer_id from handles where handle=?", (handle,)).fetchone()[0]
416 if peer_id != None:
417 self.cursor.execute("insert into keys(peer_id, key) values(?, ?)", (peer_id, key))
418 self.conn.commit()
419 with self.write_lock:
420 peer_id = self.cursor.execute("select peer_id from handles where handle=?", (handle,)).fetchone()[0]
421 if peer_id != None:
422 self.cursor.execute("insert into keys(peer_id, key) values(?, ?)", (peer_id, key))
423 self.conn.commit()
424
425 def remove_key(self, key):
426 self.cursor.execute("delete from keys where key=?", (key,))
427 self.conn.commit()
428 with self.write_lock:
429 self.cursor.execute("delete from keys where key=?", (key,))
430 self.conn.commit()
431
432 def get_handle_ids_for_peer(self, peer_id):
433 return list(chain.from_iterable(self.cursor.execute("select handle_id from handles where peer_id=?",
(236 . 8)(248 . 11)
435 for peer_id in peer_ids:
436 handle = self.cursor.execute("select handle from handles where peer_id=?", (peer_id,)).fetchone()[0]
437 peer = self.get_peer_by_handle(handle)
438 if not (self.is_duplicate(peers, peer)):
439 peers.append(peer)
440 if self.is_duplicate(peers, peer):
441 continue
442 if peer.address == None or peer.port == None:
443 continue
444 peers.append(peer)
445 return peers
446
447
(259 . 8)(274 . 8)
449 return Peer(self.socket, {
450 "handles": handles,
451 "peer_id": handle_info[1],
452 "address": address[0] if address else "",
453 "port": address[1] if address else "",
454 "address": address[0] if address else None,
455 "port": address[1] if address else None,
456 "keys": keys
457 })
458
(55 . 16)(55 . 10)
463 self.deliver(message)
464 return
465
466 # if the speaker is in our wot, we need to check if the message is hearsay
467 if message.speaker in self.state.get_peer_handles():
468 # embargo to wait for immediate copy of message
469 else:
470 self.embargo(message)
471 return
472
473 else:
474 # skip the embargo and deliver this message with appropriate simple hearsay labeling
475 message.prefix = "%s[%s]" % (message.speaker, peer.handles[0])
476 self.deliver(message)
477 return
478 elif error_code == STALE_PACKET:
479 logging.debug("[%s:%d] -> stale packet: %s" % packet_info)
480 return
(93 . 18)(87 . 18)
482 def embargo(self, message):
483 # initialize the key/value to empty array if not in the hash
484 # append message to array
485 if not message.message_hash in self.embargo_queue.keys():
486 self.embargo_queue[message.message_hash] = []
487 self.embargo_queue[message.message_hash].append(message)
488 with self.embargo_queue_lock:
489 if not message.message_hash in self.embargo_queue.keys():
490 self.embargo_queue[message.message_hash] = []
491 self.embargo_queue[message.message_hash].append(message)
492
493 def check_embargo_queue(self):
494 # get a lock so other threads can't mess with the db or the queue
495 self.embargo_queue_lock.acquire()
496 self.check_for_immediate_messages()
497 self.flush_hearsay_messages()
498 with self.embargo_queue_lock:
499 self.check_for_immediate_messages()
500 self.flush_hearsay_messages()
501
502 # release the lock
503 self.embargo_queue_lock.release()
504 # release the lock
505
506 # continue the thread loop after interval
507 time.sleep(1)
(180 . 8)(174 . 7)
509
510 def send_rubbish(self):
511 logging.debug("sending rubbish...")
512 self.embargo_queue_lock.acquire()
513 try:
514 with self.embargo_queue_lock:
515 if self.client:
516 self.infosec.message(Message({
517 "speaker": self.client.nickname,
(189 . 8)(182 . 5)
519 "bounces": 0,
520 "body": self.infosec.gen_rubbish_body()
521 }))
522 except:
523 logging.error("Something went wrong attempting to send rubbish")
524 self.embargo_queue_lock.release()
525 time.sleep(RUBBISH_INTERVAL)
526 threading.Thread(target=self.send_rubbish).start()