diff -uNr a/blatta/Makefile b/blatta/Makefile --- a/blatta/Makefile 58abf7c81c8f6a960d5a083ee890b0b7c4d3f8834ecef3d4523522e50af04fd4893eed6aa16afdcfc491fea6d583543914efe8c24f60a143a2cb028360187492 +++ b/blatta/Makefile 431c84d2fa991b4c43090296e80765e1460288de0fdb2d81acf73a18977acc16164111f210af83488a80b92fa35d307ecac26e59c860bbc12cd59a5c2a3e4c00 @@ -1,4 +1,4 @@ -VERSION := 9980 +VERSION := 9979 DISTFILES = Makefile blatta README.txt lib migrations tests test_net_configs start_test_net.sh diff -uNr a/blatta/lib/client.py b/blatta/lib/client.py --- a/blatta/lib/client.py ae28c098e89d25444fbd5fd93b6031d40ce4cef4ace438a887a6837fafde32137c304766a672b58898e36b75f5e8e0b784f58a0803994539d2b8c41cefe187d1 +++ b/blatta/lib/client.py 4446f38df71cab018b132851c12e5df188949618983d81eaa098c41276cf5b8a96d9fedf55ee3d5ef73412b3d5f3690ca0ea890adcfd61867171a569d7c6e605 @@ -178,11 +178,6 @@ channel.add_member(self) self.channels[irc_lower(channelname)] = channel self.message_channel(channel, "JOIN", channelname, True) - self.reply("353 %s = %s :%s" - % (self.nickname, - channelname, - " ".join(sorted(x - for x in self.state.get_peer_handles())))) self.reply("366 %s %s :End of NAMES list" % (self.nickname, channelname)) def list_handler(): @@ -538,7 +533,19 @@ def pest_reply(self, msg): self.message(":Pest NOTICE %s :%s" % (self.server.channel_name, msg)) - def reply_403(self, channel): + def send_join(self, handle): + self.message(":%s JOIN %s" % (handle, self.server.channel_name)) + + def send_part(self, handle): + self.message(":%s PART %s" % (handle, self.server.channel_name)) + + def send_away(self, handle): + self.message(":%s AWAY :No recent messages" % (handle)) + + def send_back(self, handle): + self.message(":%s AWAY" % (handle)) + + def reply_403(self, channeyl): self.reply("403 %s %s :No such channel" % (self.nickname, channel)) def reply_461(self, command): diff -uNr a/blatta/lib/server.py b/blatta/lib/server.py --- a/blatta/lib/server.py 225ccf41f5e87fff8d9837f1eaf03d4806fa62f9a2a1608f4642660dfa9bc863f10860fd02879a42283ce862c9c0c0b034fd2b0158a4506fb15fa4ffd33fffc9 +++ b/blatta/lib/server.py 4aec359e99db8141cfde356af8447e0cfda4572052407791fa5eaeb913b371254d9e9e77cc59628d4b732a61ef653b198deb2b056444d74045a0fe4015a26d77 @@ -104,6 +104,7 @@ last_short_buffer_check = time.time() last_rubbish_dispatch = time.time() last_order_buffer_check = time.time() + last_presence_check = time.time() while True: # we don't want to be listening for client connections if there's already a client connected @@ -163,6 +164,11 @@ self.station.check_order_buffer() last_order_buffer_check = now + # check presence + if last_presence_check + int(self.station.state.get_knob('presence_check_seconds')) < now: + self.station.report_presence() + last_presence_check = now + def create_directory(path): if not os.path.isdir(path): os.makedirs(path) diff -uNr a/blatta/lib/state.py b/blatta/lib/state.py --- a/blatta/lib/state.py 770b11a71459e33f5f4d2c72cf55dcb79b18c711f3041d8d23375a1419224d309a50c4f10c466527dc7ee910dbda120abe877d68e8981c7f51ca4a401bf54775 +++ b/blatta/lib/state.py ca409e06cb9491b00378c24bb0a189ed752c01a46b3d8406f403f7e7d378d4a62f91a1ada8eab3ffdf374e4a62ea3a5edf3edb48cd5500bbde30f3e8153c209f @@ -1,22 +1,33 @@ +import calendar + from peer import Peer from message import EMPTY_CHAIN +from message import Message import sqlite3 import imp import binascii import logging import datetime +import time import caribou from itertools import chain -KNOBS=({'max_bounces': 3, +KNOBS=( + { + 'max_bounces': 3, 'embargo_interval_seconds': 1, 'rubbish_interval_seconds': 10, 'nick': '', 'order_buffer_check_seconds': 5 * 60, 'order_buffer_expiration_seconds': 5 * 60, 'short_buffer_expiration_seconds': 1, - 'short_buffer_check_interval_seconds': 1}) + 'short_buffer_check_interval_seconds': 1, + 'peer_offline_interval_seconds': 60, + 'peer_away_interval_seconds': 10 * 60, + 'presence_check_seconds': 5, + } +) class State(object): def __init__(self, station, db_path=None): @@ -155,7 +166,7 @@ cursor = self.cursor() at = [] if handle == None: - results = cursor.execute("select handle_id, address, port, updated_at from at\ + results = cursor.execute("select handle_id, address, port, updated_at, strftime('%s', updated_at) from at\ order by updated_at desc").fetchall() else: result = cursor.execute("select handle_id from handles where handle=?", @@ -164,16 +175,27 @@ handle_id = result[0] else: return [] - results = cursor.execute("select handle_id, address, port, updated_at from at \ + results = cursor.execute("select handle_id, address, port, updated_at, strftime('%s', updated_at) from at \ where handle_id=? order by updated_at desc", (handle_id,)).fetchall() for result in results: - handle_id, address, port, updated_at = result + handle_id, address, port, updated_at_utc, updated_at_unixtime = result h = cursor.execute("select handle from handles where handle_id=?", (handle_id,)).fetchone()[0] - at.append({"handle": h, - "address": "%s:%s" % (address, port), - "active_at": updated_at if updated_at else "no packets received from this address"}) + if updated_at_utc: + dt_format = '%Y-%m-%d %H:%M:%S.%f' + dt_utc = datetime.datetime.strptime(updated_at_utc, dt_format) + dt_local = self.utc_to_local(dt_utc) + updated_at = datetime.datetime.strftime(dt_local, dt_format) + else: + updated_at = "no packets received from this address" + + at.append({ + "handle": h, + "address": "%s:%s" % (address, port), + "active_at": updated_at, + "active_at_unixtime": int(updated_at_unixtime) if updated_at_unixtime else 0 + }) return at def import_at_and_wot(self, at_path): @@ -210,7 +232,7 @@ (handle_id,)).fetchone() # if there are no AT entries for this handle, insert one - timestamp = datetime.datetime.now() if set_active_at else None + timestamp = datetime.datetime.utcnow() if set_active_at else None if at_entry == None: cursor.execute("insert into at(handle_id, address, port, updated_at) values(?, ?, ?, ?)", (handle_id, @@ -357,7 +379,37 @@ peers.append(peer) return peers - + + def handle_is_online(self, handle): + # last rubbish message from peer associated with handle is + # sufficiently recent + at = self.get_at(handle)[0] + if at["active_at_unixtime"] > time.time() - int(self.get_knob("peer_offline_interval_seconds")): + return True + else: + return False + + def utc_to_local(self, utc_dt): + # get integer timestamp to avoid precision lost + timestamp = calendar.timegm(utc_dt.timetuple()) + local_dt = datetime.datetime.fromtimestamp(timestamp) + assert utc_dt.resolution >= datetime.timedelta(microseconds=1) + return local_dt.replace(microsecond=utc_dt.microsecond) + + def handle_is_away(self, handle): + # last broadcast or dm is sufficiently old + cursor = self.cursor() + away_interval_seconds = int(self.get_knob("peer_away_interval_seconds")) + dt = datetime.datetime.utcfromtimestamp( + time.time() - away_interval_seconds + ) + raw_messages = cursor.execute("select message_bytes from log where created_at > ?", (dt,)).fetchall() + for message_bytes in raw_messages: + int_ts, self_chain, net_chain, speaker, body = Message._unpack_message(message_bytes[0][:]) + if speaker == handle: + return False + return True + def get_peer_by_handle(self, handle): cursor = self.cursor() handle_info = cursor.execute("select handle_id, peer_id from handles where handle=?", diff -uNr a/blatta/lib/station.py b/blatta/lib/station.py --- a/blatta/lib/station.py b715a5e7a750d2bb552b5792e87aa5741f36b521f3c68d17b312d49d9872c842b1f1e4c87db6e733ef3836b59d91cdc66b9217c908fb3a74dd5dedd485eda4a1 +++ b/blatta/lib/station.py a82e5b4c782b1484c844fd7843441bf0220bd18a796963b836c23dbc00a0f4031677d7699ec86a6a4b892b194871869e3cf55f481a5d978559e235869a7c3417 @@ -1,6 +1,8 @@ import time -VERSION = 9980 +VERSION = 9979 +STATUS_ONLINE = 0 +STATUS_AWAY = 1 import binascii import logging @@ -41,6 +43,7 @@ GETDATA: self.handle_getdata, IGNORE: self.handle_ignore } + self.presence = {} def start(self): self.server.start() @@ -168,6 +171,33 @@ message.reporting_peers = message_with_stats['reporting_peers'] self.rebroadcast(message) + def report_presence(self): + # if handle isn't in the presence dict, check if rubbish received and send /join if so + for handle in self.state.get_peer_handles(): + if self.state.handle_is_online(handle): + if self.presence.get(handle) is None: + if self.client: + self.presence[handle] = STATUS_ONLINE + self.client.send_join(handle) + else: + if self.presence.get(handle): + if self.client: + del self.presence[handle] + self.client.send_part(handle) + + # if handle IS in the presence dict, check last message received from handle and send /away + for handle in self.presence.keys(): + if self.presence[handle] is not STATUS_AWAY: + if self.state.handle_is_away(handle): + if self.client: + self.presence[handle] = STATUS_AWAY + self.client.send_away(handle) + # need to check if the handle is back + else: + if not self.state.handle_is_away(handle): + self.presence[handle] = STATUS_ONLINE + self.client.send_back(handle) + def report_error(self, error_code, message): packet_info = message['metadata']["packet_info"] address = packet_info[0]