-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgetRoutePackages.py
More file actions
242 lines (171 loc) · 7.2 KB
/
getRoutePackages.py
File metadata and controls
242 lines (171 loc) · 7.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
import json
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from utils import files, itineraries, packages, routes, utils
ITINERARIES = {}
def get_itinerary_packages_and_stops(env, orgId, itinerary, timestamp):
pkgs_and_stops = itineraries.get_itinerary_packages_and_stops(env, orgId, itinerary)
ITINERARIES[itinerary] = {"pkgsAndStops": pkgs_and_stops, "uniquePackages": set()}
# TODO: check why the fuck this is not printing every time.
print(
f">> {itinerary} have {len(pkgs_and_stops['loadedPackages'])} packages "
# TODO: fetch switchboard to check package status
# + f"{len(pkgs_and_stops['deliveryStops'])} stops "
+ f"({timestamp})"
)
for pkg in pkgs_and_stops["loadedPackages"]:
ITINERARIES[itinerary]["uniquePackages"].add(pkg)
def fetch_itineraries(events):
print(">> Fetching itineraries in Route Events")
itineraries = []
done = [
e["notes"] for e in events if "done. itinerary ids" in str(e["notes"]).lower()
]
publishing_stamps = [
e["timestamp"]
for e in events
if "publishing to oegr" in str(e["notes"]).lower()
]
if len(done) > 0:
for i in range(len(done)):
note = done[i]
start = str(note).find("[")
end = str(note).find("]")
# Gets [ >> fac1ab60-fd7a-4ff9-8798-e9167060295c << ] or "" it there is no itinerary id
iti = str(note)[start + 1 : end].strip()
itineraries.append(iti)
return itineraries, publishing_stamps
def no_itinerary_generated(itinerary_ids):
return len([i for i in itinerary_ids if str(i).strip() != ""]) == 0
def get_routes(env, orgId):
routeId = "init"
rts = set()
updated = False
while True:
print(f"\n>> Routes Selected So Far: {' '.join(rts)}")
if utils.select_answer(">> Do you want to provide another route?") == "Y":
print(">> Type the new HUB's name (leave blank and hit enter if done)")
routeId = get_route_from_user(env, orgId)
rts.add(routeId)
updated = True
elif len(rts) > 0 and updated: # means the user entered a blank line
break
return list(rts)
def get_route_from_user(env, orgId):
answer = utils.select_answer("> Do you have a routeId? ")
routeId = None
if answer == "Y":
routeId = input("> Input the route ID: ")
else:
while routeId is None:
hubName = input("Type the hub (only numbers)\n> ").strip()
cpt = datetime.strptime(
input("Type in the date (yyyy-mm-dd)\n> ").strip(), "%Y-%m-%d"
)
cpt = cpt.strftime("%Y-%m-%d")
routeName = input("Type the route name\n> ").strip()
route = routes.find_route(env, orgId, routeName, hubName, cpt)
if route is None:
print(">> Route not Found, please try again\n")
print("--------------------------------")
else:
routeId = route["routeId"]
print(f">> Route found: {routeId}\n")
return routeId
def print_itineraries(itinerary_ids):
print(f">> Found {len(itinerary_ids)} itineraries:")
print(f">> {' '.join([f'[ {iti} ]' for iti in itinerary_ids])}")
def sync_itineraries_and_response(response, itinerary_ids):
for itinerary in itinerary_ids:
response["itineraries"].append(ITINERARIES[itinerary].get("pkgsAndStops"))
for pkg in ITINERARIES[itinerary]["uniquePackages"]:
response["uniquePackages"].add(pkg)
# response["uniquePackages"] = list(response["uniquePackages"])
return response
def process_itineraries(env, orgId, itinerary_ids, timestamps, routeId):
response = {"routeId": routeId, "itineraries": [], "uniquePackages": set()}
# get all the data for each of them concurrently / in parallel
with ThreadPoolExecutor() as pool:
for i in range(len(itinerary_ids)):
itinerary = itinerary_ids[i]
timestamp = timestamps[i]
# print(f">> Checking itinerary '{itinerary}'")
if str(itinerary).strip() != "":
pool.submit(
get_itinerary_packages_and_stops, env, orgId, itinerary, timestamp
)
else:
print(f">> [] have 0 packages and 0 stops ({timestamp})")
pool.shutdown(True)
response = sync_itineraries_and_response(response, itinerary_ids)
print(
f">> {len(response['uniquePackages'])} unique packages found across itineraries"
)
return response
def get_pkgs_from_alamo(env, routeId):
response = routes.get_stop_details(env, routeId)
print(">> Searching for packages with Alamo's Stop Details")
if response.get("routeStopDetail") is None:
print(response.get("message"))
return set()
stops = response["routeStopDetail"]["stops"]
pids = set()
for stop in stops:
stopPackages = stop["stopPackages"]
for pkgs in stopPackages:
pid = pkgs.get("packageId")
if pid is not None:
pids.add(pid)
print(f">> Found {len(pids)} packages\n")
return pids
def main(env, orgId, rts=None) -> dict[str, list]:
if rts is None:
rts = get_routes(env, orgId)
else:
print(">> These routes were provided:")
print("\n".join(rts))
if utils.select_answer("Do you wish to continue with them?") == "N":
rts = get_routes(env, orgId)
final_response = {}
for routeId in rts:
print(f'{"{:=<50}".format("")}') # prints 50 `=` characters aligned to the left
print(f">> Route ID: {routeId}")
pids = set()
# get packages from sortation (issue is pkgs are not getting to itinerary)
pkgs_sortation = packages.get_route_packages_sortation(env, orgId, routeId)
for pkg in pkgs_sortation:
pids.add(pkg["packageID"])
# get packages from route stop details
pkgs_alamo = get_pkgs_from_alamo(env, routeId)
for pkg in pkgs_alamo:
pids.add(pkg)
# get packages from itineraries
events = routes.get_route_events(env, routeId)
if events is not None:
itinerary_ids, timestamps = fetch_itineraries(events)
print(f">> Found {len(itinerary_ids)} itineraries")
# print_itineraries(itinerary_ids)
# if at least one itinerary was generated
if not no_itinerary_generated(itinerary_ids):
response = process_itineraries(
env, orgId, itinerary_ids, timestamps, routeId
)
for pid in response["uniquePackages"]:
pids.add(pid)
# sets are not json compatible, so need to convert to list
final_response[routeId] = list(pids)
print(f">> Total of {len(final_response[routeId])} unique packages")
print()
return final_response
if __name__ == "__main__":
env = utils.select_env()
orgId = utils.select_org(env)
response = main(env, orgId)
print(">> Results saved in the file below")
files.save_json_to_file(
json.dumps(
response,
indent=2,
),
"ROUTE_PKGS",
)