Skip to content

Commit

Permalink
Fixed the BaseProvider and Validated the files and commits
Browse files Browse the repository at this point in the history
  • Loading branch information
AbdullahiFatola committed Sep 15, 2023
2 parents 733b7ee + ccf3d6a commit e705ef4
Show file tree
Hide file tree
Showing 13 changed files with 335 additions and 202 deletions.
4 changes: 2 additions & 2 deletions config/deer.json
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
],
"system_information": {
"email": "[email protected]",
"feed_contact_email": "mobidata-bww@nvbw.de",
"feed_contact_email": "mobidata-bw@nvbw.de",
"language": "de-DE",
"name": "deer",
"operator": "DEER GmbH",
Expand All @@ -83,4 +83,4 @@
}
},
"publication_base_url": "https://data.mfdz.de/mobidata-bw/deer"
}
}
5 changes: 2 additions & 3 deletions config/voi.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@
"system_information": {
"email": "[email protected]",
"feed_contact_email": "[email protected]",
"language": "de-DE",
"language": "de",
"name": "Voi eScooter Karlsruhe",
"operator": "raumobil GmbH",
"phone_number": "+49 721 6607-245",
"system_id": "voi",
"timezone": "CET"
"timezone": "Europe/Berlin"
}
},
"publication_base_url": "https://data.mfdz.de/mobidata-bw/voi"
Expand Down
14 changes: 1 addition & 13 deletions tests/gbfs/providers/raumobil_mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,8 @@ class MockRaumobilAPI:
https://lsd.raumobil.net/
"""

def all_stations(self):
with open('tests/data/voi_locations.json') as location_file:
results = json.load(location_file)
features = results[0]['result']['u0ty']['features']
locations = [feature for feature in features if feature['geometry']['type'] == 'Point']

for location in locations:
yield location

def all_vehicles(self):
with open('tests/data/voi_vehicles.json') as vehicles_file:
results = json.load(vehicles_file)
features = results[0]['result']['u0ty']['features']
vehicles = [feature for feature in features if feature['properties']['featureType'] == 'Vehicle']

vehicles = json.load(vehicles_file)
for vehicle in vehicles:
yield vehicle
11 changes: 6 additions & 5 deletions x2gbfs/gbfs/base_provider.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
from abc import ABC, abstractmethod
from typing import Tuple, Union
from typing import Any, Dict, Generator, Optional, Tuple


class BaseProvider(ABC):
@abstractmethod
def load_stations(self, default_last_reported: int) -> Tuple[dict, dict]:
def load_stations(self, default_last_reported: int) -> Tuple[Optional[Dict], Optional[Dict]]:
"""
Retrieves stations from the providers API and converts them
into gbfs station infos and station status.
Returns dicts where the key is the station_id and values
are station_info/station_status.
For free floating only providers, this method needs not to be overwritten.
Note: station status' vehicle availabilty currently will be calculated
using vehicle information's station_id, in case it is defined by this
provider.
"""
pass
return None, None

@abstractmethod
def load_vehicles(self, default_last_reported: int) -> Tuple[dict, dict]:
def load_vehicles(self, default_last_reported: int) -> Tuple[Optional[Dict], Optional[Dict]]:
"""
Retrieves vehicles and vehicle types from provider's API and converts them
into gbfs vehicles, vehicle_types.
Expand Down
56 changes: 21 additions & 35 deletions x2gbfs/gbfs/gbfs_transformer.py
Original file line number Diff line number Diff line change
@@ -1,53 +1,39 @@
from collections import Counter
from datetime import datetime
from typing import Any, Dict
from typing import Any, Dict, List, Optional, Tuple

from .base_provider import BaseProvider
import logging
import sys

logger = logging.getLogger('x2gbfs.voi')

