diff --git a/TMGToolbox/src/XTMF_internal/space_time_travel_assignment.py b/TMGToolbox/src/XTMF_internal/space_time_travel_assignment.py index 2c4dbb5..61e2c0a 100644 --- a/TMGToolbox/src/XTMF_internal/space_time_travel_assignment.py +++ b/TMGToolbox/src/XTMF_internal/space_time_travel_assignment.py @@ -126,7 +126,7 @@ def __call__( RunTitle, TrafficClasses, ): - print("starting...") + print("Starting STTA...") # ---1 Set up Scenario Scenario = _m.Modeller().emmebank.scenario(ScenarioNumber) if Scenario is None: @@ -150,7 +150,6 @@ def __call__( tc["TollWeightList"] = [float(x) for x in tc["TollWeightList"].split(",")] try: - print("Starting assignment.") self._execute( Scenario, IntervalLengthList, @@ -171,7 +170,7 @@ def __call__( RunTitle, Parameters, ) - print("Assignment complete.") + print("STTA complete.") except Exception as e: raise Exception(_util.formatReverseStack()) @@ -212,12 +211,12 @@ def _execute( IntervalLengthList, tc["DemandMatrixNumber"], "demand_matrix", - [("cost_matrix", tc["CostMatrixNumber"]), ("time_matrix", tc["TimeMatrixNumber"]), ("toll_matrix", tc["TollMatrixNumber"])], + [("cost_matrix", tc["CostMatrixNumber"]), ("time_matrix", tc["TimeMatrixNumber"])], ) allMatrixDictsList.append(all_matrix_dict) # load all time dependent output matrices self._load_input_matrices(allMatrixDictsList, "demand_matrix") - self._load_output_matrices(allMatrixDictsList, ["cost_matrix", "time_matrix", "toll_matrix"]) + self._load_output_matrices(allMatrixDictsList, ["cost_matrix", "time_matrix"]) with _m.logbook_trace( name="%s (%s v%s)" % (self.RunTitle, self.__class__.__name__, self.version), @@ -237,7 +236,6 @@ def _execute( # initialize output matrices self._init_output_matrices(allMatrixDictsList, temporaryMatrixList, outputMatrixName="cost_matrix", description="") self._init_output_matrices(allMatrixDictsList, temporaryMatrixList, outputMatrixName="time_matrix", description="") - self._init_output_matrices(allMatrixDictsList, temporaryMatrixList, outputMatrixName="toll_matrix", description="") self._init_analysis_matrices(Parameters, len(IntervalLengthList)) with self.temporaryAttributeManager(Scenario) as tempAttributeList: timeDependentVolumeAttributeLists = [] @@ -267,40 +265,39 @@ def _execute( self._tracker.completeSubtask() # Assign traffic to road network per time period with _trace("Running Road Assignments."): - completed_path_analysis = False - if completed_path_analysis is False: - modeList = self._loadModeList(Parameters) - stta_spec = self._get_primary_STTA_spec( - allMatrixDictsList, - modeList, - volumeAttributeLists, - costAttributeLists, - IntervalLengthList, - StartTime, - ExtraTimeInterval, - NumberOfExtraTimeIntervals, - InnerIterations, - OuterIterations, - CoarseRGap, - FineRGap, - CoarseBRGap, - FineBRGap, - NormalizedGap, - Parameters, - PerformanceFlag, - linkComponentAttributeList, - CreateLinkComponentAttribute, - LinkComponentAttribute, - StartIndex, - timeResultAttributeList, - ) - report = self._tracker.runTool(trafficAssignmentTool, stta_spec, scenario=Scenario) + modeList = self._loadModeList(Parameters) + stta_spec = self._get_primary_STTA_spec( + allMatrixDictsList, + modeList, + volumeAttributeLists, + costAttributeLists, + IntervalLengthList, + StartTime, + ExtraTimeInterval, + NumberOfExtraTimeIntervals, + InnerIterations, + OuterIterations, + CoarseRGap, + FineRGap, + CoarseBRGap, + FineBRGap, + NormalizedGap, + Parameters, + PerformanceFlag, + linkComponentAttributeList, + CreateLinkComponentAttribute, + LinkComponentAttribute, + StartIndex, + timeResultAttributeList, + ) + report = self._tracker.runTool(trafficAssignmentTool, stta_spec, scenario=Scenario) checked = self._load_stopping_criteria(report) number = checked[0] stopping_criterion = checked[1] value = checked[2] print("Primary assignment complete at %s iterations." % number) print("Stopping criterion was %s with a value of %s." % (stopping_criterion, value)) + self.correct_cost_matrices(Parameters, len(IntervalLengthList)) def _load_atts(self, Scenario, run_title, max_outer_iterations, max_inner_iterations, traffic_classes, modeller_namespace): time_matrix_ids = ["mf" + str(mtx["TimeMatrixNumber"]) for mtx in traffic_classes] @@ -420,7 +417,7 @@ def _init_input_matrices(self, allMatrixDictsList, temporaryMatrixList, inputMat for matrix_list in allMatrixDictsList: for i, mtx in enumerate(matrix_list[inputMatrixName]): if mtx == None: - mtx = _util.initializeMatrix(matrix_type="FULL") + mtx = _util.initializeMatrix(matrix_type="FULL", default=0.00000001) temporaryMatrixList.append(mtx) matrix_list[inputMatrixName][i] = mtx @@ -519,7 +516,7 @@ def _process_traffic_attribute(self, Scenario, traffic_attrib_id, attribute_type def _calculate_applied_toll_factor(self, Parameters): applied_toll_factor_list = [] for tc in Parameters["TrafficClasses"]: - if len(tc["TollWeightList"]) != 0: + if len(tc["TollWeightList"]) > 0: try: toll_weight_list = [60 / weight for weight in tc["TollWeightList"]] applied_toll_factor_list.append(toll_weight_list) @@ -596,6 +593,7 @@ def _get_primary_STTA_spec( number_of_processors = multiprocessing.cpu_count() else: number_of_processors = max(multiprocessing.cpu_count() - 1, 1) + # Generic Spec for STTA STTA_spec = { "type": "SPACE_TIME_TRAFFIC_ASSIGNMENT", @@ -634,10 +632,9 @@ def convert_bound(bound_str): return None if bound_str == "None" else float(bound_str) def append_path_analyes(stta_class, parameters, i): - tc = Parameters['TrafficClasses'] + tc = Parameters['TrafficClasses'][i] if not('PathAnalyses' in tc): return - path = tc['PathAnalyses'] analysis = { "link_component": path['AttributeId'], @@ -691,7 +688,7 @@ def append_traversal_analysis(stta_class, parameters, i): "demand": matrix_dict["demand_matrix"][0].id, "generalized_cost": { "link_costs": costAttributeLists[i][0].id, - "od_fixed_cost": None, + "od_fixed_cost": matrix_dict["cost_matrix"][0].id, }, "results": { "link_volumes": volumeAttributeLists[i][0], @@ -732,6 +729,27 @@ def _load_stopping_criteria(self, report): else: value = "undefined" return number, stopping_criterion, value + + def correct_cost_matrices(self, parameters, number_of_intervals): + for tc in parameters['TrafficClasses']: + first_cost = int(tc['CostMatrixNumber']) + # If this isn't temporary then we need to adjust these matrices + if first_cost <= 0: + continue + + for i in range(number_of_intervals): + matrix_number = first_cost + i + perception = 60/tc['TollWeightList'][i] + spec = { + "expression": "mf%s/%f" % (matrix_number, perception), + "result": "mf%s" % matrix_number, + "constraint": {"by_value": None, "by_zone": None}, + "aggregation": {"origins": None, "destinations": None}, + "type": "MATRIX_CALCULATION", + } + matrixCalcTool(spec) + + return @contextmanager def temporaryMatricesManager(self):