9987-embargoing 1 import time
9987-embargoing 2 import threading
9987-embargoing 3 import binascii
9987-embargoing 4 import logging
9987-embargoing 5 import os
9987-embargoing 6 from lib.state import State
9987-embargoing 7 from lib.infosec import MAX_BOUNCES
9987-embargoing 8 from lib.infosec import STALE_PACKET
9987-embargoing 9 from lib.infosec import DUPLICATE_PACKET
9987-embargoing 10 from lib.infosec import MALFORMED_PACKET
9987-embargoing 11 from lib.infosec import INVALID_SIGNATURE
9987-embargoing 12 from lib.infosec import IGNORED
9987-embargoing 13 from lib.infosec import Infosec
9987-embargoing 14 from commands import IGNORE
9987-embargoing 15 from lib.message import Message
9987-embargoing 16 from commands import BROADCAST
9987-embargoing 17 from commands import DIRECT
9987-embargoing 18 from lib.peer import Peer
9987-embargoing 19
9987-embargoing 20 RUBBISH_INTERVAL = 10
9987-embargoing 21
9987-embargoing 22 class Station(object):
9987-embargoing 23 def __init__(self, options):
9987-embargoing 24 self.client = None
9987-embargoing 25 self.state = State.get_instance(options["socket"], options["db_path"])
9987-embargoing 26 if options.get("address_table_path") != None:
9987-embargoing 27 self.state.import_at_and_wot(options.get("address_table_path"))
9987-embargoing 28 self.infosec = Infosec(self.state)
9987-embargoing 29 self.embargo_queue = {}
9987-embargoing 30 self.embargo_queue_lock = threading.Lock()
9987-embargoing 31
9987-embargoing 32 def start_embargo_queue_checking(self):
9987-embargoing 33 threading.Thread(target=self.check_embargo_queue).start()
9987-embargoing 34
9987-embargoing 35 def start_rubbish(self):
9987-embargoing 36 pass
9987-embargoing 37 threading.Thread(target=self.send_rubbish).start()
9987-embargoing 38
9987-embargoing 39 def handle_udp_data(self, bytes_address_pair):
9987-embargoing 40 data = bytes_address_pair[0]
9987-embargoing 41 address = bytes_address_pair[1]
9987-embargoing 42 packet_info = (address[0],
9987-embargoing 43 address[1],
9987-embargoing 44 binascii.hexlify(data)[0:16])
9987-embargoing 45 logging.debug("[%s:%d] -> %s" % packet_info)
9987-embargoing 46 for peer in self.state.get_keyed_peers():
9987-embargoing 47 message = self.infosec.unpack(peer, data)
9987-embargoing 48 error_code = message.error_code
9987-embargoing 49 if(error_code == None):
9987-embargoing 50 logging.debug("%s(%s) -> %s bounces: %d" % (message.speaker, peer.handles[0], message.body, message.bounces))
9987-embargoing 51 self.conditionally_update_at(peer, message, address)
9987-embargoing 52
9987-embargoing 53
9987-embargoing 54 if message.command == DIRECT:
9987-embargoing 55 self.deliver(message)
9987-embargoing 56 return
9987-embargoing 57
9986-rebroadcast-... 58
9986-rebroadcast-... 59 else:
9987-embargoing 60 self.embargo(message)
9987-embargoing 61 return
9987-embargoing 62 elif error_code == STALE_PACKET:
9987-embargoing 63 logging.debug("[%s:%d] -> stale packet: %s" % packet_info)
9987-embargoing 64 return
9987-embargoing 65 elif error_code == DUPLICATE_PACKET:
9987-embargoing 66 logging.debug("[%s:%d] -> duplicate packet: %s" % packet_info)
9987-embargoing 67 return
9987-embargoing 68 elif error_code == MALFORMED_PACKET:
9987-embargoing 69 logging.debug("[%s:%d] -> malformed packet: %s" % packet_info)
9987-embargoing 70 return
9987-embargoing 71 elif error_code == IGNORED:
9987-embargoing 72 self.conditionally_update_at(peer, message, address)
9987-embargoing 73 logging.debug("[%s:%d] -> ignoring packet: %s" % packet_info)
9987-embargoing 74 return
9987-embargoing 75 elif error_code == INVALID_SIGNATURE:
9987-embargoing 76 pass
9987-embargoing 77 logging.debug("[%s:%d] -> martian packet: %s" % packet_info)
9987-embargoing 78
9987-embargoing 79 def deliver(self, message):
9987-embargoing 80
9987-embargoing 81 self.state.add_to_dedup_queue(message.message_hash)
9987-embargoing 82
9987-embargoing 83
9987-embargoing 84 if self.client:
9987-embargoing 85 self.client.message_from_station(message)
9987-embargoing 86
9987-embargoing 87 def embargo(self, message):
9987-embargoing 88
9987-embargoing 89
9986-rebroadcast-... 90 with self.embargo_queue_lock:
9986-rebroadcast-... 91 if not message.message_hash in self.embargo_queue.keys():
9986-rebroadcast-... 92 self.embargo_queue[message.message_hash] = []
9986-rebroadcast-... 93 self.embargo_queue[message.message_hash].append(message)
9987-embargoing 94
9987-embargoing 95 def check_embargo_queue(self):
9987-embargoing 96
9986-rebroadcast-... 97 with self.embargo_queue_lock:
9986-rebroadcast-... 98 self.check_for_immediate_messages()
9986-rebroadcast-... 99 self.flush_hearsay_messages()
9987-embargoing 100
9986-rebroadcast-... 101
9987-embargoing 102
9987-embargoing 103
9987-embargoing 104 time.sleep(1)
9987-embargoing 105 threading.Thread(target=self.check_embargo_queue).start()
9987-embargoing 106
9987-embargoing 107 def check_for_immediate_messages(self):
9987-embargoing 108 for key in dict(self.embargo_queue).keys():
9987-embargoing 109 messages = self.embargo_queue[key]
9987-embargoing 110
9987-embargoing 111 for message in messages:
9987-embargoing 112
9987-embargoing 113
9987-embargoing 114
9987-embargoing 115 if message.speaker in message.peer.handles:
9987-embargoing 116
9987-embargoing 117
9987-embargoing 118
9987-embargoing 119 self.embargo_queue.pop(key, None)
9987-embargoing 120 self.deliver(message)
9987-embargoing 121 self.rebroadcast(message)
9987-embargoing 122 break
9987-embargoing 123
9987-embargoing 124
9987-embargoing 125 def flush_hearsay_messages(self):
9987-embargoing 126
9987-embargoing 127
9987-embargoing 128
9987-embargoing 129 for key in dict(self.embargo_queue).keys():
9987-embargoing 130
9987-embargoing 131
9987-embargoing 132 handles = []
9987-embargoing 133 messages = self.embargo_queue[key]
9987-embargoing 134 for message in messages:
9987-embargoing 135 handles.append(message.peer.handles[0])
9987-embargoing 136
9987-embargoing 137
9987-embargoing 138 message = sorted(messages, key=lambda m: m.bounces)[0]
9987-embargoing 139
9987-embargoing 140
9987-embargoing 141 self.embargo_queue.pop(key, None)
9987-embargoing 142
9987-embargoing 143
9987-embargoing 144 if len(messages) < 4:
9987-embargoing 145 message.prefix = "%s[%s]" % (message.speaker, "|".join(handles))
9987-embargoing 146 else:
9987-embargoing 147 message.prefix = "%s[%d]" % (message.speaker, len(messages))
9987-embargoing 148
9987-embargoing 149
9987-embargoing 150 self.deliver(message)
9987-embargoing 151
9987-embargoing 152
9987-embargoing 153 self.rebroadcast(message)
9987-embargoing 154
9987-embargoing 155
9987-embargoing 156
9987-embargoing 157
9987-embargoing 158 def conditionally_update_at(self, peer, message, address):
9987-embargoing 159 if message.speaker in peer.handles:
9987-embargoing 160 self.state.update_at({
9987-embargoing 161 "handle": message.speaker,
9987-embargoing 162 "address": address[0],
9987-embargoing 163 "port": address[1]
9987-embargoing 164 })
9987-embargoing 165
9987-embargoing 166 def rebroadcast(self, message):
9987-embargoing 167 if message.bounces < MAX_BOUNCES:
9987-embargoing 168 message.command = BROADCAST
9987-embargoing 169 message.bounces = message.bounces + 1
9987-embargoing 170 self.infosec.message(message)
9987-embargoing 171 else:
9987-embargoing 172 logging.debug("[%s:%d] -> packet TTL expired: %s" % packet_info)
9987-embargoing 173
9987-embargoing 174
9987-embargoing 175 def send_rubbish(self):
9987-embargoing 176 logging.debug("sending rubbish...")
9986-rebroadcast-... 177 with self.embargo_queue_lock:
9987-embargoing 178 if self.client:
9987-embargoing 179 self.infosec.message(Message({
9987-embargoing 180 "speaker": self.client.nickname,
9987-embargoing 181 "command": IGNORE,
9987-embargoing 182 "bounces": 0,
9987-embargoing 183 "body": self.infosec.gen_rubbish_body()
9987-embargoing 184 }))
9987-embargoing 185 time.sleep(RUBBISH_INTERVAL)
9987-embargoing 186 threading.Thread(target=self.send_rubbish).start()