diff -uNr a/blatta/Makefile b/blatta/Makefile --- a/blatta/Makefile 431c84d2fa991b4c43090296e80765e1460288de0fdb2d81acf73a18977acc16164111f210af83488a80b92fa35d307ecac26e59c860bbc12cd59a5c2a3e4c00 +++ b/blatta/Makefile e349f34effdc0c0f717424902644c1643775b1f604e8356a0ffcaef980e2e15d6576836a081b87cb9fa1f6fb99ada553f687851b6ffcc0f83e1ba86c81727aeb @@ -1,4 +1,4 @@ -VERSION := 9979 +VERSION := 9978 DISTFILES = Makefile blatta README.txt lib migrations tests test_net_configs start_test_net.sh diff -uNr a/blatta/lib/client.py b/blatta/lib/client.py --- a/blatta/lib/client.py 4446f38df71cab018b132851c12e5df188949618983d81eaa098c41276cf5b8a96d9fedf55ee3d5ef73412b3d5f3690ca0ea890adcfd61867171a569d7c6e605 +++ b/blatta/lib/client.py 5b3d8b0a91f1e8716b600aee2966c7456798dcbeab55af77aac6d5f73f9bef501929c6ac426d973498c2d3c4bee65c47f93cea5742e32ca127bc308449ae707a @@ -533,6 +533,9 @@ def pest_reply(self, msg): self.message(":Pest NOTICE %s :%s" % (self.server.channel_name, msg)) + def pest_dm_reply(self, speaker, msg): + self.message(":%s NOTICE %s :%s" % (speaker, self.nickname, msg)) + def send_join(self, handle): self.message(":%s JOIN %s" % (handle, self.server.channel_name)) @@ -543,7 +546,7 @@ self.message(":%s AWAY :No recent messages" % (handle)) def send_back(self, handle): - self.message(":%s AWAY" % (handle)) + self.message(":%s AWAY :" % (handle)) def reply_403(self, channeyl): self.reply("403 %s %s :No such channel" % (self.nickname, channel)) diff -uNr a/blatta/lib/direct.py b/blatta/lib/direct.py --- a/blatta/lib/direct.py b2bfee16a02f0ad104fbe59958be80d27fc59c50695d25b4c008c28aba4438c00706c3181c473c7a74bd85f21a681059b694b3b792f7c234d3478b346307eb7c +++ b/blatta/lib/direct.py b6619f974d8efc25c60eb441be028ec57eb693efb6e97188590572213d8061145ede293212d0cbc9112ca5c5afe4d278d7f6d10b12938663bc2e6d7dd0ac6ee0 @@ -41,18 +41,16 @@ self.log_outgoing(target_peer) def retry(self, requesting_peer): - target_peer = self.state.get_peer_by_handle(self.handle) - - if target_peer == None: - logging.debug("Aborting message: unknown handle: %s" % self.handle) + if requesting_peer == None: + logging.debug("Aborting message: unknown peer: %s" % requesting_peer.handles[0]) return - if not target_peer.get_key(): - logging.debug("No key for peer associated with %s" % self.handle) + if not requesting_peer.get_key(): + logging.debug("No key for peer associated with %s" % requesting_peer.handles[0]) return # TODO: Figure out how to verify that the requester was the original intended recipient - signed_packet_bytes = self.pack(target_peer, self.command, self.bounces, self.message_bytes) - target_peer.send(signed_packet_bytes) - self.log_outgoing(target_peer) + signed_packet_bytes = self.pack(requesting_peer, self.command, self.bounces, self.message_bytes) + requesting_peer.send(signed_packet_bytes) + self.log_outgoing(requesting_peer) diff -uNr a/blatta/lib/long_buffer.py b/blatta/lib/long_buffer.py --- a/blatta/lib/long_buffer.py 9f5b2f1661fab5fcc0a29d99094099c7a40acaa3736475e277177ed97036db710ccc0c4a608be22759a2f0493cb28bb62574b055a83c71205998a2e0eb8721dd +++ b/blatta/lib/long_buffer.py 76fce797467fec82ac2d8908f6769b043c7fd14bae6e9acfb3b4d2660a61725cff6ce2e9a2ef21ea6f7a27f6a75d6cfb9d19013f058cfc32c5d86bc771ec86ed @@ -18,10 +18,10 @@ command, message_bytes = self.state.get_message(message_hash) if message_bytes: - if command == DIRECT: - return Direct({message_bytes: message_bytes}, self.state) - elif command == BROADCAST: - return Broadcast({'message_bytes': message_bytes}, self.state) + if command == DIRECT: + return Direct({'message_bytes': message_bytes}, self.state) + elif command == BROADCAST: + return Broadcast({'message_bytes': message_bytes}, self.state) def expunge_expired(self): for message in self.buffer.values(): diff -uNr a/blatta/lib/message.py b/blatta/lib/message.py --- a/blatta/lib/message.py 9d70f920b8f62bcbc311319dfe98461ebd73e787cc1b97005b2f807f76f4fe99fb45a6650a04a149a854c8c547e87c77d00afc39426a6756bcd5d7e5d8881221 +++ b/blatta/lib/message.py b74685019898622685bd7f422bf5d7157af5624eeda40c5b976a3530e1e189b26e7cc38e5fccd49c9a78acd268c0154dd87f1014a6708afbc0c184de61d7e053 @@ -54,11 +54,13 @@ self.state = state self.prefix = None self.fork_status = None + # target peer handle self.handle = message.get("handle") self.peer = message.get("peer") self.body = message.get("body") self.timestamp = message.get("timestamp") self.command = message.get("command") + # source peer handle self.speaker = message.get("speaker") self.bounces = message.get("bounces") self.self_chain = message.get("self_chain") @@ -164,7 +166,6 @@ return cls._evaluate_message(long_buffer, order_buffer, message) - @classmethod def _evaluate_message(cls, long_buffer, order_buffer, message): # if we're expecting this message as a GETDATA response, skip the timestamp check @@ -231,16 +232,10 @@ @classmethod def _evaluate_direct(cls, long_buffer, order_buffer, message): - # if this is the first direct message from a station, we needn't check for antecedents - if message['self_chain'] == EMPTY_CHAIN: - return message - # no need to check net_chain for a direct message if not (long_buffer.has(message['self_chain'])): - if not order_buffer.has(message['message_hash']): - order_buffer.add(message) - message['error_code'] = OUT_OF_ORDER_SELF - return message + message['error_code'] = OUT_OF_ORDER_SELF + return message return message @classmethod @@ -267,15 +262,6 @@ return int_ts, self_chain, net_chain, speaker, body @classmethod - def _unpack_last_valid_message(cls, last_message_info): - if last_message_info.get('message_bytes'): - return cls._unpack_message( - last_message_info['message_bytes'] - )[4] - else: - return "" - - @classmethod def _pad(cls, text, size): return text.ljust(size, "\x00") diff -uNr a/blatta/lib/order_buffer.py b/blatta/lib/order_buffer.py --- a/blatta/lib/order_buffer.py 388113d46790ba9828275d8f1b20cef6dca63976d43192efa8d4332b3360fa8cb93a50a026eb35a66ba129bea7e61fc6973491072f6b5385d4ec83c11ccd2dc7 +++ b/blatta/lib/order_buffer.py 204d40b495a9d2c5bd82bfa20aa738cdf3dd8a3f19f5d2ab1f0d5809ecb5e4c3b36a712d761566a8ca9dee93911754445c73ca2b0a976cb78a950f5a2981ed49 @@ -1,3 +1,5 @@ +import binascii +import logging import time from broadcast import Broadcast from direct import Direct @@ -14,6 +16,7 @@ ts = time.time() if message['command'] == BROADCAST: m = Broadcast(message, self.state) + assert(m.message_hash == message['message_hash']) elif message['command'] == DIRECT: m = Direct(message, self.state) else: @@ -33,25 +36,13 @@ return True return False - def has(self, message_hash): - for value in self.buffer.values(): - for message in value: - if message_hash == message.message_hash: - return True - return False - def dequeue_and_order_mature_messages(self): current_time = time.time() sorted_messages = sorted(self.buffer.keys()) mature_messages = [] for timestamp in sorted_messages: if timestamp < current_time - int(self.state.get_knob('order_buffer_expiration_seconds')): - if isinstance(self.buffer[timestamp], list): - if len(self.buffer[timestamp]) > 0: - for message in self.buffer[timestamp]: - mature_messages.append(message) - del self.buffer[timestamp] - else: - mature_messages.append(self.buffer[timestamp]) - del self.buffer[timestamp] + for message in self.buffer[timestamp]: + mature_messages.append(message) + del self.buffer[timestamp] return sorted(mature_messages, key=lambda m: m.timestamp) \ No newline at end of file diff -uNr a/blatta/lib/state.py b/blatta/lib/state.py --- a/blatta/lib/state.py ca409e06cb9491b00378c24bb0a189ed752c01a46b3d8406f403f7e7d378d4a62f91a1ada8eab3ffdf374e4a62ea3a5edf3edb48cd5500bbde30f3e8153c209f +++ b/blatta/lib/state.py 78ed8993826a75eab1b2d638f6a61bd6aed1727786f1687f909e41f95379f4556574f4b363b525c5a2d6987340d6e67fae17e9f6db92d9900f2430b428e45778 @@ -19,10 +19,11 @@ 'embargo_interval_seconds': 1, 'rubbish_interval_seconds': 10, 'nick': '', - 'order_buffer_check_seconds': 5 * 60, - 'order_buffer_expiration_seconds': 5 * 60, + 'order_buffer_check_seconds': 180, + 'order_buffer_expiration_seconds': 120, 'short_buffer_expiration_seconds': 1, 'short_buffer_check_interval_seconds': 1, + 'getdata_requests_expiration_seconds': 10, 'peer_offline_interval_seconds': 60, 'peer_away_interval_seconds': 10 * 60, 'presence_check_seconds': 5, @@ -183,6 +184,8 @@ h = cursor.execute("select handle from handles where handle_id=?", (handle_id,)).fetchone()[0] if updated_at_utc: + if '.' not in updated_at_utc: + updated_at_utc = updated_at_utc + '.0' dt_format = '%Y-%m-%d %H:%M:%S.%f' dt_utc = datetime.datetime.strptime(updated_at_utc, dt_format) dt_local = self.utc_to_local(dt_utc) @@ -383,7 +386,11 @@ def handle_is_online(self, handle): # last rubbish message from peer associated with handle is # sufficiently recent - at = self.get_at(handle)[0] + try: + at = self.get_at(handle)[0] + except IndexError: + return False + if at["active_at_unixtime"] > time.time() - int(self.get_knob("peer_offline_interval_seconds")): return True else: diff -uNr a/blatta/lib/station.py b/blatta/lib/station.py --- a/blatta/lib/station.py a82e5b4c782b1484c844fd7843441bf0220bd18a796963b836c23dbc00a0f4031677d7699ec86a6a4b892b194871869e3cf55f481a5d978559e235869a7c3417 +++ b/blatta/lib/station.py 79d6e16967eced23171091d8cefd19a20beca845a68698be54f75ef48f869c2dfd80d452ba3dc4ababbe76a5007381d309e627eec61f2d83afbc513b543880ab @@ -1,6 +1,6 @@ import time -VERSION = 9979 +VERSION = 9978 STATUS_ONLINE = 0 STATUS_AWAY = 1 @@ -36,6 +36,7 @@ self.short_buffer = ShortBuffer(self.state) self.long_buffer = LongBuffer(self.state) self.order_buffer = OrderBuffer(self.state) + self.getdata_requests = {} self.server = Server(cmd_line_options, self) self.handlers = { DIRECT: self.handle_direct, @@ -44,6 +45,7 @@ IGNORE: self.handle_ignore } self.presence = {} + self.last_delivered = 0 def start(self): self.server.start() @@ -106,7 +108,6 @@ def handle_direct(self, message): message.log_incoming(message.peer) self.deliver(message) - self.long_buffer.intern(message) self.conditionally_update_at(message, message.metadata["address"]) def handle_broadcast(self, message): @@ -119,7 +120,6 @@ if self.short_buffer.has(message.message_hash): self.short_buffer.drop(message.message_hash) self.deliver(message) - self.long_buffer.intern(message) self.state.update_net_chain(message.message_hash) self.rebroadcast(message) else: @@ -166,7 +166,6 @@ message.prefix = "%s[%d]" % (message.speaker, len(message_with_stats['closest_peers'])) self.deliver(message) - self.long_buffer.intern(message) self.state.update_net_chain(message.message_hash) message.reporting_peers = message_with_stats['reporting_peers'] self.rebroadcast(message) @@ -226,10 +225,18 @@ port = packet_info[1] logging.debug( "[%s:%d] -> message received out of order: %s" % (address, port, binascii.hexlify(message['message_hash']))) - if not self.order_buffer.has(message['message_hash']): - for chain in broken_chains: + for chain in broken_chains: + if not self.getdata_requests.get(message[chain]): GetData(message, chain, self.state).send() + self.getdata_requests[message[chain]] = time.time() self.order_buffer.add(message) + self.clean_getdata_requests() + + def clean_getdata_requests(self): + for message_hash in self.getdata_requests.keys(): + if (self.getdata_requests.get(message_hash) < + time.time() - self.state.get_knob('getdata_requests_expiration_seconds')): + del self.getdata_requests[message_hash] def deliver(self, message): # it's possible that these messages are from an order buffer @@ -238,18 +245,24 @@ if self.long_buffer.has(message.message_hash): return + # send to the irc client if self.client: # emit a replay warning if this message is a getdata response and older than the last # displayed message if message.get_data_response: - if message.timestamp < self.state.get_latest_message_timestamp(): + if message.timestamp < self.last_delivered: warning = time.strftime( "Replay: %Y-%m-%d %H:%M:%S:", time.localtime(message.timestamp) ) - self.client.pest_reply(warning) + if message.command == BROADCAST: + self.client.pest_reply(warning) + else: + self.client.pest_dm_reply(message.speaker, warning) self.client.message_from_station(message) + self.last_delivered = message.timestamp + self.long_buffer.intern(message) # we only update the address table if the speaker is same as peer def conditionally_update_at(self, message, address): diff -uNr a/blatta/tests/test_getdata.py b/blatta/tests/test_getdata.py --- a/blatta/tests/test_getdata.py b48397de933c7423e2e5c881f18c0e30ff858f31d3de6ef4c6074ea512afa85069c9fd0caf537c2cc41cc4ca8d69fdfe1ae1fd6f7af9211ccde47c8311a70776 +++ b/blatta/tests/test_getdata.py c50220a20c89e9c56b3083422835887c8c5346354f2a10a117e873f3287a3f44e6f9ace31e46045b149a0a310a77c86e4506e682ccd944d2b88c2c484336f10b @@ -1,5 +1,4 @@ import unittest -import logging import time from mock import Mock @@ -9,7 +8,6 @@ from lib.order_buffer import OrderBuffer from lib.state import State from lib.direct import Direct -from lib.broadcast import Broadcast import helper class TestGetData(unittest.TestCase): diff -uNr a/blatta/tests/test_order_buffer.py b/blatta/tests/test_order_buffer.py --- a/blatta/tests/test_order_buffer.py false +++ b/blatta/tests/test_order_buffer.py d4056f2557395e2d4f1367500f9a9462f7c518648b025c887c376243bff75e70e59b2f341aebe3064dba038c4f9e4c09e77f870b90cab8f3545db9a000be4d90 @@ -0,0 +1,32 @@ +import unittest + +import time +from mock import Mock +from lib.commands import BROADCAST +from lib.message import Message +from lib.getdata import GetData +from lib.long_buffer import LongBuffer +from lib.order_buffer import OrderBuffer +from lib.state import State +from lib.direct import Direct +import helper + +class TestOrderBuffer(unittest.TestCase): + def setUp(self): + helper.setup() + self.socket = Mock() + self.state = State(self.socket) + self.state.set_knob('nick', 'alice') + + def test_add(self): + m1 = { + 'command': BROADCAST, + 'body': "m1", + 'speaker': "bob", + 'handle': "alice", + 'self_chain': 'abc', + 'net_chain': 'def', + 'message_hash': '123' + } + ob = OrderBuffer(self.state) + ob.add(m1) diff -uNr a/blatta/tests/test_station.py b/blatta/tests/test_station.py --- a/blatta/tests/test_station.py f6eb911b7a54ec1ef368e1eed8202aa2e385086f3d5509c08eb8bb78d216ff2408b7409b5f4f368d46ea98475c4afb3a67a094ede9109278ad0cb7ea93459f4b +++ b/blatta/tests/test_station.py 90668d46c39a64258b943cf8ff176e84b0b98eb662b8e6193b717f6582bfeae981698754ff6f4aa7efda8f672ded907ee0680a5f3c175e3848718c712a200f2d @@ -1,4 +1,5 @@ # https://stackoverflow.com/questions/1896918/running-unittest-with-typical-test-directory-structure +import time import unittest import logging from mock import Mock @@ -75,6 +76,16 @@ def tearDown(self): pass + def test_clean_getdata_requests_clears_expired_hashes(self): + self.station.getdata_requests["abc"] = time.time() - 15 + self.station.clean_getdata_requests() + self.assertEqual(len(self.station.getdata_requests), 0) + + def test_clean_getdata_requests_retains_valid_hashes(self): + self.station.getdata_requests["abc"] = time.time() + self.station.clean_getdata_requests() + self.assertEqual(len(self.station.getdata_requests), 1) + def test_embargo_bounce_ordering(self): self.skipTest("the tested code has been re-implemented") peer1 = Mock()