-
+ 9E41FDD532E857CEC8E4D3407560D8570B8E6B7B713739E6D81622B0A6ABCBE5A74A9E1CE70F192BE2CE5F5C2D0B2374E433BCB7A1D83E322C24460ADF92723A
blatta/lib/station.py
(0 . 0)(1 . 196)
1249 import time
1250 import threading
1251 import binascii
1252 import logging
1253 import os
1254 from lib.state import State
1255 from lib.infosec import MAX_BOUNCES
1256 from lib.infosec import STALE_PACKET
1257 from lib.infosec import DUPLICATE_PACKET
1258 from lib.infosec import MALFORMED_PACKET
1259 from lib.infosec import INVALID_SIGNATURE
1260 from lib.infosec import IGNORED
1261 from lib.infosec import Infosec
1262 from commands import IGNORE
1263 from lib.message import Message
1264 from commands import BROADCAST
1265 from commands import DIRECT
1266 from lib.peer import Peer
1267
1268 RUBBISH_INTERVAL = 10
1269
1270 class Station(object):
1271 def __init__(self, options):
1272 self.client = None
1273 self.state = State.get_instance(options["socket"], options["db_path"])
1274 if options.get("address_table_path") != None:
1275 self.state.import_at_and_wot(options.get("address_table_path"))
1276 self.infosec = Infosec(self.state)
1277 self.embargo_queue = {}
1278 self.embargo_queue_lock = threading.Lock()
1279
1280 def start_embargo_queue_checking(self):
1281 threading.Thread(target=self.check_embargo_queue).start()
1282
1283 def start_rubbish(self):
1284 pass
1285 threading.Thread(target=self.send_rubbish).start()
1286
1287 def handle_udp_data(self, bytes_address_pair):
1288 data = bytes_address_pair[0]
1289 address = bytes_address_pair[1]
1290 packet_info = (address[0],
1291 address[1],
1292 binascii.hexlify(data)[0:16])
1293 logging.debug("[%s:%d] -> %s" % packet_info)
1294 for peer in self.state.get_keyed_peers():
1295 message = self.infosec.unpack(peer, data)
1296 error_code = message.error_code
1297 if(error_code == None):
1298 logging.debug("%s(%s) -> %s bounces: %d" % (message.speaker, peer.handles[0], message.body, message.bounces))
1299 self.conditionally_update_at(peer, message, address)
1300
1301 # if this is a direct message, just deliver it and return
1302 if message.command == DIRECT:
1303 self.deliver(message)
1304 return
1305
1306 # if the speaker is in our wot, we need to check if the message is hearsay
1307 if message.speaker in self.state.get_peer_handles():
1308 self.embargo(message)
1309 return
1310
1311 else:
1312 # skip the embargo and deliver this message with appropriate simple hearsay labeling
1313 message.prefix = "%s[%s]" % (message.speaker, peer.handles[0])
1314 self.deliver(message)
1315 return
1316 elif error_code == STALE_PACKET:
1317 logging.debug("[%s:%d] -> stale packet: %s" % packet_info)
1318 return
1319 elif error_code == DUPLICATE_PACKET:
1320 logging.debug("[%s:%d] -> duplicate packet: %s" % packet_info)
1321 return
1322 elif error_code == MALFORMED_PACKET:
1323 logging.debug("[%s:%d] -> malformed packet: %s" % packet_info)
1324 return
1325 elif error_code == IGNORED:
1326 self.conditionally_update_at(peer, message, address)
1327 logging.debug("[%s:%d] -> ignoring packet: %s" % packet_info)
1328 return
1329 elif error_code == INVALID_SIGNATURE:
1330 pass
1331 logging.debug("[%s:%d] -> martian packet: %s" % packet_info)
1332
1333 def deliver(self, message):
1334 # add to duplicate queue
1335 self.state.add_to_dedup_queue(message.message_hash)
1336
1337 # send to the irc client
1338 if self.client:
1339 self.client.message_from_station(message)
1340
1341 def embargo(self, message):
1342 # initialize the key/value to empty array if not in the hash
1343 # append message to array
1344 if not message.message_hash in self.embargo_queue.keys():
1345 self.embargo_queue[message.message_hash] = []
1346 self.embargo_queue[message.message_hash].append(message)
1347
1348 def check_embargo_queue(self):
1349 # get a lock so other threads can't mess with the db or the queue
1350 self.embargo_queue_lock.acquire()
1351 self.check_for_immediate_messages()
1352 self.flush_hearsay_messages()
1353
1354 # release the lock
1355 self.embargo_queue_lock.release()
1356
1357 # continue the thread loop after interval
1358 time.sleep(1)
1359 threading.Thread(target=self.check_embargo_queue).start()
1360
1361 def check_for_immediate_messages(self):
1362 for key in dict(self.embargo_queue).keys():
1363 messages = self.embargo_queue[key]
1364
1365 for message in messages:
1366
1367 # if this is an immediate copy of the message
1368
1369 if message.speaker in message.peer.handles:
1370
1371 # clear the queue and deliver
1372
1373 self.embargo_queue.pop(key, None)
1374 self.deliver(message)
1375 self.rebroadcast(message)
1376 break
1377
1378
1379 def flush_hearsay_messages(self):
1380 # if we made it this far either we haven't found any immediate messages
1381 # or we sent them all so we must deliver the remaining hearsay messages
1382 # with the appropriate labeling
1383 for key in dict(self.embargo_queue).keys():
1384
1385 # collect the source handles
1386 handles = []
1387 messages = self.embargo_queue[key]
1388 for message in messages:
1389 handles.append(message.peer.handles[0])
1390
1391 # select the message with the lowest bounce count
1392 message = sorted(messages, key=lambda m: m.bounces)[0]
1393
1394 # clear the queue
1395 self.embargo_queue.pop(key, None)
1396
1397 # compute prefix
1398 if len(messages) < 4:
1399 message.prefix = "%s[%s]" % (message.speaker, "|".join(handles))
1400 else:
1401 message.prefix = "%s[%d]" % (message.speaker, len(messages))
1402
1403 # deliver
1404 self.deliver(message)
1405
1406 # send the message to all other peers if it should be propagated
1407 self.rebroadcast(message)
1408
1409
1410 # we only update the address table if the speaker is same as peer
1411
1412 def conditionally_update_at(self, peer, message, address):
1413 if message.speaker in peer.handles:
1414 self.state.update_at({
1415 "handle": message.speaker,
1416 "address": address[0],
1417 "port": address[1]
1418 })
1419
1420 def rebroadcast(self, message):
1421 if message.bounces < MAX_BOUNCES:
1422 message.command = BROADCAST
1423 message.bounces = message.bounces + 1
1424 self.infosec.message(message)
1425 else:
1426 logging.debug("[%s:%d] -> packet TTL expired: %s" % packet_info)
1427
1428
1429 def send_rubbish(self):
1430 logging.debug("sending rubbish...")
1431 self.embargo_queue_lock.acquire()
1432 try:
1433 if self.client:
1434 self.infosec.message(Message({
1435 "speaker": self.client.nickname,
1436 "command": IGNORE,
1437 "bounces": 0,
1438 "body": self.infosec.gen_rubbish_body()
1439 }))
1440 except:
1441 logging.error("Something went wrong attempting to send rubbish")
1442 self.embargo_queue_lock.release()
1443 time.sleep(RUBBISH_INTERVAL)
1444 threading.Thread(target=self.send_rubbish).start()