diff -uNr a/blatta/blatta b/blatta/blatta --- a/blatta/blatta 87fb4a6177c042b76e8c0e34dc153bc2f574e78b90521124a44cf41e02f57aaf13ba15be758f3a67056f53064c8928f3da47dbc0cffb88019ae5456b6f06a9bf +++ b/blatta/blatta 0dce4472982646ffa031d3c88c98dcdc52c94627a4e046da8d403a501a118e74d9893604b3daab04af8677711a32ccac2d12c3a765a64bd8e36a250e0f8986bb @@ -39,9 +39,8 @@ action="store_true", help="fork and become a daemon") op.add_option( - "--debug", - action="store_true", - help="print debug messages to stdout") + "--log-level", + help="specify priority level for logging: info or debug") op.add_option( "--listen", metavar="X", @@ -71,11 +70,7 @@ op.add_option( "--statedir", metavar="X", - help="save persistent channel state (topic, key) in directory X") - op.add_option( - "--verbose", - action="store_true", - help="be verbose (print some progress messages to stdout)") + help="save persistent channel state (topic) in directory X") if os.name == "posix": op.add_option( "--chroot", @@ -92,7 +87,7 @@ if options.channel_name is None: options.channel_name = "#pest" log_format = "%(levelname)s %(asctime)s: %(message)s" - if options.debug: + if options.log_level == 'debug': logging.basicConfig(level=logging.DEBUG, format=log_format, stream=sys.stdout) else: logging.basicConfig(level=logging.INFO, format=log_format, stream=sys.stdout) @@ -145,5 +140,4 @@ except KeyboardInterrupt: logging.error("Interrupted.") - main(sys.argv) diff -uNr a/blatta/lib/client.py b/blatta/lib/client.py --- a/blatta/lib/client.py 481b05a7a3816e07a04df8c5f0b541f666f4f5975119dfe5609fdd4ecb1a07deefb03e2ed3d804c9f916ad8d6d05d4140f2f479f1064db4b3ef6ec4e75db2130 +++ b/blatta/lib/client.py eac411466552678c425d22b90318ad903bcad61213f50ddc38a35dd9db76c771414f6e63e4df35282fef40c3c0b12177b8856dc41be4887307ab871080fc9e3b @@ -7,12 +7,12 @@ import base64 import traceback import logging -from lib.state import State -from lib.message import Message -from lib.server import VERSION +from state import State +from message import Message +from server import VERSION from funcs import * -from lib.commands import BROADCAST -from lib.commands import DIRECT +from commands import BROADCAST +from commands import DIRECT class Client(object): __linesep_regexp = re.compile(r"\r?\n") @@ -181,8 +181,7 @@ channelname, " ".join(sorted(x for x in self.state.get_peer_handles())))) - self.reply("366 %s %s :End of NAMES list" - % (self.nickname, channelname)) + self.reply("366 %s %s :End of NAMES list" % (self.nickname, channelname)) def list_handler(): if len(arguments) < 1: @@ -289,7 +288,6 @@ return targetname = arguments[0] message = arguments[1] - client = server.get_client(targetname) if server.has_channel(targetname): channel = server.get_channel(targetname) @@ -312,8 +310,6 @@ "bounces": 0, "command": DIRECT })) - if(client): - client.message(formatted_message) def part_handler(): if len(arguments) < 1: @@ -560,9 +556,9 @@ try: handler_table[command]() except KeyError: - self.reply("421 %s %s :Unknown command" % (self.nickname, command)) - stack = traceback.format_exc() - logging.debug(stack) + self.reply("421 %s %s :Unknown command" % (self.nickname, command)) + stack = traceback.format_exc() + logging.debug(stack) def socket_readable_notification(self): try: diff -uNr a/blatta/lib/infosec.py b/blatta/lib/infosec.py --- a/blatta/lib/infosec.py a7f781100f32373df224f187126fcc44dcfb4b4a8bb53e60e1acafdcff0a9cfd71d47f40e50c6c6f3f7c92a5eb4821b8eec22ecac01a84aa1e7f67204a9a220e +++ b/blatta/lib/infosec.py f6b26aa5d1367c471975ff5e943315e7136ab40d21c7a329e030a2cfffc06b4e8c3c143844560db05b9233087617d571b0d460f0c4ff63e61afc58d79fd4a986 @@ -1,12 +1,12 @@ import hashlib -import lib.serpent -from lib.serpent import Serpent -from lib.serpent import serpent_cbc_encrypt -from lib.serpent import serpent_cbc_decrypt +import serpent +from serpent import Serpent +from serpent import serpent_cbc_encrypt +from serpent import serpent_cbc_decrypt from commands import BROADCAST from commands import DIRECT from commands import IGNORE -from lib.message import Message +from message import Message import base64 import binascii import time @@ -33,12 +33,17 @@ MALFORMED_PACKET = 2 INVALID_SIGNATURE = 3 IGNORED = 4 +MESSAGE_LOGGING_FORMAT = "[%s:%d %s] <- %s %s %s" class Infosec(object): def __init__(self, state=None): self.state = state def message(self, message): + if not message.speaker: + logging.error("aborting message send due speaker not being set") + return + # if we are not rebroadcasting we need to set the timestamp if message.timestamp == None: message.original = True @@ -63,8 +68,9 @@ else: message_bytes = message.message_bytes + message_hash = binascii.hexlify(hashlib.sha256(message_bytes).digest()) + if message.command != IGNORE: - message_hash = binascii.hexlify(hashlib.sha256(message_bytes).digest()) logging.debug("generated message_hash: %s" % message_hash) self.state.add_to_dedup_queue(message_hash) self.state.log(message.speaker, message_bytes, target_peer) @@ -72,10 +78,17 @@ if message.command == DIRECT: signed_packet_bytes = self.pack(target_peer, message, message_bytes) target_peer.send(signed_packet_bytes) + logging.info(MESSAGE_LOGGING_FORMAT % (target_peer.address, + target_peer.port, + target_peer.handles[0], + message.body, + message.bounces, + message_hash)) + elif message.command == BROADCAST or message.command == IGNORE: # sanity check if message.message_bytes and message_bytes != message_bytes: - logging.debug("aborting send: message modified by station!") + logging.error("aborting send: message modified by station!") return for peer in self.state.get_keyed_peers(): @@ -86,8 +99,13 @@ signed_packet_bytes = self.pack(peer, message, message_bytes) peer.send(signed_packet_bytes) - else: - pass + if message.command != IGNORE: + logging.info(MESSAGE_LOGGING_FORMAT % (peer.address, + peer.port, + peer.handles[0], + message.body, + message.bounces, + message_hash)) def get_message_bytes(self, message, peer=None): timestamp = message.timestamp diff -uNr a/blatta/lib/server.py b/blatta/lib/server.py --- a/blatta/lib/server.py 543b50fa952dea77a3a28ac4166697877605d330dd65b74b50f2e27393f7aa0c3cc08585515661194d65f23f208818a32d2dff7c3c81385a9473803b4fee61e6 +++ b/blatta/lib/server.py 81527bdb79d3f7c5f42234e6355a530d86fcdc3974a0461782dcd4fe5fc245f8cdca423f4f033cf207224544525ec222a647f8bf56e70bf5a0f04fbb73120d89 @@ -1,10 +1,9 @@ -VERSION = "9986" +VERSION = "9985" import os import select import socket import sys -import sys import tempfile import time import string @@ -12,11 +11,13 @@ import sqlite3 from datetime import datetime from funcs import * -from lib.client import Client -from lib.channel import Channel -from lib.station import Station -from lib.message import Message -from lib.infosec import PACKET_SIZE +from client import Client +from channel import Channel +from station import Station +from station import EMBARGO_INTERVAL +from station import RUBBISH_INTERVAL +from message import Message +from infosec import PACKET_SIZE import imp import pprint import logging @@ -28,7 +29,6 @@ self.channel_name = options.channel_name self.password = options.password self.motdfile = options.motd - self.verbose = options.verbose self.logdir = options.logdir self.chroot = options.chroot self.setuid = options.setuid @@ -37,13 +37,14 @@ self.pp = pprint.PrettyPrinter(indent=4) self.db_path = options.db_path self.address_table_path = options.address_table_path + self.irc_server_address = "127.0.0.1" if options.listen: - self.address = socket.gethostbyname(options.listen) + self.udp_address = socket.gethostbyname(options.listen) else: - self.address = "" + self.udp_address = "" server_name_limit = 63 # From the RFC. - self.name = socket.getfqdn(self.address)[:server_name_limit] + self.name = socket.getfqdn(self.udp_address)[:server_name_limit] self.channels = {} # irc_lower(Channel name) --> Channel instance. self.client = None @@ -124,13 +125,11 @@ def start(self): # Setup UDP first self.udp_server_socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM) - self.udp_server_socket.bind((self.address, self.udp_port)) + self.udp_server_socket.bind((self.udp_address, self.udp_port)) self.station = Station({ "socket": self.udp_server_socket, "db_path": self.db_path, "address_table_path": self.address_table_path }) - self.station.start_embargo_queue_checking() - self.station.start_rubbish() logging.info("Listening for Pest packets on udp port %d." % self.udp_port) serversockets = [] @@ -138,7 +137,7 @@ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: - s.bind((self.address, port)) + s.bind((self.irc_server_address, port)) except socket.error as e: logging.error("Could not bind port %s: %s." % (port, e)) sys.exit(1) @@ -155,36 +154,62 @@ os.setuid(self.setuid[0]) logging.info("Setting uid:gid to %s:%s" % (self.setuid[0], self.setuid[1])) + + # event loop setup last_aliveness_check = time.time() + last_embargo_queue_check = time.time() + last_rubbish_dispatch = time.time() while True: - (inputready,outputready,exceptready) = select.select([self.udp_server_socket],[],[],0) - (iwtd, owtd, ewtd) = select.select( - serversockets + ([self.client.socket] if self.client else []), - [self.client.socket] if self.client and self.client.write_queue_size() > 0 else [], - [], - .2) - for x in inputready: - if x == self.udp_server_socket: - bytes_address_pair = self.udp_server_socket.recvfrom(PACKET_SIZE) - self.station.handle_udp_data(bytes_address_pair) + # we don't want to be listening for client connections if there's already a client connected + if self.client == None: + input_sockets = serversockets + else: + input_sockets = [self.client.socket] + output_sockets = ([self.client.socket] + if self.client and self.client.write_queue_size() > 0 else []) + + # handle tcp socket events + (iwtd, owtd, ewtd) = select.select(input_sockets, output_sockets, [], .2) for x in iwtd: if self.client != None: self.client.socket_readable_notification() else: - (conn, addr) = x.accept() - self.client = Client(self, conn) - self.station.client = self.client - logging.info("Accepted connection from %s:%s." % ( - addr[0], addr[1])) + try: + (conn, addr) = x.accept() + self.client = Client(self, conn) + self.station.client = self.client + logging.info("Accepted connection from %s:%s." % ( + addr[0], addr[1])) + except socket.error as e: + logging.error("Failed to accept new client connection: %s" % e) for x in owtd: if self.client and x == self.client.socket: # client may have been disconnected self.client.socket_writable_notification() + + # handle udp socket events + (inputready,outputready,exceptready) = select.select([self.udp_server_socket],[],[],0) + for x in inputready: + if x == self.udp_server_socket: + bytes_address_pair = self.udp_server_socket.recvfrom(PACKET_SIZE) + self.station.handle_udp_data(bytes_address_pair) + + # ping pong now = time.time() if last_aliveness_check + 10 < now: - if self.client: - self.client.check_aliveness() + if self.client: + self.client.check_aliveness() last_aliveness_check = now + # clear embargo queue if enough time has elapsed + if last_embargo_queue_check + EMBARGO_INTERVAL < now: + self.station.check_embargo_queue() + last_embargo_queue_check = now + + # spray rubbish + if last_rubbish_dispatch + RUBBISH_INTERVAL < now: + self.station.send_rubbish() + last_rubbish_dispatch = now + def create_directory(path): if not os.path.isdir(path): os.makedirs(path) diff -uNr a/blatta/lib/state.py b/blatta/lib/state.py --- a/blatta/lib/state.py 8ceb7b9145164136be138992805eaf4b9565362e11a87af8d28e0f172b45400fe324cd0a24f4221dbd8e694972b03931c7594d0e9f7bbd064f2c86cd0ed25dd0 +++ b/blatta/lib/state.py 034e0af7be67265b08fa39744e1ae5637f0b6bf28021748b964cbc1dcb5147f95b6b94ee94160946c54198669f4dad4f1c98552ade5b16bb78f327c20958777f @@ -1,9 +1,8 @@ -from lib.peer import Peer +from peer import Peer import sqlite3 import imp import hashlib import logging -import threading from itertools import chain class State(object): @@ -18,59 +17,61 @@ if State.__instance != None: raise Exception("This class is a singleton") else: - self.write_lock = threading.Lock() - with self.write_lock: - self.socket = socket - self.conn = sqlite3.connect(db_path, check_same_thread=False) - self.cursor = self.conn.cursor() - self.cursor.execute("create table if not exists at(handle_id integer,\ - address text not null,\ - port integer not null,\ - active_at datetime default null,\ - updated_at datetime default current_timestamp,\ - unique(handle_id, address, port))") - - self.cursor.execute("create table if not exists wot(peer_id integer primary key)") - - self.cursor.execute("create table if not exists handles(handle_id integer primary key,\ - peer_id integer,\ - handle text,\ - unique(handle))") - - self.cursor.execute("create table if not exists keys(peer_id intenger,\ - key text,\ - used_at datetime default current_timestamp,\ - unique(key))") - - self.cursor.execute("create table if not exists logs(\ - handle text not null,\ - peer_id integer,\ - message_bytes blob not null,\ - created_at datetime default current_timestamp)") - - self.cursor.execute("create table if not exists dedup_queue(\ - hash text not null,\ - created_at datetime default current_timestamp)") + self.socket = socket + self.conn = sqlite3.connect(db_path, check_same_thread=False) + cursor = self.cursor() + cursor.execute("create table if not exists at(handle_id integer,\ + address text not null,\ + port integer not null,\ + active_at datetime default null,\ + updated_at datetime default current_timestamp,\ + unique(handle_id, address, port))") + + cursor.execute("create table if not exists wot(peer_id integer primary key)") + + cursor.execute("create table if not exists handles(handle_id integer primary key,\ + peer_id integer,\ + handle text,\ + unique(handle))") + + cursor.execute("create table if not exists keys(peer_id intenger,\ + key text,\ + used_at datetime default current_timestamp,\ + unique(key))") + + cursor.execute("create table if not exists logs(\ + handle text not null,\ + peer_id integer,\ + message_bytes blob not null,\ + created_at datetime default current_timestamp)") + + cursor.execute("create table if not exists dedup_queue(\ + hash text not null,\ + created_at datetime default current_timestamp)") State.__instance = self + def cursor(self): + return self.conn.cursor() + def get_at(self, handle=None): + cursor = self.cursor() at = [] if handle == None: - results = self.cursor.execute("select handle_id,address,port,active_at from at\ + results = cursor.execute("select handle_id,address,port,active_at from at\ order by updated_at desc").fetchall() else: - result = self.cursor.execute("select handle_id from handles where handle=?", + result = cursor.execute("select handle_id from handles where handle=?", (handle,)).fetchone() if None != result: handle_id = result[0] else: return [] - results = self.cursor.execute("select handle_id,address,port,active_at from at \ + results = cursor.execute("select handle_id,address,port,active_at from at \ where handle_id=? order by updated_at desc", (handle_id,)).fetchall() for result in results: handle_id, address, port, updated_at = result - h = self.cursor.execute("select handle from handles where handle_id=?", + h = cursor.execute("select handle from handles where handle_id=?", (handle_id,)).fetchone()[0] at.append({"handle": h, "address": "%s:%s" % (address, port), @@ -79,34 +80,35 @@ def is_duplicate_message(self, message_hash): - with self.write_lock: - self.cursor.execute("delete from dedup_queue where created_at < datetime(current_timestamp, '-1 hour')") - self.conn.commit() - result = self.cursor.execute("select hash from dedup_queue where hash=?", - (message_hash,)).fetchone() - logging.debug("checking if %s is dupe" % message_hash) - if(result != None): - return True - else: - return False + cursor = self.cursor() + cursor.execute("delete from dedup_queue where created_at < datetime(current_timestamp, '-1 hour')") + self.conn.commit() + result = cursor.execute("select hash from dedup_queue where hash=?", + (message_hash,)).fetchone() + logging.debug("checking if %s is dupe" % message_hash) + if(result != None): + return True + else: + return False def add_to_dedup_queue(self, message_hash): - with self.write_lock: - self.cursor.execute("insert into dedup_queue(hash)\ - values(?)", - (message_hash,)) - logging.debug("added %s to dedup" % message_hash) - self.conn.commit() + cursor = self.cursor() + cursor.execute("insert into dedup_queue(hash)\ + values(?)", + (message_hash,)) + logging.debug("added %s to dedup" % message_hash) + self.conn.commit() def get_last_message_hash(self, handle, peer_id=None): + cursor = self.cursor() if peer_id: - message_bytes = self.cursor.execute("select message_bytes from logs\ + message_bytes = cursor.execute("select message_bytes from logs\ where handle=? and peer_id=?\ order by created_at desc limit 1", (handle, peer_id)).fetchone() else: - message_bytes = self.cursor.execute("select message_bytes from logs\ + message_bytes = cursor.execute("select message_bytes from logs\ where handle=? and peer_id is null\ order by created_at desc limit 1", (handle,)).fetchone() @@ -117,121 +119,125 @@ return "\x00" * 32 def log(self, handle, message_bytes, peer=None): - with self.write_lock: - if peer != None: - peer_id = peer.peer_id - else: - peer_id = None + cursor = self.cursor() + if peer != None: + peer_id = peer.peer_id + else: + peer_id = None - self.cursor.execute("insert into logs(handle, peer_id, message_bytes)\ - values(?, ?, ?)", - (handle, peer_id, buffer(message_bytes))) + cursor.execute("insert into logs(handle, peer_id, message_bytes)\ + values(?, ?, ?)", + (handle, peer_id, buffer(message_bytes))) def import_at_and_wot(self, at_path): - with self.write_lock: - wot = imp.load_source('wot', at_path) - for peer in wot.peers: - results = self.cursor.execute("select * from handles where handle=? limit 1", - (peer["name"],)).fetchall() - if len(results) == 0: - key = peer["key"] - port = peer["port"] - address = peer["address"] - self.cursor.execute("insert into wot(peer_id) values(null)") - peer_id = self.cursor.lastrowid - self.cursor.execute("insert into handles(peer_id, handle) values(?, ?)", - (peer_id, peer["name"])) - handle_id = self.cursor.lastrowid - self.cursor.execute("insert into at(handle_id, address, port, updated_at) values(?, ?, ?, ?)", - (handle_id, peer["address"], peer["port"], None)) - self.cursor.execute("insert into keys(peer_id, key) values(?, ?)", - (peer_id, key)) + cursor = self.cursor() + wot = imp.load_source('wot', at_path) + for peer in wot.peers: + results = cursor.execute("select * from handles where handle=? limit 1", + (peer["name"],)).fetchall() + if len(results) == 0: + key = peer["key"] + port = peer["port"] + address = peer["address"] + cursor.execute("insert into wot(peer_id) values(null)") + peer_id = cursor.lastrowid + cursor.execute("insert into handles(peer_id, handle) values(?, ?)", + (peer_id, peer["name"])) + handle_id = cursor.lastrowid + cursor.execute("insert into at(handle_id, address, port, updated_at) values(?, ?, ?, ?)", + (handle_id, peer["address"], peer["port"], None)) + cursor.execute("insert into keys(peer_id, key) values(?, ?)", + (peer_id, key)) - self.conn.commit() + self.conn.commit() def update_at(self, peer, set_active_at=True): - with self.write_lock: - row = self.cursor.execute("select handle_id from handles where handle=?", - (peer["handle"],)).fetchone() - if row != None: - handle_id = row[0] - else: - return - - try: - self.cursor.execute("insert into at(handle_id, address, port) values(?, ?, ?)", - (handle_id, peer["address"], peer["port"])) - except sqlite3.IntegrityError as ex: - self.cursor.execute("update at set updated_at = current_timestamp\ - where handle_id=? and address=? and port=?", - (handle_id, peer["address"], peer["port"])) - if set_active_at: - self.cursor.execute("update at set active_at = current_timestamp\ - where handle_id=? and address=? and port=?", - (handle_id, peer["address"], peer["port"])) - self.conn.commit() + cursor = self.cursor() + row = cursor.execute("select handle_id from handles where handle=?", + (peer["handle"],)).fetchone() + if row != None: + handle_id = row[0] + else: + return + + try: + cursor.execute("insert into at(handle_id, address, port) values(?, ?, ?)", + (handle_id, peer["address"], peer["port"])) + except sqlite3.IntegrityError as ex: + cursor.execute("update at set updated_at = current_timestamp\ + where handle_id=? and address=? and port=?", + (handle_id, peer["address"], peer["port"])) + if set_active_at: + cursor.execute("update at set active_at = current_timestamp\ + where handle_id=? and address=? and port=?", + (handle_id, peer["address"], peer["port"])) + self.conn.commit() def add_peer(self, handle): - with self.write_lock: - self.cursor.execute("insert into wot(peer_id) values(null)") - peer_id = self.cursor.lastrowid - self.cursor.execute("insert into handles(peer_id, handle) values(?, ?)", - (peer_id, handle)) - self.conn.commit() + cursor = self.cursor() + cursor.execute("insert into wot(peer_id) values(null)") + peer_id = self.cursor.lastrowid + cursor.execute("insert into handles(peer_id, handle) values(?, ?)", + (peer_id, handle)) + self.conn.commit() def remove_peer(self, handle): - with self.write_lock: - # get peer id - result = self.cursor.execute("select peer_id from handles where handle=?", - (handle,)).fetchone() - if result == None: - return - else: - peer_id = result[0] - # get all aliases + cursor = self.cursor() + + # get peer id + result = cursor.execute("select peer_id from handles where handle=?", + (handle,)).fetchone() + if result == None: + return + else: + peer_id = result[0] + # get all aliases - handle_ids = self.get_handle_ids_for_peer(peer_id) - for handle_id in handle_ids: - # delete at entries for each alias + handle_ids = self.get_handle_ids_for_peer(peer_id) + for handle_id in handle_ids: + # delete at entries for each alias - self.cursor.execute("delete from at where handle_id=?", (handle_id,)) + cursor.execute("delete from at where handle_id=?", (handle_id,)) - self.cursor.execute("delete from handles where peer_id=?", (peer_id,)) + cursor.execute("delete from handles where peer_id=?", (peer_id,)) - # delete all keys for peer id + # delete all keys for peer id - self.cursor.execute("delete from keys where peer_id=?", (handle_id,)) - - # delete peer from wot - - self.cursor.execute("delete from wot where peer_id=?", (peer_id,)) - self.conn.commit() + cursor.execute("delete from keys where peer_id=?", (handle_id,)) + + # delete peer from wot + + cursor.execute("delete from wot where peer_id=?", (peer_id,)) + self.conn.commit() def add_key(self, handle, key): - with self.write_lock: - peer_id = self.cursor.execute("select peer_id from handles where handle=?", (handle,)).fetchone()[0] - if peer_id != None: - self.cursor.execute("insert into keys(peer_id, key) values(?, ?)", (peer_id, key)) - self.conn.commit() + cursor = self.cursor() + peer_id = cursor.execute("select peer_id from handles where handle=?", (handle,)).fetchone()[0] + if peer_id != None: + cursor.execute("insert into keys(peer_id, key) values(?, ?)", (peer_id, key)) + self.conn.commit() def remove_key(self, key): - with self.write_lock: - self.cursor.execute("delete from keys where key=?", (key,)) - self.conn.commit() + cursor = self.cursor() + cursor.execute("delete from keys where key=?", (key,)) + self.conn.commit() def get_handle_ids_for_peer(self, peer_id): - return list(chain.from_iterable(self.cursor.execute("select handle_id from handles where peer_id=?", + cursor = self.cursor() + return list(chain.from_iterable(cursor.execute("select handle_id from handles where peer_id=?", (peer_id,)).fetchall())) def get_peer_handles(self): - handles = self.listify(self.cursor.execute("select handle from handles").fetchall()) + cursor = self.cursor() + handles = self.listify(cursor.execute("select handle from handles").fetchall()) return handles def get_peers(self): + cursor = self.cursor() peers = [] - handles = self.cursor.execute("select handle from handles").fetchall() + handles = cursor.execute("select handle from handles").fetchall() for handle in handles: peer = self.get_peer_by_handle(handle[0]) @@ -243,10 +249,11 @@ return list(chain.from_iterable(results)) def get_keyed_peers(self): - peer_ids = self.listify(self.cursor.execute("select peer_id from keys").fetchall()) + cursor = self.cursor() + peer_ids = self.listify(cursor.execute("select peer_id from keys").fetchall()) peers = [] for peer_id in peer_ids: - handle = self.cursor.execute("select handle from handles where peer_id=?", (peer_id,)).fetchone()[0] + handle = cursor.execute("select handle from handles where peer_id=?", (peer_id,)).fetchone()[0] peer = self.get_peer_by_handle(handle) if self.is_duplicate(peers, peer): continue @@ -257,18 +264,19 @@ def get_peer_by_handle(self, handle): - handle_info = self.cursor.execute("select handle_id, peer_id from handles where handle=?", + cursor = self.cursor() + handle_info = cursor.execute("select handle_id, peer_id from handles where handle=?", (handle,)).fetchone() if handle_info == None: return None - address = self.cursor.execute("select address, port from at where handle_id=?\ + address = cursor.execute("select address, port from at where handle_id=?\ order by updated_at desc limit 1", (handle_info[0],)).fetchone() - handles = self.listify(self.cursor.execute("select handle from handles where peer_id=?", + handles = self.listify(cursor.execute("select handle from handles where peer_id=?", (handle_info[1],)).fetchall()) - keys = self.listify(self.cursor.execute("select key from keys where peer_id=?\ + keys = self.listify(cursor.execute("select key from keys where peer_id=?\ order by used_at desc", (handle_info[1],)).fetchall()) return Peer(self.socket, { diff -uNr a/blatta/lib/station.py b/blatta/lib/station.py --- a/blatta/lib/station.py af78fe5ef8ec6919ae03ee8da359debb290963f184560c69c7005d483819040e1d34c5e00e40092f1a962bfbc60922741018fee44a8cb5764fce8b594d29934d +++ b/blatta/lib/station.py 05f39e51685760a96a63d9d96565ff66c8d6a44a521787af7e992ec7f0fd50b589df8ee69ff81db69823d98cbe223fb157047157f43c43f4ccc504498530f027 @@ -1,23 +1,23 @@ import time -import threading import binascii import logging import os -from lib.state import State -from lib.infosec import MAX_BOUNCES -from lib.infosec import STALE_PACKET -from lib.infosec import DUPLICATE_PACKET -from lib.infosec import MALFORMED_PACKET -from lib.infosec import INVALID_SIGNATURE -from lib.infosec import IGNORED -from lib.infosec import Infosec +from state import State +from infosec import MAX_BOUNCES +from infosec import STALE_PACKET +from infosec import DUPLICATE_PACKET +from infosec import MALFORMED_PACKET +from infosec import INVALID_SIGNATURE +from infosec import IGNORED +from infosec import Infosec from commands import IGNORE -from lib.message import Message +from message import Message from commands import BROADCAST from commands import DIRECT -from lib.peer import Peer +from peer import Peer RUBBISH_INTERVAL = 10 +EMBARGO_INTERVAL = 1 class Station(object): def __init__(self, options): @@ -27,14 +27,6 @@ self.state.import_at_and_wot(options.get("address_table_path")) self.infosec = Infosec(self.state) self.embargo_queue = {} - self.embargo_queue_lock = threading.Lock() - - def start_embargo_queue_checking(self): - threading.Thread(target=self.check_embargo_queue).start() - - def start_rubbish(self): - pass - threading.Thread(target=self.send_rubbish).start() def handle_udp_data(self, bytes_address_pair): data = bytes_address_pair[0] @@ -47,7 +39,12 @@ message = self.infosec.unpack(peer, data) error_code = message.error_code if(error_code == None): - logging.debug("%s(%s) -> %s bounces: %d" % (message.speaker, peer.handles[0], message.body, message.bounces)) + logging.info("[%s:%d %s] -> %s %d %s" % (peer.address, + peer.port, + peer.handles[0], + message.body, + message.bounces, + message.message_hash)) self.conditionally_update_at(peer, message, address) # if this is a direct message, just deliver it and return @@ -87,22 +84,14 @@ def embargo(self, message): # initialize the key/value to empty array if not in the hash # append message to array - with self.embargo_queue_lock: - if not message.message_hash in self.embargo_queue.keys(): - self.embargo_queue[message.message_hash] = [] - self.embargo_queue[message.message_hash].append(message) + if not message.message_hash in self.embargo_queue.keys(): + self.embargo_queue[message.message_hash] = [] + self.embargo_queue[message.message_hash].append(message) def check_embargo_queue(self): # get a lock so other threads can't mess with the db or the queue - with self.embargo_queue_lock: - self.check_for_immediate_messages() - self.flush_hearsay_messages() - - # release the lock - - # continue the thread loop after interval - time.sleep(1) - threading.Thread(target=self.check_embargo_queue).start() + self.check_for_immediate_messages() + self.flush_hearsay_messages() def check_for_immediate_messages(self): for key in dict(self.embargo_queue).keys(): @@ -169,18 +158,14 @@ message.bounces = message.bounces + 1 self.infosec.message(message) else: - logging.debug("[%s:%d] -> packet TTL expired: %s" % packet_info) + logging.debug("message TTL expired: %s" % message.message_hash) def send_rubbish(self): - logging.debug("sending rubbish...") - with self.embargo_queue_lock: - if self.client: - self.infosec.message(Message({ - "speaker": self.client.nickname, - "command": IGNORE, - "bounces": 0, - "body": self.infosec.gen_rubbish_body() - })) - time.sleep(RUBBISH_INTERVAL) - threading.Thread(target=self.send_rubbish).start() + if self.client: + self.infosec.message(Message({ + "speaker": self.client.nickname, + "command": IGNORE, + "bounces": 0, + "body": self.infosec.gen_rubbish_body() + })) diff -uNr a/blatta/start_test_net.sh b/blatta/start_test_net.sh --- a/blatta/start_test_net.sh 24a5c19318989da9f79790107499e2ebda16bc5389b739e4e3ae686c3ff024317517203b9c5c3324ae1a391a63f94939e22c8de730e758ecbc6afee4f54e108d +++ b/blatta/start_test_net.sh 2d26b4e88714111204fdffb51aa603bc94f517781589411bd72be924abe9fc4ddf6f344ebf3dec070580c4211b0fd533b3c8a45617f8c07a685166a33ae3118d @@ -1,6 +1,6 @@ #!/bin/bash # start 3 servers on different ports -./blatta --debug --channel-name \#aleth --irc-port 9968 --udp-port 7778 --db-path a.db --address-table-path test_net_configs/a.py > logs/a & -./blatta --debug --channel-name \#aleth --irc-port 6669 --udp-port 7779 --db-path b.db --address-table-path test_net_configs/b.py > logs/b & -./blatta --debug --channel-name \#aleth --irc-port 6670 --udp-port 7780 --db-path c.db --address-table-path test_net_configs/c.py > logs/c & +./blatta --log-level info --channel-name \#aleth --irc-port 9968 --udp-port 7778 --db-path a.db --address-table-path test_net_configs/a.py > logs/a & +./blatta --log-level info --channel-name \#aleth --irc-port 6669 --udp-port 7779 --db-path b.db --address-table-path test_net_configs/b.py > logs/b & +./blatta --log-level debug --channel-name \#aleth --irc-port 6670 --udp-port 7780 --db-path c.db --address-table-path test_net_configs/c.py > logs/c &