- F53458362F6D7C18A066F42DBA0D1F8370DEF5BE9DD1A89A972082F3D387E34881CC879688AD66D1F5FE0D5930132FFCC176D32FA34487C0F5226394C17E6169+ 24C203C6741B7EB17BF86C2A1046D5BE50F4FDECC7E9B4E5316E2F9101BEDEE9B84CE01A666EDF98F6545574571358E5B2EBC19B12C941F006BC1182DC2992F2blatta/lib/station.py(1 . 28)(1 . 47)
2516 import time
2517 VERSION = 9982
2518 
2519 import binascii
2520 import logging
2521 import os
2522 
2523 from lib.broadcast import Broadcast
2524 from lib.direct import Direct
2525 from state import State
2526 from infosec import STALE_PACKET 
2527 from infosec import DUPLICATE_PACKET 
2528 from infosec import MALFORMED_PACKET 
2529 from infosec import INVALID_SIGNATURE 
2530 from infosec import IGNORED 
2531 from infosec import Infosec
2532 from commands import IGNORE 
2533 from getdata import GetData
2534 from message import STALE_PACKET, OUT_OF_ORDER_NET, OUT_OF_ORDER_SELF, OUT_OF_ORDER_BOTH
2535 from message import DUPLICATE_PACKET 
2536 from message import MALFORMED_PACKET 
2537 from message import INVALID_SIGNATURE 
2538 from message import UNSUPPORTED_VERSION
2539 from message import Message
2540 from commands import BROADCAST
2541 from commands import DIRECT 
2542 from peer import Peer
2543 from ignore import Ignore
2544 from server import Server
2545 from long_buffer import LongBuffer
2546 from order_buffer import OrderBuffer
2547 from short_buffer import ShortBuffer
2548 from commands import BROADCAST, DIRECT, GETDATA, IGNORE
2549 
2550 
2551 class Station(object):
2552     def __init__(self, options):
2553     def __init__(self, cmd_line_options):
2554         self.client = None
2555         self.state = State.get_instance(options["socket"], options["db_path"])
2556         if options.get("address_table_path") != None:
2557             self.state.import_at_and_wot(options.get("address_table_path"))
2558         self.infosec = Infosec(self.state)
2559         self.embargo_queue = {} 
2560         self.socket = None
2561         self.state = State(self, cmd_line_options.db_path)
2562         if cmd_line_options.address_table_path is not None:
2563             self.state.import_at_and_wot(cmd_line_options.address_table_path)
2564         self.short_buffer = ShortBuffer(self.state)
2565         self.long_buffer = LongBuffer(self.state)
2566         self.order_buffer = OrderBuffer(self.state)
2567         self.server = Server(cmd_line_options, self)
2568         self.handlers = {
2569             DIRECT: self.handle_direct,
2570             BROADCAST: self.handle_broadcast,
2571             GETDATA: self.handle_getdata,
2572             IGNORE: self.handle_ignore
2573         }
2574 
2575     def start(self):
2576         self.server.start()
2577 
2578     def handle_udp_data(self, bytes_address_pair):
2579         data = bytes_address_pair[0]
(30 . 118)(49 . 173)
2581         packet_info = (address[0],
2582                        address[1], 
2583                        binascii.hexlify(data)[0:16])
2584         logging.debug("[%s:%d] -> %s" % packet_info)
2585         for peer in self.state.get_keyed_peers():
2586             message = self.infosec.unpack(peer, data)
2587             error_code = message.error_code
2588             if(error_code == None):
2589                 logging.info("[%s:%d %s] -> %s %d %s" % (peer.address,
2590                     peer.port,
2591                     peer.handles[0],
2592                     message.body,
2593                     message.bounces,
2594                     message.message_hash))
2595                 self.conditionally_update_at(peer, message, address)
2596 
2597                 # if this is a direct message, just deliver it and return
2598                 if message.command == DIRECT:
2599                     self.deliver(message)
2600                     return
2601 
2602                 # embargo to wait for immediate copy of message 
2603                 else:
2604                     self.embargo(message)
2605                     return
2606             elif error_code == STALE_PACKET:
2607                 logging.debug("[%s:%d] -> stale packet: %s" % packet_info)
2608                 return 
2609             elif error_code == DUPLICATE_PACKET:
2610                 logging.debug("[%s:%d] -> duplicate packet: %s" % packet_info)
2611                 return 
2612             elif error_code == MALFORMED_PACKET:
2613                 logging.debug("[%s:%d] -> malformed packet: %s" % packet_info)
2614                 return 
2615             elif error_code == IGNORED:
2616                 self.conditionally_update_at(peer, message, address)
2617                 logging.debug("[%s:%d] -> ignoring packet: %s" % packet_info)
2618                 return 
2619             elif error_code == INVALID_SIGNATURE:
2620                 pass 
2621         logging.debug("[%s:%d] -> martian packet: %s" % packet_info)
2622     
2623     def deliver(self, message):
2624         # add to duplicate queue
2625         self.state.add_to_dedup_queue(message.message_hash)
2626 
2627         # send to the irc client
2628         if self.client:
2629             self.client.message_from_station(message)
2630 
2631     def embargo(self, message):
2632         # initialize the key/value to empty array if not in the hash
2633         # append message to array
2634         if not message.message_hash in self.embargo_queue.keys():
2635             self.embargo_queue[message.message_hash] = []
2636         self.embargo_queue[message.message_hash].append(message)
2637 
2638     def check_embargo_queue(self):
2639         # get a lock so other threads can't mess with the db or the queue
2640         self.check_for_immediate_messages()
2641         self.flush_hearsay_messages()
2642 
2643     def check_for_immediate_messages(self):
2644         for key in dict(self.embargo_queue).keys():
2645             messages = self.embargo_queue[key]
2646 
2647             for message in messages:
2648 
2649                 # if this is an immediate copy of the message
2650 
2651                 if message.speaker in message.peer.handles:
2652 
2653                     # clear the queue and deliver
2654 
2655                     self.embargo_queue.pop(key, None)
2656                     self.deliver(message)
2657                     self.rebroadcast(message)           
2658                     break 
2659         metadata = {
2660             'address': address,
2661             'packet_info': packet_info,
2662         }
2663 
2664         for peer in self.state.get_keyed_peers():
2665             message = Message.unpack(peer,
2666                                      data,
2667                                      self.long_buffer,
2668                                      self.order_buffer,
2669                                      metadata,
2670                                      self.state)
2671 
2672             if message.get('error_code') is INVALID_SIGNATURE:
2673                 continue
2674 
2675             if message.get('error_code') in [None,
2676                                              UNSUPPORTED_VERSION,
2677                                              STALE_PACKET,
2678                                              OUT_OF_ORDER_BOTH,
2679                                              OUT_OF_ORDER_SELF,
2680                                              OUT_OF_ORDER_NET,
2681                                              DUPLICATE_PACKET,
2682                                              MALFORMED_PACKET]:
2683                 break
2684 
2685         if message.get('error_code') is None:
2686             if message['command'] == DIRECT:
2687                 self.handle_message(Direct(message, self.state))
2688             elif message['command'] == BROADCAST:
2689                 self.handle_message(Broadcast(message, self.state))
2690             elif message['command'] == GETDATA:
2691                 # This is a little weird.  We don't want to instantiate a GetData
2692                 # object here because that would switch around body and self_chain,
2693                 # so let's just instantiate a generic Message and make sure to handle
2694                 # it as a GetData message.
2695                 self.handle_message(Message(message, self.state))
2696             elif message['command'] == IGNORE:
2697                 self.handle_message(Ignore(message, self.state))
2698         else:
2699             self.report_error(message['error_code'], message)
2700 
2701     def flush_hearsay_messages(self):
2702         # if we made it this far either we haven't found any immediate messages
2703         # or we sent them all so we must deliver the remaining hearsay messages
2704         # with the appropriate labeling
2705         for key in dict(self.embargo_queue).keys():
2706 
2707             # collect the source handles
2708             handles = [] 
2709             messages = self.embargo_queue[key]
2710             for message in messages:
2711                 handles.append(message.peer.handles[0])
2712 
2713             # select the message with the lowest bounce count 
2714             message = sorted(messages, key=lambda m: m.bounces)[0]
2715 
2716             # clear the queue
2717             self.embargo_queue.pop(key, None)
2718 
2719     def handle_message(self, message):
2720         try:
2721             self.handlers[message.command](message)
2722         except KeyError:
2723             logging.error("Unknown command, ignoring")
2724 
2725     def handle_direct(self, message):
2726         message.log_incoming(message.peer)
2727         self.deliver(message)
2728         self.long_buffer.intern(message)
2729         self.conditionally_update_at(message, message.metadata["address"])
2730 
2731     def handle_broadcast(self, message):
2732         # it's possible we'll log dupes coming out of the order buffer here
2733         message.log_incoming(message.peer)
2734 
2735         # check if this is an immediate message
2736         if message.speaker in message.peer.handles:
2737             # remove message from short buffer if it was received as hearsay
2738             if self.short_buffer.has(message.message_hash):
2739                 self.short_buffer.drop(message.message_hash)
2740             self.deliver(message)
2741             self.long_buffer.intern(message)
2742             self.state.update_net_chain(message.message_hash)
2743             self.rebroadcast(message)
2744         else:
2745             # embargo to wait for immediate copy of message
2746             self.short_buffer.embargo(message)
2747         self.conditionally_update_at(message, message.metadata["address"])
2748 
2749     def handle_getdata(self, message):
2750         message.log_incoming_getdata(message.peer)
2751 
2752         # check for the requested message
2753         archived_message = self.long_buffer.exhume(message.body)
2754 
2755         # resend it if it exists
2756         if archived_message:
2757             archived_message.retry(message.peer)
2758 
2759     def handle_ignore(self, message):
2760         self.conditionally_update_at(message, message.metadata['address'])
2761         packet_info = message.metadata["packet_info"]
2762         address = packet_info[0]
2763         port = packet_info[1]
2764         packet_sample = packet_info[2]
2765         if os.environ.get('LOG_RUBBISH'):
2766             logging.debug("[%s:%d] -> ignoring packet: %s" % (address, port, packet_sample))
2767         return
2768 
2769     def check_order_buffer(self):
2770         messages = self.order_buffer.dequeue_and_order_mature_messages()
2771         for message in messages:
2772             self.handle_message(message)
2773 
2774     def check_short_buffer(self):
2775         messages = self.short_buffer.flush()
2776         for message_with_stats in messages:
2777             message = message_with_stats['message']
2778             # compute prefix
2779             if len(messages) < 4:
2780             if len(message_with_stats['closest_peers']) < 4:
2781                 handles = []
2782                 for peer in message_with_stats['closest_peers']:
2783                     handles.append(peer.handles[0])
2784                 message.prefix = "%s[%s]" % (message.speaker, "|".join(handles))
2785             else:
2786                 message.prefix = "%s[%d]" % (message.speaker, len(messages))
2787                 message.prefix = "%s[%d]" % (message.speaker, len(message_with_stats['closest_peers']))
2788 
2789             # deliver
2790             self.deliver(message)
2791             self.long_buffer.intern(message)
2792             self.state.update_net_chain(message.message_hash)
2793             message.reporting_peers = message_with_stats['reporting_peers']
2794             self.rebroadcast(message)
2795 
2796     def report_error(self, error_code, message):
2797         packet_info = message['metadata']["packet_info"]
2798         address = packet_info[0]
2799         port = packet_info[1]
2800         packet_sample = packet_info[2]
2801         if error_code == STALE_PACKET:
2802             logging.debug("[%s:%d] -> stale packet: %s" % (address, port, binascii.hexlify(message['message_hash'])))
2803         elif error_code == DUPLICATE_PACKET:
2804             logging.debug("[%s:%d] -> duplicate packet: %s" % (address, port, binascii.hexlify(message['message_hash'])))
2805         elif error_code == MALFORMED_PACKET:
2806             logging.debug("[%s:%d] -> malformed packet: %s" % (address, port, packet_sample))
2807         elif error_code == INVALID_SIGNATURE:
2808             logging.debug("[%s:%d] -> invalid packet signature: %s" % (address, port, packet_sample))
2809         elif error_code == UNSUPPORTED_VERSION:
2810             logging.debug("[%s:%d] -> pest version not supported: %s" % (address, port, packet_sample))
2811         elif error_code == OUT_OF_ORDER_NET:
2812             self.add_message_to_order_buffer_and_send_getdata(message, ['net_chain'])
2813         elif error_code == OUT_OF_ORDER_SELF:
2814             self.add_message_to_order_buffer_and_send_getdata(message, ['self_chain'])
2815         elif error_code == OUT_OF_ORDER_BOTH:
2816             self.add_message_to_order_buffer_and_send_getdata(message, ['self_chain', 'net_chain'])
2817 
2818     def add_message_to_order_buffer_and_send_getdata(self, message, broken_chains):
2819         packet_info = message['metadata']["packet_info"]
2820         address = packet_info[0]
2821         port = packet_info[1]
2822         logging.debug(
2823             "[%s:%d] -> message received out of order: %s" % (address, port, binascii.hexlify(message['message_hash'])))
2824         if not self.order_buffer.has(message['message_hash']):
2825             for chain in broken_chains:
2826                 GetData(message, chain, self.state).send()
2827         self.order_buffer.add(message)
2828 
2829             # send the message to all other peers if it should be propagated
2830             self.rebroadcast(message)           
2831     def deliver(self, message):
2832         # it's possible that these messages are from an order buffer
2833         # dump and their immediate copies may already have been broadcast
2834         # or vice versa so we need to check the long buffer
2835         if self.long_buffer.has(message.message_hash):
2836             return
2837 
2838         # set a timestamp warning if the message is older than the last displayed message.
2839         message.set_warning()
2840 
2841     # we only update the address table if the speaker is same as peer
2842         # send to the irc client
2843         if self.client:
2844             self.client.message_from_station(message)
2845 
2846     def conditionally_update_at(self, peer, message, address):
2847         if message.speaker in peer.handles:
2848     # we only update the address table if the speaker is same as peer
2849     def conditionally_update_at(self, message, address):
2850         if message.speaker in message.peer.handles:
2851             self.state.update_at({
2852                 "handle": message.speaker,
2853                 "address": address[0],
(149 . 19)(223 . 15)
2855             })
2856 
2857     def rebroadcast(self, message):
2858         if message.bounces < int(self.state.get_knob("max_bounces")):
2859             message.command = BROADCAST
2860             message.bounces = message.bounces + 1
2861             self.infosec.message(message) 
2862         else:
2863             logging.debug("message TTL expired: %s" % message.message_hash)
2864 
2865         if not message.get_data_response:
2866             if message.bounces < int(self.state.get_knob("max_bounces")):
2867                 message.bounces = message.bounces + 1
2868                 message.forward()
2869             else:
2870                 logging.debug("message TTL expired: %s" % message.message_hash)
2871 
2872     def send_rubbish(self):
2873         if self.client:
2874             self.infosec.message(Message({
2875             Ignore({
2876                 "speaker": self.client.nickname,
2877                 "command": IGNORE,
2878                 "bounces": 0,
2879                 "body": self.infosec.gen_rubbish_body()
2880             }))
2881             }, self.state).send()