diff -uNr a/blatta/lib/client.py b/blatta/lib/client.py --- a/blatta/lib/client.py 98e1e99d7ea8fe523728ca8f5661b244f3769a09a44cfa8c40236fe5cb32bb2ee267c7ee2e2028d1e455b5fc3bae9efb53e91a457033133c4bb25162ebcf8e5e +++ b/blatta/lib/client.py 481b05a7a3816e07a04df8c5f0b541f666f4f5975119dfe5609fdd4ecb1a07deefb03e2ed3d804c9f916ad8d6d05d4140f2f479f1064db4b3ef6ec4e75db2130 @@ -515,7 +515,7 @@ except Exception as ex: self.pest_reply("Error attempting to update address table") stack = traceback.format_exc() - logger.debug(stack) + logging.debug(stack) return elif len(arguments) > 2: self.pest_reply("Usage: AT [] [
]") @@ -562,7 +562,7 @@ except KeyError: self.reply("421 %s %s :Unknown command" % (self.nickname, command)) stack = traceback.format_exc() - logger.debug(stack) + logging.debug(stack) def socket_readable_notification(self): try: diff -uNr a/blatta/lib/infosec.py b/blatta/lib/infosec.py --- a/blatta/lib/infosec.py 2f8e9df6cf92a779900080f585a1b9873218d949be48299950c2b05a64fd5a3abd8ab927f5c5d4c5d51e7c3def2bab13a6ffe532907e681883730d60bd75b0e5 +++ b/blatta/lib/infosec.py a7f781100f32373df224f187126fcc44dcfb4b4a8bb53e60e1acafdcff0a9cfd71d47f40e50c6c6f3f7c92a5eb4821b8eec22ecac01a84aa1e7f67204a9a220e @@ -40,7 +40,6 @@ def message(self, message): # if we are not rebroadcasting we need to set the timestamp - if message.timestamp == None: message.original = True message.timestamp = int(time.time()) @@ -59,7 +58,11 @@ logging.debug("Aborting message: unknown handle: %s" % message.handle) return - message_bytes = self.get_message_bytes(message, target_peer) + if message.message_bytes == None: + message_bytes = self.get_message_bytes(message, target_peer) + else: + message_bytes = message.message_bytes + if message.command != IGNORE: message_hash = binascii.hexlify(hashlib.sha256(message_bytes).digest()) logging.debug("generated message_hash: %s" % message_hash) @@ -70,12 +73,16 @@ signed_packet_bytes = self.pack(target_peer, message, message_bytes) target_peer.send(signed_packet_bytes) elif message.command == BROADCAST or message.command == IGNORE: + # sanity check + if message.message_bytes and message_bytes != message_bytes: + logging.debug("aborting send: message modified by station!") + return + for peer in self.state.get_keyed_peers(): # we don't want to send a broadcast back to the originator - if message.peer and (peer.peer_id == message.peer.peer_id): - next + continue signed_packet_bytes = self.pack(peer, message, message_bytes) peer.send(signed_packet_bytes) @@ -225,7 +232,8 @@ "self_chain": self_chain, "net_chain": net_chain, "self_chain_valid": self_chain_valid, - "message_hash": message_hash + "message_hash": message_hash, + "message_bytes": message_bytes }) # check for duplicates diff -uNr a/blatta/lib/message.py b/blatta/lib/message.py --- a/blatta/lib/message.py 0cf5cd14c7e157cf47cf2e4f0f9fd076e89a817a71b389aa8748a8472437e62a2ac35fa5f3b3ae9be39a2f15a926e5920571267bdf39b9ed6b4e0e4cbf9c2970 +++ b/blatta/lib/message.py 9efa52c31bb08e8f81821b1832f5729d9e3f52cd44cbd9c10adcc809fd4383129ace4d7c3615eb1e9b068e13e045dee2898aaac1f1eaaca81ad65b5ece9ff751 @@ -14,3 +14,4 @@ self.self_chain_valid = message.get("self_chain_valid") self.error_code = message.get("error_code") self.message_hash = message.get("message_hash") + self.message_bytes = None diff -uNr a/blatta/lib/peer.py b/blatta/lib/peer.py --- a/blatta/lib/peer.py c96da174ae6ceb0489ed2b50872c02e481fa7d14d09a747b387bcf7ec45f1ec1dac5326a3370951f20c8995c85fcc7a8b91bc333cedaf78b11345a3897c8f95d +++ b/blatta/lib/peer.py 01648a4c6129c725ebfaf5a84166bbc34cf2080df4842887c872524819b87e13d524b0b8351f75f5ecc4f7a638cce26d5da72c4967c5785ebf1a956db5906b38 @@ -24,15 +24,15 @@ return None def send(self, signed_packet_bytes): - if self.get_key() != None: + if self.get_key() != None and self.address != None and self.port != None: try: self.socket.sendto(signed_packet_bytes, (self.address, self.port)) logging.debug("[%s:%d] <- %s" % (self.address, - self.port, + int(self.port), binascii.hexlify(signed_packet_bytes)[0:16])) except Exception as ex: stack = traceback.format_exc() logging.debug(stack) else: - logging.debug("Discarding message to unknown handle or handle with no key: %s" % message.handle) + logging.debug("Discarding message to unknown handle or handle with no key: %s" % self.handles[0]) diff -uNr a/blatta/lib/server.py b/blatta/lib/server.py --- a/blatta/lib/server.py 7f7198c51eb6b00321c1754f1675d907263bf600b8ef67b79641e4a763357fe73935b9f1534234d226f9564e2341aeaafb50bfcff5128bfe14404651b8a36ef2 +++ b/blatta/lib/server.py 543b50fa952dea77a3a28ac4166697877605d330dd65b74b50f2e27393f7aa0c3cc08585515661194d65f23f208818a32d2dff7c3c81385a9473803b4fee61e6 @@ -1,4 +1,4 @@ -VERSION = "9987" +VERSION = "9986" import os import select @@ -166,12 +166,7 @@ for x in inputready: if x == self.udp_server_socket: bytes_address_pair = self.udp_server_socket.recvfrom(PACKET_SIZE) - self.station.embargo_queue_lock.acquire() - try: - self.station.handle_udp_data(bytes_address_pair) - except sqlite3.ProgrammingError as ex: - logging.error("sqlite3 concurrency problem") - self.station.embargo_queue_lock.release() + self.station.handle_udp_data(bytes_address_pair) for x in iwtd: if self.client != None: self.client.socket_readable_notification() diff -uNr a/blatta/lib/state.py b/blatta/lib/state.py --- a/blatta/lib/state.py 4f78202d4744a3284c00c4aac9c055f4abae95eea1c51c4acd519a9723a990d4d1fc336254140f75b7d75995a781935a2a6250dd19c7e7610b6643365a47938f +++ b/blatta/lib/state.py 8ceb7b9145164136be138992805eaf4b9565362e11a87af8d28e0f172b45400fe324cd0a24f4221dbd8e694972b03931c7594d0e9f7bbd064f2c86cd0ed25dd0 @@ -3,6 +3,7 @@ import imp import hashlib import logging +import threading from itertools import chain class State(object): @@ -17,37 +18,39 @@ if State.__instance != None: raise Exception("This class is a singleton") else: - self.socket = socket - self.conn = sqlite3.connect(db_path, check_same_thread=False) - self.cursor = self.conn.cursor() - self.cursor.execute("create table if not exists at(handle_id integer,\ - address text not null,\ - port integer not null,\ - active_at datetime default null,\ - updated_at datetime default current_timestamp,\ - unique(handle_id, address, port))") - - self.cursor.execute("create table if not exists wot(peer_id integer primary key)") - - self.cursor.execute("create table if not exists handles(handle_id integer primary key,\ - peer_id integer,\ - handle text,\ - unique(handle))") - - self.cursor.execute("create table if not exists keys(peer_id intenger,\ - key text,\ - used_at datetime default current_timestamp,\ - unique(key))") - - self.cursor.execute("create table if not exists logs(\ - handle text not null,\ - peer_id integer,\ - message_bytes blob not null,\ - created_at datetime default current_timestamp)") - - self.cursor.execute("create table if not exists dedup_queue(\ - hash text not null,\ - created_at datetime default current_timestamp)") + self.write_lock = threading.Lock() + with self.write_lock: + self.socket = socket + self.conn = sqlite3.connect(db_path, check_same_thread=False) + self.cursor = self.conn.cursor() + self.cursor.execute("create table if not exists at(handle_id integer,\ + address text not null,\ + port integer not null,\ + active_at datetime default null,\ + updated_at datetime default current_timestamp,\ + unique(handle_id, address, port))") + + self.cursor.execute("create table if not exists wot(peer_id integer primary key)") + + self.cursor.execute("create table if not exists handles(handle_id integer primary key,\ + peer_id integer,\ + handle text,\ + unique(handle))") + + self.cursor.execute("create table if not exists keys(peer_id intenger,\ + key text,\ + used_at datetime default current_timestamp,\ + unique(key))") + + self.cursor.execute("create table if not exists logs(\ + handle text not null,\ + peer_id integer,\ + message_bytes blob not null,\ + created_at datetime default current_timestamp)") + + self.cursor.execute("create table if not exists dedup_queue(\ + hash text not null,\ + created_at datetime default current_timestamp)") State.__instance = self def get_at(self, handle=None): @@ -76,22 +79,24 @@ def is_duplicate_message(self, message_hash): - self.cursor.execute("delete from dedup_queue where created_at < datetime(current_timestamp, '-1 hour')") - self.conn.commit() - result = self.cursor.execute("select hash from dedup_queue where hash=?", - (message_hash,)).fetchone() - logging.debug("checking if %s is dupe" % message_hash) - if(result != None): - return True - else: - return False + with self.write_lock: + self.cursor.execute("delete from dedup_queue where created_at < datetime(current_timestamp, '-1 hour')") + self.conn.commit() + result = self.cursor.execute("select hash from dedup_queue where hash=?", + (message_hash,)).fetchone() + logging.debug("checking if %s is dupe" % message_hash) + if(result != None): + return True + else: + return False def add_to_dedup_queue(self, message_hash): - self.cursor.execute("insert into dedup_queue(hash)\ - values(?)", - (message_hash,)) - logging.debug("added %s to dedup" % message_hash) - self.conn.commit() + with self.write_lock: + self.cursor.execute("insert into dedup_queue(hash)\ + values(?)", + (message_hash,)) + logging.debug("added %s to dedup" % message_hash) + self.conn.commit() def get_last_message_hash(self, handle, peer_id=None): if peer_id: @@ -112,102 +117,109 @@ return "\x00" * 32 def log(self, handle, message_bytes, peer=None): - if peer != None: - peer_id = peer.peer_id - else: - peer_id = None + with self.write_lock: + if peer != None: + peer_id = peer.peer_id + else: + peer_id = None - self.cursor.execute("insert into logs(handle, peer_id, message_bytes)\ - values(?, ?, ?)", - (handle, peer_id, buffer(message_bytes))) + self.cursor.execute("insert into logs(handle, peer_id, message_bytes)\ + values(?, ?, ?)", + (handle, peer_id, buffer(message_bytes))) def import_at_and_wot(self, at_path): - wot = imp.load_source('wot', at_path) - for peer in wot.peers: - results = self.cursor.execute("select * from handles where handle=? limit 1", - (peer["name"],)).fetchall() - if len(results) == 0: - key = peer["key"] - port = peer["port"] - address = peer["address"] - self.cursor.execute("insert into wot(peer_id) values(null)") - peer_id = self.cursor.lastrowid - self.cursor.execute("insert into handles(peer_id, handle) values(?, ?)", - (peer_id, peer["name"])) - handle_id = self.cursor.lastrowid - self.cursor.execute("insert into at(handle_id, address, port, updated_at) values(?, ?, ?, ?)", - (handle_id, peer["address"], peer["port"], None)) - self.cursor.execute("insert into keys(peer_id, key) values(?, ?)", - (peer_id, key)) + with self.write_lock: + wot = imp.load_source('wot', at_path) + for peer in wot.peers: + results = self.cursor.execute("select * from handles where handle=? limit 1", + (peer["name"],)).fetchall() + if len(results) == 0: + key = peer["key"] + port = peer["port"] + address = peer["address"] + self.cursor.execute("insert into wot(peer_id) values(null)") + peer_id = self.cursor.lastrowid + self.cursor.execute("insert into handles(peer_id, handle) values(?, ?)", + (peer_id, peer["name"])) + handle_id = self.cursor.lastrowid + self.cursor.execute("insert into at(handle_id, address, port, updated_at) values(?, ?, ?, ?)", + (handle_id, peer["address"], peer["port"], None)) + self.cursor.execute("insert into keys(peer_id, key) values(?, ?)", + (peer_id, key)) - self.conn.commit() + self.conn.commit() def update_at(self, peer, set_active_at=True): - row = self.cursor.execute("select handle_id from handles where handle=?", - (peer["handle"],)).fetchone() - if row != None: - handle_id = row[0] - else: - return - - try: - self.cursor.execute("insert into at(handle_id, address, port) values(?, ?, ?)", - (handle_id, peer["address"], peer["port"])) - except sqlite3.IntegrityError as ex: - self.cursor.execute("update at set updated_at = current_timestamp\ - where handle_id=? and address=? and port=?", - (handle_id, peer["address"], peer["port"])) - if set_active_at: - self.cursor.execute("update at set active_at = current_timestamp\ - where handle_id=? and address=? and port=?", - (handle_id, peer["address"], peer["port"])) - self.conn.commit() + with self.write_lock: + row = self.cursor.execute("select handle_id from handles where handle=?", + (peer["handle"],)).fetchone() + if row != None: + handle_id = row[0] + else: + return + + try: + self.cursor.execute("insert into at(handle_id, address, port) values(?, ?, ?)", + (handle_id, peer["address"], peer["port"])) + except sqlite3.IntegrityError as ex: + self.cursor.execute("update at set updated_at = current_timestamp\ + where handle_id=? and address=? and port=?", + (handle_id, peer["address"], peer["port"])) + if set_active_at: + self.cursor.execute("update at set active_at = current_timestamp\ + where handle_id=? and address=? and port=?", + (handle_id, peer["address"], peer["port"])) + self.conn.commit() def add_peer(self, handle): - self.cursor.execute("insert into wot(peer_id) values(null)") - peer_id = self.cursor.lastrowid - self.cursor.execute("insert into handles(peer_id, handle) values(?, ?)", - (peer_id, handle)) - self.conn.commit() + with self.write_lock: + self.cursor.execute("insert into wot(peer_id) values(null)") + peer_id = self.cursor.lastrowid + self.cursor.execute("insert into handles(peer_id, handle) values(?, ?)", + (peer_id, handle)) + self.conn.commit() def remove_peer(self, handle): - # get peer id - - result = self.cursor.execute("select peer_id from handles where handle=?", (handle,)).fetchone() - if result == None: - return - else: - peer_id = result[0] - # get all aliases + with self.write_lock: + # get peer id + result = self.cursor.execute("select peer_id from handles where handle=?", + (handle,)).fetchone() + if result == None: + return + else: + peer_id = result[0] + # get all aliases - handle_ids = self.get_handle_ids_for_peer(peer_id) - for handle_id in handle_ids: - # delete at entries for each alias + handle_ids = self.get_handle_ids_for_peer(peer_id) + for handle_id in handle_ids: + # delete at entries for each alias - self.cursor.execute("delete from at where handle_id=?", (handle_id,)) + self.cursor.execute("delete from at where handle_id=?", (handle_id,)) - self.cursor.execute("delete from handles where peer_id=?", (peer_id,)) + self.cursor.execute("delete from handles where peer_id=?", (peer_id,)) - # delete all keys for peer id + # delete all keys for peer id - self.cursor.execute("delete from keys where peer_id=?", (handle_id,)) - - # delete peer from wot - - self.cursor.execute("delete from wot where peer_id=?", (peer_id,)) - self.conn.commit() + self.cursor.execute("delete from keys where peer_id=?", (handle_id,)) + + # delete peer from wot + + self.cursor.execute("delete from wot where peer_id=?", (peer_id,)) + self.conn.commit() def add_key(self, handle, key): - peer_id = self.cursor.execute("select peer_id from handles where handle=?", (handle,)).fetchone()[0] - if peer_id != None: - self.cursor.execute("insert into keys(peer_id, key) values(?, ?)", (peer_id, key)) - self.conn.commit() + with self.write_lock: + peer_id = self.cursor.execute("select peer_id from handles where handle=?", (handle,)).fetchone()[0] + if peer_id != None: + self.cursor.execute("insert into keys(peer_id, key) values(?, ?)", (peer_id, key)) + self.conn.commit() def remove_key(self, key): - self.cursor.execute("delete from keys where key=?", (key,)) - self.conn.commit() + with self.write_lock: + self.cursor.execute("delete from keys where key=?", (key,)) + self.conn.commit() def get_handle_ids_for_peer(self, peer_id): return list(chain.from_iterable(self.cursor.execute("select handle_id from handles where peer_id=?", @@ -236,8 +248,11 @@ for peer_id in peer_ids: handle = self.cursor.execute("select handle from handles where peer_id=?", (peer_id,)).fetchone()[0] peer = self.get_peer_by_handle(handle) - if not (self.is_duplicate(peers, peer)): - peers.append(peer) + if self.is_duplicate(peers, peer): + continue + if peer.address == None or peer.port == None: + continue + peers.append(peer) return peers @@ -259,8 +274,8 @@ return Peer(self.socket, { "handles": handles, "peer_id": handle_info[1], - "address": address[0] if address else "", - "port": address[1] if address else "", + "address": address[0] if address else None, + "port": address[1] if address else None, "keys": keys }) diff -uNr a/blatta/lib/station.py b/blatta/lib/station.py --- a/blatta/lib/station.py 9e41fdd532e857cec8e4d3407560d8570b8e6b7b713739e6d81622b0a6abcbe5a74a9e1ce70f192be2ce5f5c2d0b2374e433bcb7a1d83e322c24460adf92723a +++ b/blatta/lib/station.py af78fe5ef8ec6919ae03ee8da359debb290963f184560c69c7005d483819040e1d34c5e00e40092f1a962bfbc60922741018fee44a8cb5764fce8b594d29934d @@ -55,16 +55,10 @@ self.deliver(message) return - # if the speaker is in our wot, we need to check if the message is hearsay - if message.speaker in self.state.get_peer_handles(): + # embargo to wait for immediate copy of message + else: self.embargo(message) return - - else: - # skip the embargo and deliver this message with appropriate simple hearsay labeling - message.prefix = "%s[%s]" % (message.speaker, peer.handles[0]) - self.deliver(message) - return elif error_code == STALE_PACKET: logging.debug("[%s:%d] -> stale packet: %s" % packet_info) return @@ -93,18 +87,18 @@ def embargo(self, message): # initialize the key/value to empty array if not in the hash # append message to array - if not message.message_hash in self.embargo_queue.keys(): - self.embargo_queue[message.message_hash] = [] - self.embargo_queue[message.message_hash].append(message) + with self.embargo_queue_lock: + if not message.message_hash in self.embargo_queue.keys(): + self.embargo_queue[message.message_hash] = [] + self.embargo_queue[message.message_hash].append(message) def check_embargo_queue(self): # get a lock so other threads can't mess with the db or the queue - self.embargo_queue_lock.acquire() - self.check_for_immediate_messages() - self.flush_hearsay_messages() + with self.embargo_queue_lock: + self.check_for_immediate_messages() + self.flush_hearsay_messages() - # release the lock - self.embargo_queue_lock.release() + # release the lock # continue the thread loop after interval time.sleep(1) @@ -180,8 +174,7 @@ def send_rubbish(self): logging.debug("sending rubbish...") - self.embargo_queue_lock.acquire() - try: + with self.embargo_queue_lock: if self.client: self.infosec.message(Message({ "speaker": self.client.nickname, @@ -189,8 +182,5 @@ "bounces": 0, "body": self.infosec.gen_rubbish_body() })) - except: - logging.error("Something went wrong attempting to send rubbish") - self.embargo_queue_lock.release() time.sleep(RUBBISH_INTERVAL) threading.Thread(target=self.send_rubbish).start()