Skip to content

Commit

Permalink
Merge branch 'master' into services/oilmarket
Browse files Browse the repository at this point in the history
  • Loading branch information
pomo-mondreganto authored Dec 6, 2023
2 parents 3a12c04 + c161c80 commit 61aebe6
Show file tree
Hide file tree
Showing 92 changed files with 102,165 additions and 0 deletions.
1 change: 1 addition & 0 deletions check.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@

ALLOWED_CHECKER_PATTERNS = [
"import requests",
"requests/",
"requests.exceptions",
"s: requests.Session",
"sess: requests.Session",
Expand Down
199 changes: 199 additions & 0 deletions checkers/explorers/checker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
#!/usr/bin/env python3
import random
import re
import string
import sys

import requests
from checklib import *
from checklib import status

import explorers_lib
import gpxhelper


class Checker(BaseChecker):
vulns: int = 1
timeout: int = 15
uses_attack_data: bool = True

req_ua_agents = ['python-requests/2.{}.0'.format(x) for x in range(15, 28)]

def __init__(self, *args, **kwargs):
super(Checker, self).__init__(*args, **kwargs)
self.gpx_helper = gpxhelper.TrackHelper()
self.lib = explorers_lib.ExplorersLib(self)
self.id_regexp = re.compile(r'^[0-9A-Za-z]{1,40}$')

def session_with_req_ua(self):
sess = get_initialized_session()
if random.randint(0, 1) == 1:
sess.headers['User-Agent'] = random.choice(self.req_ua_agents)
return sess

def random_route_name(self):
return 'Expedition #{}'.format(random.randint(1, 10000))

def random_description(self, name: str):
choices = ['how was it', 'report', 'detailed report', 'what we found', 'confidential']
if random.randint(0, 1) == 1:
return name + ':' + rnd_string(20)
return '{}: {}'.format(name, random.choice(choices))

def validate_rid(self, mid):
self.assert_eq(bool(self.id_regexp.fullmatch(mid)), True, 'Invalid id format')

def action(self, action, *args, **kwargs):
try:
super(Checker, self).action(action, *args, **kwargs)
except requests.exceptions.ConnectionError:
self.cquit(Status.DOWN, 'Connection error', 'Got requests connection error')

def check(self):
session = self.session_with_req_ua()
username, password = rnd_username(), rnd_password()

self.lib.signup(session, username, password)
self.lib.signin(session, username, password)

random_name = self.random_route_name()
random_description = self.random_description(random_name)
route = self.lib.create_route(session, random_name, random_description)

route_id = route.get('id')

self.validate_rid(route_id)
self.assert_eq(route.get('title'), random_name, 'Failed to create route')
self.assert_eq(route.get('description'), random_description, 'Failed to create route')

route_list = self.lib.get_route_list(session)
self.assert_eq(len(route_list), 1, 'Failed to get route list')
self.assert_eq(route_list[0].get('title'), random_name, 'Failed to get route list')
self.assert_eq(route_list[0].get('description'), random_description, 'Failed to get route list')

wpts = [
gpxhelper.WaypointParams(
name='Oil found',
description='Oil found at {}m'.format(random.randint(100, 1000) / 10)
),
gpxhelper.WaypointParams(
name='Oil not found',
description='No oil was found',
)
]
if random.randint(0, 1) == 1:
wpts.append(gpxhelper.WaypointParams(
name='Oil found',
description='Oil found at {}m'.format(random.randint(100, 1000) / 10)
))

gpx_file = self.gpx_helper.random_gpx_from_ds(username, wpts)

updated_gpx = self.lib.upload_gpx(session, route_id, gpx_file)
got_waypoints = updated_gpx.get('waypoints', [])
got_tracks = updated_gpx.get('track_points', [])

self.assert_eq(set(x.name for x in wpts), set(x.get('name') for x in got_waypoints), 'Failed to upload gpx')
self.assert_eq(set(x.description for x in wpts), set(x.get('desc') for x in got_waypoints),
'Failed to upload gpx')

got_waypoints[-1]['name'] = rnd_string(31, alphabet=string.ascii_uppercase + string.digits)
got_waypoints[-1]['desc'] = rnd_string(31, alphabet=string.ascii_uppercase + string.digits)
new_description = rnd_string(31, alphabet=string.ascii_uppercase + string.digits)

self.lib.update_route(session, route_id, new_description, got_tracks, got_waypoints)
updated_gpx = self.lib.get_route(session, route_id)

self.assert_eq(updated_gpx.get('description'), new_description, 'Failed to update route')
self.assert_eq(set(x.get('name') for x in got_waypoints), set(x.get('name') for x in updated_gpx['waypoints']),
'Failed to update route')
self.assert_eq(set(x.get('desc') for x in got_waypoints),
set(x.get('desc') for x in updated_gpx['waypoints']),
'Failed to update route')

u2, p2 = rnd_username(), rnd_password()
s2 = self.session_with_req_ua()
self.lib.signup(s2, u2, p2)

gpx_by_token = self.lib.get_route(s2, route_id, updated_gpx.get('share_token'))
self.assert_eq(gpx_by_token.get('title'), random_name, 'Failed to get route by token')
self.assert_eq(gpx_by_token.get('description'), new_description, 'Failed to get route by token')
self.assert_eq(set(x.get('name') for x in got_waypoints), set(x.get('name') for x in gpx_by_token['waypoints']),
'Failed to get route by token')
self.assert_eq(set(x.get('desc') for x in got_waypoints),
set(x.get('desc') for x in gpx_by_token['waypoints']),
'Failed to get route by token')

self.cquit(Status.OK)

def put(self, flag_id: str, flag: str, vuln: str):
sess = self.session_with_req_ua()
u = rnd_username()
p = rnd_password()

self.lib.signup(sess, u, p)
self.lib.signin(sess, u, p)

name = self.random_route_name()
description = flag

route = self.lib.create_route(sess, name, description)
route_id = route.get('id')

self.validate_rid(route_id)
self.assert_eq(route.get('title'), name, 'Failed to create route')
self.assert_eq(route.get('description'), description, 'Failed to create route')

wpts = [
gpxhelper.WaypointParams(
name='Oil found',
description='Oil found at {}m'.format(random.randint(100, 1000) / 10)
),
]

gpx_file = self.gpx_helper.random_gpx(creator=u, waypoints=wpts, num_points=random.randint(5, 15))
self.lib.upload_gpx(sess, route_id, gpx_file)

updated_gpx = self.lib.get_route(sess, route_id)

self.assert_eq(flag, updated_gpx.get('description'), 'Failed to upload gpx')
self.assert_eq(name, updated_gpx.get('title'), 'Failed to upload gpx')
self.assert_eq(set(x.name for x in wpts), set(x.get('name') for x in updated_gpx['waypoints']),
'Failed to upload gpx')
self.assert_eq(set(x.description for x in wpts), set(x.get('desc') for x in updated_gpx['waypoints']),
'Failed to upload gpx')

self.cquit(Status.OK, route_id, f"{u}:{p}:{route_id}")

def get(self, flag_id: str, flag: str, vuln: str):
u, p, route_id = flag_id.split(':')
sess = self.session_with_req_ua()
self.lib.signin(sess, u, p, status=Status.CORRUPT)

route = self.lib.get_route(sess, route_id, status=Status.CORRUPT)
self.assert_eq(route.get('description'), flag, 'Failed to get route', status=Status.CORRUPT)

route_list = self.lib.get_route_list(sess, status=Status.CORRUPT)
self.assert_in(flag, [x.get('description') for x in route_list], 'Failed to get route list',
status=Status.CORRUPT)

u, p = rnd_username(), rnd_password()
new_sess = self.session_with_req_ua()
self.lib.signup(new_sess, u, p)

new_sess = self.session_with_req_ua()
self.lib.signin(new_sess, u, p)

route = self.lib.get_route(new_sess, route_id, token=route.get('share_token'), status=Status.CORRUPT)
self.assert_eq(route.get('description'), flag, 'Failed to get route by token', status=Status.CORRUPT)

self.cquit(Status.OK)


if __name__ == '__main__':
c = Checker(sys.argv[2])

try:
c.action(sys.argv[1], *sys.argv[3:])
except c.get_check_finished_exception() as e:
cquit(status.Status(c.status), c.public, c.private)
88 changes: 88 additions & 0 deletions checkers/explorers/explorers_lib.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from typing import Optional

import checklib
from checklib import BaseChecker
import requests

PORT = 8000


class ExplorersLib:
@property
def api_url(self):
return f'http://{self.host}:{self.port}/api'

def __init__(self, checker: BaseChecker, port=PORT, host=None):
self.c = checker
self.port = port
self.host = host or self.c.host

def signup(self, session: requests.Session, username: str, password: str):
resp = session.post(f'{self.api_url}/signup', json={
'username': username,
'password': password
})
self.c.assert_eq(resp.status_code, 200, 'Failed to signup')
resp_json = self.c.get_json(resp, 'Failed to signup: invalid JSON')
return resp_json

def signin(self, session: requests.Session, username: str, password: str,
status: checklib.Status = checklib.Status.MUMBLE):
resp = session.post(f'{self.api_url}/signin', json={
'username': username,
'password': password
})
self.c.assert_eq(resp.status_code, 200, 'Failed to signin', status=status)
resp_json = self.c.get_json(resp, 'Failed to signin: invalid JSON')
return resp_json

def create_route(self, session: requests.Session, title: str, description: str):
resp = session.post(f'{self.api_url}/route/create', json={
'title': title,
'description': description
})
self.c.assert_eq(resp.status_code, 200, 'Failed to create route')
resp_json = self.c.get_json(resp, 'Failed to create route: invalid JSON')
self.c.assert_eq(type(resp_json), dict, 'Failed to create route: invalid JSON')
return resp_json

def get_route_list(self, session: requests.Session, status: checklib.Status = checklib.Status.MUMBLE):
resp = session.get(f'{self.api_url}/route')
self.c.assert_eq(resp.status_code, 200, 'Failed to get route list', status=status)
resp_json = self.c.get_json(resp, 'Failed to get route list: invalid JSON')
self.c.assert_eq(type(resp_json), list, 'Failed to get route list: invalid JSON')
return resp_json

def get_route(self, session: requests.Session, route_id: str, token: Optional[str] = None,
status: checklib.Status = checklib.Status.MUMBLE):
params = {}
if token:
params['token'] = token
resp = session.get(f'{self.api_url}/route/{route_id}', params=params)
self.c.assert_eq(resp.status_code, 200, 'Failed to get route', status=status)
resp_json = self.c.get_json(resp, 'Failed to get route: invalid JSON')
self.c.assert_eq(type(resp_json), dict, 'Failed to get route: invalid JSON')
return resp_json

def update_route(self, session: requests.Session, route_id: str, description: str, points=list[dict],
waypoints=list[dict]):
pld = {
'description': description,
'track_points': points,
'waypoints': waypoints
}
resp = session.post(f'{self.api_url}/route/{route_id}/update', json=pld)

self.c.assert_eq(resp.status_code, 200, 'Failed to update route')
resp_json = self.c.get_json(resp, 'Failed to update route: invalid JSON')
self.c.assert_eq(type(resp_json), dict, 'Failed to update route: invalid JSON')
return resp_json

def upload_gpx(self, session: requests.Session, route_id: str, file):
resp = session.post(f'{self.api_url}/route/{route_id}/upload', files={
'file': file
})
self.c.assert_eq(resp.status_code, 200, 'Failed to upload gpx')
resp_json = self.c.get_json(resp, 'Failed to upload gpx: invalid JSON')
self.c.assert_eq(type(resp_json), dict, 'Failed to upload gpx: invalid JSON')
return resp_json
Loading

0 comments on commit 61aebe6

Please sign in to comment.