raw
9978-bugfixes           1 import binascii
9978-bugfixes 2 import logging
9982-getdata 3 import time
9982-getdata 4 from broadcast import Broadcast
9982-getdata 5 from direct import Direct
9982-getdata 6 from commands import BROADCAST
9982-getdata 7 from commands import DIRECT
9982-getdata 8
9982-getdata 9
9982-getdata 10 class OrderBuffer(object):
9982-getdata 11 def __init__(self, state):
9982-getdata 12 self.buffer = {}
9982-getdata 13 self.state = state
9982-getdata 14
9982-getdata 15 def add(self, message):
9982-getdata 16 ts = time.time()
9982-getdata 17 if message['command'] == BROADCAST:
9982-getdata 18 m = Broadcast(message, self.state)
9978-bugfixes 19 assert(m.message_hash == message['message_hash'])
9982-getdata 20 elif message['command'] == DIRECT:
9982-getdata 21 m = Direct(message, self.state)
9982-getdata 22 else:
9982-getdata 23 return
9982-getdata 24
9982-getdata 25 if self.buffer.get(ts) is None:
9982-getdata 26 self.buffer[ts] = [m]
9982-getdata 27 else:
9982-getdata 28 self.buffer[ts].append(m)
9982-getdata 29
9982-getdata 30 def expects(self, message_hash):
9982-getdata 31 for value in self.buffer.values():
9982-getdata 32 for message in value:
9982-getdata 33 if message_hash == message.self_chain:
9982-getdata 34 return True
9982-getdata 35 elif message_hash == message.net_chain:
9982-getdata 36 return True
9982-getdata 37 return False
9982-getdata 38
9982-getdata 39 def dequeue_and_order_mature_messages(self):
9982-getdata 40 current_time = time.time()
9982-getdata 41 sorted_messages = sorted(self.buffer.keys())
9982-getdata 42 mature_messages = []
9982-getdata 43 for timestamp in sorted_messages:
9982-getdata 44 if timestamp < current_time - int(self.state.get_knob('order_buffer_expiration_seconds')):
9978-bugfixes 45 for message in self.buffer[timestamp]:
9978-bugfixes 46 mature_messages.append(message)
9978-bugfixes 47 del self.buffer[timestamp]
9982-getdata 48 return sorted(mature_messages, key=lambda m: m.timestamp)