forked from twothreenine/krautkoopf_lieferscraping
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfoodsoft.py
145 lines (124 loc) · 5.87 KB
/
foodsoft.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#This should work as an object API to connect with foodsoft and
#work with it.
import logging
import requests
import os
import re
from bs4 import BeautifulSoup as bs
import urllib.request
logging.basicConfig(level=logging.DEBUG)
def read_foodsoft_config():
foodcoop = "unnamed foodcoop"
foodsoft_url = None
if 'LS_FOODSOFT_URL' in os.environ:
foodsoft_url = os.environ['LS_FOODSOFT_URL']
foodcoop_list = re.split(".*/(.*)/", foodsoft_url)
if len(foodcoop_list) < 2:
logging.error("Could not extract foodcoop name from url " + foodsoft_url)
else:
foodcoop = foodcoop_list[1]
foodsoft_user = None
foodsoft_password = None
if 'LS_FOODSOFT_USER' in os.environ and 'LS_FOODSOFT_PASS' in os.environ:
foodsoft_user = os.environ['LS_FOODSOFT_USER']
foodsoft_password = os.environ['LS_FOODSOFT_PASS']
return foodcoop, foodsoft_url, foodsoft_user, foodsoft_password
class FSConnector:
def __init__(self, url: str, user: str, password: str):
self._session = None
if not url.endswith("/"):
url += "/"
self._url = url
self._url_login_request = url + 'login'
self._url_login_post = url + 'sessions'
self._default_header = {
'Host': 'app.foodcoops.at',
'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:69.0) Gecko/20100101 Firefox/69.0',
'Content-Type':'application/x-www-form-urlencoded',
'Upgrade-Insecure-Requests':'1'
}
self._login_data = {
"utf8":"✓",
'commit' : 'Anmelden'
}
self.login(user, password)
def _get(self, url, header, data=None):
if data is None:
response = self._session.get(url, headers=header)
if response.status_code != 200: # TODO: I think we should handle errors instead of automatically closing the session & raising an error (also applies to _post function)
self._session.close()
logging.error('ERROR ' + str(response.status_code) + ' during GET ' + url)
raise ConnectionError('Cannot get: ' +url)
return response
def _get_auth_token(self, request_content):
if request_content is None:
logging.error('ERROR failed to fetch authenticity_token')
return ''
# html = bs(response.content, 'html.parser')
# auth_token = html.find(attrs={'name':'authenticity_token'})
# return auth_token['value']
return bs(request_content, 'html.parser').find(attrs={'name':'authenticity_token'})['value']
def _post(self, url, header, data, request):
data['authenticity_token'] = self._get_auth_token(request.content)
response = self._session.post(url, headers=header, data=data, cookies=request.cookies)
if response.status_code != 200: #302
logging.error('Error ' + str(response.status_code) + ' during POST ' + url)
raise ConnectionError('Error cannot post to ' + url)
return response
def login(self, user, password):
self._user = user
self._login_data['nick'] = user
self._login_data['password'] = password
login_header = self._default_header
self._session = requests.Session()
request = self._get(self._url_login_request, login_header)
login_header['Referer'] = self._url_login_request
response = self._post(self._url_login_post, login_header, self._login_data, request)
# TODO: check if the login was really successful or not (due to false login data), for example by checking status codes?
# If not, set self._session back to None, or store logged-in status in a boolean variable
logging.debug(user + ' logged in successfully to ' + self._url)
def logout(self):
self._session.close()
def add_user_data(self, first_name=True, last_name=True, nick=False, workgroups=False, ordergroup=False):
"""
Adds the requested data of the logged-in user to the FSConnector object:
.first_name
.last_name
.nick (None if nicknames are disabled in the Foodsoft instance)
.workgroups : IDs of work groups which the user is member of, for example [1, 3, 11] or []
.ordergroup : ID of the user's ordergroup
"""
userdata_url = f"{self._url}home/profile"
parsed_html = bs(self._get(userdata_url, self._default_header).content, 'html.parser')
first_name_field = parsed_html.body.find(id="user_first_name")
if not first_name_field:
self._session.close()
self._session = None
else:
if first_name:
self.first_name = first_name_field.get("value")
if last_name:
self.last_name = parsed_html.body.find(id="user_last_name").get("value")
if nick:
nick_tag = parsed_html.body.find(id="user_nick")
if nick_tag:
self.nick = nick_tag.get("value")
else:
self.nick = None
if workgroups:
wg_links = parsed_html.body.select("[rel='nofollow']")
self.workgroups = [int(link["href"].split("=")[-1]) for link in wg_links]
if ordergroup:
links = parsed_html.body.find_all("a")
self.ordergroup = None
for link in links:
href = link.get("href")
if href:
if "invites" in href:
self.ordergroup = int(href.split("=")[-1])
break
def get_articles_CSV(self, supplier_id):
supplier_url = f"{self._url}suppliers/{str(supplier_id)}/articles.csv"
request = self._get(supplier_url, self._default_header)
decoded_content = request.content.decode('utf-8')
return decoded_content