raw
9987-embargoing         1 import time
9987-embargoing 2 import binascii
9987-embargoing 3 import logging
9987-embargoing 4 import os
9985-single-thread 5 from state import State
9985-single-thread 6 from infosec import STALE_PACKET
9985-single-thread 7 from infosec import DUPLICATE_PACKET
9985-single-thread 8 from infosec import MALFORMED_PACKET
9985-single-thread 9 from infosec import INVALID_SIGNATURE
9985-single-thread 10 from infosec import IGNORED
9985-single-thread 11 from infosec import Infosec
9987-embargoing 12 from commands import IGNORE
9985-single-thread 13 from message import Message
9987-embargoing 14 from commands import BROADCAST
9987-embargoing 15 from commands import DIRECT
9985-single-thread 16 from peer import Peer
9987-embargoing 17
9987-embargoing 18 class Station(object):
9987-embargoing 19 def __init__(self, options):
9987-embargoing 20 self.client = None
9987-embargoing 21 self.state = State.get_instance(options["socket"], options["db_path"])
9987-embargoing 22 if options.get("address_table_path") != None:
9987-embargoing 23 self.state.import_at_and_wot(options.get("address_table_path"))
9987-embargoing 24 self.infosec = Infosec(self.state)
9987-embargoing 25 self.embargo_queue = {}
9987-embargoing 26
9987-embargoing 27 def handle_udp_data(self, bytes_address_pair):
9987-embargoing 28 data = bytes_address_pair[0]
9987-embargoing 29 address = bytes_address_pair[1]
9987-embargoing 30 packet_info = (address[0],
9987-embargoing 31 address[1],
9987-embargoing 32 binascii.hexlify(data)[0:16])
9987-embargoing 33 logging.debug("[%s:%d] -> %s" % packet_info)
9987-embargoing 34 for peer in self.state.get_keyed_peers():
9987-embargoing 35 message = self.infosec.unpack(peer, data)
9987-embargoing 36 error_code = message.error_code
9987-embargoing 37 if(error_code == None):
9985-single-thread 38 logging.info("[%s:%d %s] -> %s %d %s" % (peer.address,
9985-single-thread 39 peer.port,
9985-single-thread 40 peer.handles[0],
9985-single-thread 41 message.body,
9985-single-thread 42 message.bounces,
9985-single-thread 43 message.message_hash))
9987-embargoing 44 self.conditionally_update_at(peer, message, address)
9987-embargoing 45
9987-embargoing 46 # if this is a direct message, just deliver it and return
9987-embargoing 47 if message.command == DIRECT:
9987-embargoing 48 self.deliver(message)
9987-embargoing 49 return
9987-embargoing 50
9986-rebroadcast-... 51 # embargo to wait for immediate copy of message
9986-rebroadcast-... 52 else:
9987-embargoing 53 self.embargo(message)
9987-embargoing 54 return
9987-embargoing 55 elif error_code == STALE_PACKET:
9987-embargoing 56 logging.debug("[%s:%d] -> stale packet: %s" % packet_info)
9987-embargoing 57 return
9987-embargoing 58 elif error_code == DUPLICATE_PACKET:
9987-embargoing 59 logging.debug("[%s:%d] -> duplicate packet: %s" % packet_info)
9987-embargoing 60 return
9987-embargoing 61 elif error_code == MALFORMED_PACKET:
9987-embargoing 62 logging.debug("[%s:%d] -> malformed packet: %s" % packet_info)
9987-embargoing 63 return
9987-embargoing 64 elif error_code == IGNORED:
9987-embargoing 65 self.conditionally_update_at(peer, message, address)
9987-embargoing 66 logging.debug("[%s:%d] -> ignoring packet: %s" % packet_info)
9987-embargoing 67 return
9987-embargoing 68 elif error_code == INVALID_SIGNATURE:
9987-embargoing 69 pass
9987-embargoing 70 logging.debug("[%s:%d] -> martian packet: %s" % packet_info)
9987-embargoing 71
9987-embargoing 72 def deliver(self, message):
9987-embargoing 73 # add to duplicate queue
9987-embargoing 74 self.state.add_to_dedup_queue(message.message_hash)
9987-embargoing 75
9987-embargoing 76 # send to the irc client
9987-embargoing 77 if self.client:
9987-embargoing 78 self.client.message_from_station(message)
9987-embargoing 79
9987-embargoing 80 def embargo(self, message):
9987-embargoing 81 # initialize the key/value to empty array if not in the hash
9987-embargoing 82 # append message to array
9985-single-thread 83 if not message.message_hash in self.embargo_queue.keys():
9985-single-thread 84 self.embargo_queue[message.message_hash] = []
9985-single-thread 85 self.embargo_queue[message.message_hash].append(message)
9987-embargoing 86
9987-embargoing 87 def check_embargo_queue(self):
9987-embargoing 88 # get a lock so other threads can't mess with the db or the queue
9985-single-thread 89 self.check_for_immediate_messages()
9985-single-thread 90 self.flush_hearsay_messages()
9987-embargoing 91
9987-embargoing 92 def check_for_immediate_messages(self):
9987-embargoing 93 for key in dict(self.embargo_queue).keys():
9987-embargoing 94 messages = self.embargo_queue[key]
9987-embargoing 95
9987-embargoing 96 for message in messages:
9987-embargoing 97
9987-embargoing 98 # if this is an immediate copy of the message
9987-embargoing 99
9987-embargoing 100 if message.speaker in message.peer.handles:
9987-embargoing 101
9987-embargoing 102 # clear the queue and deliver
9987-embargoing 103
9987-embargoing 104 self.embargo_queue.pop(key, None)
9987-embargoing 105 self.deliver(message)
9987-embargoing 106 self.rebroadcast(message)
9987-embargoing 107 break
9987-embargoing 108
9987-embargoing 109
9987-embargoing 110 def flush_hearsay_messages(self):
9987-embargoing 111 # if we made it this far either we haven't found any immediate messages
9987-embargoing 112 # or we sent them all so we must deliver the remaining hearsay messages
9987-embargoing 113 # with the appropriate labeling
9987-embargoing 114 for key in dict(self.embargo_queue).keys():
9987-embargoing 115
9987-embargoing 116 # collect the source handles
9987-embargoing 117 handles = []
9987-embargoing 118 messages = self.embargo_queue[key]
9987-embargoing 119 for message in messages:
9987-embargoing 120 handles.append(message.peer.handles[0])
9987-embargoing 121
9987-embargoing 122 # select the message with the lowest bounce count
9987-embargoing 123 message = sorted(messages, key=lambda m: m.bounces)[0]
9987-embargoing 124
9987-embargoing 125 # clear the queue
9987-embargoing 126 self.embargo_queue.pop(key, None)
9987-embargoing 127
9987-embargoing 128 # compute prefix
9987-embargoing 129 if len(messages) < 4:
9987-embargoing 130 message.prefix = "%s[%s]" % (message.speaker, "|".join(handles))
9987-embargoing 131 else:
9987-embargoing 132 message.prefix = "%s[%d]" % (message.speaker, len(messages))
9987-embargoing 133
9987-embargoing 134 # deliver
9987-embargoing 135 self.deliver(message)
9987-embargoing 136
9987-embargoing 137 # send the message to all other peers if it should be propagated
9987-embargoing 138 self.rebroadcast(message)
9987-embargoing 139
9987-embargoing 140
9987-embargoing 141 # we only update the address table if the speaker is same as peer
9987-embargoing 142
9987-embargoing 143 def conditionally_update_at(self, peer, message, address):
9987-embargoing 144 if message.speaker in peer.handles:
9987-embargoing 145 self.state.update_at({
9987-embargoing 146 "handle": message.speaker,
9987-embargoing 147 "address": address[0],
9987-embargoing 148 "port": address[1]
9987-embargoing 149 })
9987-embargoing 150
9987-embargoing 151 def rebroadcast(self, message):
9983-knobs 152 if message.bounces < int(self.state.get_knob("max_bounces")):
9987-embargoing 153 message.command = BROADCAST
9987-embargoing 154 message.bounces = message.bounces + 1
9987-embargoing 155 self.infosec.message(message)
9987-embargoing 156 else:
9985-single-thread 157 logging.debug("message TTL expired: %s" % message.message_hash)
9987-embargoing 158
9987-embargoing 159
9987-embargoing 160 def send_rubbish(self):