-
Notifications
You must be signed in to change notification settings - Fork 0
/
init_db.py
114 lines (86 loc) · 3.66 KB
/
init_db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# -*- coding: utf-8 -*-
from ipaddress import ip_network, ip_address
import os
import logging
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine, Column, Integer, Float, String, ForeignKey
from sqlalchemy.orm import sessionmaker
from sqlalchemy.schema import Index
logger = logging.getLogger(__name__)
BASEDIR = os.path.abspath(os.path.dirname(__file__))
db_name = 'sqlite:///' + os.path.join(BASEDIR, 'roskomsvoboda.db')
engine = create_engine(db_name, echo=False)
Base = declarative_base()
Session = sessionmaker(bind=engine)
def get_bin_ip(address):
return str(bin(int(address.packed.hex(), 16)))
def get_bin_prefix(network):
return str(bin(int(network.network_address.packed.hex(), 16)))[:network.prefixlen]
class BlockedIpData(Base):
__tablename__ = 'blocked_ip'
__table_args__ = {'sqlite_autoincrement': True}
id = Column('id', Integer, primary_key=True)
include_time = Column('include_time', String)
exclude_time = Column('exclude_time', String)
decision_date = Column('decision_date', String)
decision_number = Column('decision_number', String)
org = Column('org', String)
ip = Column('ip', String)
ip_subnet = Column('ip_subnet', String)
ip_bin = Column('ip_bin', String)
ip_bin_index = Index("ip_bin_index", ip_bin)
org_index = Index("org_index", org)
lookup_index = Index("ip_org_date_index", ip_bin, org, decision_date)
def __init__(self, data):
self.include_time = data.get('include_time')
self.exclude_time = data.get('exclude_time')
self.decision_date = data.get('decision_date')
self.decision_number = data.get('decision_number')
self.org = data.get('org')
self.ip = data.get('ip')
self.ip_subnet = data.get('ip_subnet')
self.ip_bin = get_bin_ip(ip_address(self.ip)) if self.ip else get_bin_prefix(ip_network(self.ip_subnet))
def __repr__(self):
return "<BlockedIpData({0}, {1})>".format(self.id, self.ip_bin)
class BlockGeoData(Base):
__tablename__ = 'block_geo'
__table_args__ = {'sqlite_autoincrement': True}
id = Column('id', Integer, primary_key=True)
block_id = Column('block_id', Integer, ForeignKey(BlockedIpData.id))
longitude = Column('longitude', Float)
latitude = Column('latitude', Float)
def __init__(self, block_id, lon, lat):
self.block_id = block_id
self.longitude = lon
self.latitude = lat
class GeoPrefix(Base):
__tablename__ = 'geo_prefix'
__table_args__ = {'sqlite_autoincrement': True}
id = Column('id', Integer, primary_key=True)
geo_id = Column('geo_id', Integer, ForeignKey('block_geo.id'))
prefix = Column('prefix', String)
prefix_index = Index("prefix_index", prefix)
def __init__(self, geo_id, prefix):
self.geo_id = geo_id
self.prefix = prefix
class Stats(Base):
__tablename__ = 'stats'
__table_args__ = {'sqlite_autoincrement': True}
id = Column('id', Integer, primary_key=True)
date = Column('date', String)
org = Column('org', String)
blocked_number = Column('blocked_number', Integer)
unlocked_number = Column('unlocked_number', Integer)
def __init__(self, date, blocked_number, unlocked_number, org=None):
self.date = date
self.blocked_number = blocked_number
self.unlocked_number = unlocked_number
self.org = org
def init():
BlockedIpData.__table__.drop(engine, checkfirst=True)
BlockGeoData.__table__.drop(engine, checkfirst=True)
GeoPrefix.__table__.drop(engine, checkfirst=True)
Stats.__table__.drop(engine, checkfirst=True)
Base.metadata.create_all(engine)
if __name__ == '__main__':
init()