- 9EFA52C31BB08E8F81821B1832F5729D9E3F52CD44CBD9C10ADCC809FD4383129ACE4D7C3615EB1E9B068E13E045DEE2898AAAC1F1EAACA81AD65B5ECE9FF751+ EB3F3EC3FA23E3F350B870585FDBE7CCB548B4B4A266699F943CD97148D4130495D98399F73FA7E8466C61655EA5BCFEE005D1EF07C8FAE66EE91640B9BF0444blatta/lib/message.py(1 . 7)(1 . 59)
1481 import hashlib
1482
1483 from serpent import Serpent
1484 from serpent import serpent_cbc_encrypt
1485 from serpent import serpent_cbc_decrypt
1486 from commands import BROADCAST
1487 from commands import DIRECT
1488 from commands import GETDATA
1489 from commands import IGNORE
1490 from commands import COMMAND_LABELS
1491 import base64
1492 import binascii
1493 import time
1494 import struct
1495 import hmac
1496 import random
1497 import os
1498 import logging
1499
1500 PEST_VERSION = 0xFC
1501 PACKET_SIZE = 496
1502 MAX_SPEAKER_SIZE = 32
1503 TS_ACCEPTABLE_SKEW = 60 * 15
1504 BLACK_PACKET_FORMAT = "<448s48s"
1505 RED_PACKET_FORMAT = "<16sBBxB428s"
1506 RED_PACKET_LENGTH_WITH_PADDING = 448
1507 MESSAGE_PACKET_FORMAT = "<q32s32s32s324s"
1508 GETDATA_MESSAGE_PACKET_FORMAT = "<q32s32s32s32s292s"
1509 MAX_MESSAGE_LENGTH = 428
1510 TEXT_PAYLOAD_SIZE = 324
1511
1512 # error codes
1513 STALE_PACKET = 0
1514 DUPLICATE_PACKET = 1
1515 MALFORMED_PACKET = 2
1516 INVALID_SIGNATURE = 3
1517 UNSUPPORTED_VERSION = 4
1518 OUT_OF_ORDER_SELF = 5
1519 OUT_OF_ORDER_NET = 6
1520 OUT_OF_ORDER_BOTH = 7
1521
1522 # logging formats
1523 OUTGOING_MESSAGE_LOGGING_FORMAT = "[%s:%d %s] <- %s %s %s %s"
1524 INCOMING_MESSAGE_LOGGING_FORMAT = "[%s:%d %s] -> %s %s %d %s"
1525
1526 # fork status values
1527 FORKED = 0
1528 NOT_FORKED = 1
1529 FIRST_MESSAGE = 2
1530 EMPTY_CHAIN = "\x00" * 32
1531
1532 class Message(object):
1533 def __init__(self, message):
1534 self.original = True
1535 def __init__(self, message, state=None):
1536 self.state = state
1537 self.prefix = None
1538 self.fork_status = None
1539 self.handle = message.get("handle")
1540 self.peer = message.get("peer")
1541 self.body = message.get("body")
(14 . 4)(66 . 325)
1543 self.self_chain_valid = message.get("self_chain_valid")
1544 self.error_code = message.get("error_code")
1545 self.message_hash = message.get("message_hash")
1546 self.message_bytes = None
1547 self.message_bytes = message.get("message_bytes")
1548 self.long_buffer = message.get("long_buffer")
1549 self.get_data_response = message.get("get_data_response")
1550 self.metadata = message.get('metadata')
1551 self.original = message.get('original')
1552 self.reporting_peers = message.get('reporting_peers', [])
1553 self.warning = message.get('warning')
1554 self.order_buffer = message.get('order_buffer')
1555 self.error_code = message.get('error_code')
1556
1557 @classmethod
1558 def pack(cls, peer, command, bounces, message_bytes):
1559 key_bytes = base64.b64decode(peer.get_key())
1560 signing_key = key_bytes[:32]
1561 cipher_key = key_bytes[32:]
1562
1563 # pack packet bytes
1564 nonce = cls._generate_nonce(16)
1565 version = PEST_VERSION
1566 red_packet_bytes = struct.pack(RED_PACKET_FORMAT,
1567 nonce,
1568 bounces,
1569 version,
1570 command,
1571 cls._pad(message_bytes, MAX_MESSAGE_LENGTH))
1572
1573 # encrypt packet
1574 serpent = Serpent(cipher_key)
1575 black_packet_bytes = serpent_cbc_encrypt(cipher_key, red_packet_bytes)
1576
1577 # sign packet
1578 signature_bytes = hmac.new(signing_key, black_packet_bytes, hashlib.sha384).digest()
1579
1580 # pack the signed black packet
1581 signed_packet_bytes = struct.pack(BLACK_PACKET_FORMAT, black_packet_bytes, signature_bytes)
1582
1583 return signed_packet_bytes
1584
1585 @classmethod
1586 def unpack(cls, peer, black_packet, long_buffer, order_buffer, metadata, state=None):
1587 # unpack the black packet
1588 for key in peer.keys:
1589 key_bytes = base64.b64decode(key)
1590 signing_key = key_bytes[:32]
1591 cipher_key = key_bytes[32:]
1592
1593 try:
1594 black_packet_bytes, signature_bytes = struct.unpack(BLACK_PACKET_FORMAT,
1595 black_packet)
1596 except:
1597 logging.error("Discarding malformed black packet from %s" % peer.get_key())
1598 return {
1599 "error_code": MALFORMED_PACKET,
1600 "metadata": metadata
1601 }
1602
1603 # check signature
1604 signature_check_bytes = hmac.new(signing_key,
1605 black_packet_bytes,
1606 hashlib.sha384).digest()
1607
1608 if(signature_check_bytes != signature_bytes):
1609 continue
1610
1611 # try to decrypt black packet
1612 Serpent(cipher_key)
1613 red_packet_bytes = serpent_cbc_decrypt(cipher_key, black_packet_bytes)
1614
1615 # unpack red packet
1616 try:
1617 nonce, bounces, version, command, message_bytes = struct.unpack(
1618 RED_PACKET_FORMAT,
1619 red_packet_bytes
1620 )
1621
1622 # if red_packet_bytes was never set, no matching key for the sig was found
1623 # this is expected to happen often as only one peer's key should match for each
1624 # message we receive
1625 except NameError, ex:
1626 return {
1627 "error_code": INVALID_SIGNATURE,
1628 "metadata": metadata
1629 }
1630
1631 if version < PEST_VERSION:
1632 return { "error_code": UNSUPPORTED_VERSION }
1633
1634 # compute message_hash
1635 message_hash = cls.gen_hash(message_bytes)
1636 message = cls._build_message_dict(command,
1637 peer,
1638 bounces,
1639 message_hash,
1640 message_bytes,
1641 metadata)
1642
1643 return cls._evaluate_message(long_buffer, order_buffer, message)
1644
1645
1646 @classmethod
1647 def _evaluate_message(cls, long_buffer, order_buffer, message):
1648 # if we're expecting this message as a GETDATA response, skip the timestamp check
1649 if not order_buffer.expects(message['message_hash']):
1650 if not cls.in_time_window(message['timestamp']):
1651 message['error_code'] = STALE_PACKET
1652 return message
1653 else:
1654 # we need to mark this as a get data response in order not to rebroadcast it
1655 # TODO how to handle out of order unrequested but not stale messages subsequent to a message in the order buffer?
1656 message['get_data_response'] = True
1657
1658 if long_buffer.has(message['message_hash']):
1659 message['error_code'] = DUPLICATE_PACKET
1660 return message
1661
1662 if DIRECT is message['command']:
1663 return cls._evaluate_direct(long_buffer, order_buffer, message)
1664 elif BROADCAST is message['command']:
1665 return cls._evaluate_broadcast(long_buffer, order_buffer, message)
1666 else:
1667 return message
1668
1669 @classmethod
1670 def _build_message_dict(cls, command, peer, bounces, message_hash, message_bytes, metadata):
1671 # unpack message
1672 if GETDATA == command:
1673 int_ts, self_chain, net_chain, speaker, body = cls._unpack_getdata_message(message_bytes)
1674 else:
1675 int_ts, self_chain, net_chain, speaker, body = cls._unpack_message(message_bytes)
1676
1677 message = {
1678 "peer": peer,
1679 "body": body,
1680 "timestamp": int_ts,
1681 "command": command,
1682 "speaker": speaker,
1683 "bounces": bounces,
1684 "self_chain": self_chain,
1685 "net_chain": net_chain,
1686 "message_hash": message_hash,
1687 "message_bytes": message_bytes,
1688 "metadata": metadata
1689 }
1690 return message
1691
1692 @classmethod
1693 def _evaluate_broadcast(cls, long_buffer, order_buffer, message):
1694 if (not long_buffer.has(message['self_chain']) and
1695 not long_buffer.has(message['net_chain'])):
1696 if message['self_chain'] != message['net_chain']:
1697 message['error_code'] = OUT_OF_ORDER_BOTH
1698 return message
1699 else:
1700 message['error_code'] = OUT_OF_ORDER_SELF
1701 return message
1702 elif not long_buffer.has(message['net_chain']):
1703 message['error_code'] = OUT_OF_ORDER_NET
1704 return message
1705 elif not (long_buffer.has(message['self_chain'])):
1706 message['error_code'] = OUT_OF_ORDER_SELF
1707 return message
1708 return message
1709
1710 @classmethod
1711 def _evaluate_direct(cls, long_buffer, order_buffer, message):
1712 # if this is the first direct message from a station, we needn't check for antecedents
1713 if message['self_chain'] == EMPTY_CHAIN:
1714 return message
1715
1716 # no need to check net_chain for a direct message
1717 if not (long_buffer.has(message['self_chain'])):
1718 if not order_buffer.has(message['message_hash']):
1719 order_buffer.add(message)
1720 message['error_code'] = OUT_OF_ORDER_SELF
1721 return message
1722 return message
1723
1724 @classmethod
1725 def _unpack_message(cls, message_bytes):
1726 int_ts, self_chain, net_chain, speaker, body = struct.unpack(MESSAGE_PACKET_FORMAT, message_bytes)
1727
1728 # remove padding from speaker
1729 for index, byte in enumerate(speaker):
1730 if byte == '\x00':
1731 speaker = speaker[0:index]
1732 break
1733
1734 # remove padding from body
1735 for index, byte in enumerate(body):
1736 if byte == '\x00':
1737 body = body[0:index]
1738 break
1739
1740 return int_ts, self_chain, net_chain, speaker, body
1741
1742 @classmethod
1743 def _unpack_getdata_message(cls, message_bytes):
1744 int_ts, self_chain, net_chain, speaker, body, padding = struct.unpack(GETDATA_MESSAGE_PACKET_FORMAT, message_bytes)
1745 return int_ts, self_chain, net_chain, speaker, body
1746
1747 @classmethod
1748 def _unpack_last_valid_message(cls, last_message_info):
1749 if last_message_info.get('message_bytes'):
1750 return cls._unpack_message(
1751 last_message_info['message_bytes']
1752 )[4]
1753 else:
1754 return "<no prior message received>"
1755
1756 @classmethod
1757 def _pad(cls, text, size):
1758 return text.ljust(size, "\x00")
1759
1760 @classmethod
1761 def _ts_range(cls):
1762 current_ts = int(time.time())
1763 return range(current_ts - TS_ACCEPTABLE_SKEW, current_ts + TS_ACCEPTABLE_SKEW)
1764
1765 @classmethod
1766 def _generate_nonce(cls, length=8):
1767 """Generate pseudorandom number."""
1768 return ''.join([str(random.randint(0, 9)) for i in range(length)])
1769
1770 @classmethod
1771 def gen_rubbish_body(cls):
1772 return os.urandom(MAX_MESSAGE_LENGTH)
1773
1774 @classmethod
1775 def gen_hash(cls, message_bytes):
1776 return hashlib.sha256(message_bytes).digest()
1777
1778 def set_warning(self):
1779 if self.timestamp < self.state.get_latest_message_timestamp():
1780 self.warning = time.strftime("%Y-%m-%d %H:%M:%S: ", time.localtime(self.timestamp))
1781
1782 def get_message_bytes(self, peer=None):
1783 command = self.command
1784 speaker = Message._pad(self.speaker, MAX_SPEAKER_SIZE)
1785
1786 # let's generate the self_chain value from the last message or set it to zero if
1787 # this is the first message
1788 if command == DIRECT:
1789 self_chain = self.state.get_handle_self_chain(peer.handles[0])
1790 net_chain = EMPTY_CHAIN
1791 elif command == BROADCAST:
1792 self_chain = self.state.get_broadcast_self_chain()
1793 net_chain = self.state.get_net_chain()
1794 elif command == IGNORE:
1795 self_chain = net_chain = EMPTY_CHAIN
1796 elif command == GETDATA:
1797 self_chain = net_chain = EMPTY_CHAIN
1798
1799 self.self_chain = self_chain
1800 self.net_chain = net_chain
1801
1802 message_bytes = struct.pack(MESSAGE_PACKET_FORMAT,
1803 self.timestamp,
1804 self_chain,
1805 net_chain,
1806 speaker.encode('ascii'),
1807 self.body)
1808 return message_bytes
1809
1810 def compute_message_hash(self):
1811 if self.message_hash is None:
1812 if self.message_bytes is not None:
1813 self.message_hash = Message.gen_hash(self.message_bytes)
1814 return self.message_hash
1815 else:
1816 return None
1817 else:
1818 return self.message_hash
1819
1820 def log_outgoing(self, peer):
1821 logging.info(OUTGOING_MESSAGE_LOGGING_FORMAT % (peer.address,
1822 peer.port,
1823 peer.handles[0],
1824 COMMAND_LABELS[self.command],
1825 self.body,
1826 self.bounces,
1827 binascii.hexlify(self.compute_message_hash())))
1828
1829 def log_rubbish(self, peer):
1830 logging.info(OUTGOING_MESSAGE_LOGGING_FORMAT % (peer.address,
1831 peer.port,
1832 peer.handles[0],
1833 COMMAND_LABELS[self.command],
1834 "<rubbish>",
1835 self.bounces,
1836 binascii.hexlify(self.message_hash)))
1837
1838 def log_incoming(self, peer):
1839 try:
1840 logging.info(INCOMING_MESSAGE_LOGGING_FORMAT % (peer.address,
1841 peer.port,
1842 peer.handles[0],
1843 COMMAND_LABELS[self.command],
1844 self.body,
1845 self.bounces,
1846 binascii.hexlify(self.message_hash)))
1847 except Exception, ex:
1848 logging.info("unable to log incoming message")
1849
1850 def log_incoming_getdata(self, peer):
1851 try:
1852 logging.info(INCOMING_MESSAGE_LOGGING_FORMAT % (peer.address,
1853 peer.port,
1854 peer.handles[0],
1855 COMMAND_LABELS[self.command],
1856 binascii.hexlify(self.body),
1857 self.bounces,
1858 binascii.hexlify(self.message_hash)))
1859 except Exception, ex:
1860 logging.info("unable to log incoming message")
1861 def retry(self, requesting_peer):
1862 logging.debug("Can't retry a message that isn't DIRECT or BROADCAST")
1863
1864 return
1865
1866 @classmethod
1867 def in_time_window(cls, timestamp):
1868 return timestamp in cls._ts_range()