import time import binascii import logging import os from state import State from infosec import STALE_PACKET from infosec import DUPLICATE_PACKET from infosec import MALFORMED_PACKET from infosec import INVALID_SIGNATURE from infosec import IGNORED from infosec import Infosec from commands import IGNORE from message import Message from commands import BROADCAST from commands import DIRECT from peer import Peer class Station(object): def __init__(self, options): self.client = None self.state = State.get_instance(options["socket"], options["db_path"]) if options.get("address_table_path") != None: self.state.import_at_and_wot(options.get("address_table_path")) self.infosec = Infosec(self.state) self.embargo_queue = {} def handle_udp_data(self, bytes_address_pair): data = bytes_address_pair[0] address = bytes_address_pair[1] packet_info = (address[0], address[1], binascii.hexlify(data)[0:16]) logging.debug("[%s:%d] -> %s" % packet_info) for peer in self.state.get_keyed_peers(): message = self.infosec.unpack(peer, data) error_code = message.error_code if(error_code == None): logging.info("[%s:%d %s] -> %s %d %s" % (peer.address, peer.port, peer.handles[0], message.body, message.bounces, message.message_hash)) self.conditionally_update_at(peer, message, address) # if this is a direct message, just deliver it and return if message.command == DIRECT: self.deliver(message) return # embargo to wait for immediate copy of message else: self.embargo(message) return elif error_code == STALE_PACKET: logging.debug("[%s:%d] -> stale packet: %s" % packet_info) return elif error_code == DUPLICATE_PACKET: logging.debug("[%s:%d] -> duplicate packet: %s" % packet_info) return elif error_code == MALFORMED_PACKET: logging.debug("[%s:%d] -> malformed packet: %s" % packet_info) return elif error_code == IGNORED: self.conditionally_update_at(peer, message, address) logging.debug("[%s:%d] -> ignoring packet: %s" % packet_info) return elif error_code == INVALID_SIGNATURE: pass logging.debug("[%s:%d] -> martian packet: %s" % packet_info) def deliver(self, message): # add to duplicate queue self.state.add_to_dedup_queue(message.message_hash) # send to the irc client if self.client: self.client.message_from_station(message) def embargo(self, message): # initialize the key/value to empty array if not in the hash # append message to array if not message.message_hash in self.embargo_queue.keys(): self.embargo_queue[message.message_hash] = [] self.embargo_queue[message.message_hash].append(message) def check_embargo_queue(self): # get a lock so other threads can't mess with the db or the queue self.check_for_immediate_messages() self.flush_hearsay_messages() def check_for_immediate_messages(self): for key in dict(self.embargo_queue).keys(): messages = self.embargo_queue[key] for message in messages: # if this is an immediate copy of the message if message.speaker in message.peer.handles: # clear the queue and deliver self.embargo_queue.pop(key, None) self.deliver(message) self.rebroadcast(message) break def flush_hearsay_messages(self): # if we made it this far either we haven't found any immediate messages # or we sent them all so we must deliver the remaining hearsay messages # with the appropriate labeling for key in dict(self.embargo_queue).keys(): # collect the source handles handles = [] messages = self.embargo_queue[key] for message in messages: handles.append(message.peer.handles[0]) # select the message with the lowest bounce count message = sorted(messages, key=lambda m: m.bounces)[0] # clear the queue self.embargo_queue.pop(key, None) # compute prefix if len(messages) < 4: message.prefix = "%s[%s]" % (message.speaker, "|".join(handles)) else: message.prefix = "%s[%d]" % (message.speaker, len(messages)) # deliver self.deliver(message) # send the message to all other peers if it should be propagated self.rebroadcast(message) # we only update the address table if the speaker is same as peer def conditionally_update_at(self, peer, message, address): if message.speaker in peer.handles: self.state.update_at({ "handle": message.speaker, "address": address[0], "port": address[1] }) def rebroadcast(self, message): if message.bounces < int(self.state.get_knob("max_bounces")): message.command = BROADCAST message.bounces = message.bounces + 1 self.infosec.message(message) else: logging.debug("message TTL expired: %s" % message.message_hash) def send_rubbish(self):