- F53458362F6D7C18A066F42DBA0D1F8370DEF5BE9DD1A89A972082F3D387E34881CC879688AD66D1F5FE0D5930132FFCC176D32FA34487C0F5226394C17E6169+ 24C203C6741B7EB17BF86C2A1046D5BE50F4FDECC7E9B4E5316E2F9101BEDEE9B84CE01A666EDF98F6545574571358E5B2EBC19B12C941F006BC1182DC2992F2blatta/lib/station.py(1 . 28)(1 . 47)
2516 import time
2517 VERSION = 9982
2518
2519 import binascii
2520 import logging
2521 import os
2522
2523 from lib.broadcast import Broadcast
2524 from lib.direct import Direct
2525 from state import State
2526 from infosec import STALE_PACKET
2527 from infosec import DUPLICATE_PACKET
2528 from infosec import MALFORMED_PACKET
2529 from infosec import INVALID_SIGNATURE
2530 from infosec import IGNORED
2531 from infosec import Infosec
2532 from commands import IGNORE
2533 from getdata import GetData
2534 from message import STALE_PACKET, OUT_OF_ORDER_NET, OUT_OF_ORDER_SELF, OUT_OF_ORDER_BOTH
2535 from message import DUPLICATE_PACKET
2536 from message import MALFORMED_PACKET
2537 from message import INVALID_SIGNATURE
2538 from message import UNSUPPORTED_VERSION
2539 from message import Message
2540 from commands import BROADCAST
2541 from commands import DIRECT
2542 from peer import Peer
2543 from ignore import Ignore
2544 from server import Server
2545 from long_buffer import LongBuffer
2546 from order_buffer import OrderBuffer
2547 from short_buffer import ShortBuffer
2548 from commands import BROADCAST, DIRECT, GETDATA, IGNORE
2549
2550
2551 class Station(object):
2552 def __init__(self, options):
2553 def __init__(self, cmd_line_options):
2554 self.client = None
2555 self.state = State.get_instance(options["socket"], options["db_path"])
2556 if options.get("address_table_path") != None:
2557 self.state.import_at_and_wot(options.get("address_table_path"))
2558 self.infosec = Infosec(self.state)
2559 self.embargo_queue = {}
2560 self.socket = None
2561 self.state = State(self, cmd_line_options.db_path)
2562 if cmd_line_options.address_table_path is not None:
2563 self.state.import_at_and_wot(cmd_line_options.address_table_path)
2564 self.short_buffer = ShortBuffer(self.state)
2565 self.long_buffer = LongBuffer(self.state)
2566 self.order_buffer = OrderBuffer(self.state)
2567 self.server = Server(cmd_line_options, self)
2568 self.handlers = {
2569 DIRECT: self.handle_direct,
2570 BROADCAST: self.handle_broadcast,
2571 GETDATA: self.handle_getdata,
2572 IGNORE: self.handle_ignore
2573 }
2574
2575 def start(self):
2576 self.server.start()
2577
2578 def handle_udp_data(self, bytes_address_pair):
2579 data = bytes_address_pair[0]
(30 . 118)(49 . 173)
2581 packet_info = (address[0],
2582 address[1],
2583 binascii.hexlify(data)[0:16])
2584 logging.debug("[%s:%d] -> %s" % packet_info)
2585 for peer in self.state.get_keyed_peers():
2586 message = self.infosec.unpack(peer, data)
2587 error_code = message.error_code
2588 if(error_code == None):
2589 logging.info("[%s:%d %s] -> %s %d %s" % (peer.address,
2590 peer.port,
2591 peer.handles[0],
2592 message.body,
2593 message.bounces,
2594 message.message_hash))
2595 self.conditionally_update_at(peer, message, address)
2596
2597 # if this is a direct message, just deliver it and return
2598 if message.command == DIRECT:
2599 self.deliver(message)
2600 return
2601
2602 # embargo to wait for immediate copy of message
2603 else:
2604 self.embargo(message)
2605 return
2606 elif error_code == STALE_PACKET:
2607 logging.debug("[%s:%d] -> stale packet: %s" % packet_info)
2608 return
2609 elif error_code == DUPLICATE_PACKET:
2610 logging.debug("[%s:%d] -> duplicate packet: %s" % packet_info)
2611 return
2612 elif error_code == MALFORMED_PACKET:
2613 logging.debug("[%s:%d] -> malformed packet: %s" % packet_info)
2614 return
2615 elif error_code == IGNORED:
2616 self.conditionally_update_at(peer, message, address)
2617 logging.debug("[%s:%d] -> ignoring packet: %s" % packet_info)
2618 return
2619 elif error_code == INVALID_SIGNATURE:
2620 pass
2621 logging.debug("[%s:%d] -> martian packet: %s" % packet_info)
2622
2623 def deliver(self, message):
2624 # add to duplicate queue
2625 self.state.add_to_dedup_queue(message.message_hash)
2626
2627 # send to the irc client
2628 if self.client:
2629 self.client.message_from_station(message)
2630
2631 def embargo(self, message):
2632 # initialize the key/value to empty array if not in the hash
2633 # append message to array
2634 if not message.message_hash in self.embargo_queue.keys():
2635 self.embargo_queue[message.message_hash] = []
2636 self.embargo_queue[message.message_hash].append(message)
2637
2638 def check_embargo_queue(self):
2639 # get a lock so other threads can't mess with the db or the queue
2640 self.check_for_immediate_messages()
2641 self.flush_hearsay_messages()
2642
2643 def check_for_immediate_messages(self):
2644 for key in dict(self.embargo_queue).keys():
2645 messages = self.embargo_queue[key]
2646
2647 for message in messages:
2648
2649 # if this is an immediate copy of the message
2650
2651 if message.speaker in message.peer.handles:
2652
2653 # clear the queue and deliver
2654
2655 self.embargo_queue.pop(key, None)
2656 self.deliver(message)
2657 self.rebroadcast(message)
2658 break
2659 metadata = {
2660 'address': address,
2661 'packet_info': packet_info,
2662 }
2663
2664 for peer in self.state.get_keyed_peers():
2665 message = Message.unpack(peer,
2666 data,
2667 self.long_buffer,
2668 self.order_buffer,
2669 metadata,
2670 self.state)
2671
2672 if message.get('error_code') is INVALID_SIGNATURE:
2673 continue
2674
2675 if message.get('error_code') in [None,
2676 UNSUPPORTED_VERSION,
2677 STALE_PACKET,
2678 OUT_OF_ORDER_BOTH,
2679 OUT_OF_ORDER_SELF,
2680 OUT_OF_ORDER_NET,
2681 DUPLICATE_PACKET,
2682 MALFORMED_PACKET]:
2683 break
2684
2685 if message.get('error_code') is None:
2686 if message['command'] == DIRECT:
2687 self.handle_message(Direct(message, self.state))
2688 elif message['command'] == BROADCAST:
2689 self.handle_message(Broadcast(message, self.state))
2690 elif message['command'] == GETDATA:
2691 # This is a little weird. We don't want to instantiate a GetData
2692 # object here because that would switch around body and self_chain,
2693 # so let's just instantiate a generic Message and make sure to handle
2694 # it as a GetData message.
2695 self.handle_message(Message(message, self.state))
2696 elif message['command'] == IGNORE:
2697 self.handle_message(Ignore(message, self.state))
2698 else:
2699 self.report_error(message['error_code'], message)
2700
2701 def flush_hearsay_messages(self):
2702 # if we made it this far either we haven't found any immediate messages
2703 # or we sent them all so we must deliver the remaining hearsay messages
2704 # with the appropriate labeling
2705 for key in dict(self.embargo_queue).keys():
2706
2707 # collect the source handles
2708 handles = []
2709 messages = self.embargo_queue[key]
2710 for message in messages:
2711 handles.append(message.peer.handles[0])
2712
2713 # select the message with the lowest bounce count
2714 message = sorted(messages, key=lambda m: m.bounces)[0]
2715
2716 # clear the queue
2717 self.embargo_queue.pop(key, None)
2718
2719 def handle_message(self, message):
2720 try:
2721 self.handlers[message.command](message)
2722 except KeyError:
2723 logging.error("Unknown command, ignoring")
2724
2725 def handle_direct(self, message):
2726 message.log_incoming(message.peer)
2727 self.deliver(message)
2728 self.long_buffer.intern(message)
2729 self.conditionally_update_at(message, message.metadata["address"])
2730
2731 def handle_broadcast(self, message):
2732 # it's possible we'll log dupes coming out of the order buffer here
2733 message.log_incoming(message.peer)
2734
2735 # check if this is an immediate message
2736 if message.speaker in message.peer.handles:
2737 # remove message from short buffer if it was received as hearsay
2738 if self.short_buffer.has(message.message_hash):
2739 self.short_buffer.drop(message.message_hash)
2740 self.deliver(message)
2741 self.long_buffer.intern(message)
2742 self.state.update_net_chain(message.message_hash)
2743 self.rebroadcast(message)
2744 else:
2745 # embargo to wait for immediate copy of message
2746 self.short_buffer.embargo(message)
2747 self.conditionally_update_at(message, message.metadata["address"])
2748
2749 def handle_getdata(self, message):
2750 message.log_incoming_getdata(message.peer)
2751
2752 # check for the requested message
2753 archived_message = self.long_buffer.exhume(message.body)
2754
2755 # resend it if it exists
2756 if archived_message:
2757 archived_message.retry(message.peer)
2758
2759 def handle_ignore(self, message):
2760 self.conditionally_update_at(message, message.metadata['address'])
2761 packet_info = message.metadata["packet_info"]
2762 address = packet_info[0]
2763 port = packet_info[1]
2764 packet_sample = packet_info[2]
2765 if os.environ.get('LOG_RUBBISH'):
2766 logging.debug("[%s:%d] -> ignoring packet: %s" % (address, port, packet_sample))
2767 return
2768
2769 def check_order_buffer(self):
2770 messages = self.order_buffer.dequeue_and_order_mature_messages()
2771 for message in messages:
2772 self.handle_message(message)
2773
2774 def check_short_buffer(self):
2775 messages = self.short_buffer.flush()
2776 for message_with_stats in messages:
2777 message = message_with_stats['message']
2778 # compute prefix
2779 if len(messages) < 4:
2780 if len(message_with_stats['closest_peers']) < 4:
2781 handles = []
2782 for peer in message_with_stats['closest_peers']:
2783 handles.append(peer.handles[0])
2784 message.prefix = "%s[%s]" % (message.speaker, "|".join(handles))
2785 else:
2786 message.prefix = "%s[%d]" % (message.speaker, len(messages))
2787 message.prefix = "%s[%d]" % (message.speaker, len(message_with_stats['closest_peers']))
2788
2789 # deliver
2790 self.deliver(message)
2791 self.long_buffer.intern(message)
2792 self.state.update_net_chain(message.message_hash)
2793 message.reporting_peers = message_with_stats['reporting_peers']
2794 self.rebroadcast(message)
2795
2796 def report_error(self, error_code, message):
2797 packet_info = message['metadata']["packet_info"]
2798 address = packet_info[0]
2799 port = packet_info[1]
2800 packet_sample = packet_info[2]
2801 if error_code == STALE_PACKET:
2802 logging.debug("[%s:%d] -> stale packet: %s" % (address, port, binascii.hexlify(message['message_hash'])))
2803 elif error_code == DUPLICATE_PACKET:
2804 logging.debug("[%s:%d] -> duplicate packet: %s" % (address, port, binascii.hexlify(message['message_hash'])))
2805 elif error_code == MALFORMED_PACKET:
2806 logging.debug("[%s:%d] -> malformed packet: %s" % (address, port, packet_sample))
2807 elif error_code == INVALID_SIGNATURE:
2808 logging.debug("[%s:%d] -> invalid packet signature: %s" % (address, port, packet_sample))
2809 elif error_code == UNSUPPORTED_VERSION:
2810 logging.debug("[%s:%d] -> pest version not supported: %s" % (address, port, packet_sample))
2811 elif error_code == OUT_OF_ORDER_NET:
2812 self.add_message_to_order_buffer_and_send_getdata(message, ['net_chain'])
2813 elif error_code == OUT_OF_ORDER_SELF:
2814 self.add_message_to_order_buffer_and_send_getdata(message, ['self_chain'])
2815 elif error_code == OUT_OF_ORDER_BOTH:
2816 self.add_message_to_order_buffer_and_send_getdata(message, ['self_chain', 'net_chain'])
2817
2818 def add_message_to_order_buffer_and_send_getdata(self, message, broken_chains):
2819 packet_info = message['metadata']["packet_info"]
2820 address = packet_info[0]
2821 port = packet_info[1]
2822 logging.debug(
2823 "[%s:%d] -> message received out of order: %s" % (address, port, binascii.hexlify(message['message_hash'])))
2824 if not self.order_buffer.has(message['message_hash']):
2825 for chain in broken_chains:
2826 GetData(message, chain, self.state).send()
2827 self.order_buffer.add(message)
2828
2829 # send the message to all other peers if it should be propagated
2830 self.rebroadcast(message)
2831 def deliver(self, message):
2832 # it's possible that these messages are from an order buffer
2833 # dump and their immediate copies may already have been broadcast
2834 # or vice versa so we need to check the long buffer
2835 if self.long_buffer.has(message.message_hash):
2836 return
2837
2838 # set a timestamp warning if the message is older than the last displayed message.
2839 message.set_warning()
2840
2841 # we only update the address table if the speaker is same as peer
2842 # send to the irc client
2843 if self.client:
2844 self.client.message_from_station(message)
2845
2846 def conditionally_update_at(self, peer, message, address):
2847 if message.speaker in peer.handles:
2848 # we only update the address table if the speaker is same as peer
2849 def conditionally_update_at(self, message, address):
2850 if message.speaker in message.peer.handles:
2851 self.state.update_at({
2852 "handle": message.speaker,
2853 "address": address[0],
(149 . 19)(223 . 15)
2855 })
2856
2857 def rebroadcast(self, message):
2858 if message.bounces < int(self.state.get_knob("max_bounces")):
2859 message.command = BROADCAST
2860 message.bounces = message.bounces + 1
2861 self.infosec.message(message)
2862 else:
2863 logging.debug("message TTL expired: %s" % message.message_hash)
2864
2865 if not message.get_data_response:
2866 if message.bounces < int(self.state.get_knob("max_bounces")):
2867 message.bounces = message.bounces + 1
2868 message.forward()
2869 else:
2870 logging.debug("message TTL expired: %s" % message.message_hash)
2871
2872 def send_rubbish(self):
2873 if self.client:
2874 self.infosec.message(Message({
2875 Ignore({
2876 "speaker": self.client.nickname,
2877 "command": IGNORE,
2878 "bounces": 0,
2879 "body": self.infosec.gen_rubbish_body()
2880 }))
2881 }, self.state).send()