-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcidr_api.py
65 lines (50 loc) · 1.85 KB
/
cidr_api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#
# IP Address Search Rest API
#
# python -m uvicorn cidr_api:app --reload --host 0.0.0.0
#
# ex.) curl -v http://localhost:8000/search?ipv4=192.168.3.2
#
#
from fastapi import FastAPI, Request, Security, Depends, HTTPException
from fastapi.security.api_key import APIKeyHeader
from databases import Database
import os
import ipaddress
from pydantic import IPvAnyAddress
import cidr_ipattr
async def check_api(request: Request,
api_key: str = Security(APIKeyHeader(name='x-api-key', auto_error=False))):
API_KEY = 'apitest'
if (not api_key or api_key != API_KEY):
raise HTTPException(status_code=401, detail='Invalid or missing API Key')
app = FastAPI(dependencies=[Depends(check_api)])
dbpath = os.path.join(os.environ.get("HOME"), "database.cidr")
database = Database("sqlite:///" + dbpath)
@app.on_event("startup")
async def startup():
await database.connect()
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
@app.get("/")
def read_root():
return {"Hello": "World"}
@app.get("/search")
async def read_ipgeo(ipv4: IPvAnyAddress):
attr = cidr_ipattr.IpAttribute(4)
param = attr.bin_addr(ipaddress.ip_address(ipv4))
query = """
select cidr, country, provider, asn, city
from
(select cidr, country from ipaddr_v4 where addr like :param_like and addr like substr(:param,1,prefixlen) || '%'),
(select provider, asn from asn_v4 where addr like :param_like and addr like substr(:param,1,prefixlen) || '%'),
(select city from city_v4 where addr like :param_like and addr like substr(:param,1,prefixlen) || '%')
"""
ipgeo = await database.fetch_one(
query=query, values={"param": param, "param_like": param[: attr.matches] + "%"}
)
if not ipgeo:
raise HTTPException(status_code=404, detail="Ip not found")
return ipgeo