-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathprepare_config_for_drs.py
244 lines (208 loc) · 9.32 KB
/
prepare_config_for_drs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
"""
Prepare MATSim config files for use with the drs (dynamic ride sharing) module.
Adjust the integrated XML snippets to your liking before running the script.
"""
import argparse
import logging
from lxml import etree
from pathlib import Path
DRS_MODULE = """
<module name="drs">
<!-- main drs config, leave empty to use defaults -->
<param name="maxMatchingDistanceMeters" value="500" />
</module>"""
SCORING_MODEPARAMS_DRIVER = """
<parameterset type="modeParams">
<param name="constant" value="0.0"/>
<param name="dailyMonetaryConstant" value="0.0"/>
<param name="marginalUtilityOfDistance_util_m" value="0.0"/>
<param name="marginalUtilityOfTraveling_util_hr" value="-6.0"/>
<param name="mode" value="drsDriver"/>
<param name="monetaryDistanceRate" value="0.0"/>
</parameterset>
"""
SCORING_MODEPARAMS_RIDER = """
<parameterset type="modeParams">
<param name="constant" value="0.0"/>
<param name="dailyMonetaryConstant" value="0.0"/>
<param name="marginalUtilityOfDistance_util_m" value="0.0"/>
<param name="marginalUtilityOfTraveling_util_hr" value="-6.0"/>
<param name="mode" value="drsRider"/>
<param name="monetaryDistanceRate" value="0.0"/>
</parameterset>
"""
SCORING_ACTIVITYPARAMS_DRIVER = """
<parameterset type="activityParams">
<param name="activityType" value="drsDriver interaction"/>
<param name="scoringThisActivityAtAll" value="false"/>
<param name="typicalDuration" value="undefined"/>
</parameterset>
"""
ROUTING_TELEPORTATION = """
<module name="routing" >
<parameterset type="teleportedModeParameters">
<param name="beelineDistanceFactor" value="1.3" />
<param name="mode" value="bike" />
<param name="teleportedModeSpeed" value="4.166666666666667" />
</parameterset>
<parameterset type="teleportedModeParameters">
<param name="beelineDistanceFactor" value="1.3" />
<param name="mode" value="walk" />
<param name="teleportedModeSpeed" value="0.8333333333333333" />
</parameterset>
<parameterset type="teleportedModeParameters">
<param name="beelineDistanceFactor" value="1.3" />
<param name="mode" value="non_network_walk" />
<param name="teleportedModeSpeed" value="0.8333333333333333" />
</parameterset>
<parameterset type="teleportedModeParameters">
<param name="beelineDistanceFactor" value="1.3" />
<param name="mode" value="ride" />
<param name="teleportedModeFreespeedFactor" value="1.0" />
</parameterset>
<parameterset type="teleportedModeParameters">
<param name="beelineDistanceFactor" value="1.3" />
<param name="mode" value="pt" />
<param name="teleportedModeFreespeedFactor" value="2.0" />
</parameterset>
<parameterset type="teleportedModeParameters">
<!-- NOTE: drsRider routes are internally calculated with the drsDriver routing module.
Unfortunately we still need to provide a teleportation config for drsRider
because it may be used by the ReRoute strategy (even if the result of the rerouting does not make sense
and will be replaced by the results of our matching algorithm immediately.)
To specify a teleportation config for drsRider we need to also explicitly set the
default modes, because they are deleted as soon as a single mode is configured here -->
<param name="beelineDistanceFactor" value="1.3" />
<param name="mode" value="drsRider" />
<param name="teleportedModeFreespeedFactor" value="2.0" />
</parameterset>
</module>
"""
LOG_FORMAT = "%(levelname)s | %(message)s"
logging.basicConfig(format=LOG_FORMAT, datefmt="%Y-%m-%d %H:%M:%S", level=logging.INFO)
logger = logging.getLogger(__name__)
# removing blank text is required for pretty printing to work
CLEAN_PARSER = etree.XMLParser(remove_blank_text=True, remove_comments=False)
def add_drs_module(tree: etree.ElementTree) -> None:
if tree.find("module[@name='drs']") is not None:
logger.info("drs: module config already present")
return
logger.info("drs: add module config")
drs = etree.fromstring(DRS_MODULE, parser=CLEAN_PARSER)
tree.getroot().insert(0, drs)
def adjust_qsim(tree: etree.ElementTree) -> None:
modes = tree.find("module[@name='qsim']/param[@name='mainMode']")
value = modes.get("value")
if "drsDriver" in value:
logger.info("qsim.mainMode: drsDriver already present")
return
logger.info("qsim.mainMode: add drsDriver")
modes.set("value", f"{value},drsDriver")
def adjust_replanning(tree: etree.ElementTree) -> None:
smc = tree.xpath(
"module[@name='replanning']/"
"parameterset[@type='strategysettings' and param[@name='strategyName'] and param[@value='SubtourModeChoice']]"
)
logger.info(f"replanning: using {len(smc)} SubtourModeChoice strategies")
logger.warning(
"TODO implement checking if drsDriver and drsRider are present to the subtourmodechoice config group"
)
def adjust_routing(tree: etree.ElementTree) -> None:
routing = tree.find("module[@name='routing']")
modes = routing.find("param[@name='networkModes']")
value = modes.get("value")
if "drsDriver" in value:
logger.info("routing.networkModes: drsDriver already present")
else:
logger.info("routing.networkModes: add drsDriver")
modes.set("value", f"{value},drsDriver")
logger.info("routing.clearDefaultTeleportedModeParams: set true")
clear = routing.find("param[@name='clearDefaultTeleportedModeParams']")
if clear:
clear.set("value", "true")
else:
clear = etree.Element(
"param", name="clearDefaultTeleportedModeParams", value="true"
)
routing.append(clear)
teleport_params = routing.findall("parameterset[type='teleportedModeParameters']")
if len(teleport_params) > 0:
logger.info(
f"routing.teleportedModeParameters: remove {len(teleport_params)} existing ones"
)
for p in teleport_params:
p.getparent().remove(p)
new_teleport_params = etree.fromstring(ROUTING_TELEPORTATION, parser=CLEAN_PARSER)
logger.info(
f"routing.teleportedModeParameters: add {len(new_teleport_params)} from template"
)
for p in etree.fromstring(ROUTING_TELEPORTATION, parser=CLEAN_PARSER):
routing.append(p)
def adjust_scoring(tree: etree.ElementTree) -> None:
logger.info("scoring: adjust general config (no subpops)..")
_adjust_scoring(tree.find("module[@name='scoring']"))
# or in case of subpopulations - for each scoringParameters
for p in tree.findall(
"module[@name='scoring']/parameterset[@type='scoringParameters']"
):
subpop = p.find("param[@name='subpopulation']").get("value")
logger.info(f"scoring: adjust subpopulation {subpop}..")
_adjust_scoring(p)
def _adjust_scoring(scoring: etree.Element) -> None:
mode_params = scoring.findall("parameterset[@type='modeParams']")
if len(mode_params) == 0:
logger.info("-> skip")
return
for p in mode_params:
mode = p.find("param[@name='mode']").get("value")
daily_cost = p.find("param[@name='dailyMonetaryConstant']")
if daily_cost is None:
continue
daily_cost = daily_cost.get("value")
if mode == "car" and float(daily_cost) != 0:
logger.warning(
f"scoring for car has dailyMonetaryConstant of {daily_cost}. "
"It should be 0 to avoid double dailyMonetaryCost in case a plan has both car and drsDriver modes."
"Use drs.carAndDrsDailyMonetaryConstant instead"
)
driver_mode_q = (
"parameterset[@type='modeParams']/param[@name='mode'][@value='drsDriver']"
)
driver_mode_missing = len(scoring.findall(driver_mode_q)) == 0
if driver_mode_missing:
logger.info("-> add drsDriver modeParams")
params = etree.fromstring(SCORING_MODEPARAMS_DRIVER, parser=CLEAN_PARSER)
scoring.append(params)
rider_mode_q = (
"parameterset[@type='modeParams']/param[@name='mode'][@value='drsRider']"
)
rider_mode_missing = len(scoring.findall(rider_mode_q)) == 0
if rider_mode_missing:
logger.info("-> add drsRider modeParams")
params = etree.fromstring(SCORING_MODEPARAMS_RIDER, parser=CLEAN_PARSER)
scoring.append(params)
activity_q = "parameterset[@type='activityParams']/param[@activityType='drsDriver interaction']"
activity_missing = len(scoring.findall(activity_q)) == 0
if activity_missing:
logger.info("-> add drsDriver activityParams")
params = etree.fromstring(SCORING_ACTIVITYPARAMS_DRIVER, parser=CLEAN_PARSER)
scoring.append(params)
if __name__ == "__main__":
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("in_file", type=Path)
parser.add_argument("out_file", type=Path)
args = vars(parser.parse_args())
logger.info(f"reading config from {args["in_file"]}")
tree = etree.parse(args["in_file"], CLEAN_PARSER)
add_drs_module(tree)
adjust_qsim(tree)
adjust_replanning(tree)
adjust_routing(tree)
adjust_scoring(tree)
tree.write(
args["out_file"],
encoding=tree.docinfo.encoding,
pretty_print=True,
xml_declaration=True,
)
logger.info(f"adjusted config written to {args["out_file"]}")