tree checksum vpatch file split hunks
all signers:
antecedents: genesis 9990-keep-ephemeral-ports-open 9988-hash-dedup 9989-show-wot-nicks 9991-improved-logging
press order:
genesis | |
9992-handle-edge-cases-add-feedback | |
9991-improved-logging | |
9990-keep-ephemeral-ports-open | |
9989-show-wot-nicks | |
9988-hash-dedup | |
9987-embargoing |
patch:
(30 . 3)(30 . 8)
5 3. Use genkey to generate a key.
6 4. Add the key to the peer using the key command.
7 5. Add an address for the peer using the address command.
8
9 NOTES:
10
11 To run the unit tests, you'll need to run:
12 pip install mock
- 50ACEF42C77E18FB23FA8157201DDBA47717BB95AC85FE9D23393ABA847A39665373CC6C98FDBB08096F321081980F421C17ACD4490DE8CB56D38BEF637C0565(8 . 6)(8 . 7)- A35F64EE21532CC117FB36691ECE37E2F523CB01855FB4EB8268AC745519F27FD36ADDB324D56D4893FB7A84FE70302630E53BECF3571E250B3B578CA46ABCE3
17 import sys
18 import tempfile
19 import time
20 import logging
21 from lib.server import VERSION
22 from lib.server import Server
23 from lib.peer import Peer
(90 . 8)(91 . 11)
25 (options, args) = op.parse_args(argv[1:])
26 if options.channel_name is None:
27 options.channel_name = "#pest"
28 log_format = "%(levelname)s %(asctime)s: %(message)s"
29 if options.debug:
30 options.verbose = True
31 logging.basicConfig(level=logging.DEBUG, format=log_format, stream=sys.stdout)
32 else:
33 logging.basicConfig(level=logging.INFO, format=log_format, stream=sys.stdout)
34 if options.irc_ports is None:
35 options.irc_ports = "6697"
36 if options.udp_port is None:
(139 . 7)(143 . 7)
38 try:
39 server.start()
40 except KeyboardInterrupt:
41 server.print_error("Interrupted.")
42 logging.error("Interrupted.")
43
44
45 main(sys.argv)
(6 . 6)(6 . 8)- 2BEBEB6EE55F0941C567E114EE01352F18F96D02AC4E2F037B9CED7E71C219CC2ED51AC615D28E22244C5E07F136EC7CE1FB565C7217C62DE4708AEE4D8B05D5
50 import os
51 import base64
52 import traceback
53 import logging
54 from lib.state import State
55 from lib.message import Message
56 from lib.server import VERSION
57 from funcs import *
(22 . 6)(24 . 7)
59
60 def __init__(self, server, socket):
61 self.server = server
62 self.state = State.get_instance()
63 self.socket = socket
64 self.channels = {} # irc_lower(Channel name) --> Channel
65 self.nickname = None
(37 . 15)(40 . 11)
67 else:
68 self.__handle_command = self.__registration_handler
69
70 def is_addressed_to_me(self, message):
71 command = self.__parse_udp_message(message)
72 if command[0] == 'PRIVMSG':
73 if command[1][0][0] == '#' or command[1][0] == self.nickname:
74 return True
75 else:
76 return False
77 else:
78 return True
79 def message_from_station(self, msg):
80 targetname = self.server.channel_name if msg.command == BROADCAST else self.nickname
81 pest_prefix = msg.prefix if msg.prefix else msg.speaker
82 formatted_message = ":%s PRIVMSG %s :%s" % (pest_prefix, targetname, msg.body)
83 self.__writebuffer += formatted_message + "\r\n"
84
85 def get_prefix(self):
86 return "%s" % (self.nickname)
(68 . 30)(67 . 6)
88 def write_queue_size(self):
89 return len(self.__writebuffer)
90
91 def __parse_udp_message(self, message):
92 data = " ".join(message.split()[1:]) + "\r\n"
93 lines = self.__linesep_regexp.split(data)
94 lines = lines[:-1]
95 commands = []
96 for line in lines:
97 if not line:
98 # Empty line. Ignore.
99 continue
100 x = line.split(" ", 1)
101 command = x[0].upper()
102 if len(x) == 1:
103 arguments = []
104 else:
105 if len(x[1]) > 0 and x[1][0] == ":":
106 arguments = [x[1][1:]]
107 else:
108 y = string.split(x[1], " :", 1)
109 arguments = string.split(y[0])
110 if len(y) == 2:
111 arguments.append(y[1])
112 commands.append([command, arguments])
113 return commands[0]
114
115 def __parse_read_buffer(self):
116 lines = self.__linesep_regexp.split(self.__readbuffer)
117 self.__readbuffer = lines[-1]
(159 . 7)(134 . 6)
119 % self.nickname)
120 self.reply("004 %s :%s blatta-%s o o"
121 % (self.nickname, server.name, VERSION))
122 self.send_lusers()
123 self.send_motd()
124 self.__handle_command = self.__command_handler
125
(182 . 32)(156 . 20)
127 if arguments[0] == "0":
128 for (channelname, channel) in self.channels.items():
129 self.message_channel(channel, "PART", channelname, True)
130 self.channel_log(channel, "left", meta=True)
131 server.remove_member_from_channel(self, channelname)
132 self.channels = {}
133 return
134 channelnames = arguments[0].split(",")
135 if len(arguments) > 1:
136 keys = arguments[1].split(",")
137 else:
138 keys = []
139 keys.extend((len(channelnames) - len(keys)) * [None])
140 for (i, channelname) in enumerate(channelnames):
141 for channelname in channelnames:
142 if irc_lower(channelname) in self.channels:
143 continue
144 if not valid_channel_re.match(channelname):
145 self.reply_403(channelname)
146 continue
147 channel = server.get_channel(channelname)
148 if channel.key is not None and channel.key != keys[i]:
149 self.reply(
150 "475 %s %s :Cannot join channel (+k) - bad key"
151 % (self.nickname, channelname))
152 continue
153 channel.add_member(self)
154 self.channels[irc_lower(channelname)] = channel
155 self.message_channel(channel, "JOIN", channelname, True)
156 self.channel_log(channel, "joined", meta=True)
157 if channel.topic:
158 self.reply("332 %s %s :%s"
159 % (self.nickname, channel.name, channel.topic))
(218 . 7)(180 . 7)
161 % (self.nickname,
162 channelname,
163 " ".join(sorted(x
164 for x in self.server.state.get_peer_handles()))))
165 for x in self.state.get_peer_handles()))))
166 self.reply("366 %s %s :End of NAMES list"
167 % (self.nickname, channelname))
168
(238 . 7)(200 . 7)
170 self.reply("323 %s :End of LIST" % self.nickname)
171
172 def lusers_handler():
173 self.send_lusers()
174 pass
175
176 def mode_handler():
177 if len(arguments) < 1:
(268 . 8)(230 . 6)
179 self.message_channel(
180 channel, "MODE", "%s +k %s" % (channel.name, key),
181 True)
182 self.channel_log(
183 channel, "set channel key to %s" % key, meta=True)
184 else:
185 self.reply("442 %s :You're not on that channel"
186 % targetname)
(279 . 8)(239 . 6)
188 self.message_channel(
189 channel, "MODE", "%s -k" % channel.name,
190 True)
191 self.channel_log(
192 channel, "removed channel key", meta=True)
193 else:
194 self.reply("442 %s :You're not on that channel"
195 % targetname)
(313 . 9)(271 . 6)
197 self.reply("432 %s %s :Erroneous Nickname"
198 % (self.nickname, newnick))
199 else:
200 for x in self.channels.values():
201 self.channel_log(
202 x, "changed nickname to %s" % newnick, meta=True)
203 oldnickname = self.nickname
204 self.nickname = newnick
205 server.client_changed_nickname(self, oldnickname)
(340 . 16)(295 . 23)
207 channel = server.get_channel(targetname)
208 self.message_channel(
209 channel, command, "%s :%s" % (channel.name, message))
210 self.channel_log(channel, message)
211 # send the channel message to peers as well
212 self.server.station.infosec.message(
213 Message(
214 {
215 "speaker": self.nickname,
216 "command": BROADCAST,
217 "bounces": 0,
218 "body": message
219 }))
220 else:
221 formatted_message = ":%s %s %s :%s" % (self.prefix, command, targetname, message)
222 self.server.peer_message(Message({
223 self.server.station.infosec.message(Message({
224 "speaker": self.nickname,
225 "handle": targetname,
226 "body": formatted_message,
227 "body": message,
228 "bounces": 0,
229 "command": DIRECT
230 }, self.server))
231 }))
232 if(client):
233 client.message(formatted_message)
234
(372 . 7)(334 . 6)
236 self.message_channel(
237 channel, "PART", "%s :%s" % (channelname, partmsg),
238 True)
239 self.channel_log(channel, "left (%s)" % partmsg, meta=True)
240 del self.channels[irc_lower(channelname)]
241 server.remove_member_from_channel(self, channelname)
242
(405 . 8)(366 . 6)
244 self.message_channel(
245 channel, "TOPIC", "%s :%s" % (channelname, newtopic),
246 True)
247 self.channel_log(
248 channel, "set topic to %r" % newtopic, meta=True)
249 else:
250 if channel.topic:
251 self.reply("332 %s %s :%s"
(464 . 17)(423 . 21)
253 def wot_handler():
254 if len(arguments) < 1:
255 # Display the current WOT
256 peers = self.server.state.get_peers()
257 peers = self.state.get_peers()
258 if len(peers) > 0:
259 for peer in peers:
260 self.pest_reply("%s %s:%s" % (string.join(peer.handles, ","), peer.address, peer.port))
261 if peer.address and peer.port:
262 address = "%s:%s" % (peer.address, peer.port)
263 else:
264 address = "<address not configured>"
265 self.pest_reply("%s %s" % (string.join(peer.handles, ","), address))
266 else:
267 self.pest_reply("WOT is empty")
268 elif len(arguments) == 1:
269 # Display all WOT data concerning the peer identified by HANDLE,
270 # including all known keys, starting with the most recently used, for that peer.
271 handle = arguments[0]
272 peer = self.server.state.get_peer_by_handle(handle)
273 peer = self.state.get_peer_by_handle(handle)
274 if peer:
275 self.pest_reply("keys:")
276 for key in peer.keys:
(488 . 7)(451 . 7)
278 def peer_handler():
279 if len(arguments) == 1:
280 try:
281 self.server.state.add_peer(arguments[0])
282 self.state.add_peer(arguments[0])
283 self.pest_reply("added new peer %s" % arguments[0])
284 self.message(":%s JOIN %s" % (arguments[0], self.server.channel_name))
285 except:
(499 . 11)(462 . 11)
287 def unpeer_handler():
288 if len(arguments) == 1:
289 try:
290 self.server.state.remove_peer(arguments[0])
291 self.state.remove_peer(arguments[0])
292 self.pest_reply("removed peer %s" % arguments[0])
293 self.message(":%s PART %s" % (arguments[0], self.server.channel_name))
294 except Exception, e:
295 self.server.print_debug(e)
296 logging.debug(e)
297 self.pest_reply("Error attempting to remove peer")
298 else:
299 self.pest_reply("Usage: UNPEER <HANDLE>")
(518 . 7)(481 . 7)
301 handle = arguments[0]
302 key = arguments[1]
303 try:
304 self.server.state.add_key(handle, key)
305 self.state.add_key(handle, key)
306 self.pest_reply("added key: %s" % key)
307 except:
308 self.pest_reply("Error attempting to add key")
(528 . 23)(491 . 23)
310 self.pest_reply("Usage: UNKEY <KEY>")
311 else:
312 try:
313 self.server.state.remove_key(arguments[0])
314 self.state.remove_key(arguments[0])
315 self.pest_reply("removed key: %s" % arguments[0])
316 except Exception, e:
317 self.pest_reply("Error attempting to remove key")
318 self.server.print_debug(e)
319 logging.debug(e)
320
321 def at_handler():
322 if len(arguments) == 0:
323 at = self.server.state.get_at()
324 at = self.state.get_at()
325 elif len(arguments) == 1:
326 handle = arguments[0]
327 at = self.server.state.get_at(handle)
328 at = self.state.get_at(handle)
329 elif len(arguments) == 2:
330 try:
331 handle, address = arguments
332 address_ip, port = string.split(address, ":")
333 self.server.state.update_address_table({"handle": handle,
334 self.state.update_at({"handle": handle,
335 "address": address_ip,
336 "port": port},
337 False)
(552 . 7)(515 . 7)
339 except Exception as ex:
340 self.pest_reply("Error attempting to update address table")
341 stack = traceback.format_exc()
342 print(stack)
343 logger.debug(stack)
344 return
345 elif len(arguments) > 2:
346 self.pest_reply("Usage: AT [<HANDLE>] [<ADDRESS>]")
(599 . 12)(562 . 12)
348 except KeyError:
349 self.reply("421 %s %s :Unknown command" % (self.nickname, command))
350 stack = traceback.format_exc()
351 print(stack)
352 logger.debug(stack)
353
354 def socket_readable_notification(self):
355 try:
356 data = self.socket.recv(2 ** 10)
357 self.server.print_debug(
358 logging.debug(
359 "[%s:%d] -> %r" % (self.host, self.port, data))
360 quitmsg = "EOT"
361 except socket.error as x:
(621 . 7)(584 . 7)
363 def socket_writable_notification(self):
364 try:
365 sent = self.socket.send(self.__writebuffer)
366 self.server.print_debug(
367 logging.debug(
368 "[%s:%d] <- %r" % (
369 self.host, self.port, self.__writebuffer[:sent]))
370 self.__writebuffer = self.__writebuffer[sent:]
(630 . 7)(593 . 7)
372
373 def disconnect(self, quitmsg):
374 self.message("ERROR :%s" % quitmsg)
375 self.server.print_info(
376 logging.info(
377 "Disconnected connection from %s:%s (%s)." % (
378 self.host, self.port, quitmsg))
379 self.socket.close()
(654 . 31)(617 . 8)
381
382 def message_channel(self, channel, command, message, include_self=False):
383 line = ":%s %s %s" % (self.prefix, command, message)
384 for client in channel.members:
385 if client != self or include_self:
386 client.message(line)
387 # send the channel message to peers as well
388 self.server.peer_message(
389 Message(
390 {
391 "speaker": self.nickname,
392 "command": BROADCAST,
393 "bounces": 0,
394 "body": line
395 }, self.server))
396
397 def channel_log(self, channel, message, meta=False):
398 if not self.server.logdir:
399 return
400 if meta:
401 format = "[%s] * %s %s\n"
402 else:
403 format = "[%s] <%s> %s\n"
404 timestamp = datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S UTC")
405 logname = channel.name.replace("_", "__").replace("/", "_")
406 fp = open("%s/%s.log" % (self.server.logdir, logname), "a")
407 fp.write(format % (timestamp, self.nickname, message))
408 fp.close()
409 if include_self:
410 self.message(line)
411
412 def message_related(self, msg, include_self=False):
413 clients = set()
(691 . 10)(631 . 6)
415 for client in clients:
416 client.message(msg)
417
418 def send_lusers(self):
419 self.reply("251 %s :There are %d users and 0 services on 1 server"
420 % (self.nickname, len(self.server.clients)))
421
422 def send_motd(self):
423 server = self.server
424 motdlines = server.get_motd_lines()
(16 . 6)(16 . 7)- 0096D80E9D0C52787F1AD8C43D6B392C5E5434DFD04F62F41361365D858EA8F92B92F901CE4165E2B4FB4079B7AA9E857CF4296DEE3EB48759CF63120F3975C5
429 import random
430 import os
431 import pprint
432 import logging
433 pp = pprint.PrettyPrinter(indent=4)
434
435 PACKET_SIZE = 496
(34 . 32)(35 . 66)
437 IGNORED = 4
438
439 class Infosec(object):
440 def __init__(self, server=None):
441 self.server = server
442 def __init__(self, state=None):
443 self.state = state
444
445 def get_message_bytes(self, message, peer=None):
446 try:
447 timestamp = message.timestamp
448 except:
449 timestamp = None
450 command = message.command
451 speaker = self._pad(message.speaker, MAX_SPEAKER_SIZE)
452 def message(self, message):
453 # if we are not rebroadcasting we need to set the timestamp
454
455 # if we are rebroadcasting we need to use the original timestamp
456 if message.timestamp == None:
457 message.original = True
458 message.timestamp = int(time.time())
459 else:
460 message.original = False
461
462 target_peer = (self.state.get_peer_by_handle(message.handle)
463 if message.command == DIRECT
464 else None)
465
466 if target_peer and not target_peer.get_key():
467 logging.debug("No key for peer associated with %s" % message.handle)
468 return
469
470 if message.command == DIRECT and target_peer == None:
471 logging.debug("Aborting message: unknown handle: %s" % message.handle)
472 return
473
474 message_bytes = self.get_message_bytes(message, target_peer)
475 if message.command != IGNORE:
476 message_hash = binascii.hexlify(hashlib.sha256(message_bytes).digest())
477 logging.debug("generated message_hash: %s" % message_hash)
478 self.state.add_to_dedup_queue(message_hash)
479 self.state.log(message.speaker, message_bytes, target_peer)
480
481 if message.command == DIRECT:
482 signed_packet_bytes = self.pack(target_peer, message, message_bytes)
483 target_peer.send(signed_packet_bytes)
484 elif message.command == BROADCAST or message.command == IGNORE:
485 for peer in self.state.get_keyed_peers():
486
487 # we don't want to send a broadcast back to the originator
488
489 if(timestamp == None):
490 int_ts = int(time.time())
491 if message.peer and (peer.peer_id == message.peer.peer_id):
492 next
493
494 signed_packet_bytes = self.pack(peer, message, message_bytes)
495 peer.send(signed_packet_bytes)
496 else:
497 int_ts = timestamp
498 pass
499
500 def get_message_bytes(self, message, peer=None):
501 timestamp = message.timestamp
502 command = message.command
503 speaker = self._pad(message.speaker, MAX_SPEAKER_SIZE)
504
505 # let's generate the self_chain value from the last message or set it to zero if
506 # there this is the first message
507
508 if message.original:
509 if command == DIRECT:
510 self_chain = self.server.state.get_last_message_hash(message.speaker, peer.peer_id)
511 self_chain = self.state.get_last_message_hash(message.speaker, peer.peer_id)
512 elif command == BROADCAST:
513 self_chain = self.server.state.get_last_message_hash(message.speaker)
514 self_chain = self.state.get_last_message_hash(message.speaker)
515 elif command == IGNORE:
516 self_chain = "\x00" * 32
517 net_chain = "\x00" * 32
(69 . 16)(104 . 19)
519
520 # pack message bytes
521
522 message_bytes = struct.pack(MESSAGE_PACKET_FORMAT, int_ts, self_chain, net_chain, speaker, message.body)
523 if message.command != IGNORE:
524 logging.debug("packing message bytes: %s" % message.body)
525 else:
526 logging.debug("packing rubbish message bytes: %s" % binascii.hexlify(message.body))
527
528 message_bytes = struct.pack(MESSAGE_PACKET_FORMAT, message.timestamp, self_chain, net_chain, speaker, message.body)
529 return message_bytes
530
531 def pack(self, peer, message):
532 def pack(self, peer, message, message_bytes):
533 key_bytes = base64.b64decode(peer.get_key())
534 signing_key = key_bytes[:32]
535 cipher_key = key_bytes[32:]
536
537 message_bytes = self.get_message_bytes(message, peer)
538
539 # pack packet bytes
540
541 nonce = self._generate_nonce(16)
(111 . 15)(149 . 15)
543 try:
544 black_packet_bytes, signature_bytes = struct.unpack(BLACK_PACKET_FORMAT, black_packet)
545 except:
546 self.server.print_error("Discarding malformed black packet from %s" % peer.get_key())
547 return Message({ "error_code": MALFORMED_PACKET }, self.server)
548 logging.error("Discarding malformed black packet from %s" % peer.get_key())
549 return Message({ "error_code": MALFORMED_PACKET })
550
551 # check signature
552
553 signature_check_bytes = hmac.new(signing_key, black_packet_bytes, hashlib.sha384).digest()
554
555 if(signature_check_bytes != signature_bytes):
556 return Message({ "error_code": INVALID_SIGNATURE }, self.server)
557 return Message({ "error_code": INVALID_SIGNATURE })
558
559 # try to decrypt black packet
560
(130 . 10)(168 . 27)
562
563 nonce, bounces, version, command, message_bytes = struct.unpack(RED_PACKET_FORMAT, red_packet_bytes)
564
565 # compute message_hash
566
567 message_hash = binascii.hexlify(hashlib.sha256(message_bytes).digest())
568
569 # unpack message
570
571 int_ts, self_chain, net_chain, speaker, message = struct.unpack(MESSAGE_PACKET_FORMAT, message_bytes)
572 speaker = speaker.strip()
573 int_ts, self_chain, net_chain, speaker, body = struct.unpack(MESSAGE_PACKET_FORMAT, message_bytes)
574
575 # remove padding from speaker
576
577 for index, byte in enumerate(speaker):
578 if byte == '\x00':
579 speaker = speaker[0:index]
580 break
581
582 # remove padding from body
583
584 for index, byte in enumerate(body):
585 if byte == '\x00':
586 body = body[0:index]
587 break
588
589 # nothing to be done for an IGNORE command
590
(143 . 39)(198 . 26)
592 # check timestamp
593
594 if(int_ts not in self._ts_range()):
595 return Message({ "error_code": STALE_PACKET }, self.server)
596
597 # check for duplicates
598
599 message_hash = binascii.hexlify(hashlib.sha256(message_bytes).digest())
600 if(self.server.state.is_duplicate_message(message_hash)):
601 return Message({ "error_code": DUPLICATE_PACKET }, self.server)
602 else:
603 self.server.state.add_to_dedup_queue(message_hash)
604 return Message({ "error_code": STALE_PACKET })
605
606 # check self_chain
607
608 if command == DIRECT:
609 self_chain_check = self.server.state.get_last_message_hash(speaker, peer.peer_id)
610 self_chain_check = self.state.get_last_message_hash(speaker, peer.peer_id)
611 elif command == BROADCAST:
612 self_chain_check = self.server.state.get_last_message_hash(speaker)
613 self_chain_check = self.state.get_last_message_hash(speaker)
614
615 self_chain_valid = (self_chain_check == self_chain)
616
617 # log this message for use in the self_chain check
618
619 self.server.state.log(speaker, message_bytes, peer.peer_id if (command == DIRECT) else None)
620 self.state.log(speaker, message_bytes, peer if (command == DIRECT) else None)
621
622 # remove padding from message bytes
623
624 for index, byte in enumerate(message):
625 if binascii.hexlify(byte) == "00":
626 unpadded_message = message[0:index]
627 break
628 # build message object
629
630 return Message({
631 message = Message({
632 "peer": peer,
633 "body": unpadded_message.rstrip(),
634 "body": body.rstrip(),
635 "timestamp": int_ts,
636 "command": command,
637 "speaker": speaker,
(183 . 12)(225 . 19)
639 "self_chain": self_chain,
640 "net_chain": net_chain,
641 "self_chain_valid": self_chain_valid,
642 "error_code": None
643 },
644 self.server)
645 "message_hash": message_hash
646 })
647
648 # check for duplicates
649
650 if(self.state.is_duplicate_message(message_hash)):
651 message.error_code = DUPLICATE_PACKET
652 return message
653
654 return message
655
656 def _pad(self, text, size):
657 return text.ljust(size)
658 return text.ljust(size, "\x00")
659
660 def _ts_range(self):
661 current_ts = int(time.time())
(1 . 7)(1 . 7)- E763BB836EBA69AEDEBD4D4ADFDD8820E1A173F16E1FD493DDD16BF6D41718155C20EE38073570D2AEB0144EA281B6FBCC5DDDE48D5CA60CB01A0B08103DD1F5
666 class Message(object):
667 def __init__(self, message, server=None):
668 def __init__(self, message):
669 self.original = True
670 self.server = server
671 self.prefix = None
672 self.handle = message.get("handle")
673 self.peer = message.get("peer")
674 self.body = message.get("body")
(13 . 5)(13 . 4)
676 self.net_chain = message.get("net_chain")
677 self.self_chain_valid = message.get("self_chain_valid")
678 self.error_code = message.get("error_code")
679 if server:
680 self.state = server.state
681 self.message_hash = message.get("message_hash")
(1 . 20)(1 . 21)- 16E7971B6EAB7483A4060D5CAE5111DEC2F61618A2022620343EF7AA3FCEDEE87CC6499C9F9978215C315FDE958E70FA7810F50967E97DD299CD98842118C12D
686 import socket
687 from infosec import Infosec
688 from commands import IGNORE
689 from commands import DIRECT
690 from commands import BROADCAST
691
692 import sys
693 import binascii
694 import traceback
695 import logging
696
697 class Peer(object):
698 def __init__(self, server, peer_entry):
699 def __init__(self, socket, peer_entry):
700 self.handles = peer_entry["handles"]
701 self.keys = peer_entry["keys"]
702 self.peer_id = peer_entry["peer_id"]
703 self.server = server
704 self.address = peer_entry["address"]
705 self.port = peer_entry["port"]
706 self.socket = self.server.udp_server_socket
707 self.infosec = Infosec(server)
708 self.socket = socket
709
710 def get_key(self):
711 if len(self.keys) > 0:
(22 . 16)(23 . 16)
713 else:
714 return None
715
716 def send(self, msg):
717 try:
718 if msg.command != IGNORE:
719 self.server.print_debug("packing message: %s" % msg.body)
720 signed_packet_bytes = self.infosec.pack(self, msg)
721 self.socket.sendto(signed_packet_bytes, (self.address, self.port))
722 self.server.print_debug("[%s:%d] <- %s" % (self.address,
723 self.port,
724 binascii.hexlify(signed_packet_bytes)[0:16]))
725 def send(self, signed_packet_bytes):
726 if self.get_key() != None:
727 try:
728 self.socket.sendto(signed_packet_bytes, (self.address, self.port))
729 logging.debug("[%s:%d] <- %s" % (self.address,
730 self.port,
731 binascii.hexlify(signed_packet_bytes)[0:16]))
732
733 except Exception as ex:
734 stack = traceback.format_exc()
735 print(stack)
736 except Exception as ex:
737 stack = traceback.format_exc()
738 logging.debug(stack)
739 else:
740 logging.debug("Discarding message to unknown handle or handle with no key: %s" % message.handle)
(1 . 4)(1 . 4)- ACD5EAFFDBA356D5B2B2E0CE494E3BE8AED35CCF0B96F9605BFD73FD3F758286F1908D043274A5480AC02C2D270550E1B061B32C0856E521A4EABA2F9F6B29F3
745 VERSION = "9988"
746 VERSION = "9987"
747
748 import os
749 import select
(8 . 29)(8 . 18)
751 import tempfile
752 import time
753 import string
754 import binascii
755 import hashlib
756 import datetime
757 import sqlite3
758 from datetime import datetime
759 from funcs import *
760 from lib.client import Client
761 from lib.state import State
762 from lib.channel import Channel
763 from lib.infosec import PACKET_SIZE
764 from lib.infosec import MAX_BOUNCES
765 from lib.infosec import STALE_PACKET
766 from lib.infosec import DUPLICATE_PACKET
767 from lib.infosec import MALFORMED_PACKET
768 from lib.infosec import INVALID_SIGNATURE
769 from lib.infosec import IGNORED
770 from lib.infosec import Infosec
771 from lib.peer import Peer
772 from lib.station import Station
773 from lib.message import Message
774 from funcs import *
775 from commands import BROADCAST
776 from commands import DIRECT
777 from commands import IGNORE
778 from lib.infosec import PACKET_SIZE
779 import imp
780 import pprint
781 import logging
782
783 class Server(object):
784 def __init__(self, options):
(40 . 18)(29 . 14)
786 self.password = options.password
787 self.motdfile = options.motd
788 self.verbose = options.verbose
789 self.debug = options.debug
790 self.logdir = options.logdir
791 self.chroot = options.chroot
792 self.setuid = options.setuid
793 self.statedir = options.statedir
794 self.infosec = Infosec(self)
795 self.config_file_path = options.config_file_path
796 self.state = State(self, options.db_path)
797 self.pp = pprint.PrettyPrinter(indent=4)
798
799 if options.address_table_path != None:
800 self.state.import_at_and_wot(options.address_table_path)
801 self.db_path = options.db_path
802 self.address_table_path = options.address_table_path
803
804 if options.listen:
805 self.address = socket.gethostbyname(options.listen)
(61 . 8)(46 . 9)
807 self.name = socket.getfqdn(self.address)[:server_name_limit]
808
809 self.channels = {} # irc_lower(Channel name) --> Channel instance.
810 self.clients = {} # Socket --> Client instance..peers = ""
811 self.client = None
812 self.nicknames = {} # irc_lower(Nickname) --> Client instance.
813
814 if self.logdir:
815 create_directory(self.logdir)
816 if self.statedir:
(79 . 7)(65 . 7)
818 try:
819 pid = os.fork()
820 if pid > 0:
821 self.print_info("PID: %d" % pid)
822 logging.info("PID: %d" % pid)
823 sys.exit(0)
824 except OSError:
825 sys.exit(1)
(113 . 19)(99 . 6)
827 else:
828 return []
829
830 def print_info(self, msg):
831 if self.verbose:
832 print(msg)
833 sys.stdout.flush()
834
835 def print_debug(self, msg):
836 if self.debug:
837 print("%s %s" % (datetime.now(), msg))
838 sys.stdout.flush()
839
840 def print_error(self, msg):
841 sys.stderr.write("%s\n" % msg)
842
843 def client_changed_nickname(self, client, oldnickname):
844 if oldnickname:
845 del self.nicknames[irc_lower(oldnickname)]
(139 . 118)(112 . 26)
847 def remove_client(self, client, quitmsg):
848 client.message_related(":%s QUIT :%s" % (client.prefix, quitmsg))
849 for x in client.channels.values():
850 client.channel_log(x, "quit (%s)" % quitmsg, meta=True)
851 x.remove_client(client)
852 if client.nickname \
853 and irc_lower(client.nickname) in self.nicknames:
854 del self.nicknames[irc_lower(client.nickname)]
855 del self.clients[client.socket]
856 self.client = None
857
858 def remove_channel(self, channel):
859 del self.channels[irc_lower(channel.name)]
860
861 def handle_udp_data(self, bytes_address_pair):
862 data = bytes_address_pair[0]
863 address = bytes_address_pair[1]
864 packet_info = (address[0],
865 address[1],
866 binascii.hexlify(data)[0:16])
867 self.print_debug("[%s:%d] -> %s" % packet_info)
868 for peer in self.state.get_peers():
869 if peer.get_key() != None:
870 message = self.infosec.unpack(peer, data)
871 error_code = message.error_code
872 if(error_code == None):
873 self.print_debug("[%s] -> %s" % (peer.handles[0], message.body))
874
875 self.conditionally_update_address_table(peer, message, address)
876 # send the message to all clients
877 for c in self.clients:
878 if (self.clients[c].is_addressed_to_me(message.body)):
879 self.clients[c].message(message.body)
880 # send the message to all other peers if it should be propagated
881 if(message.command == BROADCAST) and message.bounces < MAX_BOUNCES:
882 self.rebroadcast(peer, message)
883 return
884 elif error_code == STALE_PACKET:
885 self.print_debug("[%s:%d] -> stale packet: %s" % packet_info)
886 return
887 elif error_code == DUPLICATE_PACKET:
888 self.print_debug("[%s:%d] -> duplicate packet: %s" % packet_info)
889 return
890 elif error_code == MALFORMED_PACKET:
891 self.print_debug("[%s:%d] -> malformed packet: %s" % packet_info)
892 return
893 elif error_code == IGNORED:
894 self.conditionally_update_address_table(peer, message, address)
895 self.print_debug("[%s:%d] -> ignoring packet: %s" % packet_info)
896 return
897 elif error_code == INVALID_SIGNATURE:
898 pass
899 self.print_debug("[%s:%d] -> martian packet: %s" % packet_info)
900
901 # we only update the address table if the speaker is same as peer
902
903 def conditionally_update_address_table(self, peer, message, address):
904 try:
905 idx = peer.handles.index(message.speaker)
906 except:
907 idx = None
908
909 if idx != None:
910 self.state.update_address_table({"handle": message.speaker,
911 "address": address[0],
912 "port": address[1]
913 })
914 def peer_message(self, message):
915 message.original = True
916 if message.command == DIRECT:
917 peer = self.state.get_peer_by_handle(message.handle)
918 message_bytes = self.infosec.get_message_bytes(message, peer)
919 message_hash = binascii.hexlify(hashlib.sha256(message_bytes).digest())
920 self.state.add_to_dedup_queue(message_hash)
921
922 self.state.log(message.speaker, message_bytes, peer.peer_id)
923 if peer and (peer.get_key() != None):
924 peer.send(message)
925 else:
926 self.print_debug("Discarding message to unknown handle or handle with no key: %s" % message.handle)
927 else:
928 message.timestamp = int(time.time())
929 message_bytes = self.infosec.get_message_bytes(message)
930 if message.command != IGNORE:
931 self.state.log(message.speaker, message_bytes)
932 message_hash = binascii.hexlify(hashlib.sha256(message_bytes).digest())
933 self.state.add_to_dedup_queue(message_hash)
934 for peer in self.state.get_peers():
935 if peer.get_key() != None:
936 peer.send(message)
937 else:
938 self.print_debug("Discarding message to handle with no key: %s" % message.handle)
939
940 def rebroadcast(self, source_peer, message):
941 message.original = False
942 for peer in self.state.get_peers():
943 if(peer.peer_id != source_peer.peer_id):
944 message.command = BROADCAST
945 message.bounces = message.bounces + 1
946 peer.send(message)
947
948
949 def sendrubbish(self):
950 for socket in self.clients:
951 self.peer_message(Message({
952 "speaker": self.clients[socket].nickname,
953 "command": IGNORE,
954 "bounces": 0,
955 "body": self.infosec.gen_rubbish_body()
956 }, self))
957
958 def start(self):
959 # Setup UDP first
960 self.udp_server_socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
961 self.udp_server_socket.bind((self.address, self.udp_port))
962 self.print_info("Listening for Pest packets on udp port %d." % self.udp_port)
963 self.station = Station({ "socket": self.udp_server_socket,
964 "db_path": self.db_path,
965 "address_table_path": self.address_table_path
966 })
967 self.station.start_embargo_queue_checking()
968 self.station.start_rubbish()
969 logging.info("Listening for Pest packets on udp port %d." % self.udp_port)
970
971 serversockets = []
972 for port in self.irc_ports:
(259 . 51)(140 . 55)
974 try:
975 s.bind((self.address, port))
976 except socket.error as e:
977 self.print_error("Could not bind port %s: %s." % (port, e))
978 logging.error("Could not bind port %s: %s." % (port, e))
979 sys.exit(1)
980 s.listen(5)
981 serversockets.append(s)
982 del s
983 self.print_info("Listening for IRC connections on port %d." % port)
984 logging.info("Listening for IRC connections on port %d." % port)
985 if self.chroot:
986 os.chdir(self.chroot)
987 os.chroot(self.chroot)
988 self.print_info("Changed root directory to %s" % self.chroot)
989 logging.info("Changed root directory to %s" % self.chroot)
990 if self.setuid:
991 os.setgid(self.setuid[1])
992 os.setuid(self.setuid[0])
993 self.print_info("Setting uid:gid to %s:%s"
994 logging.info("Setting uid:gid to %s:%s"
995 % (self.setuid[0], self.setuid[1]))
996 last_aliveness_check = time.time()
997 while True:
998 (inputready,outputready,exceptready) = select.select([self.udp_server_socket],[],[],0)
999 (iwtd, owtd, ewtd) = select.select(
1000 serversockets + [x.socket for x in self.clients.values()],
1001 [x.socket for x in self.clients.values()
1002 if x.write_queue_size() > 0],
1003 serversockets + ([self.client.socket] if self.client else []),
1004 [self.client.socket] if self.client and self.client.write_queue_size() > 0 else [],
1005 [],
1006 .2)
1007 for x in inputready:
1008 if x == self.udp_server_socket:
1009 bytes_address_pair = self.udp_server_socket.recvfrom(PACKET_SIZE)
1010 self.handle_udp_data(bytes_address_pair)
1011 if x == self.udp_server_socket:
1012 bytes_address_pair = self.udp_server_socket.recvfrom(PACKET_SIZE)
1013 self.station.embargo_queue_lock.acquire()
1014 try:
1015 self.station.handle_udp_data(bytes_address_pair)
1016 except sqlite3.ProgrammingError as ex:
1017 logging.error("sqlite3 concurrency problem")
1018 self.station.embargo_queue_lock.release()
1019 for x in iwtd:
1020 if x in self.clients:
1021 self.clients[x].socket_readable_notification()
1022 if self.client != None:
1023 self.client.socket_readable_notification()
1024 else:
1025 (conn, addr) = x.accept()
1026 self.clients[conn] = Client(self, conn)
1027 self.print_info("Accepted connection from %s:%s." % (
1028 self.client = Client(self, conn)
1029 self.station.client = self.client
1030 logging.info("Accepted connection from %s:%s." % (
1031 addr[0], addr[1]))
1032 for x in owtd:
1033 if x in self.clients: # client may have been disconnected
1034 self.clients[x].socket_writable_notification()
1035 if self.client and x == self.client.socket: # client may have been disconnected
1036 self.client.socket_writable_notification()
1037 now = time.time()
1038 if last_aliveness_check + 10 < now:
1039 for client in self.clients.values():
1040 client.check_aliveness()
1041 last_aliveness_check = now
1042 self.sendrubbish() # Kludge to keep ephemeral port open when NATed
1043 if self.client:
1044 self.client.check_aliveness()
1045 last_aliveness_check = now
1046
1047 def create_directory(path):
1048 if not os.path.isdir(path):
(2 . 42)(2 . 53)-
1053 import sqlite3
1054 import imp
1055 import hashlib
1056 import logging
1057 from itertools import chain
1058
1059 class State(object):
1060
1061 def __init__(self, server, db_path):
1062 self.server = server
1063 self.conn = sqlite3.connect(db_path)
1064 self.cursor = self.conn.cursor()
1065 self.cursor.execute("create table if not exists at(handle_id integer,\
1066 address text not null,\
1067 port integer not null,\
1068 active_at datetime default null,\
1069 updated_at datetime default current_timestamp,\
1070 unique(handle_id, address, port))")
1071
1072 self.cursor.execute("create table if not exists wot(peer_id integer primary key)")
1073
1074 self.cursor.execute("create table if not exists handles(handle_id integer primary key,\
1075 peer_id integer,\
1076 handle text,\
1077 unique(handle))")
1078
1079 self.cursor.execute("create table if not exists keys(peer_id intenger,\
1080 key text,\
1081 used_at datetime default current_timestamp,\
1082 unique(key))")
1083
1084 self.cursor.execute("create table if not exists logs(\
1085 handle text not null,\
1086 peer_id integer,\
1087 message_bytes blob not null,\
1088 created_at datetime default current_timestamp)")
1089
1090 self.cursor.execute("create table if not exists dedup_queue(\
1091 hash text not null,\
1092 created_at datetime default current_timestamp)")
1093 __instance = None
1094 @staticmethod
1095 def get_instance(socket=None, db_path=None):
1096 if State.__instance == None:
1097 State(socket, db_path)
1098 return State.__instance
1099
1100 def __init__(self, socket, db_path):
1101 if State.__instance != None:
1102 raise Exception("This class is a singleton")
1103 else:
1104 self.socket = socket
1105 self.conn = sqlite3.connect(db_path, check_same_thread=False)
1106 self.cursor = self.conn.cursor()
1107 self.cursor.execute("create table if not exists at(handle_id integer,\
1108 address text not null,\
1109 port integer not null,\
1110 active_at datetime default null,\
1111 updated_at datetime default current_timestamp,\
1112 unique(handle_id, address, port))")
1113
1114 self.cursor.execute("create table if not exists wot(peer_id integer primary key)")
1115
1116 self.cursor.execute("create table if not exists handles(handle_id integer primary key,\
1117 peer_id integer,\
1118 handle text,\
1119 unique(handle))")
1120
1121 self.cursor.execute("create table if not exists keys(peer_id intenger,\
1122 key text,\
1123 used_at datetime default current_timestamp,\
1124 unique(key))")
1125
1126 self.cursor.execute("create table if not exists logs(\
1127 handle text not null,\
1128 peer_id integer,\
1129 message_bytes blob not null,\
1130 created_at datetime default current_timestamp)")
1131
1132 self.cursor.execute("create table if not exists dedup_queue(\
1133 hash text not null,\
1134 created_at datetime default current_timestamp)")
1135 State.__instance = self
1136
1137 def get_at(self, handle=None):
1138 at = []
(60 . 7)(71 . 7)
1140 (handle_id,)).fetchone()[0]
1141 at.append({"handle": h,
1142 "address": "%s:%s" % (address, port),
1143 "active_at": updated_at})
1144 "active_at": updated_at if updated_at else "no packets received from this address"})
1145 return at
1146
1147
(69 . 6)(80 . 7)
1149 self.conn.commit()
1150 result = self.cursor.execute("select hash from dedup_queue where hash=?",
1151 (message_hash,)).fetchone()
1152 logging.debug("checking if %s is dupe" % message_hash)
1153 if(result != None):
1154 return True
1155 else:
(78 . 6)(90 . 7)
1157 self.cursor.execute("insert into dedup_queue(hash)\
1158 values(?)",
1159 (message_hash,))
1160 logging.debug("added %s to dedup" % message_hash)
1161 self.conn.commit()
1162
1163 def get_last_message_hash(self, handle, peer_id=None):
(96 . 9)(109 . 14)
1165 if message_bytes:
1166 return hashlib.sha256(message_bytes[0][:]).digest()
1167 else:
1168 return "0" * 32
1169 return "\x00" * 32
1170
1171 def log(self, handle, message_bytes, peer=None):
1172 if peer != None:
1173 peer_id = peer.peer_id
1174 else:
1175 peer_id = None
1176
1177 def log(self, handle, message_bytes, peer_id=None):
1178 self.cursor.execute("insert into logs(handle, peer_id, message_bytes)\
1179 values(?, ?, ?)",
1180 (handle, peer_id, buffer(message_bytes)))
(124 . 7)(142 . 7)
1182
1183 self.conn.commit()
1184
1185 def update_address_table(self, peer, set_active_at=True):
1186 def update_at(self, peer, set_active_at=True):
1187 row = self.cursor.execute("select handle_id from handles where handle=?",
1188 (peer["handle"],)).fetchone()
1189 if row != None:
(196 . 7)(214 . 7)
1191 (peer_id,)).fetchall()))
1192
1193 def get_peer_handles(self):
1194 handles = list(chain.from_iterable(self.cursor.execute("select handle from handles").fetchall()))
1195 handles = self.listify(self.cursor.execute("select handle from handles").fetchall())
1196 return handles
1197
1198 def get_peers(self):
(209 . 6)(227 . 20)
1200 peers.append(peer)
1201 return peers
1202
1203 def listify(self, results):
1204 return list(chain.from_iterable(results))
1205
1206 def get_keyed_peers(self):
1207 peer_ids = self.listify(self.cursor.execute("select peer_id from keys").fetchall())
1208 peers = []
1209 for peer_id in peer_ids:
1210 handle = self.cursor.execute("select handle from handles where peer_id=?", (peer_id,)).fetchone()[0]
1211 peer = self.get_peer_by_handle(handle)
1212 if not (self.is_duplicate(peers, peer)):
1213 peers.append(peer)
1214 return peers
1215
1216
1217 def get_peer_by_handle(self, handle):
1218 handle_info = self.cursor.execute("select handle_id, peer_id from handles where handle=?",
1219 (handle,)).fetchone()
(219 . 18)(251 . 19)
1221 address = self.cursor.execute("select address, port from at where handle_id=?\
1222 order by updated_at desc limit 1",
1223 (handle_info[0],)).fetchone()
1224 handles = list(chain.from_iterable(self.cursor.execute("select handle from handles where peer_id=?",
1225 (handle_info[1],)).fetchall()))
1226 keys = list(chain.from_iterable(self.cursor.execute("select key from keys where peer_id=?\
1227 handles = self.listify(self.cursor.execute("select handle from handles where peer_id=?",
1228 (handle_info[1],)).fetchall())
1229 keys = self.listify(self.cursor.execute("select key from keys where peer_id=?\
1230 order by used_at desc",
1231 (handle_info[1],)).fetchall()))
1232 return Peer(self.server, {
1233 (handle_info[1],)).fetchall())
1234 return Peer(self.socket, {
1235 "handles": handles,
1236 "peer_id": handle_info[1],
1237 "address": address[0] if address else "",
1238 "port": address[1] if address else "",
1239 "keys": keys
1240 })
1241
1242 def is_duplicate(self, peers, peer):
1243 for existing_peer in peers:
1244 if existing_peer.address == peer.address and existing_peer.port == peer.port:
(0 . 0)(1 . 196)
1249 import time
1250 import threading
1251 import binascii
1252 import logging
1253 import os
1254 from lib.state import State
1255 from lib.infosec import MAX_BOUNCES
1256 from lib.infosec import STALE_PACKET
1257 from lib.infosec import DUPLICATE_PACKET
1258 from lib.infosec import MALFORMED_PACKET
1259 from lib.infosec import INVALID_SIGNATURE
1260 from lib.infosec import IGNORED
1261 from lib.infosec import Infosec
1262 from commands import IGNORE
1263 from lib.message import Message
1264 from commands import BROADCAST
1265 from commands import DIRECT
1266 from lib.peer import Peer
1267
1268 RUBBISH_INTERVAL = 10
1269
1270 class Station(object):
1271 def __init__(self, options):
1272 self.client = None
1273 self.state = State.get_instance(options["socket"], options["db_path"])
1274 if options.get("address_table_path") != None:
1275 self.state.import_at_and_wot(options.get("address_table_path"))
1276 self.infosec = Infosec(self.state)
1277 self.embargo_queue = {}
1278 self.embargo_queue_lock = threading.Lock()
1279
1280 def start_embargo_queue_checking(self):
1281 threading.Thread(target=self.check_embargo_queue).start()
1282
1283 def start_rubbish(self):
1284 pass
1285 threading.Thread(target=self.send_rubbish).start()
1286
1287 def handle_udp_data(self, bytes_address_pair):
1288 data = bytes_address_pair[0]
1289 address = bytes_address_pair[1]
1290 packet_info = (address[0],
1291 address[1],
1292 binascii.hexlify(data)[0:16])
1293 logging.debug("[%s:%d] -> %s" % packet_info)
1294 for peer in self.state.get_keyed_peers():
1295 message = self.infosec.unpack(peer, data)
1296 error_code = message.error_code
1297 if(error_code == None):
1298 logging.debug("%s(%s) -> %s bounces: %d" % (message.speaker, peer.handles[0], message.body, message.bounces))
1299 self.conditionally_update_at(peer, message, address)
1300
1301 # if this is a direct message, just deliver it and return
1302 if message.command == DIRECT:
1303 self.deliver(message)
1304 return
1305
1306 # if the speaker is in our wot, we need to check if the message is hearsay
1307 if message.speaker in self.state.get_peer_handles():
1308 self.embargo(message)
1309 return
1310
1311 else:
1312 # skip the embargo and deliver this message with appropriate simple hearsay labeling
1313 message.prefix = "%s[%s]" % (message.speaker, peer.handles[0])
1314 self.deliver(message)
1315 return
1316 elif error_code == STALE_PACKET:
1317 logging.debug("[%s:%d] -> stale packet: %s" % packet_info)
1318 return
1319 elif error_code == DUPLICATE_PACKET:
1320 logging.debug("[%s:%d] -> duplicate packet: %s" % packet_info)
1321 return
1322 elif error_code == MALFORMED_PACKET:
1323 logging.debug("[%s:%d] -> malformed packet: %s" % packet_info)
1324 return
1325 elif error_code == IGNORED:
1326 self.conditionally_update_at(peer, message, address)
1327 logging.debug("[%s:%d] -> ignoring packet: %s" % packet_info)
1328 return
1329 elif error_code == INVALID_SIGNATURE:
1330 pass
1331 logging.debug("[%s:%d] -> martian packet: %s" % packet_info)
1332
1333 def deliver(self, message):
1334 # add to duplicate queue
1335 self.state.add_to_dedup_queue(message.message_hash)
1336
1337 # send to the irc client
1338 if self.client:
1339 self.client.message_from_station(message)
1340
1341 def embargo(self, message):
1342 # initialize the key/value to empty array if not in the hash
1343 # append message to array
1344 if not message.message_hash in self.embargo_queue.keys():
1345 self.embargo_queue[message.message_hash] = []
1346 self.embargo_queue[message.message_hash].append(message)
1347
1348 def check_embargo_queue(self):
1349 # get a lock so other threads can't mess with the db or the queue
1350 self.embargo_queue_lock.acquire()
1351 self.check_for_immediate_messages()
1352 self.flush_hearsay_messages()
1353
1354 # release the lock
1355 self.embargo_queue_lock.release()
1356
1357 # continue the thread loop after interval
1358 time.sleep(1)
1359 threading.Thread(target=self.check_embargo_queue).start()
1360
1361 def check_for_immediate_messages(self):
1362 for key in dict(self.embargo_queue).keys():
1363 messages = self.embargo_queue[key]
1364
1365 for message in messages:
1366
1367 # if this is an immediate copy of the message
1368
1369 if message.speaker in message.peer.handles:
1370
1371 # clear the queue and deliver
1372
1373 self.embargo_queue.pop(key, None)
1374 self.deliver(message)
1375 self.rebroadcast(message)
1376 break
1377
1378
1379 def flush_hearsay_messages(self):
1380 # if we made it this far either we haven't found any immediate messages
1381 # or we sent them all so we must deliver the remaining hearsay messages
1382 # with the appropriate labeling
1383 for key in dict(self.embargo_queue).keys():
1384
1385 # collect the source handles
1386 handles = []
1387 messages = self.embargo_queue[key]
1388 for message in messages:
1389 handles.append(message.peer.handles[0])
1390
1391 # select the message with the lowest bounce count
1392 message = sorted(messages, key=lambda m: m.bounces)[0]
1393
1394 # clear the queue
1395 self.embargo_queue.pop(key, None)
1396
1397 # compute prefix
1398 if len(messages) < 4:
1399 message.prefix = "%s[%s]" % (message.speaker, "|".join(handles))
1400 else:
1401 message.prefix = "%s[%d]" % (message.speaker, len(messages))
1402
1403 # deliver
1404 self.deliver(message)
1405
1406 # send the message to all other peers if it should be propagated
1407 self.rebroadcast(message)
1408
1409
1410 # we only update the address table if the speaker is same as peer
1411
1412 def conditionally_update_at(self, peer, message, address):
1413 if message.speaker in peer.handles:
1414 self.state.update_at({
1415 "handle": message.speaker,
1416 "address": address[0],
1417 "port": address[1]
1418 })
1419
1420 def rebroadcast(self, message):
1421 if message.bounces < MAX_BOUNCES:
1422 message.command = BROADCAST
1423 message.bounces = message.bounces + 1
1424 self.infosec.message(message)
1425 else:
1426 logging.debug("[%s:%d] -> packet TTL expired: %s" % packet_info)
1427
1428
1429 def send_rubbish(self):
1430 logging.debug("sending rubbish...")
1431 self.embargo_queue_lock.acquire()
1432 try:
1433 if self.client:
1434 self.infosec.message(Message({
1435 "speaker": self.client.nickname,
1436 "command": IGNORE,
1437 "bounces": 0,
1438 "body": self.infosec.gen_rubbish_body()
1439 }))
1440 except:
1441 logging.error("Something went wrong attempting to send rubbish")
1442 self.embargo_queue_lock.release()
1443 time.sleep(RUBBISH_INTERVAL)
1444 threading.Thread(target=self.send_rubbish).start()
- 10233FA2A74D0F92F3215B417140A9481F1263CEB7CA4486CCA97D48E9C112A36A9B66CB4F2C99A553626DEA431D6D8AE6D22735BD2535B8BDE7EA964A1F0B21(1 . 6)(1 . 6)
1449 #!/bin/bash
1450
1451 # start 3 servers on different ports
1452 ./blatta --debug --channel-name \#aleth --irc-port 6668 --udp-port 7778 --db-path a.db --address-table-path test_net_configs/a.py > logs/a &
1453 ./blatta --debug --channel-name \#aleth --irc-port 9968 --udp-port 7778 --db-path a.db --address-table-path test_net_configs/a.py > logs/a &
1454 ./blatta --debug --channel-name \#aleth --irc-port 6669 --udp-port 7779 --db-path b.db --address-table-path test_net_configs/b.py > logs/b &
1455 ./blatta --debug --channel-name \#aleth --irc-port 6670 --udp-port 7780 --db-path c.db --address-table-path test_net_configs/c.py > logs/c &
- 3276661A7529957FB3D7AAC616F26BE8DE21D436AE5092C40662BC2FCA472A6BE7E460A2D8C76286A8D84BAC1E8A8BF94B98E086C9DF5A8A9389FF8C9EFEC8B9(4 . 9)(4 . 9)
1460 'name': 'awt_b',
1461 'port': 7779
1462 },
1463 { 'address': 'localhost',
1464 'key': 'lT8/fYe/rQdReyavsTrVqInnLFCaU38o2ZAn5+r8uoFSSWgJelafikFELR9t6SJHMpFQvLmlAbF14nL2PfOAyA==',
1465 'name': 'awt_c',
1466 'port': 7780
1467 }
1468 # { 'address': 'localhost',
1469 # 'key': 'lT8/fYe/rQdReyavsTrVqInnLFCaU38o2ZAn5+r8uoFSSWgJelafikFELR9t6SJHMpFQvLmlAbF14nL2PfOAyA==',
1470 # 'name': 'awt_c',
1471 # 'port': 7780
1472 # }
1473 ]
-(0 . 0)(1 . 1)
1478 # This file can't be empty otherwise diff won't see it.
-(0 . 0)(1 . 194)
1483 # https://stackoverflow.com/questions/1896918/running-unittest-with-typical-test-directory-structure
1484 import unittest
1485 import logging
1486 from mock import Mock
1487 from mock import patch
1488
1489 from lib.station import Station
1490
1491 class TestStation(unittest.TestCase):
1492 def setUp(self):
1493 logging.basicConfig(level=logging.DEBUG)
1494 options = {
1495 "clients": {"clientsocket": Mock()},
1496 "db_path": "tests/test.db",
1497 "socket": Mock()
1498 }
1499 self.station = Station(options)
1500 self.station.deliver = Mock()
1501 self.station.rebroadcast = Mock()
1502 self.station.rebroadcast.return_value = "foobar"
1503
1504 def tearDown(self):
1505 pass
1506
1507 def test_embargo_bounce_ordering(self):
1508 peer1 = Mock()
1509 peer1.handles = ["a", "b"]
1510 peer2 = Mock()
1511 peer2.handles = ["c", "d"]
1512 low_bounce_message = Mock()
1513 low_bounce_message.peer = peer1
1514 low_bounce_message.bounces = 1
1515 low_bounce_message.message_hash = "messagehash"
1516 high_bounce_message = Mock()
1517 high_bounce_message.peer = peer2
1518 high_bounce_message.bounces = 2
1519 high_bounce_message.message_hash = "messagehash"
1520 self.station.embargo_queue = {
1521 "messagehash": [
1522 low_bounce_message,
1523 high_bounce_message
1524 ],
1525 }
1526 self.station.flush_hearsay_messages()
1527 self.station.deliver.assert_called_once_with(low_bounce_message)
1528 self.station.rebroadcast.assert_called_once_with(low_bounce_message)
1529
1530 def test_immediate_message_delivered(self):
1531 peer = Mock()
1532 peer.handles = ["a", "b"]
1533 message = Mock()
1534 message.speaker = "a"
1535 message.peer = peer
1536 self.station.embargo_queue = {
1537 "messagehash": [
1538 message
1539 ],
1540 }
1541 self.station.check_for_immediate_messages()
1542 self.station.deliver.assert_called_once_with(message)
1543 self.station.rebroadcast.assert_called_once_with(message)
1544
1545 def test_hearsay_message_not_delivered(self):
1546 peer = Mock()
1547 peer.handles = ["a", "b"]
1548 message = Mock()
1549 message.speaker = "c"
1550 message.peer = peer
1551 self.station.embargo_queue = {
1552 "messagehash": [
1553 message
1554 ],
1555 }
1556 self.station.check_for_immediate_messages()
1557 self.station.deliver.assert_not_called()
1558
1559 def test_embargo_queue_cleared(self):
1560 peer = Mock()
1561 peer.handles = ["a", "b"]
1562 message = Mock()
1563 message.speaker = "c"
1564 message.peer = peer
1565 self.station.embargo_queue = {
1566 "messagehash": [
1567 message
1568 ],
1569 }
1570 self.assertEqual(len(self.station.embargo_queue), 1)
1571 self.station.flush_hearsay_messages()
1572 self.assertEqual(len(self.station.embargo_queue), 0)
1573
1574 def test_immediate_prefix(self):
1575 peer = Mock()
1576 peer.handles = ["a", "b"]
1577 message = Mock()
1578 message.speaker = "a"
1579 message.prefix = None
1580 message.peer = peer
1581 self.station.embargo_queue = {
1582 "messagehash": [
1583 message
1584 ],
1585 }
1586 self.station.check_for_immediate_messages()
1587 self.assertEqual(message.prefix, None)
1588
1589 def test_simple_hearsay_prefix(self):
1590 peer = Mock()
1591 peer.handles = ["a", "b"]
1592 message = Mock()
1593 message.speaker = "c"
1594 message.prefix = None
1595 message.peer = peer
1596 self.station.embargo_queue = {
1597 "messagehash": [
1598 message
1599 ],
1600 }
1601 self.station.flush_hearsay_messages()
1602 self.assertEqual(message.prefix, "c[a]")
1603
1604 def test_in_wot_hearsay_prefix_under_four(self):
1605 peer1 = Mock()
1606 peer1.handles = ["a", "b"]
1607 peer2 = Mock()
1608 peer2.handles = ["d", "e"]
1609 peer3 = Mock()
1610 peer3.handles = ["f", "g"]
1611 message_via_peer1 = Mock()
1612 message_via_peer1.speaker = "c"
1613 message_via_peer1.prefix = None
1614 message_via_peer1.peer = peer1
1615 message_via_peer1.bounces = 1
1616 message_via_peer2 = Mock()
1617 message_via_peer2.speaker = "c"
1618 message_via_peer2.prefix = None
1619 message_via_peer2.peer = peer2
1620 message_via_peer2.bounces = 2
1621 message_via_peer3 = Mock()
1622 message_via_peer3.speaker = "c"
1623 message_via_peer3.prefix = None
1624 message_via_peer3.peer = peer3
1625 message_via_peer3.bounces = 1
1626 self.station.embargo_queue = {
1627 "messagehash": [
1628 message_via_peer1,
1629 message_via_peer2,
1630 message_via_peer3
1631 ],
1632 }
1633 self.station.flush_hearsay_messages()
1634 self.station.deliver.assert_called_once_with(message_via_peer1)
1635 self.assertEqual(message_via_peer1.prefix, "c[a|d|f]")
1636
1637 def test_in_wot_hearsay_prefix_more_than_three(self):
1638 peer1 = Mock()
1639 peer1.handles = ["a", "b"]
1640 peer2 = Mock()
1641 peer2.handles = ["d", "e"]
1642 peer3 = Mock()
1643 peer3.handles = ["f", "g"]
1644 peer4 = Mock()
1645 peer4.handles = ["f", "g"]
1646 message_via_peer1 = Mock()
1647 message_via_peer1.speaker = "c"
1648 message_via_peer1.prefix = None
1649 message_via_peer1.peer = peer1
1650 message_via_peer1.bounces = 1
1651 message_via_peer2 = Mock()
1652 message_via_peer2.speaker = "c"
1653 message_via_peer2.prefix = None
1654 message_via_peer2.peer = peer2
1655 message_via_peer2.bounces = 2
1656 message_via_peer3 = Mock()
1657 message_via_peer3.speaker = "c"
1658 message_via_peer3.prefix = None
1659 message_via_peer3.peer = peer3
1660 message_via_peer3.bounces = 1
1661 message_via_peer4 = Mock()
1662 message_via_peer4.speaker = "c"
1663 message_via_peer4.prefix = None
1664 message_via_peer4.peer = peer4
1665 message_via_peer4.bounces = 1
1666 self.station.embargo_queue = {
1667 "messagehash": [
1668 message_via_peer1,
1669 message_via_peer2,
1670 message_via_peer3,
1671 message_via_peer4
1672 ],
1673 }
1674 self.station.flush_hearsay_messages()
1675 self.station.deliver.assert_called_once_with(message_via_peer1)
1676 self.assertEqual(message_via_peer1.prefix, "c[4]")