9987-embargoing 1 import time
9987-embargoing 2 import binascii
9987-embargoing 3 import logging
9987-embargoing 4 import os
9985-single-thread 5 from state import State
9985-single-thread 6 from infosec import MAX_BOUNCES
9985-single-thread 7 from infosec import STALE_PACKET
9985-single-thread 8 from infosec import DUPLICATE_PACKET
9985-single-thread 9 from infosec import MALFORMED_PACKET
9985-single-thread 10 from infosec import INVALID_SIGNATURE
9985-single-thread 11 from infosec import IGNORED
9985-single-thread 12 from infosec import Infosec
9987-embargoing 13 from commands import IGNORE
9985-single-thread 14 from message import Message
9987-embargoing 15 from commands import BROADCAST
9987-embargoing 16 from commands import DIRECT
9985-single-thread 17 from peer import Peer
9987-embargoing 18
9987-embargoing 19 RUBBISH_INTERVAL = 10
9985-single-thread 20 EMBARGO_INTERVAL = 1
9987-embargoing 21
9987-embargoing 22 class Station(object):
9987-embargoing 23 def __init__(self, options):
9987-embargoing 24 self.client = None
9987-embargoing 25 self.state = State.get_instance(options["socket"], options["db_path"])
9987-embargoing 26 if options.get("address_table_path") != None:
9987-embargoing 27 self.state.import_at_and_wot(options.get("address_table_path"))
9987-embargoing 28 self.infosec = Infosec(self.state)
9987-embargoing 29 self.embargo_queue = {}
9987-embargoing 30
9987-embargoing 31 def handle_udp_data(self, bytes_address_pair):
9987-embargoing 32 data = bytes_address_pair[0]
9987-embargoing 33 address = bytes_address_pair[1]
9987-embargoing 34 packet_info = (address[0],
9987-embargoing 35 address[1],
9987-embargoing 36 binascii.hexlify(data)[0:16])
9987-embargoing 37 logging.debug("[%s:%d] -> %s" % packet_info)
9987-embargoing 38 for peer in self.state.get_keyed_peers():
9987-embargoing 39 message = self.infosec.unpack(peer, data)
9987-embargoing 40 error_code = message.error_code
9987-embargoing 41 if(error_code == None):
9985-single-thread 42 logging.info("[%s:%d %s] -> %s %d %s" % (peer.address,
9985-single-thread 43 peer.port,
9985-single-thread 44 peer.handles[0],
9985-single-thread 45 message.body,
9985-single-thread 46 message.bounces,
9985-single-thread 47 message.message_hash))
9987-embargoing 48 self.conditionally_update_at(peer, message, address)
9987-embargoing 49
9987-embargoing 50
9987-embargoing 51 if message.command == DIRECT:
9987-embargoing 52 self.deliver(message)
9987-embargoing 53 return
9987-embargoing 54
9986-rebroadcast-... 55
9986-rebroadcast-... 56 else:
9987-embargoing 57 self.embargo(message)
9987-embargoing 58 return
9987-embargoing 59 elif error_code == STALE_PACKET:
9987-embargoing 60 logging.debug("[%s:%d] -> stale packet: %s" % packet_info)
9987-embargoing 61 return
9987-embargoing 62 elif error_code == DUPLICATE_PACKET:
9987-embargoing 63 logging.debug("[%s:%d] -> duplicate packet: %s" % packet_info)
9987-embargoing 64 return
9987-embargoing 65 elif error_code == MALFORMED_PACKET:
9987-embargoing 66 logging.debug("[%s:%d] -> malformed packet: %s" % packet_info)
9987-embargoing 67 return
9987-embargoing 68 elif error_code == IGNORED:
9987-embargoing 69 self.conditionally_update_at(peer, message, address)
9987-embargoing 70 logging.debug("[%s:%d] -> ignoring packet: %s" % packet_info)
9987-embargoing 71 return
9987-embargoing 72 elif error_code == INVALID_SIGNATURE:
9987-embargoing 73 pass
9987-embargoing 74 logging.debug("[%s:%d] -> martian packet: %s" % packet_info)
9987-embargoing 75
9987-embargoing 76 def deliver(self, message):
9987-embargoing 77
9987-embargoing 78 self.state.add_to_dedup_queue(message.message_hash)
9987-embargoing 79
9987-embargoing 80
9987-embargoing 81 if self.client:
9987-embargoing 82 self.client.message_from_station(message)
9987-embargoing 83
9987-embargoing 84 def embargo(self, message):
9987-embargoing 85
9987-embargoing 86
9985-single-thread 87 if not message.message_hash in self.embargo_queue.keys():
9985-single-thread 88 self.embargo_queue[message.message_hash] = []
9985-single-thread 89 self.embargo_queue[message.message_hash].append(message)
9987-embargoing 90
9987-embargoing 91 def check_embargo_queue(self):
9987-embargoing 92
9985-single-thread 93 self.check_for_immediate_messages()
9985-single-thread 94 self.flush_hearsay_messages()
9987-embargoing 95
9987-embargoing 96 def check_for_immediate_messages(self):
9987-embargoing 97 for key in dict(self.embargo_queue).keys():
9987-embargoing 98 messages = self.embargo_queue[key]
9987-embargoing 99
9987-embargoing 100 for message in messages:
9987-embargoing 101
9987-embargoing 102
9987-embargoing 103
9987-embargoing 104 if message.speaker in message.peer.handles:
9987-embargoing 105
9987-embargoing 106
9987-embargoing 107
9987-embargoing 108 self.embargo_queue.pop(key, None)
9987-embargoing 109 self.deliver(message)
9987-embargoing 110 self.rebroadcast(message)
9987-embargoing 111 break
9987-embargoing 112
9987-embargoing 113
9987-embargoing 114 def flush_hearsay_messages(self):
9987-embargoing 115
9987-embargoing 116
9987-embargoing 117
9987-embargoing 118 for key in dict(self.embargo_queue).keys():
9987-embargoing 119
9987-embargoing 120
9987-embargoing 121 handles = []
9987-embargoing 122 messages = self.embargo_queue[key]
9987-embargoing 123 for message in messages:
9987-embargoing 124 handles.append(message.peer.handles[0])
9987-embargoing 125
9987-embargoing 126
9987-embargoing 127 message = sorted(messages, key=lambda m: m.bounces)[0]
9987-embargoing 128
9987-embargoing 129
9987-embargoing 130 self.embargo_queue.pop(key, None)
9987-embargoing 131
9987-embargoing 132
9987-embargoing 133 if len(messages) < 4:
9987-embargoing 134 message.prefix = "%s[%s]" % (message.speaker, "|".join(handles))
9987-embargoing 135 else:
9987-embargoing 136 message.prefix = "%s[%d]" % (message.speaker, len(messages))
9987-embargoing 137
9987-embargoing 138
9987-embargoing 139 self.deliver(message)
9987-embargoing 140
9987-embargoing 141
9987-embargoing 142 self.rebroadcast(message)
9987-embargoing 143
9987-embargoing 144
9987-embargoing 145
9987-embargoing 146
9987-embargoing 147 def conditionally_update_at(self, peer, message, address):
9987-embargoing 148 if message.speaker in peer.handles:
9987-embargoing 149 self.state.update_at({
9987-embargoing 150 "handle": message.speaker,
9987-embargoing 151 "address": address[0],
9987-embargoing 152 "port": address[1]
9987-embargoing 153 })
9987-embargoing 154
9987-embargoing 155 def rebroadcast(self, message):
9987-embargoing 156 if message.bounces < MAX_BOUNCES:
9987-embargoing 157 message.command = BROADCAST
9987-embargoing 158 message.bounces = message.bounces + 1
9987-embargoing 159 self.infosec.message(message)
9987-embargoing 160 else:
9985-single-thread 161 logging.debug("message TTL expired: %s" % message.message_hash)
9987-embargoing 162
9987-embargoing 163
9987-embargoing 164 def send_rubbish(self):