- AF78FE5EF8EC6919AE03EE8DA359DEBB290963F184560C69C7005D483819040E1D34C5E00E40092F1A962BFBC60922741018FEE44A8CB5764FCE8B594D29934D+ 05F39E51685760A96A63D9D96565FF66C8D6A44A521787AF7E992EC7F0FD50B589DF8EE69FF81DB69823D98CBE223FB157047157F43C43F4CCC504498530F027blatta/lib/station.py(1 . 23)(1 . 23)
774 import time
775 import threading
776 import binascii
777 import logging
778 import os
779 from lib.state import State
780 from lib.infosec import MAX_BOUNCES
781 from lib.infosec import STALE_PACKET
782 from lib.infosec import DUPLICATE_PACKET
783 from lib.infosec import MALFORMED_PACKET
784 from lib.infosec import INVALID_SIGNATURE
785 from lib.infosec import IGNORED
786 from lib.infosec import Infosec
787 from state import State
788 from infosec import MAX_BOUNCES
789 from infosec import STALE_PACKET
790 from infosec import DUPLICATE_PACKET
791 from infosec import MALFORMED_PACKET
792 from infosec import INVALID_SIGNATURE
793 from infosec import IGNORED
794 from infosec import Infosec
795 from commands import IGNORE
796 from lib.message import Message
797 from message import Message
798 from commands import BROADCAST
799 from commands import DIRECT
800 from lib.peer import Peer
801 from peer import Peer
802
803 RUBBISH_INTERVAL = 10
804 EMBARGO_INTERVAL = 1
805
806 class Station(object):
807 def __init__(self, options):
(27 . 14)(27 . 6)
809 self.state.import_at_and_wot(options.get("address_table_path"))
810 self.infosec = Infosec(self.state)
811 self.embargo_queue = {}
812 self.embargo_queue_lock = threading.Lock()
813
814 def start_embargo_queue_checking(self):
815 threading.Thread(target=self.check_embargo_queue).start()
816
817 def start_rubbish(self):
818 pass
819 threading.Thread(target=self.send_rubbish).start()
820
821 def handle_udp_data(self, bytes_address_pair):
822 data = bytes_address_pair[0]
(47 . 7)(39 . 12)
824 message = self.infosec.unpack(peer, data)
825 error_code = message.error_code
826 if(error_code == None):
827 logging.debug("%s(%s) -> %s bounces: %d" % (message.speaker, peer.handles[0], message.body, message.bounces))
828 logging.info("[%s:%d %s] -> %s %d %s" % (peer.address,
829 peer.port,
830 peer.handles[0],
831 message.body,
832 message.bounces,
833 message.message_hash))
834 self.conditionally_update_at(peer, message, address)
835
836 # if this is a direct message, just deliver it and return
(87 . 22)(84 . 14)
838 def embargo(self, message):
839 # initialize the key/value to empty array if not in the hash
840 # append message to array
841 with self.embargo_queue_lock:
842 if not message.message_hash in self.embargo_queue.keys():
843 self.embargo_queue[message.message_hash] = []
844 self.embargo_queue[message.message_hash].append(message)
845 if not message.message_hash in self.embargo_queue.keys():
846 self.embargo_queue[message.message_hash] = []
847 self.embargo_queue[message.message_hash].append(message)
848
849 def check_embargo_queue(self):
850 # get a lock so other threads can't mess with the db or the queue
851 with self.embargo_queue_lock:
852 self.check_for_immediate_messages()
853 self.flush_hearsay_messages()
854
855 # release the lock
856
857 # continue the thread loop after interval
858 time.sleep(1)
859 threading.Thread(target=self.check_embargo_queue).start()
860 self.check_for_immediate_messages()
861 self.flush_hearsay_messages()
862
863 def check_for_immediate_messages(self):
864 for key in dict(self.embargo_queue).keys():
(169 . 18)(158 . 14)
866 message.bounces = message.bounces + 1
867 self.infosec.message(message)
868 else:
869 logging.debug("[%s:%d] -> packet TTL expired: %s" % packet_info)
870 logging.debug("message TTL expired: %s" % message.message_hash)
871
872
873 def send_rubbish(self):
874 logging.debug("sending rubbish...")
875 with self.embargo_queue_lock:
876 if self.client:
877 self.infosec.message(Message({
878 "speaker": self.client.nickname,
879 "command": IGNORE,
880 "bounces": 0,
881 "body": self.infosec.gen_rubbish_body()
882 }))
883 time.sleep(RUBBISH_INTERVAL)
884 threading.Thread(target=self.send_rubbish).start()
885 if self.client:
886 self.infosec.message(Message({
887 "speaker": self.client.nickname,
888 "command": IGNORE,
889 "bounces": 0,
890 "body": self.infosec.gen_rubbish_body()
891 }))