tree checksum vpatch file split hunks

all signers:

antecedents: 9984-unbork-at-command 9986-rebroadcast-simple-hearsay-and-more 9983-knobs 9991-improved-logging genesis 9985-single-thread 9987-embargoing

press order:

genesis
9992-handle-edge-cases-add-feedback
9991-improved-logging
9990-keep-ephemeral-ports-open
9989-show-wot-nicks
9988-hash-dedup
9987-embargoing
9986-rebroadcast-simple-hearsay-and-more
9985-single-thread
9984-unbork-at-command
9983-knobs
9982-getdata

patch:

- 7EE0913AE7ADDD7E419CCDE7A0F4E7C2348029AD00E1D6994DA98430875358B91032DED251EE34C7917C81308BE84E9042DA9D54780B440AEC23065A53C498FF
+ 3DF1A18DC139FA71D9CE6B666778B01F9CFBB91C1A21A59CBD398914599A2F3D94EE6D453875B27F574261D5DA4921928BA9ADB0071996B764EB03ED5E838701
blatta/README.txt
(7 . 8)(7 . 8)
5
6 Notably missing:
7
8 - Pest-specific warning/informational output for incoming/outgoing messages
9 - GetData message support
10 - Address Cast
11 - Prod
12 - Key Offer message support
13 - Key Slice message support
14
(21 . 6)(21 . 7)
16 GENKEY
17 KEY
18 UNKEY
19 KNOB
20
21 GETTING STARTED
22
- 0DCE4472982646FFA031D3C88C98DCDC52C94627A4E046DA8D403A501A118E74D9893604B3DAAB04AF8677711A32CCAC2D12C3A765A64BD8E36A250E0F8986BB
+ C50E1B42DBE007ED9D7DF82747BB7CE0C122966AF972A0973C9F03C96F85E7EDC24642CB770935CF61EB037924C7B06A2A295E500081A4BCBF0FF1B9E2318605
blatta/blatta
(2 . 17)(2 . 9)
27
28 import os
29 import re
30 import select
31 import socket
32 import string
33 import sys
34 import tempfile
35 import time
36 import logging
37 from lib.server import VERSION
38 from lib.server import Server
39 from lib.peer import Peer
40 from datetime import datetime
41 from lib.station import Station, VERSION
42 from optparse import OptionParser
43
44
(27 . 29)(19 . 14)
46 "-b", "--db-path",
47 help="Specify path to settings database file")
48 op.add_option(
49 "-c", "--config-file-path",
50 metavar="X",
51 help="load the configfile from X")
52 op.add_option(
53 "-n", "--channel-name",
54 metavar="X",
55 help="specify the channel name for this Pest network")
56 op.add_option(
57 "-d", "--daemon",
58 action="store_true",
59 help="fork and become a daemon")
60 op.add_option(
61 "--log-level",
62 help="specify priority level for logging: info or debug")
63 op.add_option(
64 "--listen",
65 metavar="X",
66 help="listen on specific IP address X")
67 op.add_option(
68 "--logdir",
69 metavar="X",
70 help="store channel log in directory X")
71 op.add_option(
72 "--motd",
73 metavar="X",
74 help="display file X as message of the day")
(67 . 10)(44 . 6)
76 metavar="X",
77 help="listen for UDP packets on X;"
78 " default: 7778")
79 op.add_option(
80 "--statedir",
81 metavar="X",
82 help="save persistent channel state (topic) in directory X")
83 if os.name == "posix":
84 op.add_option(
85 "--chroot",
(87 . 10)(60 . 7)
87 if options.channel_name is None:
88 options.channel_name = "#pest"
89 log_format = "%(levelname)s %(asctime)s: %(message)s"
90 if options.log_level == 'debug':
91 logging.basicConfig(level=logging.DEBUG, format=log_format, stream=sys.stdout)
92 else:
93 logging.basicConfig(level=logging.INFO, format=log_format, stream=sys.stdout)
94 logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO"), format=log_format, stream=sys.stdout)
95 if options.irc_ports is None:
96 options.irc_ports = "6697"
97 if options.udp_port is None:
(99 . 8)(69 . 6)
99 options.udp_port = int(options.udp_port)
100 if options.db_path is None:
101 options.db_path = "blatta.db"
102 if options.config_file_path is None:
103 options.config_file_path = "config.py"
104 if options.chroot:
105 if os.getuid() != 0:
106 op.error("Must be root to use --chroot")
(132 . 11)(100 . 9)
108 except ValueError:
109 op.error("bad port: %r" % port)
110 options.irc_ports = irc_ports
111 server = Server(options)
112 if options.daemon:
113 server.daemonize()
114 station = Station(options)
115 try:
116 server.start()
117 station.start()
118 except KeyboardInterrupt:
119 logging.error("Interrupted.")
120
- 79E611C4EC3B9DCBFC23C09140F4F2DB23923B97BEC5427FB28E3886DDAC5F074DA2A80C379837A011FC834D41DD2273F9D9A85635983DDDE769F5924F0AB74F
+
blatta/config.py.example
(1 . 10)(0 . 0)
125 peers = [
126 {
127 "name":"schellenberg",
128 # Secrets must be precisely 64 bytes.
129 "local_secret":"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
130 "remote_secret":"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
131 "address":"10.0.0.1",
132 "port":7778
133 }
134 ]
-
+ B73F1A4AFA94CAA17167B0316CE1AD893C33E46F3E5114230B808BABC21E4C1F9A3327576E890114DEBE2F48E2FBC4A16533A02E824D1080CDC80268A39F30BF
blatta/lib/broadcast.py
(0 . 0)(1 . 56)
139 import logging
140 import time
141 import hashlib
142 import binascii
143 from message import Message
144 from message import BROADCAST
145
146
147 class Broadcast(Message):
148 def __init__(self, message, state):
149 message['command'] = BROADCAST
150 message['bounces'] = 0
151 super(Broadcast, self).__init__(message, state)
152
153 def send(self):
154 if not self.speaker:
155 logging.error("aborting message send due speaker not being set")
156 return
157
158 # if we are not rebroadcasting we need to set the timestamp
159 self.timestamp = int(time.time())
160
161 target_peer = None
162
163 self.message_bytes = self.get_message_bytes(target_peer)
164 self.message_hash = hashlib.sha256(self.message_bytes).digest()
165
166 self.long_buffer.intern(self)
167
168 self.state.update_broadcast_self_chain(self.message_hash)
169 self.state.update_net_chain(self.message_hash)
170 for peer in self.state.get_keyed_peers(exclude_addressless=True):
171 signed_packet_bytes = self.pack(peer, self.command, self.bounces, self.message_bytes)
172 peer.send(signed_packet_bytes)
173 self.log_outgoing(peer)
174
175 # we already have message bytes here since this message came from the long buffer
176 def retry(self, requesting_peer):
177 signed_packet_bytes = self.pack(requesting_peer, self.command, self.bounces, self.message_bytes)
178 requesting_peer.send(signed_packet_bytes)
179 self.log_outgoing(requesting_peer)
180
181 def forward(self):
182 if not self.speaker:
183 logging.error("aborting message send due speaker not being set")
184 return
185
186 reporting_peer_ids = map(lambda p: p.peer_id, self.reporting_peers)
187 for peer in self.state.get_keyed_peers(exclude_addressless=True, exclude_ids=reporting_peer_ids):
188 # we don't want to send a broadcast back to the originator
189 if self.peer and (peer.peer_id == self.peer.peer_id):
190 continue
191
192 signed_packet_bytes = Message.pack(peer, self.command, self.bounces, self.message_bytes)
193 peer.send(signed_packet_bytes)
194 self.log_outgoing(peer)
-
+ 97EEBB27E6EC219B66D69A66FFE37962046B5F76A2A30A11AAD259DE4985789E61837290C9F327E22D2D04E7894DF0AEC767D67AAB4E86F7C81DF22EB162AD91
blatta/lib/caribou.py
(0 . 0)(1 . 271)
199 """
200 Caribou is a simple SQLite database migrations library, built
201 to manage the evoluton of client side databases over multiple releases
202 of an application.
203 """
204
205 from __future__ import with_statement
206
207 __author__ = 'clutchski@gmail.com'
208
209 import contextlib
210 import datetime
211 import glob
212 import imp
213 import os.path
214 import sqlite3
215 import traceback
216
217 # statics
218
219 VERSION_TABLE = 'migration_version'
220 UTC_LENGTH = 14
221
222 # errors
223
224 class Error(Exception):
225 """ Base class for all Caribou errors. """
226 pass
227
228 class InvalidMigrationError(Error):
229 """ Thrown when a client migration contains an error. """
230 pass
231
232 class InvalidNameError(Error):
233 """ Thrown when a client migration has an invalid filename. """
234
235 def __init__(self, filename):
236 msg = 'Migration filenames must start with a UTC timestamp. ' \
237 'The following file has an invalid name: %s' % filename
238 super(InvalidNameError, self).__init__(msg)
239
240 # code
241
242 @contextlib.contextmanager
243 def execute(conn, sql, params=None):
244 params = [] if params is None else params
245 cursor = conn.execute(sql, params)
246 try:
247 yield cursor
248 finally:
249 cursor.close()
250
251 @contextlib.contextmanager
252 def transaction(conn):
253 try:
254 yield
255 conn.commit()
256 except:
257 conn.rollback()
258 msg = "Error in transaction: %s" % traceback.format_exc()
259 raise Error(msg)
260
261 def has_method(an_object, method_name):
262 return hasattr(an_object, method_name) and \
263 callable(getattr(an_object, method_name))
264
265 def is_directory(path):
266 return os.path.exists(path) and os.path.isdir(path)
267
268 class Migration(object):
269 """ This class represents a migration version. """
270
271 def __init__(self, path):
272 self.path = path
273 self.filename = os.path.basename(path)
274 self.module_name, _ = os.path.splitext(self.filename)
275 self.get_version() # will assert the filename is valid
276 self.name = self.module_name[UTC_LENGTH:]
277 while self.name.startswith('_'):
278 self.name = self.name[1:]
279 try:
280 self.module = imp.load_source(self.module_name, path)
281 except:
282 msg = "Invalid migration %s: %s" % (path, traceback.format_exc())
283 raise InvalidMigrationError(msg)
284 # assert the migration has the needed methods
285 missing = [m for m in ['upgrade', 'downgrade']
286 if not has_method(self.module, m)]
287 if missing:
288 msg = 'Migration %s is missing required methods: %s.' % (
289 self.path, ', '.join(missing))
290 raise InvalidMigrationError(msg)
291
292 def get_version(self):
293 if len(self.filename) < UTC_LENGTH:
294 raise InvalidNameError(self.filename)
295 timestamp = self.filename[:UTC_LENGTH]
296 #FIXME: is this test sufficient?
297 if not timestamp.isdigit():
298 raise InvalidNameError(self.filename)
299 return timestamp
300
301 def upgrade(self, conn):
302 self.module.upgrade(conn)
303
304 def downgrade(self, conn):
305 self.module.downgrade(conn)
306
307 def __repr__(self):
308 return 'Migration(%s)' % self.filename
309
310 class Database(object):
311
312 def __init__(self, db_url):
313 self.db_url = db_url
314 self.conn = sqlite3.connect(db_url)
315
316 def close(self):
317 self.conn.close()
318
319 def is_version_controlled(self):
320 sql = """select *
321 from sqlite_master
322 where type = 'table'
323 and name = :1"""
324 with execute(self.conn, sql, [VERSION_TABLE]) as cursor:
325 return bool(cursor.fetchall())
326
327 def upgrade(self, migrations, target_version=None):
328 if target_version:
329 _assert_migration_exists(migrations, target_version)
330
331 migrations.sort(key=lambda x: x.get_version())
332 database_version = self.get_version()
333
334 for migration in migrations:
335 current_version = migration.get_version()
336 if current_version <= database_version:
337 continue
338 if target_version and current_version > target_version:
339 break
340 migration.upgrade(self.conn)
341 new_version = migration.get_version()
342 self.update_version(new_version)
343
344 def downgrade(self, migrations, target_version):
345 if target_version not in (0, '0'):
346 _assert_migration_exists(migrations, target_version)
347
348 migrations.sort(key=lambda x: x.get_version(), reverse=True)
349 database_version = self.get_version()
350
351 for i, migration in enumerate(migrations):
352 current_version = migration.get_version()
353 if current_version > database_version:
354 continue
355 if current_version <= target_version:
356 break
357 migration.downgrade(self.conn)
358 next_version = 0
359 # if an earlier migration exists, set the db version to
360 # its version number
361 if i < len(migrations) - 1:
362 next_migration = migrations[i + 1]
363 next_version = next_migration.get_version()
364 self.update_version(next_version)
365
366 def get_version(self):
367 """ Return the database's version, or None if it is not under version
368 control.
369 """
370 if not self.is_version_controlled():
371 return None
372 sql = 'select version from %s' % VERSION_TABLE
373 with execute(self.conn, sql) as cursor:
374 result = cursor.fetchall()
375 return result[0][0] if result else 0
376
377 def update_version(self, version):
378 sql = 'update %s set version = :1' % VERSION_TABLE
379 with transaction(self.conn):
380 self.conn.execute(sql, [version])
381
382 def initialize_version_control(self):
383 sql = """ create table if not exists %s
384 ( version text ) """ % VERSION_TABLE
385 with transaction(self.conn):
386 self.conn.execute(sql)
387 self.conn.execute('insert into %s values (0)' % VERSION_TABLE)
388
389 def __repr__(self):
390 return 'Database("%s")' % self.db_url
391
392 def _assert_migration_exists(migrations, version):
393 if version not in (m.get_version() for m in migrations):
394 raise Error('No migration with version %s exists.' % version)
395
396 def load_migrations(directory):
397 """ Return the migrations contained in the given directory. """
398 if not is_directory(directory):
399 msg = "%s is not a directory." % directory
400 raise Error(msg)
401 wildcard = os.path.join(directory, '*.py')
402 migration_files = glob.glob(wildcard)
403 return [Migration(f) for f in migration_files]
404
405 def upgrade(db_url, migration_dir, version=None):
406 """ Upgrade the given database with the migrations contained in the
407 migrations directory. If a version is not specified, upgrade
408 to the most recent version.
409 """
410 with contextlib.closing(Database(db_url)) as db:
411 db = Database(db_url)
412 if not db.is_version_controlled():
413 db.initialize_version_control()
414 migrations = load_migrations(migration_dir)
415 db.upgrade(migrations, version)
416
417 def downgrade(db_url, migration_dir, version):
418 """ Downgrade the database to the given version with the migrations
419 contained in the given migration directory.
420 """
421 with contextlib.closing(Database(db_url)) as db:
422 if not db.is_version_controlled():
423 msg = "The database %s is not version controlled." % (db_url)
424 raise Error(msg)
425 migrations = load_migrations(migration_dir)
426 db.downgrade(migrations, version)
427
428 def get_version(db_url):
429 """ Return the migration version of the given database. """
430 with contextlib.closing(Database(db_url)) as db:
431 return db.get_version()
432
433 def create_migration(name, directory=None):
434 """ Create a migration with the given name. If no directory is specified,
435 the current working directory will be used.
436 """
437 directory = directory if directory else '.'
438 if not is_directory(directory):
439 msg = '%s is not a directory.' % directory
440 raise Error(msg)
441
442 now = datetime.datetime.now()
443 version = now.strftime("%Y%m%d%H%M%S")
444
445 contents = MIGRATION_TEMPLATE % {'name':name, 'version':version}
446
447 name = name.replace(' ', '_')
448 filename = "%s_%s.py" % (version, name)
449 path = os.path.join(directory, filename)
450 with open(path, 'w') as migration_file:
451 migration_file.write(contents)
452 return path
453
454 MIGRATION_TEMPLATE = """\
455 \"\"\"
456 This module contains a Caribou migration.
457
458 Migration Name: %(name)s
459 Migration Version: %(version)s
460 \"\"\"
461
462 def upgrade(connection):
463 # add your upgrade step here
464 pass
465
466 def downgrade(connection):
467 # add your downgrade step here
468 pass
469 """
- 9E92E286E45F8D293DF0EF2126DBCD90E95FD853434D94FA37E55705D2DC7517CF768ACAD7B54029A666926E2E489541EBBE618334600B8166EE8898FD01D094
+ 9C0C2889E379F46AD66B27BE027EA4C37355442DEF1D39B79EADBBBEB437D3729D847F3F66D8F829410BD48A334F7468F772FF8B4C21835EE5999DDF0BB15760
blatta/lib/channel.py
(5 . 58)(5 . 11)
474 self.server = server
475 self.name = name
476 self.members = set()
477 self._topic = ""
478 self._key = None
479 if self.server.statedir:
480 self._state_path = "%s/%s" % (
481 self.server.statedir,
482 name.replace("_", "__").replace("/", "_"))
483 self._read_state()
484 else:
485 self._state_path = None
486
487 def add_member(self, client):
488 self.members.add(client)
489
490 def get_topic(self):
491 return self._topic
492
493 def set_topic(self, value):
494 self._topic = value
495 self._write_state()
496
497 topic = property(get_topic, set_topic)
498
499 def get_key(self):
500 return self._key
501
502 def set_key(self, value):
503 self._key = value
504 self._write_state()
505
506 key = property(get_key, set_key)
507
508 def remove_client(self, client):
509 self.members.discard(client)
510 if not self.members:
511 self.server.remove_channel(self)
512
513 def _read_state(self):
514 if not (self._state_path and os.path.exists(self._state_path)):
515 return
516 data = {}
517 exec(open(self._state_path), {}, data)
518 self._topic = data.get("topic", "")
519 self._key = data.get("key")
520
521 def _write_state(self):
522 if not self._state_path:
523 return
524 (fd, path) = tempfile.mkstemp(dir=os.path.dirname(self._state_path))
525 fp = os.fdopen(fd, "w")
526 fp.write("topic = %r\n" % self.topic)
527 fp.write("key = %r\n" % self.key)
528 fp.close()
529 os.rename(path, self._state_path)
530
531
- 259E95AF9FD927FA7A5733E3485ED3F1D4C11CE76DA8D093320878914F48149E06A3D9800F39BD978BCC910D8586B8FA0671AA38EDCBBF432FC5ED14259624FE
+ CE97A842DF4D7C328976CA4DCD0E99E9FF0C5E56FBA68ECB094055089B81A35C303B492FBAB33116FD5BE2213D75976259F7A947E7E4E538BA273C7D42698125
blatta/lib/client.py
(1 . 19)(1 . 17)
536 import socket
537 import time
538 import sys
539 import re
540 import string
541 import os
542 import base64
543 import traceback
544 import logging
545 from state import State
546 from state import KNOBS
547 from message import Message
548 from server import VERSION
549 import datetime
550 from message import Message, PEST_VERSION
551 from broadcast import Broadcast
552 from direct import Direct
553 from station import VERSION
554 from funcs import *
555 from commands import BROADCAST
556 from commands import DIRECT
557 from commands import BROADCAST, DIRECT
558
559 class Client(object):
560 __linesep_regexp = re.compile(r"\r?\n")
(25 . 7)(23 . 7)
562
563 def __init__(self, server, socket):
564 self.server = server
565 self.state = State.get_instance()
566 self.state = None
567 self.socket = socket
568 self.channels = {} # irc_lower(Channel name) --> Channel
569 self.nickname = None
(44 . 7)(42 . 12)
571 def message_from_station(self, msg):
572 targetname = self.server.channel_name if msg.command == BROADCAST else self.nickname
573 pest_prefix = msg.prefix if msg.prefix else msg.speaker
574 formatted_message = ":%s PRIVMSG %s :%s" % (pest_prefix, targetname, msg.body)
575 formatted_message = ":%s PRIVMSG %s :%s%s" % (
576 pest_prefix,
577 targetname,
578 msg.warning if msg.warning else "",
579 msg.body
580 )
581 self.__writebuffer += formatted_message + "\r\n"
582
583 def get_prefix(self):
(76 . 20)(79 . 24)
585 if not line:
586 # Empty line. Ignore.
587 continue
588 x = line.split(" ", 1)
589 command = x[0].upper()
590 if len(x) == 1:
591 arguments = []
592 else:
593 if len(x[1]) > 0 and x[1][0] == ":":
594 arguments = [x[1][1:]]
595 else:
596 y = string.split(x[1], " :", 1)
597 arguments = string.split(y[0])
598 if len(y) == 2:
599 arguments.append(y[1])
600 command, arguments = self.__parse_command_arguments(line)
601 self.__handle_command(command, arguments)
602
603 def __parse_command_arguments(self, line):
604 x = line.split(" ", 1)
605 command = x[0].upper()
606 if len(x) == 1:
607 arguments = []
608 else:
609 if len(x[1]) > 0 and x[1][0] == ":":
610 arguments = [x[1][1:]]
611 else:
612 y = string.split(x[1], " :", 1)
613 arguments = string.split(y[0])
614 if len(y) == 2:
615 arguments.append(y[1])
616 return command, arguments
617
618 def __pass_handler(self, command, arguments):
619 server = self.server
620 if command == "PASS":
(117 . 6)(124 . 7)
622 self.reply("432 * %s :Erroneous nickname" % nick)
623 else:
624 self.nickname = nick
625 self.state.set_knob("nick", nick)
626 server.client_changed_nickname(self, None)
627 elif command == "USER":
628 if len(arguments) < 4:
(128 . 11)(136 . 11)
630 self.disconnect("Client quit")
631 return
632 if self.nickname and self.user:
633 self.reply("001 %s :Hi, welcome to Pest" % self.nickname)
634 self.reply("002 %s :Your host is %s, running version blatta-%s"
635 % (self.nickname, server.name, VERSION))
636 self.reply("003 %s :This server was created sometime"
637 % self.nickname)
638 self.reply("001 %s :Hi, welcome to PestNet" % self.nickname)
639 self.reply("002 %s :Your host is %s, running Blatta %d and Pest 0x%X"
640 % (self.nickname, server.name, VERSION, PEST_VERSION))
641 self.reply("003 %s :This server was created %s"
642 % (self.nickname, datetime.datetime.now()))
643 self.reply("004 %s :%s blatta-%s o o"
644 % (self.nickname, server.name, VERSION))
645 self.send_motd()
(171 . 12)(179 . 6)
647 channel.add_member(self)
648 self.channels[irc_lower(channelname)] = channel
649 self.message_channel(channel, "JOIN", channelname, True)
650 if channel.topic:
651 self.reply("332 %s %s :%s"
652 % (self.nickname, channel.name, channel.topic))
653 else:
654 self.reply("331 %s %s :No topic is set"
655 % (self.nickname, channel.name))
656 self.reply("353 %s = %s :%s"
657 % (self.nickname,
658 channelname,
(185 . 73)(187 . 13)
660 self.reply("366 %s %s :End of NAMES list" % (self.nickname, channelname))
661
662 def list_handler():
663 if len(arguments) < 1:
664 channels = server.channels.values()
665 else:
666 channels = []
667 for channelname in arguments[0].split(","):
668 if server.has_channel(channelname):
669 channels.append(server.get_channel(channelname))
670 channels.sort(key=lambda x: x.name)
671 for channel in channels:
672 self.reply("322 %s %s %d :%s"
673 % (self.nickname, channel.name,
674 len(channel.members), channel.topic))
675 self.reply("323 %s :End of LIST" % self.nickname)
676 pass
677
678 def lusers_handler():
679 pass
680 pass
681
682 def mode_handler():
683 if len(arguments) < 1:
684 self.reply_461("MODE")
685 return
686 targetname = arguments[0]
687 if server.has_channel(targetname):
688 channel = server.get_channel(targetname)
689 if len(arguments) < 2:
690 if channel.key:
691 modes = "+k"
692 if irc_lower(channel.name) in self.channels:
693 modes += " %s" % channel.key
694 else:
695 modes = "+"
696 self.reply("324 %s %s %s"
697 % (self.nickname, targetname, modes))
698 return
699 flag = arguments[1]
700 if flag == "+k":
701 if len(arguments) < 3:
702 self.reply_461("MODE")
703 return
704 key = arguments[2]
705 if irc_lower(channel.name) in self.channels:
706 channel.key = key
707 self.message_channel(
708 channel, "MODE", "%s +k %s" % (channel.name, key),
709 True)
710 else:
711 self.reply("442 %s :You're not on that channel"
712 % targetname)
713 elif flag == "-k":
714 if irc_lower(channel.name) in self.channels:
715 channel.key = None
716 self.message_channel(
717 channel, "MODE", "%s -k" % channel.name,
718 True)
719 else:
720 self.reply("442 %s :You're not on that channel"
721 % targetname)
722 else:
723 self.reply("472 %s %s :Unknown MODE flag"
724 % (self.nickname, flag))
725 elif targetname == self.nickname:
726 if len(arguments) == 1:
727 self.reply("221 %s +" % self.nickname)
728 else:
729 self.reply("501 %s :Unknown MODE flag" % self.nickname)
730 else:
731 self.reply_403(targetname)
732 pass
733
734 def motd_handler():
735 self.send_motd()
(278 . 6)(220 . 7)
737 ":%s!%s@%s NICK %s"
738 % (oldnickname, self.user, self.host, self.nickname),
739 True)
740 self.state.set_knob('nick', self.nickname)
741
742 def notice_and_privmsg_handler():
743 if len(arguments) == 0:
(290 . 27)(233 . 31)
745 targetname = arguments[0]
746 message = arguments[1]
747
748 # check for pest commands before handling this as a message
749 if message[0] is "%":
750 pest_command, pest_arguments = self.__parse_command_arguments(message[1:])
751 self.__handle_command(pest_command, pest_arguments)
752 return
753
754 if server.has_channel(targetname):
755 channel = server.get_channel(targetname)
756 self.message_channel(
757 channel, command, "%s :%s" % (channel.name, message))
758 # send the channel message to peers as well
759 self.server.station.infosec.message(
760 Message(
761 {
762 "speaker": self.nickname,
763 "command": BROADCAST,
764 "bounces": 0,
765 "body": message
766 }))
767 Broadcast(
768 {
769 "speaker": self.nickname,
770 "body": message,
771 "long_buffer": self.server.station.long_buffer
772 },
773 self.state).send()
774 else:
775 self.server.station.infosec.message(Message({
776 Direct({
777 "speaker": self.nickname,
778 "handle": targetname,
779 "body": message,
780 "bounces": 0,
781 "command": DIRECT
782 }))
783 "long_buffer": self.server.station.long_buffer
784 }, self.state).send()
785
786 def part_handler():
787 if len(arguments) < 1:
(351 . 71)(298 . 16)
789 self.disconnect(quitmsg)
790
791 def topic_handler():
792 if len(arguments) < 1:
793 self.reply_461("TOPIC")
794 return
795 channelname = arguments[0]
796 channel = self.channels.get(irc_lower(channelname))
797 if channel:
798 if len(arguments) > 1:
799 newtopic = arguments[1]
800 channel.topic = newtopic
801 self.message_channel(
802 channel, "TOPIC", "%s :%s" % (channelname, newtopic),
803 True)
804 else:
805 if channel.topic:
806 self.reply("332 %s %s :%s"
807 % (self.nickname, channel.name,
808 channel.topic))
809 else:
810 self.reply("331 %s %s :No topic is set"
811 % (self.nickname, channel.name))
812 else:
813 self.reply("442 %s :You're not on that channel" % channelname)
814 pass
815
816 def wallops_handler():
817 if len(arguments) < 1:
818 self.reply_461(command)
819 message = arguments[0]
820 for client in server.clients.values():
821 client.message(":%s NOTICE %s :Global notice: %s"
822 % (self.prefix, client.nickname, message))
823 pass
824
825 def who_handler():
826 if len(arguments) < 1:
827 return
828 targetname = arguments[0]
829 if server.has_channel(targetname):
830 channel = server.get_channel(targetname)
831 for member in channel.members:
832 self.reply("352 %s %s %s %s %s %s H :0 %s"
833 % (self.nickname, targetname, member.user,
834 member.host, server.name, member.nickname,
835 member.realname))
836 self.reply("315 %s %s :End of WHO list"
837 % (self.nickname, targetname))
838 pass
839
840 def whois_handler():
841 if len(arguments) < 1:
842 return
843 username = arguments[0]
844 user = server.get_client(username)
845 if user:
846 self.reply("311 %s %s %s %s * :%s"
847 % (self.nickname, user.nickname, user.user,
848 user.host, user.realname))
849 self.reply("312 %s %s %s :%s"
850 % (self.nickname, user.nickname, server.name,
851 server.name))
852 self.reply("319 %s %s :%s"
853 % (self.nickname, user.nickname,
854 " ".join(user.channels)))
855 self.reply("318 %s %s :End of WHOIS list"
856 % (self.nickname, user.nickname))
857 else:
858 self.reply("401 %s %s :No such nick"
859 % (self.nickname, username))
860 pass
861
862 def wot_handler():
863 if len(arguments) < 1:
(427 . 7)(319 . 8)
865 address = "%s:%s" % (peer.address, peer.port)
866 else:
867 address = "<address not configured>"
868 self.pest_reply("%s %s" % (string.join(peer.handles, ","), address))
869 self.pest_reply("%s %s" % (string.join(peer.handles, ","),
870 address))
871 else:
872 self.pest_reply("WOT is empty")
873 elif len(arguments) == 1:
(540 . 7)(433 . 7)
875 self.pest_reply("no knobs configured")
876 elif len(arguments) == 1:
877 knob_value = self.state.get_knob(arguments[0])
878 if knob:
879 if knob_value:
880 self.pest_reply("%s %s" % (arguments[0], knob_value))
881 else:
882 self.pest_reply("no such knob")
(549 . 6)(442 . 18)
884 self.pest_reply("set %s to %s" % (arguments[0], arguments[1]))
885 else:
886 self.pest_reply("Usage: KNOB [<NAME>] [<VALUE>]")
887
888 def resolve_handler():
889 if len(arguments) == 1:
890 handle = arguments[0]
891 peer = self.state.get_peer_by_handle(handle)
892 if peer:
893 self.state.resolve(handle)
894 self.pest_reply("resolved %s" % handle)
895 else:
896 self.pest_reply("peer with handle %s not found" % handle)
897 else:
898 self.pest_reply("Usage: RESOLVE <HANDLE>")
899
900 handler_table = {
901 "AWAY": away_handler,
(557 . 6)(462 . 7)
903 "ISON": ison_handler,
904 "JOIN": join_handler,
905 "KEY": key_handler,
906 "KNOB": knob_handler,
907 "LIST": list_handler,
908 "LUSERS": lusers_handler,
909 "MODE": mode_handler,
(569 . 6)(475 . 7)
911 "PONG": pong_handler,
912 "PRIVMSG": notice_and_privmsg_handler,
913 "QUIT": quit_handler,
914 "RESOLVE": resolve_handler,
915 "TOPIC": topic_handler,
916 "UNKEY": unkey_handler,
917 "UNPEER": unpeer_handler,
(576 . 7)(483 . 6)
919 "WHO": who_handler,
920 "WHOIS": whois_handler,
921 "WOT": wot_handler,
922 "KNOB": knob_handler
923 }
924 server = self.server
925 valid_channel_re = self.__valid_channelname_regexp
(590 . 8)(496 . 9)
927 def socket_readable_notification(self):
928 try:
929 data = self.socket.recv(2 ** 10)
930 logging.debug(
931 "[%s:%d] -> %r" % (self.host, self.port, data))
932 if os.environ.get("LOG_CLIENT_MESSAGES"):
933 logging.debug(
934 "[%s:%d] -> %r" % (self.host, self.port, data))
935 quitmsg = "EOT"
936 except socket.error as x:
937 data = ""
(607 . 9)(514 . 10)
939 def socket_writable_notification(self):
940 try:
941 sent = self.socket.send(self.__writebuffer)
942 logging.debug(
943 "[%s:%d] <- %r" % (
944 self.host, self.port, self.__writebuffer[:sent]))
945 if os.environ.get("LOG_CLIENT_MESSAGES"):
946 logging.debug(
947 "[%s:%d] <- %r" % (
948 self.host, self.port, self.__writebuffer[:sent]))
949 self.__writebuffer = self.__writebuffer[sent:]
950 except socket.error as x:
951 self.disconnect(x)
(629 . 7)(537 . 7)
953 self.message(":%s %s" % (self.server.name, msg))
954
955 def pest_reply(self, msg):
956 self.message("NOTICE %s :%s" % (self.nickname, msg))
957 self.message(":Pest NOTICE %s :%s" % (self.nickname, msg))
958
959 def reply_403(self, channel):
960 self.reply("403 %s %s :No such channel" % (self.nickname, channel))
- E4FA49D77AE8627D1B7E68B131DD1E66D03C2FD30904691B1D0179BDF99D2972D8F0A635D4140B6D1E18A4ECBE5C22EDDAB3505B4CAF29673136A372BA4EDD0B
+ B3884AE3A2F5C1AB5A94171F98BED58421C997DE79EDA026218341BB26D41CA45CAFB938303F8425A57F3FA6A177BA34B0ADE6DFCBF466B443B82E14B4C0C421
blatta/lib/commands.py
(7 . 4)(7 . 11)
965
966 BROADCAST = 0x00
967 DIRECT = 0x01
968 GETDATA = 0x03
969 IGNORE = 0xFF
970 COMMAND_LABELS = {
971 BROADCAST: "BROADCAST",
972 DIRECT: "DIRECT",
973 GETDATA: "GETDATA",
974 IGNORE: "IGNORE"
975 }
-
+ B2BFEE16A02F0AD104FBE59958BE80D27FC59C50695D25B4C008C28ABA4438C00706C3181C473C7A74BD85F21A681059B694B3B792F7C234D3478B346307EB7C
blatta/lib/direct.py
(0 . 0)(1 . 58)
981 import logging
982 import hashlib
983 import time
984 import binascii
985 from message import Message
986 from message import DIRECT
987
988
989 class Direct(Message):
990 def __init__(self, message, state):
991 message['command'] = DIRECT
992 message['bounces'] = 0
993 super(Direct, self).__init__(message, state)
994
995 def send(self):
996 if not self.speaker:
997 logging.error("aborting message send due speaker not being set")
998 return
999
1000 self.timestamp = int(time.time())
1001 target_peer = self.state.get_peer_by_handle(self.handle)
1002 if target_peer and not target_peer.get_key():
1003 logging.debug("No key for peer associated with %s" % self.handle)
1004 return
1005
1006 if target_peer == None:
1007 logging.debug("Aborting message: unknown handle: %s" % self.handle)
1008 return
1009
1010 self.message_bytes = self.get_message_bytes(target_peer)
1011 self.message_hash = hashlib.sha256(self.message_bytes).digest()
1012
1013 logging.debug("generated message_hash: %s" % binascii.hexlify(self.message_hash))
1014
1015 self.peer = target_peer
1016 self.long_buffer.intern(self)
1017
1018 signed_packet_bytes = self.pack(target_peer, self.command, self.bounces, self.message_bytes)
1019 self.state.update_handle_self_chain(target_peer.handles[0], self.message_hash)
1020 target_peer.send(signed_packet_bytes)
1021 self.log_outgoing(target_peer)
1022
1023 def retry(self, requesting_peer):
1024 target_peer = self.state.get_peer_by_handle(self.handle)
1025
1026 if target_peer == None:
1027 logging.debug("Aborting message: unknown handle: %s" % self.handle)
1028 return
1029
1030 if not target_peer.get_key():
1031 logging.debug("No key for peer associated with %s" % self.handle)
1032 return
1033
1034 # TODO: Figure out how to verify that the requester was the original intended recipient
1035 signed_packet_bytes = self.pack(target_peer, self.command, self.bounces, self.message_bytes)
1036 target_peer.send(signed_packet_bytes)
1037 self.log_outgoing(target_peer)
1038
-
+ 6BB6808C15C7BFB7C89860C4565FD28E2ABDA8AB75D1C6928E4EBEBF37CEC6F104A73C595FB4DFD34515E47D77D8EEDCAF5DEC8E7CBB353858281C17C2229CBF
blatta/lib/getdata.py
(0 . 0)(1 . 66)
1043 import time
1044 import binascii
1045 import hashlib
1046 import logging
1047 from message import Message
1048 from message import OUTGOING_MESSAGE_LOGGING_FORMAT
1049 from commands import GETDATA
1050 from commands import DIRECT
1051 from commands import BROADCAST
1052 from commands import COMMAND_LABELS
1053
1054 class GetData(Message):
1055 def __init__(self, original, broken_chain, state=None):
1056 message = {
1057 'command': GETDATA,
1058 'body': original[broken_chain],
1059 'timestamp': int(time.time()),
1060 'speaker': state.get_knob('nick'),
1061 'bounces': 0,
1062 'original': original
1063 }
1064 super(GetData, self).__init__(message, state)
1065
1066 def send(self):
1067 target_peer = (self.state.get_peer_by_handle(self.original['speaker'])
1068 if self.original['command'] == DIRECT
1069 else None)
1070
1071 if self.original['command'] == DIRECT and target_peer == None:
1072 logging.debug("Aborting message: unknown handle: %s" % self.handle)
1073 return
1074
1075 if target_peer and not target_peer.get_key():
1076 logging.debug("No key for peer associated with %s" % self.handle)
1077 return
1078
1079 if self.state.get_knob('nick') is None:
1080 logging.error("unable to pack message due to null speaker value")
1081 return
1082
1083 self.message_bytes = self.get_message_bytes(target_peer)
1084 self.message_hash = hashlib.sha256(self.message_bytes).digest()
1085
1086
1087 if self.original['command'] == DIRECT:
1088 signed_packet_bytes = self.pack(target_peer,
1089 self.command,
1090 self.bounces,
1091 self.message_bytes)
1092 target_peer.send(signed_packet_bytes)
1093 self.log_outgoing(target_peer)
1094
1095 elif self.original['command'] == BROADCAST:
1096 for peer in self.state.get_keyed_peers(exclude_addressless=True):
1097 signed_packet_bytes = self.pack(peer, self.command, self.bounces, self.message_bytes)
1098 peer.send(signed_packet_bytes)
1099 self.log_outgoing(peer)
1100
1101 def log_outgoing(self, peer):
1102 logging.info(OUTGOING_MESSAGE_LOGGING_FORMAT % (peer.address,
1103 peer.port,
1104 peer.handles[0],
1105 COMMAND_LABELS[self.command],
1106 binascii.hexlify(self.body),
1107 self.bounces,
1108 binascii.hexlify(self.message_hash)))
-
+ 3279A34976D252B3CBEC3C118A63F7606A6C7B25B02591F511ED4AAA0856CB7943AB611596F31CB4978215AA1998981F26245898677E6839861C7E4F35A12414
blatta/lib/ignore.py
(0 . 0)(1 . 30)
1113 import logging
1114 import time
1115 import hashlib
1116 import os
1117 from message import Message
1118 from message import IGNORE
1119
1120
1121 class Ignore(Message):
1122 def __init__(self, message, state):
1123 message['command'] = IGNORE
1124 message['bounces'] = 0
1125 message['body'] = self.gen_rubbish_body()
1126 super(Ignore, self).__init__(message, state)
1127
1128 def send(self):
1129 if not self.speaker:
1130 logging.error("aborting message send due speaker not being set")
1131 return
1132
1133 # if we are not rebroadcasting we need to set the timestamp
1134 self.timestamp = int(time.time())
1135 self.message_bytes = self.get_message_bytes()
1136 self.message_hash = hashlib.sha256(self.message_bytes).digest()
1137
1138 for peer in self.state.get_keyed_peers(exclude_addressless=True):
1139 signed_packet_bytes = self.pack(peer, self.command, self.bounces, self.message_bytes)
1140 peer.send(signed_packet_bytes)
1141 if os.environ.get('LOG_RUBBISH'):
1142 self.log_rubbish(peer)
- 2A3C3DF167D4BA0F838A3E9DDDDCCBE6D924C3608F742C68E56E107F1F2FC68E4569CD141C992A87565B958815DDE0DB6998179B1398562D6D4CF5963D1C980C
+
blatta/lib/infosec.py
(1 . 276)(0 . 0)
1147 import hashlib
1148 import serpent
1149 from serpent import Serpent
1150 from serpent import serpent_cbc_encrypt
1151 from serpent import serpent_cbc_decrypt
1152 from commands import BROADCAST
1153 from commands import DIRECT
1154 from commands import IGNORE
1155 from message import Message
1156 import base64
1157 import binascii
1158 import time
1159 import struct
1160 import sys
1161 import hmac
1162 import random
1163 import os
1164 import pprint
1165 import logging
1166 pp = pprint.PrettyPrinter(indent=4)
1167
1168 PACKET_SIZE = 496
1169 MAX_SPEAKER_SIZE = 32
1170 TS_ACCEPTABLE_SKEW = 60 * 15
1171 BLACK_PACKET_FORMAT = "<448s48s"
1172 RED_PACKET_FORMAT = "<16sBBxB428s"
1173 RED_PACKET_LENGTH_WITH_PADDING = 448
1174 MESSAGE_PACKET_FORMAT = "<q32s32s32s324s"
1175 MAX_MESSAGE_LENGTH = 428
1176 STALE_PACKET = 0
1177 DUPLICATE_PACKET = 1
1178 MALFORMED_PACKET = 2
1179 INVALID_SIGNATURE = 3
1180 IGNORED = 4
1181 MESSAGE_LOGGING_FORMAT = "[%s:%d %s] <- %s %s %s"
1182
1183 class Infosec(object):
1184 def __init__(self, state=None):
1185 self.state = state
1186
1187 def message(self, message):
1188 if not message.speaker:
1189 logging.error("aborting message send due speaker not being set")
1190 return
1191
1192 # if we are not rebroadcasting we need to set the timestamp
1193 if message.timestamp == None:
1194 message.original = True
1195 message.timestamp = int(time.time())
1196 else:
1197 message.original = False
1198
1199 target_peer = (self.state.get_peer_by_handle(message.handle)
1200 if message.command == DIRECT
1201 else None)
1202
1203 if target_peer and not target_peer.get_key():
1204 logging.debug("No key for peer associated with %s" % message.handle)
1205 return
1206
1207 if message.command == DIRECT and target_peer == None:
1208 logging.debug("Aborting message: unknown handle: %s" % message.handle)
1209 return
1210
1211 if message.message_bytes == None:
1212 message_bytes = self.get_message_bytes(message, target_peer)
1213 else:
1214 message_bytes = message.message_bytes
1215
1216 message_hash = binascii.hexlify(hashlib.sha256(message_bytes).digest())
1217
1218 if message.command != IGNORE:
1219 logging.debug("generated message_hash: %s" % message_hash)
1220 self.state.add_to_dedup_queue(message_hash)
1221 self.state.log(message.speaker, message_bytes, target_peer)
1222
1223 if message.command == DIRECT:
1224 signed_packet_bytes = self.pack(target_peer, message, message_bytes)
1225 target_peer.send(signed_packet_bytes)
1226 logging.info(MESSAGE_LOGGING_FORMAT % (target_peer.address,
1227 target_peer.port,
1228 target_peer.handles[0],
1229 message.body,
1230 message.bounces,
1231 message_hash))
1232
1233 elif message.command == BROADCAST or message.command == IGNORE:
1234 # sanity check
1235 if message.message_bytes and message_bytes != message_bytes:
1236 logging.error("aborting send: message modified by station!")
1237 return
1238
1239 for peer in self.state.get_keyed_peers(exclude_addressless=True):
1240
1241 # we don't want to send a broadcast back to the originator
1242 if message.peer and (peer.peer_id == message.peer.peer_id):
1243 continue
1244
1245 signed_packet_bytes = self.pack(peer, message, message_bytes)
1246 peer.send(signed_packet_bytes)
1247 if message.command != IGNORE:
1248 logging.info(MESSAGE_LOGGING_FORMAT % (peer.address,
1249 peer.port,
1250 peer.handles[0],
1251 message.body,
1252 message.bounces,
1253 message_hash))
1254
1255 def get_message_bytes(self, message, peer=None):
1256 timestamp = message.timestamp
1257 command = message.command
1258 speaker = self._pad(message.speaker, MAX_SPEAKER_SIZE)
1259
1260 # let's generate the self_chain value from the last message or set it to zero if
1261 # there this is the first message
1262
1263 if message.original:
1264 if command == DIRECT:
1265 self_chain = self.state.get_last_message_hash(message.speaker, peer.peer_id)
1266 elif command == BROADCAST:
1267 self_chain = self.state.get_last_message_hash(message.speaker)
1268 elif command == IGNORE:
1269 self_chain = "\x00" * 32
1270 net_chain = "\x00" * 32
1271 else:
1272 self_chain = message.self_chain
1273 net_chain = message.net_chain
1274
1275 # pack message bytes
1276
1277 if message.command != IGNORE:
1278 logging.debug("packing message bytes: %s" % message.body)
1279 else:
1280 logging.debug("packing rubbish message bytes: %s" % binascii.hexlify(message.body)[0:8])
1281
1282 message_bytes = struct.pack(MESSAGE_PACKET_FORMAT, message.timestamp, self_chain, net_chain, speaker, message.body)
1283 return message_bytes
1284
1285 def pack(self, peer, message, message_bytes):
1286 key_bytes = base64.b64decode(peer.get_key())
1287 signing_key = key_bytes[:32]
1288 cipher_key = key_bytes[32:]
1289
1290 # pack packet bytes
1291
1292 nonce = self._generate_nonce(16)
1293 bounces = message.bounces
1294 version = 0xfe
1295 red_packet_bytes = struct.pack(RED_PACKET_FORMAT, nonce, bounces, version, message.command, self._pad(message_bytes, MAX_MESSAGE_LENGTH))
1296
1297 # encrypt packet
1298
1299 serpent = Serpent(cipher_key)
1300 black_packet_bytes = serpent_cbc_encrypt(cipher_key, red_packet_bytes)
1301
1302 # sign packet
1303
1304 signature_bytes = hmac.new(signing_key, black_packet_bytes, hashlib.sha384).digest()
1305
1306 # pack the signed black packet
1307
1308 signed_packet_bytes = struct.pack(BLACK_PACKET_FORMAT, black_packet_bytes, signature_bytes)
1309
1310 return signed_packet_bytes
1311
1312 def unpack(self, peer, black_packet):
1313 # unpack the black packet
1314
1315 key_bytes = base64.b64decode(peer.get_key())
1316 signing_key = key_bytes[:32]
1317 cipher_key = key_bytes[32:]
1318
1319 try:
1320 black_packet_bytes, signature_bytes = struct.unpack(BLACK_PACKET_FORMAT, black_packet)
1321 except:
1322 logging.error("Discarding malformed black packet from %s" % peer.get_key())
1323 return Message({ "error_code": MALFORMED_PACKET })
1324
1325 # check signature
1326
1327 signature_check_bytes = hmac.new(signing_key, black_packet_bytes, hashlib.sha384).digest()
1328
1329 if(signature_check_bytes != signature_bytes):
1330 return Message({ "error_code": INVALID_SIGNATURE })
1331
1332 # try to decrypt black packet
1333
1334 serpent = Serpent(cipher_key)
1335 red_packet_bytes = serpent_cbc_decrypt(cipher_key, black_packet_bytes)
1336
1337 # unpack red packet
1338
1339 nonce, bounces, version, command, message_bytes = struct.unpack(RED_PACKET_FORMAT, red_packet_bytes)
1340
1341 # compute message_hash
1342
1343 message_hash = binascii.hexlify(hashlib.sha256(message_bytes).digest())
1344
1345 # unpack message
1346
1347 int_ts, self_chain, net_chain, speaker, body = struct.unpack(MESSAGE_PACKET_FORMAT, message_bytes)
1348
1349 # remove padding from speaker
1350
1351 for index, byte in enumerate(speaker):
1352 if byte == '\x00':
1353 speaker = speaker[0:index]
1354 break
1355
1356 # remove padding from body
1357
1358 for index, byte in enumerate(body):
1359 if byte == '\x00':
1360 body = body[0:index]
1361 break
1362
1363 # nothing to be done for an IGNORE command
1364
1365 if command == IGNORE:
1366 return Message({"speaker": speaker, "error_code": IGNORED})
1367
1368 # check timestamp
1369
1370 if(int_ts not in self._ts_range()):
1371 return Message({ "error_code": STALE_PACKET })
1372
1373 # check self_chain
1374
1375 if command == DIRECT:
1376 self_chain_check = self.state.get_last_message_hash(speaker, peer.peer_id)
1377 elif command == BROADCAST:
1378 self_chain_check = self.state.get_last_message_hash(speaker)
1379
1380 self_chain_valid = (self_chain_check == self_chain)
1381
1382 # log this message for use in the self_chain check
1383
1384 self.state.log(speaker, message_bytes, peer if (command == DIRECT) else None)
1385
1386 # build message object
1387
1388 message = Message({
1389 "peer": peer,
1390 "body": body.rstrip(),
1391 "timestamp": int_ts,
1392 "command": command,
1393 "speaker": speaker,
1394 "bounces": bounces,
1395 "self_chain": self_chain,
1396 "net_chain": net_chain,
1397 "self_chain_valid": self_chain_valid,
1398 "message_hash": message_hash,
1399 "message_bytes": message_bytes
1400 })
1401
1402 # check for duplicates
1403
1404 if(self.state.is_duplicate_message(message_hash)):
1405 message.error_code = DUPLICATE_PACKET
1406 return message
1407
1408 return message
1409
1410 def _pad(self, text, size):
1411 return text.ljust(size, "\x00")
1412
1413 def _ts_range(self):
1414 current_ts = int(time.time())
1415 return range(current_ts - TS_ACCEPTABLE_SKEW, current_ts + TS_ACCEPTABLE_SKEW)
1416
1417 def _generate_nonce(self, length=8):
1418 """Generate pseudorandom number."""
1419 return ''.join([str(random.randint(0, 9)) for i in range(length)])
1420
1421 def gen_rubbish_body(self):
1422 return os.urandom(MAX_MESSAGE_LENGTH)
-
+ 9F5B2F1661FAB5FCC0A29D99094099C7A40ACAA3736475E277177ED97036DB710CCC0C4A608BE22759A2F0493CB28BB62574B055A83C71205998A2E0EB8721DD
blatta/lib/long_buffer.py
(0 . 0)(1 . 49)
1427 import time
1428
1429 from lib.broadcast import Broadcast
1430 from lib.commands import BROADCAST, DIRECT
1431 from lib.direct import Direct
1432 from lib.message import EMPTY_CHAIN
1433
1434
1435 class LongBuffer(object):
1436 def __init__(self, state):
1437 self.state = state
1438 self.buffer = {}
1439
1440 def exhume(self, message_hash):
1441 message = self.buffer.get(message_hash)
1442 if message:
1443 return message
1444
1445 command, message_bytes = self.state.get_message(message_hash)
1446 if message_bytes:
1447 if command == DIRECT:
1448 return Direct({message_bytes: message_bytes}, self.state)
1449 elif command == BROADCAST:
1450 return Broadcast({'message_bytes': message_bytes}, self.state)
1451
1452 def expunge_expired(self):
1453 for message in self.buffer.values():
1454 if message.timestamp < time.time() - 3600:
1455 del self.buffer[message.message_hash]
1456
1457 def intern(self, message):
1458 # somehow we are trying to intern a message we've already got
1459 if self.has(message.message_hash):
1460 return
1461
1462 self.expunge_expired()
1463 self.buffer[message.message_hash] = message
1464 self.state.log_message(message)
1465
1466 def has(self, message_hash):
1467 if EMPTY_CHAIN == message_hash:
1468 return True
1469
1470 if self.buffer.get(message_hash) is None:
1471 if self.state.log_has_message(message_hash):
1472 return True
1473 return False
1474 else:
1475 return True
- 9EFA52C31BB08E8F81821B1832F5729D9E3F52CD44CBD9C10ADCC809FD4383129ACE4D7C3615EB1E9B068E13E045DEE2898AAAC1F1EAACA81AD65B5ECE9FF751
+ EB3F3EC3FA23E3F350B870585FDBE7CCB548B4B4A266699F943CD97148D4130495D98399F73FA7E8466C61655EA5BCFEE005D1EF07C8FAE66EE91640B9BF0444
blatta/lib/message.py
(1 . 7)(1 . 59)
1481 import hashlib
1482
1483 from serpent import Serpent
1484 from serpent import serpent_cbc_encrypt
1485 from serpent import serpent_cbc_decrypt
1486 from commands import BROADCAST
1487 from commands import DIRECT
1488 from commands import GETDATA
1489 from commands import IGNORE
1490 from commands import COMMAND_LABELS
1491 import base64
1492 import binascii
1493 import time
1494 import struct
1495 import hmac
1496 import random
1497 import os
1498 import logging
1499
1500 PEST_VERSION = 0xFC
1501 PACKET_SIZE = 496
1502 MAX_SPEAKER_SIZE = 32
1503 TS_ACCEPTABLE_SKEW = 60 * 15
1504 BLACK_PACKET_FORMAT = "<448s48s"
1505 RED_PACKET_FORMAT = "<16sBBxB428s"
1506 RED_PACKET_LENGTH_WITH_PADDING = 448
1507 MESSAGE_PACKET_FORMAT = "<q32s32s32s324s"
1508 GETDATA_MESSAGE_PACKET_FORMAT = "<q32s32s32s32s292s"
1509 MAX_MESSAGE_LENGTH = 428
1510 TEXT_PAYLOAD_SIZE = 324
1511
1512 # error codes
1513 STALE_PACKET = 0
1514 DUPLICATE_PACKET = 1
1515 MALFORMED_PACKET = 2
1516 INVALID_SIGNATURE = 3
1517 UNSUPPORTED_VERSION = 4
1518 OUT_OF_ORDER_SELF = 5
1519 OUT_OF_ORDER_NET = 6
1520 OUT_OF_ORDER_BOTH = 7
1521
1522 # logging formats
1523 OUTGOING_MESSAGE_LOGGING_FORMAT = "[%s:%d %s] <- %s %s %s %s"
1524 INCOMING_MESSAGE_LOGGING_FORMAT = "[%s:%d %s] -> %s %s %d %s"
1525
1526 # fork status values
1527 FORKED = 0
1528 NOT_FORKED = 1
1529 FIRST_MESSAGE = 2
1530 EMPTY_CHAIN = "\x00" * 32
1531
1532 class Message(object):
1533 def __init__(self, message):
1534 self.original = True
1535 def __init__(self, message, state=None):
1536 self.state = state
1537 self.prefix = None
1538 self.fork_status = None
1539 self.handle = message.get("handle")
1540 self.peer = message.get("peer")
1541 self.body = message.get("body")
(14 . 4)(66 . 325)
1543 self.self_chain_valid = message.get("self_chain_valid")
1544 self.error_code = message.get("error_code")
1545 self.message_hash = message.get("message_hash")
1546 self.message_bytes = None
1547 self.message_bytes = message.get("message_bytes")
1548 self.long_buffer = message.get("long_buffer")
1549 self.get_data_response = message.get("get_data_response")
1550 self.metadata = message.get('metadata')
1551 self.original = message.get('original')
1552 self.reporting_peers = message.get('reporting_peers', [])
1553 self.warning = message.get('warning')
1554 self.order_buffer = message.get('order_buffer')
1555 self.error_code = message.get('error_code')
1556
1557 @classmethod
1558 def pack(cls, peer, command, bounces, message_bytes):
1559 key_bytes = base64.b64decode(peer.get_key())
1560 signing_key = key_bytes[:32]
1561 cipher_key = key_bytes[32:]
1562
1563 # pack packet bytes
1564 nonce = cls._generate_nonce(16)
1565 version = PEST_VERSION
1566 red_packet_bytes = struct.pack(RED_PACKET_FORMAT,
1567 nonce,
1568 bounces,
1569 version,
1570 command,
1571 cls._pad(message_bytes, MAX_MESSAGE_LENGTH))
1572
1573 # encrypt packet
1574 serpent = Serpent(cipher_key)
1575 black_packet_bytes = serpent_cbc_encrypt(cipher_key, red_packet_bytes)
1576
1577 # sign packet
1578 signature_bytes = hmac.new(signing_key, black_packet_bytes, hashlib.sha384).digest()
1579
1580 # pack the signed black packet
1581 signed_packet_bytes = struct.pack(BLACK_PACKET_FORMAT, black_packet_bytes, signature_bytes)
1582
1583 return signed_packet_bytes
1584
1585 @classmethod
1586 def unpack(cls, peer, black_packet, long_buffer, order_buffer, metadata, state=None):
1587 # unpack the black packet
1588 for key in peer.keys:
1589 key_bytes = base64.b64decode(key)
1590 signing_key = key_bytes[:32]
1591 cipher_key = key_bytes[32:]
1592
1593 try:
1594 black_packet_bytes, signature_bytes = struct.unpack(BLACK_PACKET_FORMAT,
1595 black_packet)
1596 except:
1597 logging.error("Discarding malformed black packet from %s" % peer.get_key())
1598 return {
1599 "error_code": MALFORMED_PACKET,
1600 "metadata": metadata
1601 }
1602
1603 # check signature
1604 signature_check_bytes = hmac.new(signing_key,
1605 black_packet_bytes,
1606 hashlib.sha384).digest()
1607
1608 if(signature_check_bytes != signature_bytes):
1609 continue
1610
1611 # try to decrypt black packet
1612 Serpent(cipher_key)
1613 red_packet_bytes = serpent_cbc_decrypt(cipher_key, black_packet_bytes)
1614
1615 # unpack red packet
1616 try:
1617 nonce, bounces, version, command, message_bytes = struct.unpack(
1618 RED_PACKET_FORMAT,
1619 red_packet_bytes
1620 )
1621
1622 # if red_packet_bytes was never set, no matching key for the sig was found
1623 # this is expected to happen often as only one peer's key should match for each
1624 # message we receive
1625 except NameError, ex:
1626 return {
1627 "error_code": INVALID_SIGNATURE,
1628 "metadata": metadata
1629 }
1630
1631 if version < PEST_VERSION:
1632 return { "error_code": UNSUPPORTED_VERSION }
1633
1634 # compute message_hash
1635 message_hash = cls.gen_hash(message_bytes)
1636 message = cls._build_message_dict(command,
1637 peer,
1638 bounces,
1639 message_hash,
1640 message_bytes,
1641 metadata)
1642
1643 return cls._evaluate_message(long_buffer, order_buffer, message)
1644
1645
1646 @classmethod
1647 def _evaluate_message(cls, long_buffer, order_buffer, message):
1648 # if we're expecting this message as a GETDATA response, skip the timestamp check
1649 if not order_buffer.expects(message['message_hash']):
1650 if not cls.in_time_window(message['timestamp']):
1651 message['error_code'] = STALE_PACKET
1652 return message
1653 else:
1654 # we need to mark this as a get data response in order not to rebroadcast it
1655 # TODO how to handle out of order unrequested but not stale messages subsequent to a message in the order buffer?
1656 message['get_data_response'] = True
1657
1658 if long_buffer.has(message['message_hash']):
1659 message['error_code'] = DUPLICATE_PACKET
1660 return message
1661
1662 if DIRECT is message['command']:
1663 return cls._evaluate_direct(long_buffer, order_buffer, message)
1664 elif BROADCAST is message['command']:
1665 return cls._evaluate_broadcast(long_buffer, order_buffer, message)
1666 else:
1667 return message
1668
1669 @classmethod
1670 def _build_message_dict(cls, command, peer, bounces, message_hash, message_bytes, metadata):
1671 # unpack message
1672 if GETDATA == command:
1673 int_ts, self_chain, net_chain, speaker, body = cls._unpack_getdata_message(message_bytes)
1674 else:
1675 int_ts, self_chain, net_chain, speaker, body = cls._unpack_message(message_bytes)
1676
1677 message = {
1678 "peer": peer,
1679 "body": body,
1680 "timestamp": int_ts,
1681 "command": command,
1682 "speaker": speaker,
1683 "bounces": bounces,
1684 "self_chain": self_chain,
1685 "net_chain": net_chain,
1686 "message_hash": message_hash,
1687 "message_bytes": message_bytes,
1688 "metadata": metadata
1689 }
1690 return message
1691
1692 @classmethod
1693 def _evaluate_broadcast(cls, long_buffer, order_buffer, message):
1694 if (not long_buffer.has(message['self_chain']) and
1695 not long_buffer.has(message['net_chain'])):
1696 if message['self_chain'] != message['net_chain']:
1697 message['error_code'] = OUT_OF_ORDER_BOTH
1698 return message
1699 else:
1700 message['error_code'] = OUT_OF_ORDER_SELF
1701 return message
1702 elif not long_buffer.has(message['net_chain']):
1703 message['error_code'] = OUT_OF_ORDER_NET
1704 return message
1705 elif not (long_buffer.has(message['self_chain'])):
1706 message['error_code'] = OUT_OF_ORDER_SELF
1707 return message
1708 return message
1709
1710 @classmethod
1711 def _evaluate_direct(cls, long_buffer, order_buffer, message):
1712 # if this is the first direct message from a station, we needn't check for antecedents
1713 if message['self_chain'] == EMPTY_CHAIN:
1714 return message
1715
1716 # no need to check net_chain for a direct message
1717 if not (long_buffer.has(message['self_chain'])):
1718 if not order_buffer.has(message['message_hash']):
1719 order_buffer.add(message)
1720 message['error_code'] = OUT_OF_ORDER_SELF
1721 return message
1722 return message
1723
1724 @classmethod
1725 def _unpack_message(cls, message_bytes):
1726 int_ts, self_chain, net_chain, speaker, body = struct.unpack(MESSAGE_PACKET_FORMAT, message_bytes)
1727
1728 # remove padding from speaker
1729 for index, byte in enumerate(speaker):
1730 if byte == '\x00':
1731 speaker = speaker[0:index]
1732 break
1733
1734 # remove padding from body
1735 for index, byte in enumerate(body):
1736 if byte == '\x00':
1737 body = body[0:index]
1738 break
1739
1740 return int_ts, self_chain, net_chain, speaker, body
1741
1742 @classmethod
1743 def _unpack_getdata_message(cls, message_bytes):
1744 int_ts, self_chain, net_chain, speaker, body, padding = struct.unpack(GETDATA_MESSAGE_PACKET_FORMAT, message_bytes)
1745 return int_ts, self_chain, net_chain, speaker, body
1746
1747 @classmethod
1748 def _unpack_last_valid_message(cls, last_message_info):
1749 if last_message_info.get('message_bytes'):
1750 return cls._unpack_message(
1751 last_message_info['message_bytes']
1752 )[4]
1753 else:
1754 return "<no prior message received>"
1755
1756 @classmethod
1757 def _pad(cls, text, size):
1758 return text.ljust(size, "\x00")
1759
1760 @classmethod
1761 def _ts_range(cls):
1762 current_ts = int(time.time())
1763 return range(current_ts - TS_ACCEPTABLE_SKEW, current_ts + TS_ACCEPTABLE_SKEW)
1764
1765 @classmethod
1766 def _generate_nonce(cls, length=8):
1767 """Generate pseudorandom number."""
1768 return ''.join([str(random.randint(0, 9)) for i in range(length)])
1769
1770 @classmethod
1771 def gen_rubbish_body(cls):
1772 return os.urandom(MAX_MESSAGE_LENGTH)
1773
1774 @classmethod
1775 def gen_hash(cls, message_bytes):
1776 return hashlib.sha256(message_bytes).digest()
1777
1778 def set_warning(self):
1779 if self.timestamp < self.state.get_latest_message_timestamp():
1780 self.warning = time.strftime("%Y-%m-%d %H:%M:%S: ", time.localtime(self.timestamp))
1781
1782 def get_message_bytes(self, peer=None):
1783 command = self.command
1784 speaker = Message._pad(self.speaker, MAX_SPEAKER_SIZE)
1785
1786 # let's generate the self_chain value from the last message or set it to zero if
1787 # this is the first message
1788 if command == DIRECT:
1789 self_chain = self.state.get_handle_self_chain(peer.handles[0])
1790 net_chain = EMPTY_CHAIN
1791 elif command == BROADCAST:
1792 self_chain = self.state.get_broadcast_self_chain()
1793 net_chain = self.state.get_net_chain()
1794 elif command == IGNORE:
1795 self_chain = net_chain = EMPTY_CHAIN
1796 elif command == GETDATA:
1797 self_chain = net_chain = EMPTY_CHAIN
1798
1799 self.self_chain = self_chain
1800 self.net_chain = net_chain
1801
1802 message_bytes = struct.pack(MESSAGE_PACKET_FORMAT,
1803 self.timestamp,
1804 self_chain,
1805 net_chain,
1806 speaker.encode('ascii'),
1807 self.body)
1808 return message_bytes
1809
1810 def compute_message_hash(self):
1811 if self.message_hash is None:
1812 if self.message_bytes is not None:
1813 self.message_hash = Message.gen_hash(self.message_bytes)
1814 return self.message_hash
1815 else:
1816 return None
1817 else:
1818 return self.message_hash
1819
1820 def log_outgoing(self, peer):
1821 logging.info(OUTGOING_MESSAGE_LOGGING_FORMAT % (peer.address,
1822 peer.port,
1823 peer.handles[0],
1824 COMMAND_LABELS[self.command],
1825 self.body,
1826 self.bounces,
1827 binascii.hexlify(self.compute_message_hash())))
1828
1829 def log_rubbish(self, peer):
1830 logging.info(OUTGOING_MESSAGE_LOGGING_FORMAT % (peer.address,
1831 peer.port,
1832 peer.handles[0],
1833 COMMAND_LABELS[self.command],
1834 "<rubbish>",
1835 self.bounces,
1836 binascii.hexlify(self.message_hash)))
1837
1838 def log_incoming(self, peer):
1839 try:
1840 logging.info(INCOMING_MESSAGE_LOGGING_FORMAT % (peer.address,
1841 peer.port,
1842 peer.handles[0],
1843 COMMAND_LABELS[self.command],
1844 self.body,
1845 self.bounces,
1846 binascii.hexlify(self.message_hash)))
1847 except Exception, ex:
1848 logging.info("unable to log incoming message")
1849
1850 def log_incoming_getdata(self, peer):
1851 try:
1852 logging.info(INCOMING_MESSAGE_LOGGING_FORMAT % (peer.address,
1853 peer.port,
1854 peer.handles[0],
1855 COMMAND_LABELS[self.command],
1856 binascii.hexlify(self.body),
1857 self.bounces,
1858 binascii.hexlify(self.message_hash)))
1859 except Exception, ex:
1860 logging.info("unable to log incoming message")
1861 def retry(self, requesting_peer):
1862 logging.debug("Can't retry a message that isn't DIRECT or BROADCAST")
1863
1864 return
1865
1866 @classmethod
1867 def in_time_window(cls, timestamp):
1868 return timestamp in cls._ts_range()
-
+ 388113D46790BA9828275D8F1B20CEF6DCA63976D43192EFA8D4332B3360FA8CB93A50A026EB35A66BA129BEA7E61FC6973491072F6B5385D4EC83C11CCD2DC7
blatta/lib/order_buffer.py
(0 . 0)(1 . 57)
1873 import time
1874 from broadcast import Broadcast
1875 from direct import Direct
1876 from commands import BROADCAST
1877 from commands import DIRECT
1878
1879
1880 class OrderBuffer(object):
1881 def __init__(self, state):
1882 self.buffer = {}
1883 self.state = state
1884
1885 def add(self, message):
1886 ts = time.time()
1887 if message['command'] == BROADCAST:
1888 m = Broadcast(message, self.state)
1889 elif message['command'] == DIRECT:
1890 m = Direct(message, self.state)
1891 else:
1892 return
1893
1894 if self.buffer.get(ts) is None:
1895 self.buffer[ts] = [m]
1896 else:
1897 self.buffer[ts].append(m)
1898
1899 def expects(self, message_hash):
1900 for value in self.buffer.values():
1901 for message in value:
1902 if message_hash == message.self_chain:
1903 return True
1904 elif message_hash == message.net_chain:
1905 return True
1906 return False
1907
1908 def has(self, message_hash):
1909 for value in self.buffer.values():
1910 for message in value:
1911 if message_hash == message.message_hash:
1912 return True
1913 return False
1914
1915 def dequeue_and_order_mature_messages(self):
1916 current_time = time.time()
1917 sorted_messages = sorted(self.buffer.keys())
1918 mature_messages = []
1919 for timestamp in sorted_messages:
1920 if timestamp < current_time - int(self.state.get_knob('order_buffer_expiration_seconds')):
1921 if isinstance(self.buffer[timestamp], list):
1922 if len(self.buffer[timestamp]) > 0:
1923 for message in self.buffer[timestamp]:
1924 mature_messages.append(message)
1925 del self.buffer[timestamp]
1926 else:
1927 mature_messages.append(self.buffer[timestamp])
1928 del self.buffer[timestamp]
1929 return sorted(mature_messages, key=lambda m: m.timestamp)
- 01648A4C6129C725EBFAF5A84166BBC34CF2080DF4842887C872524819B87E13D524B0B8351F75F5ECC4F7A638CCE26D5DA72C4967C5785EBF1A956DB5906B38
+ B37B49C4C4371DC8D5ECD15D8B07CAC76DE8F70A51D3A590891E89CFA5C2E2C8F3A883257976C4F77040831199211FF3FC038EC159A3AD477FE2A3970CC8B7CC
blatta/lib/peer.py
(16 . 6)(16 . 7)
1935 self.address = peer_entry["address"]
1936 self.port = peer_entry["port"]
1937 self.socket = socket
1938 self.forked = peer_entry.get("forked")
1939
1940 def get_key(self):
1941 if len(self.keys) > 0:
(27 . 9)(28 . 6)
1943 if self.get_key() != None and self.address != None and self.port != None:
1944 try:
1945 self.socket.sendto(signed_packet_bytes, (self.address, self.port))
1946 logging.debug("[%s:%d] <- %s" % (self.address,
1947 int(self.port),
1948 binascii.hexlify(signed_packet_bytes)[0:16]))
1949
1950 except Exception as ex:
1951 stack = traceback.format_exc()
- 2003B827BA1E77B0E67A9A00BDF37ED146DDAF00BB0EC9EB72AF3AEFCED8541F948FA2B8A7770975165F96029E489A6E609E188700F5655C697E05271419D9EF
+ 225CCF41F5E87FFF8D9837F1EAF03D4806FA62F9A2A1608F4642660DFA9BC863F10860FD02879A42283CE862C9C0C0B034FD2B0158A4506FB15FA4FFD33FFFC9
blatta/lib/server.py
(1 . 37)(1 . 22)
1956 VERSION = "9983"
1957
1958 import os
1959 import select
1960 import socket
1961 import sys
1962 import tempfile
1963 import time
1964 import string
1965 import datetime
1966 import sqlite3
1967 from datetime import datetime
1968 from funcs import *
1969 from client import Client
1970 from channel import Channel
1971 from station import Station
1972 from message import Message
1973 from infosec import PACKET_SIZE
1974 import imp
1975 from message import PACKET_SIZE
1976 import pprint
1977 import logging
1978
1979 class Server(object):
1980 def __init__(self, options):
1981 def __init__(self, options, station):
1982 self.station = station
1983 self.irc_ports = options.irc_ports
1984 self.udp_port = options.udp_port
1985 self.channel_name = options.channel_name
1986 self.password = options.password
1987 self.motdfile = options.motd
1988 self.logdir = options.logdir
1989 self.chroot = options.chroot
1990 self.setuid = options.setuid
1991 self.statedir = options.statedir
1992 self.config_file_path = options.config_file_path
1993 self.pp = pprint.PrettyPrinter(indent=4)
1994 self.db_path = options.db_path
1995 self.address_table_path = options.address_table_path
(48 . 33)(33 . 6)
1997 self.client = None
1998 self.nicknames = {} # irc_lower(Nickname) --> Client instance.
1999
2000 if self.logdir:
2001 create_directory(self.logdir)
2002 if self.statedir:
2003 create_directory(self.statedir)
2004
2005 def daemonize(self):
2006 try:
2007 pid = os.fork()
2008 if pid > 0:
2009 sys.exit(0)
2010 except OSError:
2011 sys.exit(1)
2012 os.setsid()
2013 try:
2014 pid = os.fork()
2015 if pid > 0:
2016 logging.info("PID: %d" % pid)
2017 sys.exit(0)
2018 except OSError:
2019 sys.exit(1)
2020 os.chdir("/")
2021 os.umask(0)
2022 dev_null = open("/dev/null", "r+")
2023 os.dup2(dev_null.fileno(), sys.stdout.fileno())
2024 os.dup2(dev_null.fileno(), sys.stderr.fileno())
2025 os.dup2(dev_null.fileno(), sys.stdin.fileno())
2026
2027 def get_client(self, nickname):
2028 return self.nicknames.get(irc_lower(nickname))
2029
(124 . 10)(82 . 7)
2031 # Setup UDP first
2032 self.udp_server_socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
2033 self.udp_server_socket.bind((self.udp_address, self.udp_port))
2034 self.station = Station({ "socket": self.udp_server_socket,
2035 "db_path": self.db_path,
2036 "address_table_path": self.address_table_path
2037 })
2038 self.station.socket = self.udp_server_socket
2039 logging.info("Listening for Pest packets on udp port %d." % self.udp_port)
2040
2041 serversockets = []
(143 . 20)(98 . 13)
2043 serversockets.append(s)
2044 del s
2045 logging.info("Listening for IRC connections on port %d." % port)
2046 if self.chroot:
2047 os.chdir(self.chroot)
2048 os.chroot(self.chroot)
2049 logging.info("Changed root directory to %s" % self.chroot)
2050 if self.setuid:
2051 os.setgid(self.setuid[1])
2052 os.setuid(self.setuid[0])
2053 logging.info("Setting uid:gid to %s:%s"
2054 % (self.setuid[0], self.setuid[1]))
2055
2056 # event loop setup
2057 last_aliveness_check = time.time()
2058 last_embargo_queue_check = time.time()
2059 last_short_buffer_check = time.time()
2060 last_rubbish_dispatch = time.time()
2061 last_order_buffer_check = time.time()
2062
2063 while True:
2064 # we don't want to be listening for client connections if there's already a client connected
2065 if self.client == None:
(176 . 6)(124 . 8)
2067 (conn, addr) = x.accept()
2068 self.client = Client(self, conn)
2069 self.station.client = self.client
2070 self.client.state = self.station.state
2071 self.client.long_buffer = self.station.long_buffer
2072 logging.info("Accepted connection from %s:%s." % (
2073 addr[0], addr[1]))
2074 except socket.error as e:
(199 . 15)(149 . 20)
2076 last_aliveness_check = now
2077
2078 # clear embargo queue if enough time has elapsed
2079 if last_embargo_queue_check + int(self.station.state.get_knob('embargo_interval')) < now:
2080 self.station.check_embargo_queue()
2081 last_embargo_queue_check = now
2082 if last_short_buffer_check + int(self.station.state.get_knob('short_buffer_check_interval_seconds')) < now:
2083 self.station.check_short_buffer()
2084 last_short_buffer_check = now
2085
2086 # spray rubbish
2087 if last_rubbish_dispatch + int(self.station.state.get_knob('rubbish_interval')) < now:
2088 if last_rubbish_dispatch + int(self.station.state.get_knob('rubbish_interval_seconds')) < now:
2089 self.station.send_rubbish()
2090 last_rubbish_dispatch = now
2091
2092 # check order buffer
2093 if last_order_buffer_check + int(self.station.state.get_knob('order_buffer_check_seconds')) < now:
2094 self.station.check_order_buffer()
2095 last_order_buffer_check = now
2096
2097 def create_directory(path):
2098 if not os.path.isdir(path):
2099 os.makedirs(path)
-
+ 6AE2E0B4022E647056CD53DF1B08F99F9DBDCEDA1F3EB8CF5A371BFA6F59A4A5CF90DCD9FB594C2D67466E202A684AE5E3BEB3528AE4CB61D4C198A3E493E315
blatta/lib/short_buffer.py
(0 . 0)(1 . 45)
2104 import time
2105
2106 class ShortBuffer(object):
2107 def __init__(self, state):
2108 self.state = state
2109 self.buffer = {}
2110
2111 def embargo(self, message):
2112 if message.message_hash not in self.buffer.keys():
2113 self.buffer[message.message_hash] = {
2114 'received': time.time(),
2115 'message': message,
2116 'low_bounce_count': message.bounces,
2117 'closest_peers': [message.peer],
2118 'reporting_peers': [message.peer]
2119 }
2120 else:
2121 embargoed_message = self.buffer[message.message_hash]
2122 if message.bounces < embargoed_message['low_bounce_count']:
2123 embargoed_message['low_bounce_count'] = message.bounces
2124 embargoed_message['closest_peers'] = [message.peer]
2125 elif message.bounces == embargoed_message['low_bounce_count']:
2126 embargoed_message['closest_peers'].append(message.peer)
2127 else:
2128 # not interested in the message because the bounce count
2129 # is higher than what we've already got and we just want the
2130 # list of peers closest to the originator
2131 pass
2132 embargoed_message['reporting_peers'].append(message.peer)
2133
2134 def flush(self):
2135 current_time = time.time()
2136 messages = []
2137 for message_with_stats in self.buffer.values():
2138 if (message_with_stats['received'] <
2139 (current_time - int(self.state.get_knob('short_buffer_expiration_seconds')))):
2140 messages.append(message_with_stats)
2141 del self.buffer[message_with_stats['message'].message_hash]
2142 return sorted(messages, key=lambda m: m['message'].timestamp)
2143
2144 def has(self, message_hash):
2145 return self.buffer.get(message_hash)
2146
2147 def drop(self, message_hash):
2148 del self.buffer[message_hash]
- 302F32CC4525B593ED2B8E61AE3610003F5811F59778E1085B3E6CB28DD87646D77758A48B7BB7643D41D7602524B2CB35F2D7C9D96FC1D3E4F5136CD5A1A6F8
+ 770B11A71459E33F5F4D2C72CF55DCB79B18C711F3041D8D23375A1419224D309A50C4F10C466527DC7EE910DBDA120ABE877D68E8981C7F51CA4A401BF54775
blatta/lib/state.py
(1 . 65)(1 . 119)
2154 from peer import Peer
2155 from message import EMPTY_CHAIN
2156 import sqlite3
2157 import imp
2158 import hashlib
2159 import binascii
2160 import logging
2161 import datetime
2162 import caribou
2163
2164 from itertools import chain
2165
2166 KNOBS=({'max_bounces': 3,
2167 'embargo_interval': 1,
2168 'rubbish_interval': 10})
2169 'embargo_interval_seconds': 1,
2170 'rubbish_interval_seconds': 10,
2171 'nick': '',
2172 'order_buffer_check_seconds': 5 * 60,
2173 'order_buffer_expiration_seconds': 5 * 60,
2174 'short_buffer_expiration_seconds': 1,
2175 'short_buffer_check_interval_seconds': 1})
2176
2177 class State(object):
2178 __instance = None
2179 @staticmethod
2180 def get_instance(socket=None, db_path=None):
2181 if State.__instance == None:
2182 State(socket, db_path)
2183 return State.__instance
2184
2185 def __init__(self, socket, db_path):
2186 if State.__instance != None:
2187 raise Exception("This class is a singleton")
2188 else:
2189 self.socket = socket
2190 self.conn = sqlite3.connect(db_path, check_same_thread=False)
2191 cursor = self.cursor()
2192 cursor.execute("create table if not exists at(handle_id integer,\
2193 address text not null,\
2194 port integer not null,\
2195 updated_at datetime default null,\
2196 unique(handle_id, address, port))")
2197
2198 cursor.execute("create table if not exists wot(peer_id integer primary key)")
2199
2200 cursor.execute("create table if not exists handles(handle_id integer primary key,\
2201 peer_id integer,\
2202 handle text,\
2203 unique(handle))")
2204
2205 cursor.execute("create table if not exists keys(peer_id intenger,\
2206 key text,\
2207 used_at datetime default current_timestamp,\
2208 unique(key))")
2209
2210 cursor.execute("create table if not exists logs(\
2211 handle text not null,\
2212 peer_id integer,\
2213 message_bytes blob not null,\
2214 created_at datetime default current_timestamp)")
2215
2216 cursor.execute("create table if not exists dedup_queue(\
2217 hash text not null,\
2218 created_at datetime default current_timestamp)")
2219 cursor.execute("create table if not exists knobs(\
2220 name text not null,\
2221 value text not null)")
2222 State.__instance = self
2223 def __init__(self, station, db_path=None):
2224 self.station = station
2225 if db_path:
2226 self.conn = sqlite3.connect(db_path)
2227 else:
2228 self.conn = sqlite3.connect("file::memory:")
2229
2230 cursor = self.cursor()
2231 cursor.execute("create table if not exists handle_self_chain(id integer primary key autoincrement,\
2232 handle string not null,\
2233 message_hash blob not null)")
2234
2235 cursor.execute("create table if not exists broadcast_self_chain(id integer primary key autoincrement,\
2236 message_hash blob not null)")
2237
2238 cursor.execute("create table if not exists net_chain(id integer primary key autoincrement,\
2239 message_hash blob not null)")
2240
2241 cursor.execute("create table if not exists at(handle_id integer,\
2242 address text not null,\
2243 port integer not null,\
2244 updated_at datetime default null,\
2245 unique(handle_id, address, port))")
2246
2247 cursor.execute("create table if not exists wot(peer_id integer primary key autoincrement)")
2248
2249 cursor.execute("create table if not exists handles(handle_id integer primary key,\
2250 peer_id integer,\
2251 handle text,\
2252 unique(handle))")
2253
2254 cursor.execute("create table if not exists keys(peer_id intenger,\
2255 key text,\
2256 used_at datetime default current_timestamp,\
2257 unique(key))")
2258
2259 cursor.execute("create table if not exists log(\
2260 message_bytes blob not null,\
2261 message_hash text not null, \
2262 command integer not null, \
2263 timestamp datetime not null, \
2264 created_at datetime default current_timestamp)")
2265
2266 cursor.execute("create table if not exists knobs(\
2267 name text not null,\
2268 value text not null)")
2269
2270 # migrate the db if necessary
2271 if db_path:
2272 caribou.upgrade(db_path, "migrations")
2273
2274 self.conn.commit()
2275
2276 def cursor(self):
2277 return self.conn.cursor()
2278
2279 def update_handle_self_chain(self, handle, message_hash):
2280 cursor = self.cursor()
2281 cursor.execute("insert into handle_self_chain(handle, message_hash) values(?, ?)", (handle, buffer(message_hash)))
2282 self.conn.commit()
2283
2284 def get_handle_self_chain(self, handle):
2285 cursor = self.cursor()
2286 results = cursor.execute("select message_hash from handle_self_chain where handle=?\
2287 order by id desc limit 1", (handle,)).fetchone()
2288 if results is not None:
2289 return results[0][:]
2290 else:
2291 return EMPTY_CHAIN
2292
2293 def update_broadcast_self_chain(self, message_hash):
2294 cursor = self.cursor()
2295 cursor.execute("insert into broadcast_self_chain(message_hash) values(?)", (buffer(message_hash),))
2296 self.conn.commit()
2297
2298 def get_broadcast_self_chain(self):
2299 cursor = self.cursor()
2300 results = cursor.execute("select message_hash from broadcast_self_chain order by id desc limit 1").fetchone()
2301 if results is not None:
2302 return results[0][:]
2303 else:
2304 return EMPTY_CHAIN
2305
2306 def update_net_chain(self, message_hash):
2307 self.cursor().execute("insert into net_chain(message_hash) values(?)", (buffer(message_hash),))
2308 self.conn.commit()
2309
2310 def get_net_chain(self):
2311 cursor = self.cursor()
2312 results = cursor.execute("select message_hash from net_chain order by id desc limit 1").fetchone()
2313 if results is not None:
2314 return results[0][:]
2315 else:
2316 return EMPTY_CHAIN
2317
2318 def get_knobs(self):
2319 cursor = self.cursor()
2320 results = cursor.execute("select name, value from knobs order by name asc").fetchall()
(88 . 7)(142 . 15)
2322 cursor.execute("update knobs set value=? where name=?", (knob_value, knob_name,))
2323 else:
2324 cursor.execute("insert into knobs(name, value) values(?, ?)", (knob_name, knob_value,))
2325
2326
2327 self.conn.commit()
2328
2329 def get_latest_message_timestamp(self):
2330 cursor = self.cursor()
2331 result = cursor.execute("select timestamp from log order by timestamp desc limit 1").fetchone()
2332 if result:
2333 return result[0]
2334
2335 def get_at(self, handle=None):
2336 cursor = self.cursor()
2337 at = []
(114 . 57)(176 . 6)
2339 "active_at": updated_at if updated_at else "no packets received from this address"})
2340 return at
2341
2342
2343 def is_duplicate_message(self, message_hash):
2344 cursor = self.cursor()
2345 cursor.execute("delete from dedup_queue where created_at < datetime(current_timestamp, '-1 hour')")
2346 self.conn.commit()
2347 result = cursor.execute("select hash from dedup_queue where hash=?",
2348 (message_hash,)).fetchone()
2349 logging.debug("checking if %s is dupe" % message_hash)
2350 if(result != None):
2351 return True
2352 else:
2353 return False
2354
2355 def add_to_dedup_queue(self, message_hash):
2356 cursor = self.cursor()
2357 cursor.execute("insert into dedup_queue(hash)\
2358 values(?)",
2359 (message_hash,))
2360 logging.debug("added %s to dedup" % message_hash)
2361 self.conn.commit()
2362
2363 def get_last_message_hash(self, handle, peer_id=None):
2364 cursor = self.cursor()
2365 if peer_id:
2366 message_bytes = cursor.execute("select message_bytes from logs\
2367 where handle=? and peer_id=?\
2368 order by created_at desc limit 1",
2369 (handle, peer_id)).fetchone()
2370
2371 else:
2372 message_bytes = cursor.execute("select message_bytes from logs\
2373 where handle=? and peer_id is null\
2374 order by created_at desc limit 1",
2375 (handle,)).fetchone()
2376
2377 if message_bytes:
2378 return hashlib.sha256(message_bytes[0][:]).digest()
2379 else:
2380 return "\x00" * 32
2381
2382 def log(self, handle, message_bytes, peer=None):
2383 cursor = self.cursor()
2384 if peer != None:
2385 peer_id = peer.peer_id
2386 else:
2387 peer_id = None
2388
2389 cursor.execute("insert into logs(handle, peer_id, message_bytes)\
2390 values(?, ?, ?)",
2391 (handle, peer_id, buffer(message_bytes)))
2392
2393 def import_at_and_wot(self, at_path):
2394 cursor = self.cursor()
2395 wot = imp.load_source('wot', at_path)
(184 . 7)(195 . 6)
2397 (handle_id, peer["address"], peer["port"], None))
2398 cursor.execute("insert into keys(peer_id, key) values(?, ?)",
2399 (peer_id, key))
2400
2401 self.conn.commit()
2402
2403 def update_at(self, peer, set_active_at=True):
(212 . 24)(222 . 18)
2405 peer['address'],
2406 peer['port']))
2407
2408 # otherwise update the existing entry if it differs
2409 # otherwise just update the existing entry
2410 else:
2411 try:
2412 if (at_entry[1] != peer['address'] or
2413 at_entry[2] != peer['port']):
2414 cursor.execute("update at set updated_at = ?,\
2415 address = ?,\
2416 port = ?\
2417 where handle_id=?",
2418 (timestamp,
2419 peer["address"],
2420 peer["port"],
2421 handle_id))
2422
2423 logging.debug("updated at entry for %s: %s:%d" % (
2424 peer['handle'],
2425 peer['address'],
2426 peer['port']))
2427 cursor.execute("update at set updated_at = ?,\
2428 address = ?,\
2429 port = ?\
2430 where handle_id=?",
2431 (timestamp,
2432 peer["address"],
2433 peer["port"],
2434 handle_id))
2435
2436 except sqlite3.IntegrityError:
2437 cursor.execute("delete from at where handle_id=?", (handle_id,))
2438
(310 . 10)(314 . 38)
2440
2441 def listify(self, results):
2442 return list(chain.from_iterable(results))
2443
2444 def get_keyed_peers(self, exclude_addressless=False):
2445
2446 def log_has_message(self, message_hash):
2447 cursor = self.cursor()
2448 result = cursor.execute("select exists(select 1 from log where message_hash=?)\
2449 limit 1", (binascii.hexlify(message_hash),)).fetchone()
2450 return result[0]
2451
2452 def log_message(self, message):
2453 cursor = self.cursor()
2454 message_hash_hex_string = binascii.hexlify(message.message_hash)
2455 cursor.execute("insert into log(message_hash, message_bytes, command, timestamp) values(?, ?, ?, ?)",
2456 (message_hash_hex_string,
2457 buffer(message.message_bytes),
2458 message.command,
2459 message.timestamp))
2460 self.conn.commit()
2461
2462 def get_message(self, message_hash):
2463 cursor = self.cursor()
2464 message_hash_hex_string = binascii.hexlify(message_hash)
2465 result = cursor.execute("select command, message_bytes from log where message_hash=? limit 1",
2466 (message_hash_hex_string,)).fetchone()
2467 if result:
2468 return result[0], result[1][:]
2469
2470 return None, None
2471
2472 def get_keyed_peers(self, exclude_addressless=False, exclude_ids=[]):
2473 cursor = self.cursor()
2474 peer_ids = self.listify(cursor.execute("select peer_id from keys").fetchall())
2475 peer_ids = self.listify(cursor.execute("select peer_id from keys\
2476 where peer_id not in (%s) order by random()" % ','.join('?'*len(exclude_ids)),
2477 exclude_ids).fetchall())
2478 peers = []
2479 for peer_id in peer_ids:
2480 handle = cursor.execute("select handle from handles where peer_id=?", (peer_id,)).fetchone()[0]
(334 . 15)(366 . 16)
2482 if handle_info == None:
2483 return None
2484
2485 peer_id = handle_info[1]
2486 address = cursor.execute("select address, port from at where handle_id=?\
2487 order by updated_at desc limit 1",
2488 (handle_info[0],)).fetchone()
2489 handles = self.listify(cursor.execute("select handle from handles where peer_id=?",
2490 (handle_info[1],)).fetchall())
2491 (peer_id,)).fetchall())
2492 keys = self.listify(cursor.execute("select key from keys where peer_id=?\
2493 order by used_at desc",
2494 (handle_info[1],)).fetchall())
2495 return Peer(self.socket, {
2496 order by random()",
2497 (peer_id,)).fetchall())
2498 return Peer(self.station.socket, {
2499 "handles": handles,
2500 "peer_id": handle_info[1],
2501 "address": address[0] if address else None,
(352 . 6)(385 . 8)
2503
2504 def is_duplicate(self, peers, peer):
2505 for existing_peer in peers:
2506 if existing_peer.address == peer.address and existing_peer.port == peer.port:
2507 if (not existing_peer.address is None
2508 and existing_peer.address == peer.address
2509 and existing_peer.port == peer.port):
2510 return True
2511 return False
- F53458362F6D7C18A066F42DBA0D1F8370DEF5BE9DD1A89A972082F3D387E34881CC879688AD66D1F5FE0D5930132FFCC176D32FA34487C0F5226394C17E6169
+ 24C203C6741B7EB17BF86C2A1046D5BE50F4FDECC7E9B4E5316E2F9101BEDEE9B84CE01A666EDF98F6545574571358E5B2EBC19B12C941F006BC1182DC2992F2
blatta/lib/station.py
(1 . 28)(1 . 47)
2516 import time
2517 VERSION = 9982
2518
2519 import binascii
2520 import logging
2521 import os
2522
2523 from lib.broadcast import Broadcast
2524 from lib.direct import Direct
2525 from state import State
2526 from infosec import STALE_PACKET
2527 from infosec import DUPLICATE_PACKET
2528 from infosec import MALFORMED_PACKET
2529 from infosec import INVALID_SIGNATURE
2530 from infosec import IGNORED
2531 from infosec import Infosec
2532 from commands import IGNORE
2533 from getdata import GetData
2534 from message import STALE_PACKET, OUT_OF_ORDER_NET, OUT_OF_ORDER_SELF, OUT_OF_ORDER_BOTH
2535 from message import DUPLICATE_PACKET
2536 from message import MALFORMED_PACKET
2537 from message import INVALID_SIGNATURE
2538 from message import UNSUPPORTED_VERSION
2539 from message import Message
2540 from commands import BROADCAST
2541 from commands import DIRECT
2542 from peer import Peer
2543 from ignore import Ignore
2544 from server import Server
2545 from long_buffer import LongBuffer
2546 from order_buffer import OrderBuffer
2547 from short_buffer import ShortBuffer
2548 from commands import BROADCAST, DIRECT, GETDATA, IGNORE
2549
2550
2551 class Station(object):
2552 def __init__(self, options):
2553 def __init__(self, cmd_line_options):
2554 self.client = None
2555 self.state = State.get_instance(options["socket"], options["db_path"])
2556 if options.get("address_table_path") != None:
2557 self.state.import_at_and_wot(options.get("address_table_path"))
2558 self.infosec = Infosec(self.state)
2559 self.embargo_queue = {}
2560 self.socket = None
2561 self.state = State(self, cmd_line_options.db_path)
2562 if cmd_line_options.address_table_path is not None:
2563 self.state.import_at_and_wot(cmd_line_options.address_table_path)
2564 self.short_buffer = ShortBuffer(self.state)
2565 self.long_buffer = LongBuffer(self.state)
2566 self.order_buffer = OrderBuffer(self.state)
2567 self.server = Server(cmd_line_options, self)
2568 self.handlers = {
2569 DIRECT: self.handle_direct,
2570 BROADCAST: self.handle_broadcast,
2571 GETDATA: self.handle_getdata,
2572 IGNORE: self.handle_ignore
2573 }
2574
2575 def start(self):
2576 self.server.start()
2577
2578 def handle_udp_data(self, bytes_address_pair):
2579 data = bytes_address_pair[0]
(30 . 118)(49 . 173)
2581 packet_info = (address[0],
2582 address[1],
2583 binascii.hexlify(data)[0:16])
2584 logging.debug("[%s:%d] -> %s" % packet_info)
2585 for peer in self.state.get_keyed_peers():
2586 message = self.infosec.unpack(peer, data)
2587 error_code = message.error_code
2588 if(error_code == None):
2589 logging.info("[%s:%d %s] -> %s %d %s" % (peer.address,
2590 peer.port,
2591 peer.handles[0],
2592 message.body,
2593 message.bounces,
2594 message.message_hash))
2595 self.conditionally_update_at(peer, message, address)
2596
2597 # if this is a direct message, just deliver it and return
2598 if message.command == DIRECT:
2599 self.deliver(message)
2600 return
2601
2602 # embargo to wait for immediate copy of message
2603 else:
2604 self.embargo(message)
2605 return
2606 elif error_code == STALE_PACKET:
2607 logging.debug("[%s:%d] -> stale packet: %s" % packet_info)
2608 return
2609 elif error_code == DUPLICATE_PACKET:
2610 logging.debug("[%s:%d] -> duplicate packet: %s" % packet_info)
2611 return
2612 elif error_code == MALFORMED_PACKET:
2613 logging.debug("[%s:%d] -> malformed packet: %s" % packet_info)
2614 return
2615 elif error_code == IGNORED:
2616 self.conditionally_update_at(peer, message, address)
2617 logging.debug("[%s:%d] -> ignoring packet: %s" % packet_info)
2618 return
2619 elif error_code == INVALID_SIGNATURE:
2620 pass
2621 logging.debug("[%s:%d] -> martian packet: %s" % packet_info)
2622
2623 def deliver(self, message):
2624 # add to duplicate queue
2625 self.state.add_to_dedup_queue(message.message_hash)
2626
2627 # send to the irc client
2628 if self.client:
2629 self.client.message_from_station(message)
2630
2631 def embargo(self, message):
2632 # initialize the key/value to empty array if not in the hash
2633 # append message to array
2634 if not message.message_hash in self.embargo_queue.keys():
2635 self.embargo_queue[message.message_hash] = []
2636 self.embargo_queue[message.message_hash].append(message)
2637
2638 def check_embargo_queue(self):
2639 # get a lock so other threads can't mess with the db or the queue
2640 self.check_for_immediate_messages()
2641 self.flush_hearsay_messages()
2642
2643 def check_for_immediate_messages(self):
2644 for key in dict(self.embargo_queue).keys():
2645 messages = self.embargo_queue[key]
2646
2647 for message in messages:
2648
2649 # if this is an immediate copy of the message
2650
2651 if message.speaker in message.peer.handles:
2652
2653 # clear the queue and deliver
2654
2655 self.embargo_queue.pop(key, None)
2656 self.deliver(message)
2657 self.rebroadcast(message)
2658 break
2659 metadata = {
2660 'address': address,
2661 'packet_info': packet_info,
2662 }
2663
2664 for peer in self.state.get_keyed_peers():
2665 message = Message.unpack(peer,
2666 data,
2667 self.long_buffer,
2668 self.order_buffer,
2669 metadata,
2670 self.state)
2671
2672 if message.get('error_code') is INVALID_SIGNATURE:
2673 continue
2674
2675 if message.get('error_code') in [None,
2676 UNSUPPORTED_VERSION,
2677 STALE_PACKET,
2678 OUT_OF_ORDER_BOTH,
2679 OUT_OF_ORDER_SELF,
2680 OUT_OF_ORDER_NET,
2681 DUPLICATE_PACKET,
2682 MALFORMED_PACKET]:
2683 break
2684
2685 if message.get('error_code') is None:
2686 if message['command'] == DIRECT:
2687 self.handle_message(Direct(message, self.state))
2688 elif message['command'] == BROADCAST:
2689 self.handle_message(Broadcast(message, self.state))
2690 elif message['command'] == GETDATA:
2691 # This is a little weird. We don't want to instantiate a GetData
2692 # object here because that would switch around body and self_chain,
2693 # so let's just instantiate a generic Message and make sure to handle
2694 # it as a GetData message.
2695 self.handle_message(Message(message, self.state))
2696 elif message['command'] == IGNORE:
2697 self.handle_message(Ignore(message, self.state))
2698 else:
2699 self.report_error(message['error_code'], message)
2700
2701 def flush_hearsay_messages(self):
2702 # if we made it this far either we haven't found any immediate messages
2703 # or we sent them all so we must deliver the remaining hearsay messages
2704 # with the appropriate labeling
2705 for key in dict(self.embargo_queue).keys():
2706
2707 # collect the source handles
2708 handles = []
2709 messages = self.embargo_queue[key]
2710 for message in messages:
2711 handles.append(message.peer.handles[0])
2712
2713 # select the message with the lowest bounce count
2714 message = sorted(messages, key=lambda m: m.bounces)[0]
2715
2716 # clear the queue
2717 self.embargo_queue.pop(key, None)
2718
2719 def handle_message(self, message):
2720 try:
2721 self.handlers[message.command](message)
2722 except KeyError:
2723 logging.error("Unknown command, ignoring")
2724
2725 def handle_direct(self, message):
2726 message.log_incoming(message.peer)
2727 self.deliver(message)
2728 self.long_buffer.intern(message)
2729 self.conditionally_update_at(message, message.metadata["address"])
2730
2731 def handle_broadcast(self, message):
2732 # it's possible we'll log dupes coming out of the order buffer here
2733 message.log_incoming(message.peer)
2734
2735 # check if this is an immediate message
2736 if message.speaker in message.peer.handles:
2737 # remove message from short buffer if it was received as hearsay
2738 if self.short_buffer.has(message.message_hash):
2739 self.short_buffer.drop(message.message_hash)
2740 self.deliver(message)
2741 self.long_buffer.intern(message)
2742 self.state.update_net_chain(message.message_hash)
2743 self.rebroadcast(message)
2744 else:
2745 # embargo to wait for immediate copy of message
2746 self.short_buffer.embargo(message)
2747 self.conditionally_update_at(message, message.metadata["address"])
2748
2749 def handle_getdata(self, message):
2750 message.log_incoming_getdata(message.peer)
2751
2752 # check for the requested message
2753 archived_message = self.long_buffer.exhume(message.body)
2754
2755 # resend it if it exists
2756 if archived_message:
2757 archived_message.retry(message.peer)
2758
2759 def handle_ignore(self, message):
2760 self.conditionally_update_at(message, message.metadata['address'])
2761 packet_info = message.metadata["packet_info"]
2762 address = packet_info[0]
2763 port = packet_info[1]
2764 packet_sample = packet_info[2]
2765 if os.environ.get('LOG_RUBBISH'):
2766 logging.debug("[%s:%d] -> ignoring packet: %s" % (address, port, packet_sample))
2767 return
2768
2769 def check_order_buffer(self):
2770 messages = self.order_buffer.dequeue_and_order_mature_messages()
2771 for message in messages:
2772 self.handle_message(message)
2773
2774 def check_short_buffer(self):
2775 messages = self.short_buffer.flush()
2776 for message_with_stats in messages:
2777 message = message_with_stats['message']
2778 # compute prefix
2779 if len(messages) < 4:
2780 if len(message_with_stats['closest_peers']) < 4:
2781 handles = []
2782 for peer in message_with_stats['closest_peers']:
2783 handles.append(peer.handles[0])
2784 message.prefix = "%s[%s]" % (message.speaker, "|".join(handles))
2785 else:
2786 message.prefix = "%s[%d]" % (message.speaker, len(messages))
2787 message.prefix = "%s[%d]" % (message.speaker, len(message_with_stats['closest_peers']))
2788
2789 # deliver
2790 self.deliver(message)
2791 self.long_buffer.intern(message)
2792 self.state.update_net_chain(message.message_hash)
2793 message.reporting_peers = message_with_stats['reporting_peers']
2794 self.rebroadcast(message)
2795
2796 def report_error(self, error_code, message):
2797 packet_info = message['metadata']["packet_info"]
2798 address = packet_info[0]
2799 port = packet_info[1]
2800 packet_sample = packet_info[2]
2801 if error_code == STALE_PACKET:
2802 logging.debug("[%s:%d] -> stale packet: %s" % (address, port, binascii.hexlify(message['message_hash'])))
2803 elif error_code == DUPLICATE_PACKET:
2804 logging.debug("[%s:%d] -> duplicate packet: %s" % (address, port, binascii.hexlify(message['message_hash'])))
2805 elif error_code == MALFORMED_PACKET:
2806 logging.debug("[%s:%d] -> malformed packet: %s" % (address, port, packet_sample))
2807 elif error_code == INVALID_SIGNATURE:
2808 logging.debug("[%s:%d] -> invalid packet signature: %s" % (address, port, packet_sample))
2809 elif error_code == UNSUPPORTED_VERSION:
2810 logging.debug("[%s:%d] -> pest version not supported: %s" % (address, port, packet_sample))
2811 elif error_code == OUT_OF_ORDER_NET:
2812 self.add_message_to_order_buffer_and_send_getdata(message, ['net_chain'])
2813 elif error_code == OUT_OF_ORDER_SELF:
2814 self.add_message_to_order_buffer_and_send_getdata(message, ['self_chain'])
2815 elif error_code == OUT_OF_ORDER_BOTH:
2816 self.add_message_to_order_buffer_and_send_getdata(message, ['self_chain', 'net_chain'])
2817
2818 def add_message_to_order_buffer_and_send_getdata(self, message, broken_chains):
2819 packet_info = message['metadata']["packet_info"]
2820 address = packet_info[0]
2821 port = packet_info[1]
2822 logging.debug(
2823 "[%s:%d] -> message received out of order: %s" % (address, port, binascii.hexlify(message['message_hash'])))
2824 if not self.order_buffer.has(message['message_hash']):
2825 for chain in broken_chains:
2826 GetData(message, chain, self.state).send()
2827 self.order_buffer.add(message)
2828
2829 # send the message to all other peers if it should be propagated
2830 self.rebroadcast(message)
2831 def deliver(self, message):
2832 # it's possible that these messages are from an order buffer
2833 # dump and their immediate copies may already have been broadcast
2834 # or vice versa so we need to check the long buffer
2835 if self.long_buffer.has(message.message_hash):
2836 return
2837
2838 # set a timestamp warning if the message is older than the last displayed message.
2839 message.set_warning()
2840
2841 # we only update the address table if the speaker is same as peer
2842 # send to the irc client
2843 if self.client:
2844 self.client.message_from_station(message)
2845
2846 def conditionally_update_at(self, peer, message, address):
2847 if message.speaker in peer.handles:
2848 # we only update the address table if the speaker is same as peer
2849 def conditionally_update_at(self, message, address):
2850 if message.speaker in message.peer.handles:
2851 self.state.update_at({
2852 "handle": message.speaker,
2853 "address": address[0],
(149 . 19)(223 . 15)
2855 })
2856
2857 def rebroadcast(self, message):
2858 if message.bounces < int(self.state.get_knob("max_bounces")):
2859 message.command = BROADCAST
2860 message.bounces = message.bounces + 1
2861 self.infosec.message(message)
2862 else:
2863 logging.debug("message TTL expired: %s" % message.message_hash)
2864
2865 if not message.get_data_response:
2866 if message.bounces < int(self.state.get_knob("max_bounces")):
2867 message.bounces = message.bounces + 1
2868 message.forward()
2869 else:
2870 logging.debug("message TTL expired: %s" % message.message_hash)
2871
2872 def send_rubbish(self):
2873 if self.client:
2874 self.infosec.message(Message({
2875 Ignore({
2876 "speaker": self.client.nickname,
2877 "command": IGNORE,
2878 "bounces": 0,
2879 "body": self.infosec.gen_rubbish_body()
2880 }))
2881 }, self.state).send()
-
+ 24CB89EE415217D9D19CA443F9A25C918FD5D0ED9C30FAC6051A6886AF0AB5F05310A6E64F4D24C6B69AA5DDA096E99FADA0FB1A18FAE16F68BD36E725BDDA9A
blatta/migrations/20220106130042_use_ids.py
(0 . 0)(1 . 17)
2886 """
2887 This module contains a Caribou migration.
2888
2889 Migration Name: use_ids
2890 Migration Version: 20220106130042
2891 """
2892 import hashlib
2893
2894 def upgrade(conn):
2895 # alter dedup_queue hash type from text to blog
2896 conn.execute("drop table if exists dedup_queue")
2897
2898 # add the unique id and message_hash columns to logs
2899 conn.execute("drop table if exists logs")
2900
2901 def downgrade(conn):
2902 pass
- C617E723EF3A1360C343CA635D0E4AA7F77AED515557644FCE41DAB0319057193EE75CFF2258F69486037A025353672C6E81418D8CD9DC0C1FA6C64B66DA8ED9
+
blatta/scripts/gen_key_pair.py
(1 . 63)(0 . 0)
2907 #! /usr/bin/env python
2908 # This is a TOY. Do not fire in anger.
2909
2910 import os
2911 import sys
2912 import base64
2913 import pprint
2914 from optparse import OptionParser
2915
2916
2917 def main(argv):
2918 pp = pprint.PrettyPrinter(indent=4)
2919 op = OptionParser(
2920 description="gen_key_pair generates pseudo-random 64 byte key pairs for use in testing alcuin")
2921 op.add_option(
2922 "-r", "--remote-name",
2923 help="Name of remote station to include with key pair")
2924 op.add_option(
2925 "-l", "--local-name",
2926 help="Name of local station to include with key pair")
2927 op.add_option(
2928 "-p", "--remote-port",
2929 help="Remote station port number to include with key pair")
2930 op.add_option(
2931 "-q", "--local-port",
2932 help="Local station port number to include with key pair")
2933 op.add_option(
2934 "-a", "--remote-address",
2935 help="Remote station IP address to include with key pair")
2936 op.add_option(
2937 "-b", "--local-address",
2938 help="Local station IP address to include with key pair")
2939
2940 (options, args) = op.parse_args(argv[1:])
2941
2942 if options.local_port is None:
2943 options.local_port = 7778
2944 if options.remote_port is None:
2945 options.remote_port = 7778
2946 if options.local_address is None:
2947 options.local_address = ""
2948 if options.remote_address is None:
2949 options.remote_address = ""
2950 key = generate_key()
2951 my_config = {
2952 "name": options.local_name,
2953 "key": key,
2954 "address": options.local_address,
2955 "port": options.local_port
2956 }
2957 their_config = {
2958 "name": options.remote_name,
2959 "key": key,
2960 "address": options.remote_address,
2961 "port": options.remote_port
2962 }
2963 pp.pprint(my_config)
2964 pp.pprint(their_config)
2965
2966 def generate_key():
2967 return base64.b64encode(os.urandom(64))
2968
2969 main(sys.argv)
- BB6A2CA2267F79B30C0F393A552EC5CF79BD1B248A71093E72631955FA1F8D637AE3E0B2D5BC6FCA90A9615347DF64D79BBB390173A4F5B494C192545421E7A9
+ AFCBBCF791E1F0E90125A8FFDA4A018FB172FAE6C074BF5DD046371662BEFEA5CA980E28BB23DE5A5C3F1A6B0283B628605BDEA22FC5C425736B89FB12219442
blatta/start_test_net.sh
(2 . 5)(2 . 5)
2974
2975 # start 3 servers on different ports
2976 ./blatta --log-level debug --channel-name \#aleth --irc-port 9968 --udp-port 7778 --db-path a.db --address-table-path test_net_configs/a.py > logs/a &
2977 # ./blatta --log-level info --channel-name \#aleth --irc-port 6669 --udp-port 7779 --db-path b.db --address-table-path test_net_configs/b.py > logs/b &
2978 ./blatta --log-level debug --channel-name \#aleth --irc-port 6669 --udp-port 7779 --db-path b.db --address-table-path test_net_configs/b.py > logs/b &
2979 ./blatta --log-level debug --channel-name \#aleth --irc-port 6670 --udp-port 7780 --db-path c.db --address-table-path test_net_configs/c.py > logs/c &
- 27BFACB1A2F3D5C0C9947045E0DBF61D2822C0DA84BE7B9589B261D79FD3B9B2A845D354FC0310AAE2841D5DC0B1D8FF22960D33D779DFBC6E0680BD33424D27
+ 215072B9B4CB54E788224F8E27B7FDA063F1CB9DBA302E7AFF276828AABF2DDB0EA5A802AF6165BE85F49AFAB36C4855059E40463372AEC8B396DC5FDD2D6A5A
blatta/test_net_configs/a.py
(1 . 12)(1 . 26)
2984 peers = [
2985 { 'address': 'localhost',
2986 {
2987 'address': 'localhost',
2988 'key': '58bc4NyvMjasIXvsOvPxugaMpFS6tme+xJleOEwVn4iv2IuLUNAfHrkFCeL/Q4m/13Q5gfZxDbVEOtjQe+zW6Q==',
2989 'name': 'awt_b',
2990 'port': 7779
2991 },
2992 # { 'address': 'localhost',
2993 # 'key': 'lT8/fYe/rQdReyavsTrVqInnLFCaU38o2ZAn5+r8uoFSSWgJelafikFELR9t6SJHMpFQvLmlAbF14nL2PfOAyA==',
2994 # 'name': 'awt_c',
2995 # 'port': 7780
2996 # }
2997 {
2998 'address': 'localhost',
2999 'key': 'oVIZ+U9F1b0YI9QdLVt2If/qLxoHG/2NCmgXq7HyaYASNn3zQeXTR/4Tz8z9MB6gOkwu+5+LH8L+MsyyQ0nhdA==',
3000 'name': 'awt_c',
3001 'port': 7780
3002 },
3003 {
3004 'address': 'localhost',
3005 'key': 'WEEnf6QWATZaVjKCkgsgYUD4uJyiYIqsQQagl/Hc35Hd/HOWSaw79YJ7uXyw9G1/XoJD0BMxMCJ6HEJ0jupL1A==',
3006 'name': 'awt_d',
3007 'port': 7781
3008 },
3009 {
3010 'address': 'localhost',
3011 'key': 'S3KJlcOLAlsy1bFJp71/woKsAF48SRPX5fcxWyxVmgsHlJeuVwq7hvQK6qKuNfIDnTUO/T9V0b75ugF0mAcQsg==',
3012 'name': 'awt_e',
3013 'port': 7782
3014 }
3015 ]
- 8869517FFCED618BB8197669AA4124F29E450CA8F2E6451DB8C7CB62D09F6EE4B6724DBD0A511851BCCA1D392A4AB6C6A5A1FC1FEB3F61E6F637A33BECD6E251
+ 44002664205935959EA93351A57366A1F91252AAD95FE5523919754220E4AEB61621D2D568F31B1364F04BED1453E1BC78B0C5C3C8D074AA2EF10D489554B212
blatta/test_net_configs/b.py
(8 . 5)(8 . 10)
3020 'key': '8ugkh+G1NC45DhPPtvPCI/78+fvV8K3v2XaQXvLGpJzeXy2IEA5ZnIo3PGU30+25JxAr0KV+InoqBa0VpY+zCA==',
3021 'name': 'awt_c',
3022 'port': 7780
3023 }
3024 },
3025 { 'address': 'localhost',
3026 'key': '+H7mJLhUvecaE+mcV1AOKzppSWHyPTpN8Sv8+Kr1usr9haxYGC8NSjs7LaXBdtuYceUAkl+TDJ6zJnqmVQUy8w==',
3027 'name': 'awt_d',
3028 'port': 7781
3029 }
3030 ]
- 12569FC74A9742F8D4CE31DE62C222D1F0010AFCBE863ED471EB95384CD06C6E86605BCABC04A9D6E1C53106FA46655AB9E6CAE2EC935C64BE5156186D7488C3
+ 1FBE9273E73AC3A8D9EC188FD3796C9CE35B68B232485B27B55B328FDE071E0DC221CF5A318446AAA76A52245F7A57F403B68EF1E984CA62D43756E90CEE4F5D
blatta/test_net_configs/c.py
(5 . 7)(5 . 7)
3035 'key': '8ugkh+G1NC45DhPPtvPCI/78+fvV8K3v2XaQXvLGpJzeXy2IEA5ZnIo3PGU30+25JxAr0KV+InoqBa0VpY+zCA=='
3036 },
3037 { 'address': 'localhost',
3038 'key': 'lT8/fYe/rQdReyavsTrVqInnLFCaU38o2ZAn5+r8uoFSSWgJelafikFELR9t6SJHMpFQvLmlAbF14nL2PfOAyA==',
3039 'key': 'oVIZ+U9F1b0YI9QdLVt2If/qLxoHG/2NCmgXq7HyaYASNn3zQeXTR/4Tz8z9MB6gOkwu+5+LH8L+MsyyQ0nhdA==',
3040 'name': 'awt_a',
3041 'port': 7778
3042 }
-
+ 3F950DF14EE015DDBE13D7982DDC1D5FD99B08E959B30C7BCC9478D7DBBF24019CECA6AC892C004C422AF2EDF5404439345DB9F10FE53097AE0AB3160CD10F3F
blatta/test_net_configs/d.py
(0 . 0)(1 . 14)
3047 peers = [
3048 {
3049 'address': 'localhost',
3050 'name': 'awt_b',
3051 'port': 7779,
3052 'key': '+H7mJLhUvecaE+mcV1AOKzppSWHyPTpN8Sv8+Kr1usr9haxYGC8NSjs7LaXBdtuYceUAkl+TDJ6zJnqmVQUy8w=='
3053 },
3054 {
3055 'address': 'localhost',
3056 'name': 'awt_a',
3057 'port': 7778,
3058 'key': 'WEEnf6QWATZaVjKCkgsgYUD4uJyiYIqsQQagl/Hc35Hd/HOWSaw79YJ7uXyw9G1/XoJD0BMxMCJ6HEJ0jupL1A=='
3059 }
3060 ]
-
+ DBF72C79C7768C7C268AEBE48687FCEC4EBB25FC900A6F317F54C81B5879064A5BE52C3AF6D8FC68AB823638D21BE742949E2B06B09D4AA14F3281D7D3F5B7C0
blatta/test_net_configs/e.py
(0 . 0)(1 . 8)
3065 peers = [
3066 {
3067 'address': 'localhost',
3068 'name': 'awt_a',
3069 'port': 7778,
3070 'key': 'S3KJlcOLAlsy1bFJp71/woKsAF48SRPX5fcxWyxVmgsHlJeuVwq7hvQK6qKuNfIDnTUO/T9V0b75ugF0mAcQsg=='
3071 }
3072 ]
-
+ 636EB6ADDE55156D0F8951B85CB7C3F58BE4BA90A8E616821794F8280FC7BEC33934C1A6CCA12B707207A083791B81DD74B4BE9AB5CE4E041B809205C90389E0
blatta/tests/helper.py
(0 . 0)(1 . 8)
3077 import logging
3078 import os
3079 import sys
3080
3081
3082 def setup():
3083 log_format = "%(levelname)s %(asctime)s: %(message)s"
3084 logging.basicConfig(level=os.environ.get("LOGLEVEL", "INFO"), format=log_format, stream=sys.stdout)
-
+ 29EA5E81951450A33616BE0D9609CA1391F9A92A45378ABA6E6DA21058D4AD588FEF569CFF2B4AEE6B98BE416AEC7B6CE01BB09716A19975189BDB2A1281C04F
blatta/tests/test_broadcast.py
(0 . 0)(1 . 69)
3089 import unittest
3090 import helper
3091 from mock import Mock
3092 from lib.state import State
3093 from lib.message import Message
3094 from lib.broadcast import Broadcast
3095 from lib.long_buffer import LongBuffer
3096 from lib.order_buffer import OrderBuffer
3097
3098 class TestMessage(unittest.TestCase):
3099 def setUp(self):
3100 helper.setup()
3101 self.alice_socket = Mock()
3102 self.alice_state = State(self.alice_socket)
3103 self.bob_socket = Mock()
3104 self.bob_state = State(self.bob_socket)
3105 self.setupAlice()
3106 self.setupBob()
3107
3108 def setupAlice(self):
3109 self.bob_state.add_peer('alice')
3110 self.bob_state.add_key(
3111 'alice',
3112 '9h6wYndVjt8QpnIZOYb7KD2tYKCKw4rjlYg4LM1ODx1Qkr3qA0IuKNukkwKhQ4UP9ypMlhyPHa7AGD7NO7Ws5w=='
3113 )
3114 self.bob_state.update_at({
3115 'handle': 'alice',
3116 'address': '127.0.0.1',
3117 'port': 8888
3118 })
3119
3120 def setupBob(self):
3121 self.alice_state.add_peer('bob')
3122 self.alice_state.add_key(
3123 'bob',
3124 '9h6wYndVjt8QpnIZOYb7KD2tYKCKw4rjlYg4LM1ODx1Qkr3qA0IuKNukkwKhQ4UP9ypMlhyPHa7AGD7NO7Ws5w=='
3125 )
3126 self.alice_state.update_at({
3127 'handle': 'bob',
3128 'address': '127.0.0.1',
3129 'port': 8889
3130 })
3131
3132 def tearDown(self):
3133 self.alice_state.remove_peer('bob')
3134 self.bob_state.remove_peer('alice')
3135
3136 def test_broadcast_message(self):
3137 message = Broadcast({
3138 'handle': 'bob',
3139 'speaker': 'alice',
3140 'body': 'm1',
3141 'long_buffer': LongBuffer(self.alice_state)
3142 }, self.alice_state)
3143 message.send()
3144 bob = self.alice_state.get_peer_by_handle('bob')
3145 message_bytes = message.get_message_bytes(bob)
3146 black_packet = Message.pack(bob, message.command, message.bounces, message_bytes)
3147 self.bob_socket.sendto.called_once_with(black_packet, (bob.address, bob.port))
3148
3149 # now bob must unpack the black packet
3150 alice = self.bob_state.get_peer_by_handle('alice')
3151 received_message = Message.unpack(alice,
3152 black_packet,
3153 LongBuffer(self.bob_state),
3154 OrderBuffer(self.bob_state),
3155 {})
3156 self.assertEqual(message.body, received_message['body'])
3157
-
+ B5E72914E0E830B2EBAB8B1052667ED3A36F72ACD69312F87516894510ACA9EBF0A629F502B02B960E7AA4BC383AF801575B9EA0FD7860C26B9029B9AF2D0540
blatta/tests/test_direct.py
(0 . 0)(1 . 73)
3162 import unittest
3163
3164 import time
3165
3166 import helper
3167 from mock import Mock
3168
3169 from lib.state import State
3170 from lib.message import Message
3171 from lib.direct import Direct
3172 from lib.long_buffer import LongBuffer
3173 from lib.order_buffer import OrderBuffer
3174
3175 class TestMessage(unittest.TestCase):
3176 def setUp(self):
3177 helper.setup()
3178 self.alice_socket = Mock()
3179 self.alice_state = State(self.alice_socket)
3180 self.bob_socket = Mock()
3181 self.bob_state = State(self.bob_socket)
3182 self.setupAlice()
3183 self.setupBob()
3184
3185 def setupAlice(self):
3186 self.bob_state.add_peer('alice')
3187 self.bob_state.add_key(
3188 'alice',
3189 '9h6wYndVjt8QpnIZOYb7KD2tYKCKw4rjlYg4LM1ODx1Qkr3qA0IuKNukkwKhQ4UP9ypMlhyPHa7AGD7NO7Ws5w=='
3190 )
3191 self.bob_state.update_at({
3192 'handle': 'alice',
3193 'address': '127.0.0.1',
3194 'port': 8888
3195 })
3196
3197 def setupBob(self):
3198 self.alice_state.add_peer('bob')
3199 self.alice_state.add_key(
3200 'bob',
3201 '9h6wYndVjt8QpnIZOYb7KD2tYKCKw4rjlYg4LM1ODx1Qkr3qA0IuKNukkwKhQ4UP9ypMlhyPHa7AGD7NO7Ws5w=='
3202 )
3203 self.alice_state.update_at({
3204 'handle': 'bob',
3205 'address': '127.0.0.1',
3206 'port': 8889
3207 })
3208
3209 def tearDown(self):
3210 self.alice_state.remove_peer('bob')
3211 self.bob_state.remove_peer('alice')
3212
3213 def test_direct_message(self):
3214 message = Direct({
3215 'handle': 'bob',
3216 'speaker': 'alice',
3217 'body': 'm1',
3218 'long_buffer': LongBuffer(self.alice_state),
3219 'timestamp': int(time.time())
3220 }, self.alice_state)
3221 bob = self.alice_state.get_peer_by_handle('bob')
3222 message.message_bytes = message.get_message_bytes(bob)
3223 message.send()
3224 black_packet = Message.pack(bob, message.command, message.bounces, message.message_bytes)
3225 self.bob_socket.sendto.called_once_with(black_packet, (bob.address, bob.port))
3226
3227 # now bob must unpack the black packet
3228 alice = self.bob_state.get_peer_by_handle('alice')
3229 received_message = Message.unpack(alice,
3230 black_packet,
3231 LongBuffer(self.bob_state),
3232 OrderBuffer(self.bob_state),
3233 {})
3234 self.assertEqual(message.body, received_message['body'])
-
+ B48397DE933C7423E2E5C881F18C0E30FF858F31D3DE6EF4C6074EA512AFA85069C9FD0CAF537C2CC41CC4CA8D69FDFE1AE1FD6F7AF9211CCDE47C8311A70776
blatta/tests/test_getdata.py
(0 . 0)(1 . 118)
3240 import unittest
3241 import logging
3242
3243 import time
3244 from mock import Mock
3245 from lib.message import Message
3246 from lib.getdata import GetData
3247 from lib.long_buffer import LongBuffer
3248 from lib.order_buffer import OrderBuffer
3249 from lib.state import State
3250 from lib.direct import Direct
3251 from lib.broadcast import Broadcast
3252 import helper
3253
3254 class TestGetData(unittest.TestCase):
3255 def setUp(self):
3256 helper.setup()
3257 self.alice_socket = Mock()
3258 self.alice_state = State(self.alice_socket)
3259 self.alice_state.set_knob('nick', 'alice')
3260 self.setupBob()
3261 self.bob_socket = Mock()
3262 self.bob_state = State(self.bob_socket)
3263 self.setupAlice()
3264
3265 def setupBob(self):
3266 self.alice_state.add_peer('bob')
3267 self.alice_state.add_key(
3268 'bob',
3269 '9h6wYndVjt8QpnIZOYb7KD2tYKCKw4rjlYg4LM1ODx1Qkr3qA0IuKNukkwKhQ4UP9ypMlhyPHa7AGD7NO7Ws5w=='
3270 )
3271 self.alice_state.update_at({
3272 'handle': 'bob',
3273 'address': '127.0.0.1',
3274 'port': 8889
3275 })
3276
3277 def setupAlice(self):
3278 self.bob_state.add_peer('alice')
3279 self.bob_state.add_key(
3280 'alice',
3281 '9h6wYndVjt8QpnIZOYb7KD2tYKCKw4rjlYg4LM1ODx1Qkr3qA0IuKNukkwKhQ4UP9ypMlhyPHa7AGD7NO7Ws5w=='
3282 )
3283 self.bob_state.update_at({
3284 'handle': 'alice',
3285 'address': '127.0.0.1',
3286 'port': 8888
3287 })
3288
3289 def test_send(self):
3290 long_buffer = LongBuffer(self.bob_state)
3291 m1 = Direct({
3292 'handle': 'alice',
3293 'speaker': 'bob',
3294 'body': 'm1',
3295 'timestamp': int(time.time()),
3296 'long_buffer': long_buffer
3297 }, self.bob_state)
3298 m2 = Direct({
3299 'handle': 'alice',
3300 'speaker': 'bob',
3301 'body': 'm2',
3302 'timestamp': int(time.time()),
3303 'long_buffer': long_buffer
3304 }, self.bob_state)
3305 m3 = Direct({
3306 'handle': 'alice',
3307 'speaker': 'bob',
3308 'body': 'm3',
3309 'timestamp': int(time.time()),
3310 'long_buffer': long_buffer
3311 }, self.bob_state)
3312 # we need to send these messages to get them into the log
3313 alice = self.bob_state.get_peer_by_handle('alice')
3314 m1.message_bytes = m1.get_message_bytes(alice)
3315 m1.send()
3316 m2.message_bytes = m2.get_message_bytes(alice)
3317 m2.send()
3318 m3.message_bytes = m3.get_message_bytes(alice)
3319 m3.send()
3320
3321 # now let's compile the black packet so alice can
3322 # unpack it and get a message we can pass to GetData()
3323 m1_message_bytes = m1.get_message_bytes(alice)
3324 m1_black_packet = Message.pack(alice, m1.command, m1.bounces, m1_message_bytes)
3325
3326 # we use m3 because if we used m2 there would be no break,
3327 # and if we used m1 it would be considered the first message
3328 m3_message_bytes = m3.get_message_bytes(alice)
3329 # TODO: something strange going on here with the message bytes causing the logger to barf
3330 m3_black_packet = Message.pack(alice, m3.command, m3.bounces, m3_message_bytes)
3331
3332
3333 # we need bob's peer object to know what key to use to decrypt
3334 bob = self.alice_state.get_peer_by_handle('bob')
3335 m1_received = Message.unpack(bob,
3336 m1_black_packet,
3337 LongBuffer(self.alice_state),
3338 OrderBuffer(self.alice_state),
3339 {},
3340 self.alice_state)
3341
3342 m3_received = Message.unpack(bob,
3343 m3_black_packet,
3344 LongBuffer(self.alice_state),
3345 OrderBuffer(self.alice_state),
3346 {},
3347 self.alice_state)
3348
3349 gd_message = GetData(m3_received, 'self_chain', self.alice_state)
3350 gd_message.send()
3351
3352 # rebuild the black packet so we can compare with what was actually sent
3353 gd_black_packet = Message.pack(bob,
3354 gd_message.command,
3355 gd_message.bounces,
3356 gd_message.get_message_bytes(bob))
3357 self.alice_socket.sendto.called_once_with(gd_black_packet, (bob.address, bob.port))
- 991E1FF9817A01D4320ABDF30E09C890B5454C726E313A488DD6BEDC9E8E663019A63CAF9FC16979251F653BEDA05ECECA4A806A7E3C299E6847A8B6FE11A6E4
+ F6EB911B7A54EC1EF368E1EED8202AA2E385086F3D5509C08EB8BB78D216FF2408B7409B5F4F368D46EA98475C4AFB3A67A094EDE9109278AD0CB7EA93459F4B
blatta/tests/test_station.py
(2 . 27)(2 . 81)
3363 import unittest
3364 import logging
3365 from mock import Mock
3366 from mock import patch
3367
3368 from lib.commands import DIRECT
3369 from lib.commands import GETDATA
3370 from lib.station import Station
3371 from lib.state import State
3372 from lib.message import Message
3373 from lib.getdata import GetData
3374 from lib.order_buffer import OrderBuffer
3375 from lib.long_buffer import LongBuffer
3376 from collections import namedtuple
3377 import helper
3378
3379 class TestStation(unittest.TestCase):
3380 def setUp(self):
3381 helper.setup()
3382 logging.basicConfig(level=logging.DEBUG)
3383 options = {
3384 "clients": {"clientsocket": Mock()},
3385 "db_path": "tests/test.db",
3386 "socket": Mock()
3387 }
3388 self.station_socket = Mock()
3389 Options = namedtuple('Options', ['db_path',
3390 'address_table_path',
3391 'socket',
3392 'irc_ports',
3393 'udp_port',
3394 'channel_name',
3395 'password',
3396 'motd',
3397 'listen'])
3398 options = Options(
3399 None,
3400 None,
3401 self.station_socket,
3402 None,
3403 None,
3404 None,
3405 None,
3406 None,
3407 None
3408 )
3409 self.station = Station(options)
3410 self.station.deliver = Mock()
3411 self.station.rebroadcast = Mock()
3412 self.station.rebroadcast.return_value = "foobar"
3413 self.bob_state = State(Mock(), None)
3414 self.station.state.set_knob('nick', 'alice')
3415 self.bob_state.set_knob('nick', 'bob')
3416 self.setupBob()
3417 self.setupAlice()
3418
3419 def setupBob(self):
3420 self.station.state.add_peer('bob')
3421 self.station.state.add_key(
3422 'bob',
3423 '9h6wYndVjt8QpnIZOYb7KD2tYKCKw4rjlYg4LM1ODx1Qkr3qA0IuKNukkwKhQ4UP9ypMlhyPHa7AGD7NO7Ws5w=='
3424 )
3425 self.station.state.update_at({
3426 'handle': 'bob',
3427 'address': '127.0.0.1',
3428 'port': 8889
3429 })
3430
3431 def setupAlice(self):
3432 self.bob_state.add_peer('alice')
3433 self.bob_state.add_key(
3434 'alice',
3435 '9h6wYndVjt8QpnIZOYb7KD2tYKCKw4rjlYg4LM1ODx1Qkr3qA0IuKNukkwKhQ4UP9ypMlhyPHa7AGD7NO7Ws5w=='
3436 )
3437 self.bob_state.update_at({
3438 'handle': 'alice',
3439 'address': '127.0.0.1',
3440 'port': 8888
3441 })
3442
3443 def tearDown(self):
3444 pass
3445
3446 def test_embargo_bounce_ordering(self):
3447 self.skipTest("the tested code has been re-implemented")
3448 peer1 = Mock()
3449 peer1.handles = ["a", "b"]
3450 peer2 = Mock()
(35 . 7)(89 . 7)
3452 high_bounce_message.peer = peer2
3453 high_bounce_message.bounces = 2
3454 high_bounce_message.message_hash = "messagehash"
3455 self.station.embargo_queue = {
3456 self.station.short_buffer = {
3457 "messagehash": [
3458 low_bounce_message,
3459 high_bounce_message
(45 . 73)(99 . 31)
3461 self.station.deliver.assert_called_once_with(low_bounce_message)
3462 self.station.rebroadcast.assert_called_once_with(low_bounce_message)
3463
3464 def test_immediate_message_delivered(self):
3465 peer = Mock()
3466 peer.handles = ["a", "b"]
3467 message = Mock()
3468 message.speaker = "a"
3469 message.peer = peer
3470 self.station.embargo_queue = {
3471 "messagehash": [
3472 message
3473 ],
3474 }
3475 self.station.check_for_immediate_messages()
3476 self.station.deliver.assert_called_once_with(message)
3477 self.station.rebroadcast.assert_called_once_with(message)
3478
3479 def test_hearsay_message_not_delivered(self):
3480 peer = Mock()
3481 peer.handles = ["a", "b"]
3482 message = Mock()
3483 message.speaker = "c"
3484 message.peer = peer
3485 self.station.embargo_queue = {
3486 "messagehash": [
3487 message
3488 ],
3489 }
3490 self.station.check_for_immediate_messages()
3491 self.station.deliver.assert_not_called()
3492
3493 def test_embargo_queue_cleared(self):
3494 self.skipTest("the embargo queue is now th short buffer")
3495 peer = Mock()
3496 peer.handles = ["a", "b"]
3497 message = Mock()
3498 message.speaker = "c"
3499 message.peer = peer
3500 self.station.embargo_queue = {
3501 self.station.short_buffer = {
3502 "messagehash": [
3503 message
3504 ],
3505 }
3506 self.assertEqual(len(self.station.embargo_queue), 1)
3507 self.assertEqual(len(self.station.short_buffer), 1)
3508 self.station.flush_hearsay_messages()
3509 self.assertEqual(len(self.station.embargo_queue), 0)
3510
3511 def test_immediate_prefix(self):
3512 peer = Mock()
3513 peer.handles = ["a", "b"]
3514 message = Mock()
3515 message.speaker = "a"
3516 message.prefix = None
3517 message.peer = peer
3518 self.station.embargo_queue = {
3519 "messagehash": [
3520 message
3521 ],
3522 }
3523 self.station.check_for_immediate_messages()
3524 self.assertEqual(message.prefix, None)
3525 self.assertEqual(len(self.station.short_buffer), 0)
3526
3527 def test_simple_hearsay_prefix(self):
3528 self.skipTest("this code has moved")
3529 peer = Mock()
3530 peer.handles = ["a", "b"]
3531 message = Mock()
3532 message.speaker = "c"
3533 message.prefix = None
3534 message.peer = peer
3535 self.station.embargo_queue = {
3536 self.station.short_buffer = {
3537 "messagehash": [
3538 message
3539 ],
(120 . 6)(132 . 7)
3541 self.assertEqual(message.prefix, "c[a]")
3542
3543 def test_in_wot_hearsay_prefix_under_four(self):
3544 self.skipTest("the embargo queue is now th short buffer")
3545 peer1 = Mock()
3546 peer1.handles = ["a", "b"]
3547 peer2 = Mock()
(141 . 7)(154 . 7)
3549 message_via_peer3.prefix = None
3550 message_via_peer3.peer = peer3
3551 message_via_peer3.bounces = 1
3552 self.station.embargo_queue = {
3553 self.station.short_buffer = {
3554 "messagehash": [
3555 message_via_peer1,
3556 message_via_peer2,
(153 . 6)(166 . 7)
3558 self.assertEqual(message_via_peer1.prefix, "c[a|d|f]")
3559
3560 def test_in_wot_hearsay_prefix_more_than_three(self):
3561 self.skipTest("the embargo queue is now th short buffer")
3562 peer1 = Mock()
3563 peer1.handles = ["a", "b"]
3564 peer2 = Mock()
(181 . 7)(195 . 7)
3566 message_via_peer4.prefix = None
3567 message_via_peer4.peer = peer4
3568 message_via_peer4.bounces = 1
3569 self.station.embargo_queue = {
3570 self.station.short_buffer = {
3571 "messagehash": [
3572 message_via_peer1,
3573 message_via_peer2,
(192 . 3)(206 . 47)
3575 self.station.flush_hearsay_messages()
3576 self.station.deliver.assert_called_once_with(message_via_peer1)
3577 self.assertEqual(message_via_peer1.prefix, "c[4]")
3578
3579 # this test occasionally fails
3580 def test_receive_getdata_request_for_existing_direct_message(self):
3581 self.skipTest("intermittent failure")
3582 # 'send' bob a couple of messages
3583 m1 = Message({
3584 'command': DIRECT,
3585 'handle': 'bob',
3586 'speaker': 'alice',
3587 'body': 'm1',
3588 'bounces': 0
3589 }, self.station.state)
3590
3591 m1.send()
3592
3593 m2 = Message({
3594 'command': DIRECT,
3595 'handle': 'bob',
3596 'speaker': 'alice',
3597 'body': 'm2',
3598 'bounces': 0,
3599 }, self.station.state)
3600
3601 m2.send()
3602
3603 # oops look's like bob didn't get the message
3604
3605 # build GETDATA black packet to retreive m1
3606 alice = self.bob_state.get_peer_by_handle('alice')
3607 bob = self.station.state.get_peer_by_handle('bob')
3608 gd_message = GetData(m2, self.bob_state)
3609 gd_message_bytes = gd_message.get_message_bytes(alice)
3610 gd_black_packet = Message.pack(bob, GETDATA, gd_message.bounces, gd_message_bytes)
3611
3612 # call handle_udp_data with GETDATA packet
3613 self.station.handle_udp_data([gd_black_packet, ['127.0.0.1', 8889]])
3614
3615 # build up the retry black packet to verify that it was sent
3616 retry_black_packet = Message.pack(bob, DIRECT, 0, m1.get_message_bytes(bob))
3617
3618 # assert retry is called and sends the correct message
3619 sent_message_black_packet = self.station_socket.sendto.call_args[0][0]
3620 sent_message = Message.unpack(bob, sent_message_black_packet, LongBuffer(), OrderBuffer(), self.station.state)
3621 self.assertEqual(sent_message.body, 'm1')