# https://stackoverflow.com/questions/1896918/running-unittest-with-typical-test-directory-structure import time import unittest import logging from mock import Mock from lib.commands import DIRECT from lib.commands import GETDATA from lib.station import Station from lib.state import State from lib.message import Message from lib.getdata import GetData from lib.order_buffer import OrderBuffer from lib.long_buffer import LongBuffer from collections import namedtuple import helper class TestStation(unittest.TestCase): def setUp(self): helper.setup() logging.basicConfig(level=logging.DEBUG) self.station_socket = Mock() Options = namedtuple('Options', ['db_path', 'address_table_path', 'socket', 'irc_ports', 'udp_port', 'channel_name', 'password', 'motd', 'listen']) options = Options( None, None, self.station_socket, None, None, None, None, None, None ) self.station = Station(options) self.station.deliver = Mock() self.station.rebroadcast = Mock() self.station.rebroadcast.return_value = "foobar" self.bob_state = State(Mock(), None) self.station.state.set_knob('nick', 'alice') self.bob_state.set_knob('nick', 'bob') self.setupBob() self.setupAlice() def setupBob(self): self.station.state.add_peer('bob') self.station.state.add_key( 'bob', '9h6wYndVjt8QpnIZOYb7KD2tYKCKw4rjlYg4LM1ODx1Qkr3qA0IuKNukkwKhQ4UP9ypMlhyPHa7AGD7NO7Ws5w==' ) self.station.state.update_at({ 'handle': 'bob', 'address': '127.0.0.1', 'port': 8889 }) def setupAlice(self): self.bob_state.add_peer('alice') self.bob_state.add_key( 'alice', '9h6wYndVjt8QpnIZOYb7KD2tYKCKw4rjlYg4LM1ODx1Qkr3qA0IuKNukkwKhQ4UP9ypMlhyPHa7AGD7NO7Ws5w==' ) self.bob_state.update_at({ 'handle': 'alice', 'address': '127.0.0.1', 'port': 8888 }) def tearDown(self): pass def test_clean_getdata_requests_clears_expired_hashes(self): self.station.getdata_requests["abc"] = time.time() - 15 self.station.clean_getdata_requests() self.assertEqual(len(self.station.getdata_requests), 0) def test_clean_getdata_requests_retains_valid_hashes(self): self.station.getdata_requests["abc"] = time.time() self.station.clean_getdata_requests() self.assertEqual(len(self.station.getdata_requests), 1) def test_embargo_bounce_ordering(self): self.skipTest("the tested code has been re-implemented") peer1 = Mock() peer1.handles = ["a", "b"] peer2 = Mock() peer2.handles = ["c", "d"] low_bounce_message = Mock() low_bounce_message.peer = peer1 low_bounce_message.bounces = 1 low_bounce_message.message_hash = "messagehash" high_bounce_message = Mock() high_bounce_message.peer = peer2 high_bounce_message.bounces = 2 high_bounce_message.message_hash = "messagehash" self.station.short_buffer = { "messagehash": [ low_bounce_message, high_bounce_message ], } self.station.flush_hearsay_messages() self.station.deliver.assert_called_once_with(low_bounce_message) self.station.rebroadcast.assert_called_once_with(low_bounce_message) def test_embargo_queue_cleared(self): self.skipTest("the embargo queue is now th short buffer") peer = Mock() peer.handles = ["a", "b"] message = Mock() message.speaker = "c" message.peer = peer self.station.short_buffer = { "messagehash": [ message ], } self.assertEqual(len(self.station.short_buffer), 1) self.station.flush_hearsay_messages() self.assertEqual(len(self.station.short_buffer), 0) def test_simple_hearsay_prefix(self): self.skipTest("this code has moved") peer = Mock() peer.handles = ["a", "b"] message = Mock() message.speaker = "c" message.prefix = None message.peer = peer self.station.short_buffer = { "messagehash": [ message ], } self.station.flush_hearsay_messages() self.assertEqual(message.prefix, "c[a]") def test_in_wot_hearsay_prefix_under_four(self): self.skipTest("the embargo queue is now th short buffer") peer1 = Mock() peer1.handles = ["a", "b"] peer2 = Mock() peer2.handles = ["d", "e"] peer3 = Mock() peer3.handles = ["f", "g"] message_via_peer1 = Mock() message_via_peer1.speaker = "c" message_via_peer1.prefix = None message_via_peer1.peer = peer1 message_via_peer1.bounces = 1 message_via_peer2 = Mock() message_via_peer2.speaker = "c" message_via_peer2.prefix = None message_via_peer2.peer = peer2 message_via_peer2.bounces = 2 message_via_peer3 = Mock() message_via_peer3.speaker = "c" message_via_peer3.prefix = None message_via_peer3.peer = peer3 message_via_peer3.bounces = 1 self.station.short_buffer = { "messagehash": [ message_via_peer1, message_via_peer2, message_via_peer3 ], } self.station.flush_hearsay_messages() self.station.deliver.assert_called_once_with(message_via_peer1) self.assertEqual(message_via_peer1.prefix, "c[a|d|f]") def test_in_wot_hearsay_prefix_more_than_three(self): self.skipTest("the embargo queue is now th short buffer") peer1 = Mock() peer1.handles = ["a", "b"] peer2 = Mock() peer2.handles = ["d", "e"] peer3 = Mock() peer3.handles = ["f", "g"] peer4 = Mock() peer4.handles = ["f", "g"] message_via_peer1 = Mock() message_via_peer1.speaker = "c" message_via_peer1.prefix = None message_via_peer1.peer = peer1 message_via_peer1.bounces = 1 message_via_peer2 = Mock() message_via_peer2.speaker = "c" message_via_peer2.prefix = None message_via_peer2.peer = peer2 message_via_peer2.bounces = 2 message_via_peer3 = Mock() message_via_peer3.speaker = "c" message_via_peer3.prefix = None message_via_peer3.peer = peer3 message_via_peer3.bounces = 1 message_via_peer4 = Mock() message_via_peer4.speaker = "c" message_via_peer4.prefix = None message_via_peer4.peer = peer4 message_via_peer4.bounces = 1 self.station.short_buffer = { "messagehash": [ message_via_peer1, message_via_peer2, message_via_peer3, message_via_peer4 ], } self.station.flush_hearsay_messages() self.station.deliver.assert_called_once_with(message_via_peer1) self.assertEqual(message_via_peer1.prefix, "c[4]") # this test occasionally fails def test_receive_getdata_request_for_existing_direct_message(self): self.skipTest("intermittent failure") # 'send' bob a couple of messages m1 = Message({ 'command': DIRECT, 'handle': 'bob', 'speaker': 'alice', 'body': 'm1', 'bounces': 0 }, self.station.state) m1.send() m2 = Message({ 'command': DIRECT, 'handle': 'bob', 'speaker': 'alice', 'body': 'm2', 'bounces': 0, }, self.station.state) m2.send() # oops look's like bob didn't get the message # build GETDATA black packet to retreive m1 alice = self.bob_state.get_peer_by_handle('alice') bob = self.station.state.get_peer_by_handle('bob') gd_message = GetData(m2, self.bob_state) gd_message_bytes = gd_message.get_message_bytes(alice) gd_black_packet = Message.pack(bob, GETDATA, gd_message.bounces, gd_message_bytes) # call handle_udp_data with GETDATA packet self.station.handle_udp_data([gd_black_packet, ['127.0.0.1', 8889]]) # build up the retry black packet to verify that it was sent retry_black_packet = Message.pack(bob, DIRECT, 0, m1.get_message_bytes(bob)) # assert retry is called and sends the correct message sent_message_black_packet = self.station_socket.sendto.call_args[0][0] sent_message = Message.unpack(bob, sent_message_black_packet, LongBuffer(), OrderBuffer(), self.station.state) self.assertEqual(sent_message.body, 'm1')