- 16E7971B6EAB7483A4060D5CAE5111DEC2F61618A2022620343EF7AA3FCEDEE87CC6499C9F9978215C315FDE958E70FA7810F50967E97DD299CD98842118C12D
+ 7F7198C51EB6B00321C1754F1675D907263BF600B8EF67B79641E4A763357FE73935B9F1534234D226F9564E2341AEAAFB50BFCFF5128BFE14404651B8A36EF2
blatta/lib/server.py
(1 . 4)(1 . 4)
745 VERSION = "9988"
746 VERSION = "9987"
747
748 import os
749 import select
(8 . 29)(8 . 18)
751 import tempfile
752 import time
753 import string
754 import binascii
755 import hashlib
756 import datetime
757 import sqlite3
758 from datetime import datetime
759 from funcs import *
760 from lib.client import Client
761 from lib.state import State
762 from lib.channel import Channel
763 from lib.infosec import PACKET_SIZE
764 from lib.infosec import MAX_BOUNCES
765 from lib.infosec import STALE_PACKET
766 from lib.infosec import DUPLICATE_PACKET
767 from lib.infosec import MALFORMED_PACKET
768 from lib.infosec import INVALID_SIGNATURE
769 from lib.infosec import IGNORED
770 from lib.infosec import Infosec
771 from lib.peer import Peer
772 from lib.station import Station
773 from lib.message import Message
774 from funcs import *
775 from commands import BROADCAST
776 from commands import DIRECT
777 from commands import IGNORE
778 from lib.infosec import PACKET_SIZE
779 import imp
780 import pprint
781 import logging
782
783 class Server(object):
784 def __init__(self, options):
(40 . 18)(29 . 14)
786 self.password = options.password
787 self.motdfile = options.motd
788 self.verbose = options.verbose
789 self.debug = options.debug
790 self.logdir = options.logdir
791 self.chroot = options.chroot
792 self.setuid = options.setuid
793 self.statedir = options.statedir
794 self.infosec = Infosec(self)
795 self.config_file_path = options.config_file_path
796 self.state = State(self, options.db_path)
797 self.pp = pprint.PrettyPrinter(indent=4)
798
799 if options.address_table_path != None:
800 self.state.import_at_and_wot(options.address_table_path)
801 self.db_path = options.db_path
802 self.address_table_path = options.address_table_path
803
804 if options.listen:
805 self.address = socket.gethostbyname(options.listen)
(61 . 8)(46 . 9)
807 self.name = socket.getfqdn(self.address)[:server_name_limit]
808
809 self.channels = {} # irc_lower(Channel name) --> Channel instance.
810 self.clients = {} # Socket --> Client instance..peers = ""
811 self.client = None
812 self.nicknames = {} # irc_lower(Nickname) --> Client instance.
813
814 if self.logdir:
815 create_directory(self.logdir)
816 if self.statedir:
(79 . 7)(65 . 7)
818 try:
819 pid = os.fork()
820 if pid > 0:
821 self.print_info("PID: %d" % pid)
822 logging.info("PID: %d" % pid)
823 sys.exit(0)
824 except OSError:
825 sys.exit(1)
(113 . 19)(99 . 6)
827 else:
828 return []
829
830 def print_info(self, msg):
831 if self.verbose:
832 print(msg)
833 sys.stdout.flush()
834
835 def print_debug(self, msg):
836 if self.debug:
837 print("%s %s" % (datetime.now(), msg))
838 sys.stdout.flush()
839
840 def print_error(self, msg):
841 sys.stderr.write("%s\n" % msg)
842
843 def client_changed_nickname(self, client, oldnickname):
844 if oldnickname:
845 del self.nicknames[irc_lower(oldnickname)]
(139 . 118)(112 . 26)
847 def remove_client(self, client, quitmsg):
848 client.message_related(":%s QUIT :%s" % (client.prefix, quitmsg))
849 for x in client.channels.values():
850 client.channel_log(x, "quit (%s)" % quitmsg, meta=True)
851 x.remove_client(client)
852 if client.nickname \
853 and irc_lower(client.nickname) in self.nicknames:
854 del self.nicknames[irc_lower(client.nickname)]
855 del self.clients[client.socket]
856 self.client = None
857
858 def remove_channel(self, channel):
859 del self.channels[irc_lower(channel.name)]
860
861 def handle_udp_data(self, bytes_address_pair):
862 data = bytes_address_pair[0]
863 address = bytes_address_pair[1]
864 packet_info = (address[0],
865 address[1],
866 binascii.hexlify(data)[0:16])
867 self.print_debug("[%s:%d] -> %s" % packet_info)
868 for peer in self.state.get_peers():
869 if peer.get_key() != None:
870 message = self.infosec.unpack(peer, data)
871 error_code = message.error_code
872 if(error_code == None):
873 self.print_debug("[%s] -> %s" % (peer.handles[0], message.body))
874
875 self.conditionally_update_address_table(peer, message, address)
876 # send the message to all clients
877 for c in self.clients:
878 if (self.clients[c].is_addressed_to_me(message.body)):
879 self.clients[c].message(message.body)
880 # send the message to all other peers if it should be propagated
881 if(message.command == BROADCAST) and message.bounces < MAX_BOUNCES:
882 self.rebroadcast(peer, message)
883 return
884 elif error_code == STALE_PACKET:
885 self.print_debug("[%s:%d] -> stale packet: %s" % packet_info)
886 return
887 elif error_code == DUPLICATE_PACKET:
888 self.print_debug("[%s:%d] -> duplicate packet: %s" % packet_info)
889 return
890 elif error_code == MALFORMED_PACKET:
891 self.print_debug("[%s:%d] -> malformed packet: %s" % packet_info)
892 return
893 elif error_code == IGNORED:
894 self.conditionally_update_address_table(peer, message, address)
895 self.print_debug("[%s:%d] -> ignoring packet: %s" % packet_info)
896 return
897 elif error_code == INVALID_SIGNATURE:
898 pass
899 self.print_debug("[%s:%d] -> martian packet: %s" % packet_info)
900
901 # we only update the address table if the speaker is same as peer
902
903 def conditionally_update_address_table(self, peer, message, address):
904 try:
905 idx = peer.handles.index(message.speaker)
906 except:
907 idx = None
908
909 if idx != None:
910 self.state.update_address_table({"handle": message.speaker,
911 "address": address[0],
912 "port": address[1]
913 })
914 def peer_message(self, message):
915 message.original = True
916 if message.command == DIRECT:
917 peer = self.state.get_peer_by_handle(message.handle)
918 message_bytes = self.infosec.get_message_bytes(message, peer)
919 message_hash = binascii.hexlify(hashlib.sha256(message_bytes).digest())
920 self.state.add_to_dedup_queue(message_hash)
921
922 self.state.log(message.speaker, message_bytes, peer.peer_id)
923 if peer and (peer.get_key() != None):
924 peer.send(message)
925 else:
926 self.print_debug("Discarding message to unknown handle or handle with no key: %s" % message.handle)
927 else:
928 message.timestamp = int(time.time())
929 message_bytes = self.infosec.get_message_bytes(message)
930 if message.command != IGNORE:
931 self.state.log(message.speaker, message_bytes)
932 message_hash = binascii.hexlify(hashlib.sha256(message_bytes).digest())
933 self.state.add_to_dedup_queue(message_hash)
934 for peer in self.state.get_peers():
935 if peer.get_key() != None:
936 peer.send(message)
937 else:
938 self.print_debug("Discarding message to handle with no key: %s" % message.handle)
939
940 def rebroadcast(self, source_peer, message):
941 message.original = False
942 for peer in self.state.get_peers():
943 if(peer.peer_id != source_peer.peer_id):
944 message.command = BROADCAST
945 message.bounces = message.bounces + 1
946 peer.send(message)
947
948
949 def sendrubbish(self):
950 for socket in self.clients:
951 self.peer_message(Message({
952 "speaker": self.clients[socket].nickname,
953 "command": IGNORE,
954 "bounces": 0,
955 "body": self.infosec.gen_rubbish_body()
956 }, self))
957
958 def start(self):
959 # Setup UDP first
960 self.udp_server_socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
961 self.udp_server_socket.bind((self.address, self.udp_port))
962 self.print_info("Listening for Pest packets on udp port %d." % self.udp_port)
963 self.station = Station({ "socket": self.udp_server_socket,
964 "db_path": self.db_path,
965 "address_table_path": self.address_table_path
966 })
967 self.station.start_embargo_queue_checking()
968 self.station.start_rubbish()
969 logging.info("Listening for Pest packets on udp port %d." % self.udp_port)
970
971 serversockets = []
972 for port in self.irc_ports:
(259 . 51)(140 . 55)
974 try:
975 s.bind((self.address, port))
976 except socket.error as e:
977 self.print_error("Could not bind port %s: %s." % (port, e))
978 logging.error("Could not bind port %s: %s." % (port, e))
979 sys.exit(1)
980 s.listen(5)
981 serversockets.append(s)
982 del s
983 self.print_info("Listening for IRC connections on port %d." % port)
984 logging.info("Listening for IRC connections on port %d." % port)
985 if self.chroot:
986 os.chdir(self.chroot)
987 os.chroot(self.chroot)
988 self.print_info("Changed root directory to %s" % self.chroot)
989 logging.info("Changed root directory to %s" % self.chroot)
990 if self.setuid:
991 os.setgid(self.setuid[1])
992 os.setuid(self.setuid[0])
993 self.print_info("Setting uid:gid to %s:%s"
994 logging.info("Setting uid:gid to %s:%s"
995 % (self.setuid[0], self.setuid[1]))
996 last_aliveness_check = time.time()
997 while True:
998 (inputready,outputready,exceptready) = select.select([self.udp_server_socket],[],[],0)
999 (iwtd, owtd, ewtd) = select.select(
1000 serversockets + [x.socket for x in self.clients.values()],
1001 [x.socket for x in self.clients.values()
1002 if x.write_queue_size() > 0],
1003 serversockets + ([self.client.socket] if self.client else []),
1004 [self.client.socket] if self.client and self.client.write_queue_size() > 0 else [],
1005 [],
1006 .2)
1007 for x in inputready:
1008 if x == self.udp_server_socket:
1009 bytes_address_pair = self.udp_server_socket.recvfrom(PACKET_SIZE)
1010 self.handle_udp_data(bytes_address_pair)
1011 if x == self.udp_server_socket:
1012 bytes_address_pair = self.udp_server_socket.recvfrom(PACKET_SIZE)
1013 self.station.embargo_queue_lock.acquire()
1014 try:
1015 self.station.handle_udp_data(bytes_address_pair)
1016 except sqlite3.ProgrammingError as ex:
1017 logging.error("sqlite3 concurrency problem")
1018 self.station.embargo_queue_lock.release()
1019 for x in iwtd:
1020 if x in self.clients:
1021 self.clients[x].socket_readable_notification()
1022 if self.client != None:
1023 self.client.socket_readable_notification()
1024 else:
1025 (conn, addr) = x.accept()
1026 self.clients[conn] = Client(self, conn)
1027 self.print_info("Accepted connection from %s:%s." % (
1028 self.client = Client(self, conn)
1029 self.station.client = self.client
1030 logging.info("Accepted connection from %s:%s." % (
1031 addr[0], addr[1]))
1032 for x in owtd:
1033 if x in self.clients: # client may have been disconnected
1034 self.clients[x].socket_writable_notification()
1035 if self.client and x == self.client.socket: # client may have been disconnected
1036 self.client.socket_writable_notification()
1037 now = time.time()
1038 if last_aliveness_check + 10 < now:
1039 for client in self.clients.values():
1040 client.check_aliveness()
1041 last_aliveness_check = now
1042 self.sendrubbish() # Kludge to keep ephemeral port open when NATed
1043 if self.client:
1044 self.client.check_aliveness()
1045 last_aliveness_check = now
1046
1047 def create_directory(path):
1048 if not os.path.isdir(path):