class GbfsTransformer:
def load_stations_and_vehicles(self, provider: BaseProvider):
def load_stations_and_vehicles(
self, provider: BaseProvider
) -> Tuple[Optional[List], Optional[List], Optional[List], Optional[List]]:
"""
Load stations and vehicles from provider, updates vehicle availabilities at stations
and returns gbfs collections station_infos, station_status, vehicle_types, vehicles.
Note: The value of return_type should be provided in vehicle_type feeds.
Note: If the return_type of the vehicle is free_floating (e.g scooter), stations
feeds (e.g. station_status) would be ignored.
Only vehicle feeds from provider will be loaded.
"""
Note, that all these collections are conditionally required, and hence may be missing
(see e.g. https://github.com/MobilityData/gbfs/blob/v2.3/gbfs.md#files)
"""
default_last_reported = int(datetime.timestamp(datetime.now()))

station_infos_map, station_status_map = provider.load_stations(default_last_reported)
vehicle_types_map, vehicles_map = provider.load_vehicles(default_last_reported)
vehicle_return_type = list(vehicle_types_map.values())[0].get('return_type', {})

if not vehicle_return_type:
logger.warning('Exiting. Please provide the vehicle return_type for {} e.g. roundtrip'.format(vehicle_types_map.get('vehicle_type_id')))
sys.exit()
elif vehicle_return_type == 'roundtrip':
station_infos_map, station_status_map = provider.load_stations(default_last_reported)

if station_status_map and vehicles_map:
# if feed has stations and vehicles, we deduce vehicle_types_available
# information from vehicle.station_id information
self._update_stations_availability_status(station_status_map, vehicles_map)

return (
list(station_infos_map.values()),
list(station_status_map.values()),
list(vehicle_types_map.values()),
list(vehicles_map.values()),
)
elif vehicle_return_type == 'free_floating':
station_infos_map_values = {}
station_status_map_values = {}
return (
list(station_infos_map_values),
list(station_status_map_values),
list(vehicle_types_map.values()),
list(vehicles_map.values()),
)
return (
list(station_infos_map.values()) if station_infos_map else None,
list(station_status_map.values()) if station_status_map else None,
# Note: if vehicle_types are not provided, all vehicles are assumed to be non motorized bikes https://github.com/MobilityData/gbfs/blob/v2.3/gbfs.md#files
list(vehicle_types_map.values()) if vehicle_types_map else None,
list(vehicles_map.values()) if vehicles_map else None,
)

def _update_stations_availability_status(self, status_map: Dict[str, Dict], vehicles_map: Dict[str, Dict]) -> None:
"""
Expand Down Expand Up @@ -77,7 +63,7 @@ def _update_stations_availability_status(self, status_map: Dict[str, Dict], vehi
self._update_station_availability_status(vehicle_types_per_station[station_id], status_map[station_id])

def _update_station_availability_status(
self, vt_available: list[Dict[str, Any]], station_status: Dict[str, Any]
self, vt_available: List[Dict[str, Any]], station_status: Dict[str, Any]
) -> None:
"""
Sets station_status.vehicle_types_available and
Expand Down
34 changes: 19 additions & 15 deletions x2gbfs/gbfs/gbfs_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,14 @@
import json
from datetime import datetime
from pathlib import Path
from typing import Dict, List
from typing import Dict, List, Optional


class GbfsWriter:
def __init__(self, feed_language: str = 'en'):
self.feed_language = feed_language

def gbfs_data(self, feed_language: str, base_url: str, include_pricing_plans: bool = True) -> Dict:
feeds = ['system_information', 'station_information', 'station_status', 'free_bike_status', 'vehicle_types']
if include_pricing_plans:
feeds.append('system_pricing_plans')

def gbfs_data(self, feed_language: str, base_url: str, feeds: List[str]) -> Dict:
return {feed_language: {'feeds': [{'name': feed, 'url': f'{base_url}/{feed}.json'} for feed in feeds]}}

def write_gbfs_file(self, filename: str, data, timestamp: int, ttl: int = 60) -> None:
Expand All @@ -25,10 +21,10 @@ def write_gbfs_feed(
self,
config: Dict,
destFolder: str,
info: List[Dict],
status: List[Dict],
vehicle_types: List[Dict],
vehicles: List[Dict],
station_information: Optional[List[Dict]],
station_status: Optional[List[Dict]],
vehicle_types: Optional[List[Dict]],
vehicles: Optional[List[Dict]],
base_url: str,
) -> None:
base_url = base_url or config['publication_base_url']
Expand All @@ -38,11 +34,19 @@ def write_gbfs_feed(
Path(destFolder).mkdir(parents=True, exist_ok=True)

timestamp = int(datetime.timestamp(datetime.now()))
self.write_gbfs_file(destFolder + '/gbfs.json', self.gbfs_data(self.feed_language, base_url), timestamp)
self.write_gbfs_file(destFolder + '/station_information.json', {'stations': info}, timestamp)
self.write_gbfs_file(destFolder + '/station_status.json', {'stations': status}, timestamp)
self.write_gbfs_file(destFolder + '/free_bike_status.json', {'bikes': vehicles}, timestamp)
feeds = ['system_information']
self.write_gbfs_file(destFolder + '/system_information.json', system_information, timestamp)
self.write_gbfs_file(destFolder + '/vehicle_types.json', {'vehicle_types': vehicle_types}, timestamp)
if station_information and station_status:
feeds.extend(('station_information', 'station_status'))
self.write_gbfs_file(destFolder + '/station_information.json', {'stations': station_information}, timestamp)
self.write_gbfs_file(destFolder + '/station_status.json', {'stations': station_status}, timestamp)
if vehicles:
feeds.append('free_bike_status')
self.write_gbfs_file(destFolder + '/free_bike_status.json', {'bikes': vehicles}, timestamp)
if vehicle_types:
feeds.append('vehicle_types')
self.write_gbfs_file(destFolder + '/vehicle_types.json', {'vehicle_types': vehicle_types}, timestamp)
if pricing_plans:
feeds.append('system_pricing_plans')
self.write_gbfs_file(destFolder + '/system_pricing_plans.json', {'plans': pricing_plans}, timestamp)
self.write_gbfs_file(destFolder + '/gbfs.json', self.gbfs_data(self.feed_language, base_url, feeds), timestamp)
Binary file added x2gbfs/providers/.voi.py.swp
Binary file not shown.
3 changes: 1 addition & 2 deletions x2gbfs/providers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from .deer import Deer
from .fleetster import FleetsterAPI
from .voi import Voi
from .raumobil import RaumobilAPI

from .voi import Voi
44 changes: 41 additions & 3 deletions x2gbfs/providers/fleetster.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
import json
import logging
from random import random
from time import sleep
from typing import Dict, Generator, Optional

import requests

logger = logging.getLogger(__name__)


class FleetsterAPI:
"""
Expand All @@ -12,6 +17,9 @@ class FleetsterAPI:
https://my.fleetster.net/swagger/
"""

#: Number of times a login is attempted before an error is thrown on 401 response
MAX_LOGIN_ATTEMPTS = 5

token: Optional[str] = None
api_url: Optional[str] = None
user: Optional[str] = None
Expand Down Expand Up @@ -44,8 +52,38 @@ def _login(self) -> str:
return self.token

def _get_with_authorization(self, url: str) -> Dict:
token = self._login()
response = requests.get(url, headers={'Authorization': token}, timeout=10)
response.raise_for_status()
"""
Gets the provided url and returns the response as (json) dict.
The request is performed with an authentication token, aquired before the request.
In case the API responds with an 401 response, a new login is attempted
self.MAX_LOGIN_ATTEMPTS times.
"""
no_of_login_attempts = 0
while not no_of_login_attempts >= self.MAX_LOGIN_ATTEMPTS:
no_of_login_attempts += 1
token = self._login()
response = requests.get(url, headers={'Authorization': token}, timeout=10)
if response.status_code == 401:
# Authentication issues will cause a retry attempt.
# An authentication issue could be caused by a competing client requesting
# a session token with the same credentials, invalidating our token

# Give potentially competing clients some time to complete their requests
# exponential back-offs plus some randomised "jitter" to prevent the https://en.wikipedia.org/wiki/Thundering_herd_problem
seconds_to_sleep = (
0.5 * (1 + random() / 10) * no_of_login_attempts ** 2 # noqa: S311 (no cryptographic purpose)
)
logger.warn(
f'Requested token {self.token} was invalid, waiting for {seconds_to_sleep} seconds before retry'
)
sleep(seconds_to_sleep)

# Reset authentication token, so it will be requested again
self.token = None

else:
break

response.raise_for_status()
return response.json()
65 changes: 49 additions & 16 deletions x2gbfs/providers/raumobil.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,30 @@
import base64
import json
import logging
from random import random
from time import sleep
from typing import Dict, Generator, Optional

import requests, base64
import requests

logger = logging.getLogger(__name__)


class RaumobilAPI:
"""
Returns locations and vehicles from raumobil API platform's endpoint.
Raumobil API/Platform is available here:
https://lsd.raumobil.net/
Note: The implementation of this provider is currently maintained by
Nahverkehrsgesellschaft Baden-Wuerttemberg mbH (NVBW), Germany.
"""

token: Optional[str] = None
#: Number of times a login is attempted before an error is thrown on 401 response
MAX_LOGIN_ATTEMPTS = 5

api_url: Optional[str] = None
user: Optional[str] = None
password: Optional[str] = None
Expand All @@ -21,28 +34,48 @@ def __init__(self, api_url: str, user: str, password: str) -> None:
self.user = user
self.password = password

def all_stations(self) -> Generator[Dict, None, None]:
results = self._get_with_authorization(f'{self.api_url}')
locations = results.get('result').get('u0ty').get('features')
for location in locations:
yield location

def all_vehicles(self) -> Generator[Dict, None, None]:
results = self._get_with_authorization(f'{self.api_url}')
features = results.get('result', {}).get('u0ty', {}).get('features', {})
vehicles = [feature for feature in features if feature['properties']['featureType'] == 'Vehicle']
for vehicle in vehicles:
yield vehicle

def _login(self) -> str:
user_auth = f'{self.user}:{self.password}'
user_auth_encoded = base64.b64encode(user_auth.encode()).decode()
def _get_with_authorization(self, url: str) -> Dict:
"""
Gets the provided url and returns the response as (json) dict.
return user_auth_encoded
The request is performed with an authentication credential/token, acquired before the request.
In case the API responds with an 401 response, a new login is attempted
self.MAX_LOGIN_ATTEMPTS times.
"""
user_pass = f'{self.user}:{self.password}'

def _get_with_authorization(self, url: str) -> Dict:
user_auth = self._login()
response = requests.post(url, headers={'Authorization': 'Basic %s' % user_auth}, timeout=10)
response.raise_for_status()
no_of_login_attempts = 0
while not no_of_login_attempts >= self.MAX_LOGIN_ATTEMPTS:
no_of_login_attempts += 1
user_auth = base64.b64encode(user_pass.encode()).decode()
response = requests.post(url, headers={'Authorization': 'Basic %s' % user_auth}, timeout=10)
if response.status_code != 200:
# Authentication issues will cause a retry attempt.
# An authentication issue could be caused by a competing client requesting
# a session token with the same credentials, invalidating our token

# Give potentially competing clients some time to complete their requests
# exponential back-offs plus some randomised "jitter" to prevent the https://en.wikipedia.org/wiki/Thundering_herd_problem
seconds_to_sleep = (
0.5 * (1 + random() / 10) * no_of_login_attempts ** 2 # noqa: S311 (no cryptographic purpose)
)
logger.warn(
f'Getting data unsuccessful with status {response.status_code}, waiting for {seconds_to_sleep} seconds before retry'
)
sleep(seconds_to_sleep)

# Reset authentication token, so it will be requested again
self.user_auth = None

else:
break

response.raise_for_status()
return response.json()
Loading

0 comments on commit e705ef4

Please sign in to comment.