import time import threading import binascii import logging import os from lib.state import State from lib.infosec import MAX_BOUNCES from lib.infosec import STALE_PACKET from lib.infosec import DUPLICATE_PACKET from lib.infosec import MALFORMED_PACKET from lib.infosec import INVALID_SIGNATURE from lib.infosec import IGNORED from lib.infosec import Infosec from commands import IGNORE from lib.message import Message from commands import BROADCAST from commands import DIRECT from lib.peer import Peer RUBBISH_INTERVAL = 10 class Station(object): def __init__(self, options): self.client = None self.state = State.get_instance(options["socket"], options["db_path"]) if options.get("address_table_path") != None: self.state.import_at_and_wot(options.get("address_table_path")) self.infosec = Infosec(self.state) self.embargo_queue = {} self.embargo_queue_lock = threading.Lock() def start_embargo_queue_checking(self): threading.Thread(target=self.check_embargo_queue).start() def start_rubbish(self): pass threading.Thread(target=self.send_rubbish).start() def handle_udp_data(self, bytes_address_pair): data = bytes_address_pair[0] address = bytes_address_pair[1] packet_info = (address[0], address[1], binascii.hexlify(data)[0:16]) logging.debug("[%s:%d] -> %s" % packet_info) for peer in self.state.get_keyed_peers(): message = self.infosec.unpack(peer, data) error_code = message.error_code if(error_code == None): logging.debug("%s(%s) -> %s bounces: %d" % (message.speaker, peer.handles[0], message.body, message.bounces)) self.conditionally_update_at(peer, message, address) # if this is a direct message, just deliver it and return if message.command == DIRECT: self.deliver(message) return # if the speaker is in our wot, we need to check if the message is hearsay if message.speaker in self.state.get_peer_handles(): self.embargo(message) return else: # skip the embargo and deliver this message with appropriate simple hearsay labeling message.prefix = "%s[%s]" % (message.speaker, peer.handles[0]) self.deliver(message) return elif error_code == STALE_PACKET: logging.debug("[%s:%d] -> stale packet: %s" % packet_info) return elif error_code == DUPLICATE_PACKET: logging.debug("[%s:%d] -> duplicate packet: %s" % packet_info) return elif error_code == MALFORMED_PACKET: logging.debug("[%s:%d] -> malformed packet: %s" % packet_info) return elif error_code == IGNORED: self.conditionally_update_at(peer, message, address) logging.debug("[%s:%d] -> ignoring packet: %s" % packet_info) return elif error_code == INVALID_SIGNATURE: pass logging.debug("[%s:%d] -> martian packet: %s" % packet_info) def deliver(self, message): # add to duplicate queue self.state.add_to_dedup_queue(message.message_hash) # send to the irc client if self.client: self.client.message_from_station(message) def embargo(self, message): # initialize the key/value to empty array if not in the hash # append message to array if not message.message_hash in self.embargo_queue.keys(): self.embargo_queue[message.message_hash] = [] self.embargo_queue[message.message_hash].append(message) def check_embargo_queue(self): # get a lock so other threads can't mess with the db or the queue self.embargo_queue_lock.acquire() self.check_for_immediate_messages() self.flush_hearsay_messages() # release the lock self.embargo_queue_lock.release() # continue the thread loop after interval time.sleep(1) threading.Thread(target=self.check_embargo_queue).start() def check_for_immediate_messages(self): for key in dict(self.embargo_queue).keys(): messages = self.embargo_queue[key] for message in messages: # if this is an immediate copy of the message if message.speaker in message.peer.handles: # clear the queue and deliver self.embargo_queue.pop(key, None) self.deliver(message) self.rebroadcast(message) break def flush_hearsay_messages(self): # if we made it this far either we haven't found any immediate messages # or we sent them all so we must deliver the remaining hearsay messages # with the appropriate labeling for key in dict(self.embargo_queue).keys(): # collect the source handles handles = [] messages = self.embargo_queue[key] for message in messages: handles.append(message.peer.handles[0]) # select the message with the lowest bounce count message = sorted(messages, key=lambda m: m.bounces)[0] # clear the queue self.embargo_queue.pop(key, None) # compute prefix if len(messages) < 4: message.prefix = "%s[%s]" % (message.speaker, "|".join(handles)) else: message.prefix = "%s[%d]" % (message.speaker, len(messages)) # deliver self.deliver(message) # send the message to all other peers if it should be propagated self.rebroadcast(message) # we only update the address table if the speaker is same as peer def conditionally_update_at(self, peer, message, address): if message.speaker in peer.handles: self.state.update_at({ "handle": message.speaker, "address": address[0], "port": address[1] }) def rebroadcast(self, message): if message.bounces < MAX_BOUNCES: message.command = BROADCAST message.bounces = message.bounces + 1 self.infosec.message(message) else: logging.debug("[%s:%d] -> packet TTL expired: %s" % packet_info) def send_rubbish(self): logging.debug("sending rubbish...") self.embargo_queue_lock.acquire() try: if self.client: self.infosec.message(Message({ "speaker": self.client.nickname, "command": IGNORE, "bounces": 0, "body": self.infosec.gen_rubbish_body() })) except: logging.error("Something went wrong attempting to send rubbish") self.embargo_queue_lock.release() time.sleep(RUBBISH_INTERVAL) threading.Thread(target=self.send_rubbish).start()