-
Notifications
You must be signed in to change notification settings - Fork 98
Need help with Vehicle routing problem python #1112
Replies: 1 comment · 3 replies
-
|
Beta Was this translation helpful? Give feedback.
All reactions
-
My constraints are:
My customer vehicle class:
In case where there are two vehicles V1 covers two locations and V2 covers one location there V2 have 0 distance and duration |
Beta Was this translation helpful? Give feedback.
All reactions
-
I am unable to reproduce; in particular, with the following domain classes: from timefold.solver import SolverStatus
from timefold.solver.score import HardSoftScore
from timefold.solver.domain import *
from datetime import datetime, timedelta
from typing import Annotated, Optional
from pydantic import Field, computed_field, BeforeValidator
from .json_serialization import *
class Location(JsonDomainBase):
latitude: float
longitude: float
distance_matrix: dict[int, int] = Field(default_factory=dict)
duration_matrix: dict[int, int] = Field(default_factory=dict)
def driving_time_to(self, other: 'Location') -> int:
return self.duration_matrix[id(other)]
def driving_distance_to(self, other: 'Location') -> int:
return self.distance_matrix[id(other)]
def __str__(self):
return f'[{self.latitude}, {self.longitude}]'
def __repr__(self):
return f'Location({self.latitude}, {self.longitude})'
def location_validator(location):
if isinstance(location, Location):
return location
elif isinstance(location, Depot):
return location.location
else:
return Location(longitude=location[0], latitude=location[1])
LocationValidator = BeforeValidator(lambda location: location_validator(location))
class Depot(JsonDomainBase):
location: Annotated[Location, LocationSerializer, LocationValidator]
@planning_entity
class Customer(JsonDomainBase):
id: Annotated[str, PlanningId]
name: Optional[str] = None
location: Annotated[Location, LocationSerializer, LocationValidator]
ready_time: datetime
due_time: datetime
service_duration: Annotated[timedelta, DurationSerializer]
demand: int
isExtra: Optional[bool] = False
booking_date: Optional[datetime] = None
days: Optional[int] = 1
vehicle: Annotated[Optional['Vehicle'], InverseRelationShadowVariable(
source_variable_name='customers'), IdSerializer, VehicleValidator, Field(default=None)]
previous_customer: Annotated[Optional['Customer'], PreviousElementShadowVariable(
source_variable_name='customers'), IdSerializer, VisitValidator, Field(default=None)]
next_customer: Annotated[Optional['Customer'], NextElementShadowVariable(
source_variable_name='customers'), IdSerializer, VisitValidator, Field(default=None)]
arrival_time: Annotated[
Optional[datetime],
CascadingUpdateShadowVariable(target_method_name='update_arrival_time'),
Field(default=None)]
def update_arrival_time(self):
if self.vehicle is None or (self.previous_customer is not None and self.previous_customer.arrival_time is None):
self.arrival_time = None
elif self.previous_customer is None:
self.arrival_time = (self.vehicle.departure_time +
timedelta(seconds=self.vehicle.depot.location.driving_time_to(self.location)))
else:
self.arrival_time = (self.previous_customer.departure_time() +
timedelta(seconds=self.previous_customer.location.driving_time_to(self.location)))
@computed_field
@property
def departureTime(self) -> Optional[datetime]:
return self.departure_time()
@computed_field
@property
def startServiceTime(self) -> Optional[datetime]:
if self.arrival_time is None:
return None
return max(self.ready_time, self.arrival_time)
@computed_field
@property
def drivingTimeSecondsFromPreviousStandstill(self) -> Optional[int]:
if self.vehicle is None:
return None
return self.driving_time_seconds_from_previous_standstill_or_none()
def departure_time(self) -> datetime | None:
if self.arrival_time is None:
return None
return self.arrival_time + self.service_duration
def is_service_finished_after_due_time(self) -> bool:
return self.arrival_time is not None and self.departure_time() > self.due_time
def get_service_finished_delay_in_minutes(self) -> int:
if self.arrival_time is None:
return 0
return -((self.departure_time() - self.due_time) // timedelta(minutes=-1))
def driving_time_seconds_from_previous_standstill_or_none(self) -> int:
if self.vehicle is None:
raise ValueError("This method must not be called when the shadow variables are not initialized yet.")
if self.previous_customer is None:
return self.vehicle.depot.location.driving_time_to(self.location)
else:
return self.previous_customer.location.driving_time_to(self.location)
def get_location(self):
return [self.location.longitude, self.location.latitude]
def __str__(self):
return self.id
def __repr__(self):
return f'Customer({self.id})'
@planning_entity
class Vehicle(JsonDomainBase):
id: Annotated[str, PlanningId]
capacity: int
mileage: Optional[int] = 1
departure_time: datetime
depot: Depot
customers: Annotated[
list[Customer], PlanningListVariable, IdListSerializer, VisitValidator, Field(default_factory=list)]
@computed_field
@property
def arrival_time(self) -> datetime:
if len(self.customers) == 0:
return self.departure_time
last_customer = self.customers[-1]
return (last_customer.departureTime +
timedelta(seconds=last_customer.location.driving_time_to(self.depot.location)))
@computed_field
@property
def total_demand(self) -> int:
return self.calculate_total_demand()
@computed_field
@property
def total_driving_time_seconds(self) -> int:
return self.calculate_total_driving_time_seconds()
@computed_field
@property
def total_driving_distance_meters(self) -> int:
return self.calculate_total_driving_distance_meters()
def calculate_total_demand(self) -> int:
total_demand = 0
for visit in self.customers:
total_demand += visit.demand
return total_demand
def calculate_total_driving_time_seconds(self) -> int:
if len(self.customers) == 0:
return 0
total_driving_time_seconds = 0
previous_location = self.depot.location
for visit in self.customers:
total_driving_time_seconds += previous_location.driving_time_to(visit.location)
previous_location = visit.location
total_driving_time_seconds += previous_location.driving_time_to(self.depot.location)
return total_driving_time_seconds
def calculate_total_driving_distance_meters(self) -> int:
if len(self.customers) == 0:
return 0
total_driving_distance_meters = 0
previous_location = self.depot.location
for visit in self.customers:
total_driving_distance_meters += previous_location.driving_distance_to(visit.location)
previous_location = visit.location
total_driving_distance_meters += previous_location.driving_distance_to(self.depot.location)
return total_driving_distance_meters
def __str__(self):
return self.id
def __repr__(self):
return f'Vehicle({self.id}), customers={self.customers}, total_driving_time_seconds={self.total_driving_time_seconds}, total_driving_distance={self.total_driving_distance_meters}'
class DurationDistanceResponse(JsonDomainBase):
...
class DurationDistanceService(JsonDomainBase):
def update_locations_with_durations(self, locations: list[Location], response):
for l1 in locations:
for l2 in locations:
l1.duration_matrix[id(l2)] = 2
def update_locations_with_distance(self, locations: list[Location], response):
for l1 in locations:
for l2 in locations:
l1.distance_matrix[id(l2)] = 1
@planning_solution
class VehicleRoutePlan(JsonDomainBase):
name: str
depots: list[Depot]
southWestCorner: Annotated[Location, LocationSerializer, LocationValidator]
north_east_corner: Annotated[Location, LocationSerializer, LocationValidator]
vehicles: Annotated[list[Vehicle], PlanningEntityCollectionProperty]
customers: Annotated[list[Customer], PlanningEntityCollectionProperty, ValueRangeProvider]
score: Annotated[Optional[HardSoftScore], PlanningScore, ScoreSerializer, ScoreValidator, Field(default=None)]
solver_status: Annotated[Optional[SolverStatus], Field(default=None)]
duration_response: Optional[DurationDistanceResponse] = None
duration_service: Optional[DurationDistanceService] = None
start_date_time: Optional[datetime] = None
end_date_time: Optional[datetime] = None
def __init__(self, /, **data: Any):
super().__init__(**data)
if self.customers:
locations = [vehicle.depot.location for vehicle in self.vehicles] + [customer.location for customer in
self.customers]
print(locations)
self.duration_service = DurationDistanceService()
self.duration_service.update_locations_with_durations(locations, self.duration_response)
self.duration_service.update_locations_with_distance(locations, self.duration_response) Constraints: from timefold.solver.score import ConstraintFactory, HardSoftScore, constraint_provider
from .domain import *
VEHICLE_CAPACITY = "vehicleCapacity"
MINIMIZE_TRAVEL_TIME = "minimizeTravelTime"
SERVICE_FINISHED_AFTER_MAX_END_TIME = "serviceFinishedAfterMaxEndTime"
@constraint_provider
def define_constraints(factory: ConstraintFactory):
return [
# Hard constraints
vehicle_capacity(factory),
service_finished_after_max_end_time(factory),
# Soft constraints
minimize_travel_time(factory),
minimize_travel_distance(factory)
]
##############################################
# Hard constraints
##############################################
def vehicle_capacity(factory: ConstraintFactory):
return (factory.for_each(Vehicle)
# force one vehicle to have 1 and the other 2 on a dataset with
# 3 visits
.filter(lambda vehicle: not (1 <= len(vehicle.customers) < 3))
.penalize(HardSoftScore.ONE_HARD)
.as_constraint(VEHICLE_CAPACITY)
)
def service_finished_after_max_end_time(factory: ConstraintFactory):
return (factory.for_each(Customer)
.filter(lambda visit: visit.is_service_finished_after_due_time())
.penalize(HardSoftScore.ONE_HARD,
lambda visit: visit.get_service_finished_delay_in_minutes())
.as_constraint(SERVICE_FINISHED_AFTER_MAX_END_TIME)
)
##############################################
# Soft constraints
##############################################
def minimize_travel_time(factory: ConstraintFactory):
return (
factory.for_each(Vehicle)
.penalize(HardSoftScore.ONE_SOFT,
lambda vehicle: vehicle.calculate_total_driving_time_seconds())
.as_constraint(MINIMIZE_TRAVEL_TIME)
)
def minimize_travel_distance(factory: ConstraintFactory):
return (
factory.for_each(Vehicle)
.penalize(HardSoftScore.ONE_SOFT,
lambda vehicle: vehicle.calculate_total_driving_distance_meters())
.as_constraint('Minimize travel distance')
) JSON serialization from timefold.solver.score import HardSoftScore
from typing import Any
from datetime import timedelta
from pydantic import BaseModel, ConfigDict, PlainSerializer, BeforeValidator, ValidationInfo
from pydantic.alias_generators import to_camel
class JsonDomainBase(BaseModel):
model_config = ConfigDict(
alias_generator=to_camel,
populate_by_name=True,
from_attributes=True,
)
def make_id_item_validator(key: str):
def validator(v: Any, info: ValidationInfo) -> Any:
if v is None:
return None
if not isinstance(v, str) or not info.context:
return v
return info.context.get(key)[v]
return BeforeValidator(validator)
def make_id_list_item_validator(key: str):
def validator(v: Any, info: ValidationInfo) -> Any:
if v is None:
return None
if isinstance(v, (list, tuple)):
out = []
for item in v:
if not isinstance(v, str) or not info.context:
return v
out.append(info.context.get(key)[item])
return out
return v
return BeforeValidator(validator)
LocationSerializer = PlainSerializer(lambda location: [
location.latitude,
location.longitude,
], return_type=list[float])
ScoreSerializer = PlainSerializer(lambda score: str(score), return_type=str)
IdSerializer = PlainSerializer(lambda item: item.id if item is not None else None, return_type=str | None)
IdListSerializer = PlainSerializer(lambda items: [item.id for item in items], return_type=list)
DurationSerializer = PlainSerializer(lambda duration: duration // timedelta(seconds=1), return_type=int)
VisitListValidator = make_id_list_item_validator('customers')
VisitValidator = make_id_item_validator('customers')
VehicleValidator = make_id_item_validator('vehicles')
def validate_score(v: Any, info: ValidationInfo) -> Any:
if isinstance(v, HardSoftScore) or v is None:
return v
if isinstance(v, str):
return HardSoftScore.parse(v)
raise ValueError('"score" should be a string')
ScoreValidator = BeforeValidator(validate_score) and demo data: @dataclass
class _DemoDataProperties:
seed: int
visit_count: int
vehicle_count: int
vehicle_start_time: time
min_demand: int
max_demand: int
min_vehicle_capacity: int
max_vehicle_capacity: int
south_west_corner: Location
north_east_corner: Location
def __post_init__(self):
if self.min_demand < 1:
raise ValueError(f"minDemand ({self.min_demand}) must be greater than zero.")
if self.max_demand < 1:
raise ValueError(f"maxDemand ({self.max_demand}) must be greater than zero.")
if self.min_demand >= self.max_demand:
raise ValueError(f"maxDemand ({self.max_demand}) must be greater than minDemand ({self.min_demand}).")
if self.min_vehicle_capacity < 1:
raise ValueError(f"Number of minVehicleCapacity ({self.min_vehicle_capacity}) must be greater than zero.")
if self.max_vehicle_capacity < 1:
raise ValueError(f"Number of maxVehicleCapacity ({self.max_vehicle_capacity}) must be greater than zero.")
if self.min_vehicle_capacity >= self.max_vehicle_capacity:
raise ValueError(f"maxVehicleCapacity ({self.max_vehicle_capacity}) must be greater than "
f"minVehicleCapacity ({self.min_vehicle_capacity}).")
if self.visit_count < 1:
raise ValueError(f"Number of visitCount ({self.visit_count}) must be greater than zero.")
if self.vehicle_count < 1:
raise ValueError(f"Number of vehicleCount ({self.vehicle_count}) must be greater than zero.")
if self.north_east_corner.latitude <= self.south_west_corner.latitude:
raise ValueError(f"northEastCorner.getLatitude ({self.north_east_corner.latitude}) must be greater than "
f"southWestCorner.getLatitude({self.south_west_corner.latitude}).")
if self.north_east_corner.longitude <= self.south_west_corner.longitude:
raise ValueError(f"northEastCorner.getLongitude ({self.north_east_corner.longitude}) must be greater than "
f"southWestCorner.getLongitude({self.south_west_corner.longitude}).")
class DemoData(Enum):
PHILADELPHIA = _DemoDataProperties(0, 3, 2, time(7, 30),
1, 2, 15, 30,
Location(latitude=39.7656099067391,
longitude=-76.83782328143754),
Location(latitude=40.77636644354855,
longitude=-74.9300739430771))
def doubles(random: Random, start: float, end: float) -> Generator[float, None, None]:
while True:
yield random.uniform(start, end)
def ints(random: Random, start: int, end: int) -> Generator[int, None, None]:
while True:
yield random.randrange(start, end)
T = TypeVar('T')
def values(random: Random, sequence: Sequence[T]) -> Generator[T, None, None]:
start = 0
end = len(sequence) - 1
while True:
yield sequence[random.randint(start, end)]
def generate_names(random: Random) -> Generator[str, None, None]:
while True:
yield f'{random.choice(FIRST_NAMES)} {random.choice(LAST_NAMES)}'
def generate_demo_data(demo_data_enum: DemoData) -> VehicleRoutePlan:
name = "demo"
demo_data = demo_data_enum.value
random = Random(demo_data.seed)
latitudes = doubles(random, demo_data.south_west_corner.latitude, demo_data.north_east_corner.latitude)
longitudes = doubles(random, demo_data.south_west_corner.longitude, demo_data.north_east_corner.longitude)
demands = ints(random, demo_data.min_demand, demo_data.max_demand + 1)
service_durations = values(random, SERVICE_DURATION_MINUTES)
vehicle_capacities = ints(random, demo_data.min_vehicle_capacity,
demo_data.max_vehicle_capacity + 1)
vehicles = [Vehicle(id=str(i),
capacity=next(vehicle_capacities),
depot=Depot(location=Location(
latitude=next(latitudes),
longitude=next(longitudes))),
departure_time=datetime.combine(
date.today() + timedelta(days=1), demo_data.vehicle_start_time)
)
for i in range(demo_data.vehicle_count)]
names = generate_names(random)
visits = [
Customer(
id=str(i),
name=next(names),
location=Location(latitude=next(latitudes), longitude=next(longitudes)),
demand=next(demands),
ready_time=datetime.combine(date.today() + timedelta(days=1),
MORNING_WINDOW_START
if (morning_window := random.random() > 0.5)
else AFTERNOON_WINDOW_START),
due_time=datetime.combine(date.today() + timedelta(days=1),
MORNING_WINDOW_END
if morning_window
else AFTERNOON_WINDOW_END),
service_duration=timedelta(minutes=next(service_durations)),
) for i in range(demo_data.visit_count)
]
return VehicleRoutePlan(name=name,
depots=[vehicle.depot for vehicle in vehicles],
southWestCorner=demo_data.south_west_corner,
north_east_corner=demo_data.north_east_corner,
vehicles=vehicles,
customers=visits)
def tomorrow_at(local_time: time) -> datetime:
return datetime.combine(date.today(), local_time) The solver generates a solution where one vehicle has two visits, and the other has one visit, with correct total distance and duration:
My guess there is a problem with DurationDistanceService where it assigns 0 travel time to/from a depot. |
Beta Was this translation helpful? Give feedback.
All reactions
-
Thanks for your help let me check with DurationDistanceService, thank you so much |
Beta Was this translation helpful? Give feedback.
-
I am using python quickstart as a reference to create VRP in python but using distance and duration matrix in RoutePlan class for getting distance and durations, sometimes totaltraveleddistance and totaltraveledduration is 0 for the last vehicle, not able to resolve need help.
My vehicle route plan class code:
Beta Was this translation helpful? Give feedback.
All reactions