raw
9987-embargoing         1 import time
9987-embargoing 2 import threading
9987-embargoing 3 import binascii
9987-embargoing 4 import logging
9987-embargoing 5 import os
9987-embargoing 6 from lib.state import State
9987-embargoing 7 from lib.infosec import MAX_BOUNCES
9987-embargoing 8 from lib.infosec import STALE_PACKET
9987-embargoing 9 from lib.infosec import DUPLICATE_PACKET
9987-embargoing 10 from lib.infosec import MALFORMED_PACKET
9987-embargoing 11 from lib.infosec import INVALID_SIGNATURE
9987-embargoing 12 from lib.infosec import IGNORED
9987-embargoing 13 from lib.infosec import Infosec
9987-embargoing 14 from commands import IGNORE
9987-embargoing 15 from lib.message import Message
9987-embargoing 16 from commands import BROADCAST
9987-embargoing 17 from commands import DIRECT
9987-embargoing 18 from lib.peer import Peer
9987-embargoing 19
9987-embargoing 20 RUBBISH_INTERVAL = 10
9987-embargoing 21
9987-embargoing 22 class Station(object):
9987-embargoing 23 def __init__(self, options):
9987-embargoing 24 self.client = None
9987-embargoing 25 self.state = State.get_instance(options["socket"], options["db_path"])
9987-embargoing 26 if options.get("address_table_path") != None:
9987-embargoing 27 self.state.import_at_and_wot(options.get("address_table_path"))
9987-embargoing 28 self.infosec = Infosec(self.state)
9987-embargoing 29 self.embargo_queue = {}
9987-embargoing 30 self.embargo_queue_lock = threading.Lock()
9987-embargoing 31
9987-embargoing 32 def start_embargo_queue_checking(self):
9987-embargoing 33 threading.Thread(target=self.check_embargo_queue).start()
9987-embargoing 34
9987-embargoing 35 def start_rubbish(self):
9987-embargoing 36 pass
9987-embargoing 37 threading.Thread(target=self.send_rubbish).start()
9987-embargoing 38
9987-embargoing 39 def handle_udp_data(self, bytes_address_pair):
9987-embargoing 40 data = bytes_address_pair[0]
9987-embargoing 41 address = bytes_address_pair[1]
9987-embargoing 42 packet_info = (address[0],
9987-embargoing 43 address[1],
9987-embargoing 44 binascii.hexlify(data)[0:16])
9987-embargoing 45 logging.debug("[%s:%d] -> %s" % packet_info)
9987-embargoing 46 for peer in self.state.get_keyed_peers():
9987-embargoing 47 message = self.infosec.unpack(peer, data)
9987-embargoing 48 error_code = message.error_code
9987-embargoing 49 if(error_code == None):
9987-embargoing 50 logging.debug("%s(%s) -> %s bounces: %d" % (message.speaker, peer.handles[0], message.body, message.bounces))
9987-embargoing 51 self.conditionally_update_at(peer, message, address)
9987-embargoing 52
9987-embargoing 53 # if this is a direct message, just deliver it and return
9987-embargoing 54 if message.command == DIRECT:
9987-embargoing 55 self.deliver(message)
9987-embargoing 56 return
9987-embargoing 57
9987-embargoing 58 # if the speaker is in our wot, we need to check if the message is hearsay
9987-embargoing 59 if message.speaker in self.state.get_peer_handles():
9987-embargoing 60 self.embargo(message)
9987-embargoing 61 return
9987-embargoing 62
9987-embargoing 63 else:
9987-embargoing 64 # skip the embargo and deliver this message with appropriate simple hearsay labeling
9987-embargoing 65 message.prefix = "%s[%s]" % (message.speaker, peer.handles[0])
9987-embargoing 66 self.deliver(message)
9987-embargoing 67 return
9987-embargoing 68 elif error_code == STALE_PACKET:
9987-embargoing 69 logging.debug("[%s:%d] -> stale packet: %s" % packet_info)
9987-embargoing 70 return
9987-embargoing 71 elif error_code == DUPLICATE_PACKET:
9987-embargoing 72 logging.debug("[%s:%d] -> duplicate packet: %s" % packet_info)
9987-embargoing 73 return
9987-embargoing 74 elif error_code == MALFORMED_PACKET:
9987-embargoing 75 logging.debug("[%s:%d] -> malformed packet: %s" % packet_info)
9987-embargoing 76 return
9987-embargoing 77 elif error_code == IGNORED:
9987-embargoing 78 self.conditionally_update_at(peer, message, address)
9987-embargoing 79 logging.debug("[%s:%d] -> ignoring packet: %s" % packet_info)
9987-embargoing 80 return
9987-embargoing 81 elif error_code == INVALID_SIGNATURE:
9987-embargoing 82 pass
9987-embargoing 83 logging.debug("[%s:%d] -> martian packet: %s" % packet_info)
9987-embargoing 84
9987-embargoing 85 def deliver(self, message):
9987-embargoing 86 # add to duplicate queue
9987-embargoing 87 self.state.add_to_dedup_queue(message.message_hash)
9987-embargoing 88
9987-embargoing 89 # send to the irc client
9987-embargoing 90 if self.client:
9987-embargoing 91 self.client.message_from_station(message)
9987-embargoing 92
9987-embargoing 93 def embargo(self, message):
9987-embargoing 94 # initialize the key/value to empty array if not in the hash
9987-embargoing 95 # append message to array
9987-embargoing 96 if not message.message_hash in self.embargo_queue.keys():
9987-embargoing 97 self.embargo_queue[message.message_hash] = []
9987-embargoing 98 self.embargo_queue[message.message_hash].append(message)
9987-embargoing 99
9987-embargoing 100 def check_embargo_queue(self):
9987-embargoing 101 # get a lock so other threads can't mess with the db or the queue
9987-embargoing 102 self.embargo_queue_lock.acquire()
9987-embargoing 103 self.check_for_immediate_messages()
9987-embargoing 104 self.flush_hearsay_messages()
9987-embargoing 105
9987-embargoing 106 # release the lock
9987-embargoing 107 self.embargo_queue_lock.release()
9987-embargoing 108
9987-embargoing 109 # continue the thread loop after interval
9987-embargoing 110 time.sleep(1)
9987-embargoing 111 threading.Thread(target=self.check_embargo_queue).start()
9987-embargoing 112
9987-embargoing 113 def check_for_immediate_messages(self):
9987-embargoing 114 for key in dict(self.embargo_queue).keys():
9987-embargoing 115 messages = self.embargo_queue[key]
9987-embargoing 116
9987-embargoing 117 for message in messages:
9987-embargoing 118
9987-embargoing 119 # if this is an immediate copy of the message
9987-embargoing 120
9987-embargoing 121 if message.speaker in message.peer.handles:
9987-embargoing 122
9987-embargoing 123 # clear the queue and deliver
9987-embargoing 124
9987-embargoing 125 self.embargo_queue.pop(key, None)
9987-embargoing 126 self.deliver(message)
9987-embargoing 127 self.rebroadcast(message)
9987-embargoing 128 break
9987-embargoing 129
9987-embargoing 130
9987-embargoing 131 def flush_hearsay_messages(self):
9987-embargoing 132 # if we made it this far either we haven't found any immediate messages
9987-embargoing 133 # or we sent them all so we must deliver the remaining hearsay messages
9987-embargoing 134 # with the appropriate labeling
9987-embargoing 135 for key in dict(self.embargo_queue).keys():
9987-embargoing 136
9987-embargoing 137 # collect the source handles
9987-embargoing 138 handles = []
9987-embargoing 139 messages = self.embargo_queue[key]
9987-embargoing 140 for message in messages:
9987-embargoing 141 handles.append(message.peer.handles[0])
9987-embargoing 142
9987-embargoing 143 # select the message with the lowest bounce count
9987-embargoing 144 message = sorted(messages, key=lambda m: m.bounces)[0]
9987-embargoing 145
9987-embargoing 146 # clear the queue
9987-embargoing 147 self.embargo_queue.pop(key, None)
9987-embargoing 148
9987-embargoing 149 # compute prefix
9987-embargoing 150 if len(messages) < 4:
9987-embargoing 151 message.prefix = "%s[%s]" % (message.speaker, "|".join(handles))
9987-embargoing 152 else:
9987-embargoing 153 message.prefix = "%s[%d]" % (message.speaker, len(messages))
9987-embargoing 154
9987-embargoing 155 # deliver
9987-embargoing 156 self.deliver(message)
9987-embargoing 157
9987-embargoing 158 # send the message to all other peers if it should be propagated
9987-embargoing 159 self.rebroadcast(message)
9987-embargoing 160
9987-embargoing 161
9987-embargoing 162 # we only update the address table if the speaker is same as peer
9987-embargoing 163
9987-embargoing 164 def conditionally_update_at(self, peer, message, address):
9987-embargoing 165 if message.speaker in peer.handles:
9987-embargoing 166 self.state.update_at({
9987-embargoing 167 "handle": message.speaker,
9987-embargoing 168 "address": address[0],
9987-embargoing 169 "port": address[1]
9987-embargoing 170 })
9987-embargoing 171
9987-embargoing 172 def rebroadcast(self, message):
9987-embargoing 173 if message.bounces < MAX_BOUNCES:
9987-embargoing 174 message.command = BROADCAST
9987-embargoing 175 message.bounces = message.bounces + 1
9987-embargoing 176 self.infosec.message(message)
9987-embargoing 177 else:
9987-embargoing 178 logging.debug("[%s:%d] -> packet TTL expired: %s" % packet_info)
9987-embargoing 179
9987-embargoing 180
9987-embargoing 181 def send_rubbish(self):
9987-embargoing 182 logging.debug("sending rubbish...")
9987-embargoing 183 self.embargo_queue_lock.acquire()
9987-embargoing 184 try:
9987-embargoing 185 if self.client:
9987-embargoing 186 self.infosec.message(Message({
9987-embargoing 187 "speaker": self.client.nickname,
9987-embargoing 188 "command": IGNORE,
9987-embargoing 189 "bounces": 0,
9987-embargoing 190 "body": self.infosec.gen_rubbish_body()
9987-embargoing 191 }))
9987-embargoing 192 except:
9987-embargoing 193 logging.error("Something went wrong attempting to send rubbish")
9987-embargoing 194 self.embargo_queue_lock.release()
9987-embargoing 195 time.sleep(RUBBISH_INTERVAL)
9987-embargoing 196 threading.Thread(target=self.send_rubbish).start()