Skip to content

Commit

Permalink
Adding suburban_junctions.py and a command process suburban-junctions…
Browse files Browse the repository at this point in the history
… to load db table from added file in prev commit. Removed updating suburban_junctions table from accidents.
  • Loading branch information
ziv17 committed Jan 12, 2024
1 parent 94f07f8 commit 5a3e5a8
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 68 deletions.
67 changes: 0 additions & 67 deletions anyway/parsers/cbs/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@
ProviderCode,
VehicleDamage,
Streets,
SuburbanJunction,
AccidentMarkerView,
InvolvedView,
InvolvedMarkerView,
Expand Down Expand Up @@ -570,7 +569,6 @@ def import_accidents(provider_code, accidents, streets, roads, non_urban_interse
accidents_result = []
for _, accident in accidents.iterrows():
marker = create_marker(provider_code, accident, streets, roads, non_urban_intersection)
add_suburban_junction_from_marker(marker)
add_road_junction_km_from_marker(marker)
accidents_result.append(marker)
db.session.bulk_insert_mappings(AccidentMarker, accidents_result)
Expand Down Expand Up @@ -795,8 +793,6 @@ def import_streets_into_db():

yishuv_street_dict: Dict[Tuple[int, int], str] = {}
yishuv_name_dict: Dict[Tuple[int, str], int] = {}
suburban_junctions_dict: Dict[int, dict] = {}
SUBURBAN_JUNCTION = "suburban_junction"
# (road, junction) -> km
road_junction_km_dict: Dict[Tuple[int, int], int] = {}

Expand Down Expand Up @@ -846,67 +842,6 @@ def add_street_remove_name_duplicates(street: Dict[str, Any]):
yishuv_name_dict[k] = street["street"]


def import_suburban_junctions_into_db():
items = [{"non_urban_intersection": k,
NON_URBAN_INTERSECTION_HEBREW: fix_name_len(v[NON_URBAN_INTERSECTION_HEBREW]),
ROADS: v[ROADS]} for
k, v in suburban_junctions_dict.items()]
logging.debug(
f"Writing to db: {len(items)} suburban junctions"
)
db.session.query(SuburbanJunction).delete()
db.session.bulk_insert_mappings(SuburbanJunction, items)
db.session.commit()
logging.debug(f"Done.")


def fix_name_len(name: str) -> str:
if not isinstance(name, str):
return name
if len(name) > SuburbanJunction.MAX_NAME_LEN:
logging.error(f"Suburban_junction name too long ({len(name)}>"
f"{SuburbanJunction.MAX_NAME_LEN}):{name}.")
return name[: SuburbanJunction.MAX_NAME_LEN]

def load_existing_suburban_junctions():
junctions: List[SuburbanJunction] = db.session.query(SuburbanJunction).all()
for j in junctions:
add_suburban_junction(j)
logging.debug(f"Loaded suburban junctions: {len(suburban_junctions_dict)}.")


def add_suburban_junction(added: SuburbanJunction):
if added.non_urban_intersection in suburban_junctions_dict:
existing_junction = suburban_junctions_dict[added.non_urban_intersection]
added_heb = added.non_urban_intersection_hebrew
if existing_junction[NON_URBAN_INTERSECTION_HEBREW] != added_heb and added_heb is not None:
logging.error(
f"Duplicate non-urban intersection name: {added.non_urban_intersection}: existing:"
f"{existing_junction[NON_URBAN_INTERSECTION_HEBREW]}, added: {added_heb}"
)
existing_junction[NON_URBAN_INTERSECTION_HEBREW] = added_heb
existing_junction[ROADS].update(set(added.roads))
else:
suburban_junctions_dict[added.non_urban_intersection] = {
NON_URBAN_INTERSECTION_HEBREW: added.non_urban_intersection_hebrew,
ROADS: set(added.roads),
}


def add_suburban_junction_from_marker(marker: dict):
intersection = marker[NON_URBAN_INTERSECTION]
if intersection is not None:
j = SuburbanJunction()
j.non_urban_intersection = intersection
j.non_urban_intersection_hebrew = marker[NON_URBAN_INTERSECTION_HEBREW]
roads = set()
for k in ["road1", "road2"]:
if marker[k] is not None:
roads.add(marker[k])
j.roads = roads
add_suburban_junction(j)


def load_existing_road_junction_km_data():
rows: List[RoadJunctionKM] = db.session.query(RoadJunctionKM).all()
tmp = {(r.road, r.non_urban_intersection): r.km for r in rows}
Expand Down Expand Up @@ -1220,7 +1155,6 @@ def get_file_type_and_year(file_path):
def main(batch_size, source, load_start_year=None):
try:
load_existing_streets()
load_existing_suburban_junctions()
load_existing_road_junction_km_data()
total = 0
started = datetime.now()
Expand Down Expand Up @@ -1278,7 +1212,6 @@ def main(batch_size, source, load_start_year=None):
add_to_streets(streets)

import_streets_into_db()
import_suburban_junctions_into_db()
import_road_junction_km_into_db()

fill_db_geo_data()
Expand Down
101 changes: 101 additions & 0 deletions anyway/parsers/suburban_junctions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# -*- coding: utf-8 -*-
import sys
from typing import Dict, List
import logging
from openpyxl import load_workbook
from anyway.app_and_db import db
from anyway.models import SuburbanJunction


SUBURBAN_JUNCTION = "suburban_junction"
ACCIDENTS = "accidents"
CITIES = "cities"
STREETS = "streets"
ROADS = "roads"
URBAN_INTERSECTION = "urban_intersection"
NON_URBAN_INTERSECTION = "non_urban_intersection"
NON_URBAN_INTERSECTION_HEBREW = "non_urban_intersection_hebrew"
DICTIONARY = "dictionary"
INVOLVED = "involved"
VEHICLES = "vehicles"
suburban_junctions_dict: Dict[int, dict] = {}


def parse(filename):
read_from_file(filename)
import_suburban_junctions_into_db()


def read_from_file(filename: str) -> dict:
for j in _iter_rows(filename):
add_suburban_junction(j)


def _iter_rows(filename):
workbook = load_workbook(filename, read_only=True)
sheet = workbook["מילון צמתים לא עירוניים"]
rows = sheet.rows
first_row = next(rows)
headers = ["ZOMET", "SUG_DEREH", "REHOV1_KVISH1", "REHOV2_KVISH2", "KM", "IKS", "IGREK", "IDF",
"SHEM_ZOMET", "SUG_ZOMET", "KVISH_RASHI", "KM_RASHI", "SHNAT_ZOMET_SGIRA", "MAHOZ",
"NAFA", "EZOR_TIVI", "METROPOLIN", "MAAMAD_MINIZIPALI", "EZOR_STAT"
]
assert [cell.value for cell in first_row] == headers, "File does not have expected headers"
for row in rows:
# In order to ignore empty lines
if not row[0].value:
continue
roads = set()
roads.add(row[2].value)
if row[3].value:
roads.add(row[3].value)
j = SuburbanJunction()
j.non_urban_intersection = row[0].value
j.non_urban_intersection_hebrew = row[8].value
j.roads = roads
yield j


def import_suburban_junctions_into_db():
items = [{"non_urban_intersection": k,
NON_URBAN_INTERSECTION_HEBREW: fix_name_len(v[NON_URBAN_INTERSECTION_HEBREW]),
ROADS: v[ROADS]} for
k, v in suburban_junctions_dict.items()]
logging.debug(
f"Writing to db: {len(items)} suburban junctions"
)
db.session.query(SuburbanJunction).delete()
db.session.bulk_insert_mappings(SuburbanJunction, items)
db.session.commit()
logging.debug(f"Done.")


def fix_name_len(name: str) -> str:
if not isinstance(name, str):
return name
if len(name) > SuburbanJunction.MAX_NAME_LEN:
logging.error(f"Suburban_junction name too long ({len(name)}>"
f"{SuburbanJunction.MAX_NAME_LEN}):{name}.")
return name[: SuburbanJunction.MAX_NAME_LEN]


def add_suburban_junction(added: SuburbanJunction):
if added.non_urban_intersection in suburban_junctions_dict:
existing_junction = suburban_junctions_dict[added.non_urban_intersection]
added_heb = added.non_urban_intersection_hebrew
if existing_junction[NON_URBAN_INTERSECTION_HEBREW] != added_heb and added_heb is not None:
logging.error(
f"Duplicate non-urban intersection name: {added.non_urban_intersection}: existing:"
f"{existing_junction[NON_URBAN_INTERSECTION_HEBREW]}, added: {added_heb}"
)
existing_junction[NON_URBAN_INTERSECTION_HEBREW] = added_heb
existing_junction[ROADS].update(set(added.roads))
else:
suburban_junctions_dict[added.non_urban_intersection] = {
NON_URBAN_INTERSECTION_HEBREW: added.non_urban_intersection_hebrew,
ROADS: set(added.roads),
}


if __name__ == '__main__':
parse(sys.argv[1])
9 changes: 8 additions & 1 deletion main.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,14 @@ def road_segments(filename):
return parse(filename)


@process.command()
@click.argument("filename", type=str, default="static/data/suburban_junctions/suburban_junctions.xlsx")
def suburban_junctions(filename):
from anyway.parsers.suburban_junctions import parse

return parse(filename)


@process.command()
@click.argument("filepath", type=str, default="static/data/schools/schools.csv")
@click.option("--batch_size", type=int, default=5000)
Expand Down Expand Up @@ -528,4 +536,3 @@ def trigger_dag(id):

if __name__ == "__main__":
cli(sys.argv[1:]) # pylint: disable=too-many-function-args

0 comments on commit 5a3e5a8

Please sign in to comment.