diff --git a/CONTRIBUTING.rst b/CONTRIBUTING.rst index 72a0e78c..15da2871 100644 --- a/CONTRIBUTING.rst +++ b/CONTRIBUTING.rst @@ -159,7 +159,7 @@ When a PR is being reviewed, new changes might be needed: - If the change does not modify a previous change, create new commits and push. - If the change modifies a previous change and it's small, - `git commit fixup `_ + `git commit fixup `_ should be used. When it is agreed that the PR is ready, create a new branch named ``mybranch_02`` and run: diff --git a/sbws/core/scanner.py b/sbws/core/scanner.py index cdf0b9b4..5862d938 100644 --- a/sbws/core/scanner.py +++ b/sbws/core/scanner.py @@ -551,8 +551,10 @@ def run_speedtest(args, conf): # Call only once to initialize http_headers settings.init_http_headers(conf.get('scanner', 'nickname'), state['uuid'], str(controller.get_version())) - - rl = RelayList(args, conf, controller) + # To do not have to pass args and conf to RelayList, pass an extra + # argument with the data_period + measurements_period = conf.getint('general', 'data_period') + rl = RelayList(args, conf, controller, measurements_period, state) cb = CB(args, conf, controller, rl) rd = ResultDump(args, conf) rp = RelayPrioritizer(args, conf, rl, rd) diff --git a/sbws/globals.py b/sbws/globals.py index abd4791e..015e857c 100644 --- a/sbws/globals.py +++ b/sbws/globals.py @@ -84,6 +84,16 @@ # Tor already accept lines of any size, but leaving the limit anyway. BW_LINE_SIZE = 1022 +# RelayList, ResultDump, v3bwfile +# For how many seconds in the past the relays and measurements data is keep/ +# considered valid. +# This is currently set by default in config.default.ini as ``date_period``, +# and used in ResultDump and v3bwfile. +# In a future refactor, constants in config.default.ini should be moved here, +# or calculated in settings, so that there's no need to pass the configuration +# to all the functions. +MEASUREMENTS_PERIOD = 5 * 24 * 60 * 60 + # Metadata to send in every requests, so that data servers can know which # scanners are using them. # In Requests these keys are case insensitive. diff --git a/sbws/lib/relaylist.py b/sbws/lib/relaylist.py index b296c06d..880f88e2 100644 --- a/sbws/lib/relaylist.py +++ b/sbws/lib/relaylist.py @@ -1,16 +1,52 @@ +import copy +from datetime import datetime, timedelta + from stem.descriptor.router_status_entry import RouterStatusEntryV3 from stem.descriptor.server_descriptor import ServerDescriptor from stem import Flag, DescriptorUnavailable, ControllerError import random -import time import logging from threading import Lock +from ..globals import MEASUREMENTS_PERIOD + log = logging.getLogger(__name__) +def remove_old_consensus_timestamps( + consensus_timestamps, measurements_period=MEASUREMENTS_PERIOD): + """ + Remove the consensus timestamps that are older than period for which + the measurements are keep from a list of consensus_timestamps. + + :param list consensus_timestamps: + :param int measurements_period: + :returns list: a new list of ``consensus_timestamps`` + """ + oldest_date = datetime.utcnow() - timedelta(measurements_period) + new_consensus_timestamps = \ + [t for t in consensus_timestamps if t >= oldest_date] + return new_consensus_timestamps + + +def valid_after_from_network_statuses(network_statuses): + """Obtain the consensus Valid-After datetime from the ``document`` + attribute of a ``stem.descriptor.RouterStatusEntryV3``. + + :param list network_statuses: + returns datetime: + """ + for ns in network_statuses: + document = getattr(ns, 'document', None) + if document: + valid_after = getattr(document, 'valid_after', None) + if valid_after: + return valid_after + return datetime.utcnow().replace(microsecond=0) + + class Relay: - def __init__(self, fp, cont, ns=None, desc=None): + def __init__(self, fp, cont, ns=None, desc=None, timestamp=None): ''' Given a relay fingerprint, fetch all the information about a relay that sbws currently needs and store it in this class. Acts as an abstraction @@ -18,6 +54,9 @@ def __init__(self, fp, cont, ns=None, desc=None): :param str fp: fingerprint of the relay. :param cont: active and valid stem Tor controller connection + + :param datatime timestamp: the timestamp of a consensus + (RouterStatusEntryV3) from which this relay has been obtained. ''' assert isinstance(fp, str) assert len(fp) == 40 @@ -38,6 +77,8 @@ def __init__(self, fp, cont, ns=None, desc=None): self._desc = cont.get_server_descriptor(fp, default=None) except (DescriptorUnavailable, ControllerError) as e: log.exception("Exception trying to get desc %s", e) + self._consensus_timestamps = [] + self._add_consensus_timestamp(timestamp) def _from_desc(self, attr): if not self._desc: @@ -107,6 +148,68 @@ def master_key_ed25519(self): return None return key.rstrip('=') + @property + def consensus_valid_after(self): + """Obtain the consensus Valid-After from the document of this relay + network status. + """ + network_status_document = self._from_ns('document') + if network_status_document: + return getattr(network_status_document, 'valid_after', None) + return None + + @property + def last_consensus_timestamp(self): + if len(self._consensus_timestamps) >= 1: + return self._consensus_timestamps[-1] + return None + + def _add_consensus_timestamp(self, timestamp=None): + """Add the consensus timestamp in which this relay is present. + """ + # It is possible to access to the relay's consensensus Valid-After + if self.consensus_valid_after is not None: + # The consensus timestamp list was initialized. + if self.last_consensus_timestamp is not None: + # Valid-After is more recent than the most recent stored + # consensus timestamp. + if self.consensus_valid_after > self.last_consensus_timestamp: + # Add Valid-After + self._consensus_timestamps.append( + self.consensus_valid_after + ) + # The consensus timestamp list was not initialized. + else: + # Add Valid-After + self._consensus_timestamps.append(self.consensus_valid_after) + # If there was already a list the timestamp arg is more recent than + # the most recent timestamp stored, + elif (self.last_consensus_timestamp is not None + and timestamp > self.last_consensus_timestamp): + # Add the arg timestamp. + self._consensus_timestamps.append(timestamp) + # In any other case + else: + # Add the current datetime + self._consensus_timestamps.append( + datetime.utcnow().replace(microsecond=0)) + + def _remove_old_consensus_timestamps( + self, measurements_period=MEASUREMENTS_PERIOD): + self._consensus_timestamps = \ + remove_old_consensus_timestamps( + copy.deepcopy(self._consensus_timestamps, measurements_period) + ) + + def update_consensus_timestamps(self, timestamp=None): + self._add_consensus_timestamp(timestamp) + self._remove_old_consensus_timestamps() + + @property + def relay_in_recent_consensus_count(self): + """Number of times the relay was in a conensus.""" + return len(self._consensus_timestamps) + def can_exit_to_port(self, port): """ Returns True if the relay has an exit policy and the policy accepts @@ -129,16 +232,40 @@ class RelayList: transparently in the background. Provides useful interfaces for getting only relays of a certain type. ''' - REFRESH_INTERVAL = 300 # seconds - def __init__(self, args, conf, controller): + def __init__(self, args, conf, controller, + measurements_period=MEASUREMENTS_PERIOD, state=None): self._controller = controller self.rng = random.SystemRandom() self._refresh_lock = Lock() + # To track all the consensus seen. + self._consensus_timestamps = [] + # Initialize so that there's no error trying to access to it. + # In future refactor, change to a dictionary, where the keys are + # the relays' fingerprint. + self._relays = [] + # The period of time for which the measurements are keep. + self._measurements_period = measurements_period + self._state = state self._refresh() def _need_refresh(self): - return time.time() >= self._last_refresh + self.REFRESH_INTERVAL + # New consensuses happen every hour. + return datetime.utcnow() >= \ + self.last_consensus_timestamp + timedelta(seconds=60*60) + + @property + def last_consensus_timestamp(self): + """Returns the datetime when the last consensus was obtained.""" + if (getattr(self, "_consensus_timestamps") + and self._consensus_timestamps): + return self._consensus_timestamps[-1] + # If the object was not created from __init__, it won't have + # consensus_timestamps attribute or it might be empty. + # In this case force new update. + # Anytime more than 1h in the past will be old. + self._consensus_timestamps = [] + return datetime.utcnow() - timedelta(seconds=60*61) @property def relays(self): @@ -197,19 +324,62 @@ def _relays_with_flag(self, flag): def _relays_without_flag(self, flag): return [r for r in self.relays if flag not in r.flags] + def _remove_old_consensus_timestamps(self): + self._consensus_timestamps = remove_old_consensus_timestamps( + copy.deepcopy(self._consensus_timestamps), + self._measurements_period + ) + def _init_relays(self): + """Returns a new list of relays that are in the current consensus. + And update the consensus timestamp list with the current one. + + """ c = self._controller - try: - relays = [Relay(ns.fingerprint, c, ns=ns) - for ns in c.get_network_statuses()] - except ControllerError as e: - log.exception("Exception trying to init relays %s", e) - return [] - return relays + # This will get router statuses from this Tor cache, might not be + # updated with the network. + # Change to stem.descriptor.remote in future refactor. + network_statuses = c.get_network_statuses() + new_relays_dict = dict([(r.fingerprint, r) for r in network_statuses]) + + # Find the timestamp of the last consensus. + timestamp = valid_after_from_network_statuses(network_statuses) + self._consensus_timestamps.append(timestamp) + self._remove_old_consensus_timestamps() + # Update the relays that were in the previous consensus with the + # new timestamp + new_relays = [] + relays = copy.deepcopy(self._relays) + for r in relays: + if r.fingerprint in new_relays_dict.keys(): + r.update_consensus_timestamps(timestamp) + new_relays_dict.pop(r.fingerprint) + new_relays.append(r) + + # Add the relays that were not in the previous consensus + # If there was an relay in some older previous consensus, + # it won't get stored, so its previous consensuses are lost, + # but probably this is fine for now to don't make it more complicated. + for fp, ns in new_relays_dict.items(): + r = Relay(ns.fingerprint, c, ns=ns, timestamp=timestamp) + new_relays.append(r) + return new_relays def _refresh(self): + # Set a new list of relays. self._relays = self._init_relays() - self._last_refresh = time.time() + + log.info("Number of consensuses obtained in the last %s days: %s.", + int(self._measurements_period / 24 / 60 / 60), + self.recent_consensus_count) + # NOTE: blocking, writes to file! + if self._state is not None: + self._state['recent_consensus_count'] = self.recent_consensus_count + + @property + def recent_consensus_count(self): + """Number of times a new consensus was obtained.""" + return len(self._consensus_timestamps) def exits_not_bad_allowing_port(self, port): return [r for r in self.exits diff --git a/sbws/lib/resultdump.py b/sbws/lib/resultdump.py index 39aeda49..f9049251 100644 --- a/sbws/lib/resultdump.py +++ b/sbws/lib/resultdump.py @@ -204,7 +204,8 @@ class Relay: def __init__(self, fingerprint, nickname, address, master_key_ed25519, average_bandwidth=None, burst_bandwidth=None, observed_bandwidth=None, consensus_bandwidth=None, - consensus_bandwidth_is_unmeasured=None): + consensus_bandwidth_is_unmeasured=None, + relay_in_recent_consensus_count=None): self.fingerprint = fingerprint self.nickname = nickname self.address = address @@ -215,15 +216,20 @@ def __init__(self, fingerprint, nickname, address, master_key_ed25519, self.consensus_bandwidth = consensus_bandwidth self.consensus_bandwidth_is_unmeasured = \ consensus_bandwidth_is_unmeasured + # The number of times the relay was in a consensus. + self.relay_in_recent_consensus_count = \ + relay_in_recent_consensus_count - def __init__(self, relay, circ, dest_url, scanner_nick, t=None): + def __init__(self, relay, circ, dest_url, scanner_nick, t=None, + relay_in_recent_consensus_count=None): self._relay = Result.Relay(relay.fingerprint, relay.nickname, relay.address, relay.master_key_ed25519, relay.average_bandwidth, relay.burst_bandwidth, relay.observed_bandwidth, relay.consensus_bandwidth, - relay.consensus_bandwidth_is_unmeasured) + relay.consensus_bandwidth_is_unmeasured, + relay.relay_in_recent_consensus_count) self._circ = circ self._dest_url = dest_url self._scanner = scanner_nick @@ -269,6 +275,11 @@ def address(self): def master_key_ed25519(self): return self._relay.master_key_ed25519 + @property + def relay_in_recent_consensus_count(self): + """Number of times the relay was in a consensus.""" + return self._relay.relay_in_recent_consensus_count + @property def circ(self): return self._circ @@ -301,6 +312,8 @@ def to_dict(self): 'type': self.type, 'scanner': self.scanner, 'version': self.version, + 'relay_in_recent_consensus_count': + self.relay_in_recent_consensus_count, } @staticmethod @@ -368,7 +381,9 @@ def from_dict(d): return ResultError( Result.Relay( d['fingerprint'], d['nickname'], d['address'], - d['master_key_ed25519']), + d['master_key_ed25519'], + relay_in_recent_consensus_count= # noqa + d.get('relay_in_recent_consensus_count', None)), # noqa d['circ'], d['dest_url'], d['scanner'], msg=d['msg'], t=d['time']) @@ -409,7 +424,9 @@ def from_dict(d): return ResultErrorCircuit( Result.Relay( d['fingerprint'], d['nickname'], d['address'], - d['master_key_ed25519']), + d['master_key_ed25519'], + relay_in_recent_consensus_count= # noqa + d.get('relay_in_recent_consensus_count', None)), # noqa d['circ'], d['dest_url'], d['scanner'], msg=d['msg'], t=d['time']) @@ -432,7 +449,9 @@ def from_dict(d): return ResultErrorStream( Result.Relay( d['fingerprint'], d['nickname'], d['address'], - d['master_key_ed25519']), + d['master_key_ed25519'], + relay_in_recent_consensus_count= # noqa + d.get('relay_in_recent_consensus_count', None)), # noqa d['circ'], d['dest_url'], d['scanner'], msg=d['msg'], t=d['time']) @@ -468,7 +487,9 @@ def from_dict(d): return ResultErrorAuth( Result.Relay( d['fingerprint'], d['nickname'], d['address'], - d['master_key_ed25519']), + d['master_key_ed25519'], + relay_in_recent_consensus_count= # noqa + d.get('relay_in_recent_consensus_count', None)), # noqa d['circ'], d['dest_url'], d['scanner'], msg=d['msg'], t=d['time']) @@ -505,7 +526,9 @@ def from_dict(d): d['master_key_ed25519'], d['relay_average_bandwidth'], d.get('relay_burst_bandwidth'), d['relay_observed_bandwidth'], d.get('consensus_bandwidth'), - d.get('consensus_bandwidth_is_unmeasured')), + d.get('consensus_bandwidth_is_unmeasured'), + relay_in_recent_consensus_count= # noqa + d.get('relay_in_recent_consensus_count', None)), # noqa d['circ'], d['dest_url'], d['scanner'], t=d['time']) diff --git a/sbws/lib/v3bwfile.py b/sbws/lib/v3bwfile.py index d6c545a1..997e35cd 100644 --- a/sbws/lib/v3bwfile.py +++ b/sbws/lib/v3bwfile.py @@ -229,7 +229,9 @@ def from_results(cls, results, scanner_country=None, kwargs = dict() latest_bandwidth = cls.latest_bandwidth_from_results(results) earliest_bandwidth = cls.earliest_bandwidth_from_results(results) + # NOTE: Blocking, reads file generator_started = cls.generator_started_from_file(state_fpath) + recent_consensus_count = cls.consensus_count_from_file(state_fpath) timestamp = str(latest_bandwidth) kwargs['latest_bandwidth'] = unixts_to_isodt_str(latest_bandwidth) kwargs['earliest_bandwidth'] = unixts_to_isodt_str(earliest_bandwidth) @@ -240,6 +242,8 @@ def from_results(cls, results, scanner_country=None, kwargs['scanner_country'] = scanner_country if destinations_countries is not None: kwargs['destinations_countries'] = destinations_countries + if recent_consensus_count is not None: + kwargs['recent_consensus_count'] = str(recent_consensus_count) h = cls(timestamp, **kwargs) return h @@ -296,6 +300,14 @@ def generator_started_from_file(state_fpath): else: return None + @staticmethod + def consensus_count_from_file(state_fpath): + state = State(state_fpath) + if 'recent_consensus_count' in state: + return state['recent_consensus_count'] + else: + return None + @staticmethod def latest_bandwidth_from_results(results): return round(max([r.time for fp in results for r in results[fp]])) @@ -404,6 +416,12 @@ def from_results(cls, results, secs_recent=None, secs_away=None, kwargs['master_key_ed25519'] = results[0].master_key_ed25519 kwargs['time'] = cls.last_time_from_results(results) kwargs.update(cls.result_types_from_results(results)) + consensuses_count = \ + [r.relay_in_recent_consensus_count for r in results + if getattr(r, 'relay_in_recent_consensus_count', None)] + if consensuses_count: + consensus_count = max(consensuses_count) + kwargs['relay_in_recent_consensus_count'] = consensus_count success_results = [r for r in results if isinstance(r, ResultSuccess)] if not success_results: diff --git a/tests/integration/lib/test_relaylist.py b/tests/integration/lib/test_relaylist.py index a1a5efbc..b3d97c2e 100644 --- a/tests/integration/lib/test_relaylist.py +++ b/tests/integration/lib/test_relaylist.py @@ -15,3 +15,8 @@ def test_relay_properties(persistent_launch_tor): assert relay.address == '127.10.0.1' assert relay.master_key_ed25519 == \ 'wLglSEw9/DHfpNrlrqjVRSnGLVWfnm0vYxkryH4aT6Q' + + +def test_relay_list_last_consensus_timestamp(rl): + assert rl.last_consensus_timestamp == \ + rl._relays[0].last_consensus_timestamp diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index ada1b9b9..fa1143da 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -71,7 +71,8 @@ RELAY1 = Result.Relay(FP1, NICK1, IP1, ED25519, average_bandwidth=AVG_BW, burst_bandwidth=BUR_BW, observed_bandwidth=OBS_BW, consensus_bandwidth=BW, - consensus_bandwidth_is_unmeasured=UNMEASURED) + consensus_bandwidth_is_unmeasured=UNMEASURED, + relay_in_recent_consensus_count=2) RELAY2 = Result.Relay(FP2, NICK2, IP2, ED25519) RESULT = Result(RELAY1, CIRC12, DEST_URL, SCANNER, t=TIME1) @@ -95,6 +96,7 @@ "relay_observed_bandwidth": OBS_BW, "consensus_bandwidth": BW, "consensus_bandwidth_is_unmeasured": UNMEASURED, + "relay_in_recent_consensus_count": 2, } BASE_RESULT_NO_RELAY_DICT = {