import time import binascii import logging import os from state import State from infosec import MAX_BOUNCES from infosec import STALE_PACKET from infosec import DUPLICATE_PACKET from infosec import MALFORMED_PACKET from infosec import INVALID_SIGNATURE from infosec import IGNORED from infosec import Infosec from commands import IGNORE from message import Message from commands import BROADCAST from commands import DIRECT from peer import Peer RUBBISH_INTERVAL = 10 EMBARGO_INTERVAL = 1 class Station(object): def __init__(self, options): self.client = None self.state = State.get_instance(options["socket"], options["db_path"]) if options.get("address_table_path") != None: self.state.import_at_and_wot(options.get("address_table_path")) self.infosec = Infosec(self.state) self.embargo_queue = {} def handle_udp_data(self, bytes_address_pair): data = bytes_address_pair[0] address = bytes_address_pair[1] packet_info = (address[0], address[1], binascii.hexlify(data)[0:16]) logging.debug("[%s:%d] -> %s" % packet_info) for peer in self.state.get_keyed_peers(): message = self.infosec.unpack(peer, data) error_code = message.error_code if(error_code == None): logging.info("[%s:%d %s] -> %s %d %s" % (peer.address, peer.port, peer.handles[0], message.body, message.bounces, message.message_hash)) self.conditionally_update_at(peer, message, address) # if this is a direct message, just deliver it and return if message.command == DIRECT: self.deliver(message) return # embargo to wait for immediate copy of message else: self.embargo(message) return elif error_code == STALE_PACKET: logging.debug("[%s:%d] -> stale packet: %s" % packet_info) return elif error_code == DUPLICATE_PACKET: logging.debug("[%s:%d] -> duplicate packet: %s" % packet_info) return elif error_code == MALFORMED_PACKET: logging.debug("[%s:%d] -> malformed packet: %s" % packet_info) return elif error_code == IGNORED: self.conditionally_update_at(peer, message, address) logging.debug("[%s:%d] -> ignoring packet: %s" % packet_info) return elif error_code == INVALID_SIGNATURE: pass logging.debug("[%s:%d] -> martian packet: %s" % packet_info) def deliver(self, message): # add to duplicate queue self.state.add_to_dedup_queue(message.message_hash) # send to the irc client if self.client: self.client.message_from_station(message) def embargo(self, message): # initialize the key/value to empty array if not in the hash # append message to array if not message.message_hash in self.embargo_queue.keys(): self.embargo_queue[message.message_hash] = [] self.embargo_queue[message.message_hash].append(message) def check_embargo_queue(self): # get a lock so other threads can't mess with the db or the queue self.check_for_immediate_messages() self.flush_hearsay_messages() def check_for_immediate_messages(self): for key in dict(self.embargo_queue).keys(): messages = self.embargo_queue[key] for message in messages: # if this is an immediate copy of the message if message.speaker in message.peer.handles: # clear the queue and deliver self.embargo_queue.pop(key, None) self.deliver(message) self.rebroadcast(message) break def flush_hearsay_messages(self): # if we made it this far either we haven't found any immediate messages # or we sent them all so we must deliver the remaining hearsay messages # with the appropriate labeling for key in dict(self.embargo_queue).keys(): # collect the source handles handles = [] messages = self.embargo_queue[key] for message in messages: handles.append(message.peer.handles[0]) # select the message with the lowest bounce count message = sorted(messages, key=lambda m: m.bounces)[0] # clear the queue self.embargo_queue.pop(key, None) # compute prefix if len(messages) < 4: message.prefix = "%s[%s]" % (message.speaker, "|".join(handles)) else: message.prefix = "%s[%d]" % (message.speaker, len(messages)) # deliver self.deliver(message) # send the message to all other peers if it should be propagated self.rebroadcast(message) # we only update the address table if the speaker is same as peer def conditionally_update_at(self, peer, message, address): if message.speaker in peer.handles: self.state.update_at({ "handle": message.speaker, "address": address[0], "port": address[1] }) def rebroadcast(self, message): if message.bounces < MAX_BOUNCES: message.command = BROADCAST message.bounces = message.bounces + 1 self.infosec.message(message) else: logging.debug("message TTL expired: %s" % message.message_hash) def send_rubbish(self):