ULP-ActivitySupport – blob

You can use Git to clone the repository via the web URL. Download snapshot (zip)
Take mobility path width requirements into account when routing
[ULP-ActivitySupport] / simulator.py
1 #!/usr/bin/env python3\r
2 \r
3 import datetime\r
4 import math\r
5 import os\r
6 import threading\r
7 import time\r
8 \r
9 from PIL import Image\r
10 \r
11 from data import Position, Intent, User, SmartUrbanObject, Context\r
12 \r
13 \r
14 class ContextSimulator:\r
15     def __init__(self, context, routing_engine):\r
16         self.context = context\r
17         self.routing_engine = routing_engine\r
18         self.send_info_to_device = lambda suo_id, msg: None\r
19         self._frame_counter = 0\r
20         self.last_user_intent = {}\r
21         self.last_user_mobility = {}\r
22         self.loop_user_intents = {}\r
23         self.last_device_update = {}\r
24         self.mobility_speed = { # in m/s\r
25             'scooter': 3,\r
26             'wheelchair': 1.5,\r
27             'walker': 1.5,\r
28             'foot': 1,\r
29         }\r
30 \r
31     def set_messaging_callback(self, callback_function):\r
32         self.send_info_to_device = callback_function\r
33 \r
34     def tick(self):\r
35         for uid in self.context.user:\r
36             if uid not in self.loop_user_intents:\r
37                 self.loop_user_intents[uid] = False\r
38             user = self.context.user[uid]\r
39             if len(user.intent) < 1:\r
40                 self.routing_engine.clear_user_route(uid)\r
41                 continue\r
42             walking_speed = self.mobility_speed.get(user.mobility) or 1.0\r
43             step_length = walking_speed / 5 # simulations runs every 200ms\r
44             intent = user.intent[sorted(user.intent.keys())[0]]\r
45             if self.routing_engine.get_user_route(uid) is None or intent != self.last_user_intent.get(uid) or user.mobility != self.last_user_mobility.get(uid):\r
46                 goal = { 'x': intent.position.x, 'y': intent.position.y }\r
47                 self.routing_engine.set_user_route(uid, goal)\r
48                 self.last_user_intent[uid] = intent\r
49                 self.last_user_mobility[uid] = user.mobility\r
50             new_position = { 'x': user.position.x, 'y': user.position.y }\r
51             if hasattr(user, 'paused') and user.paused:\r
52                 continue\r
53             route = self.routing_engine.get_user_route(uid)\r
54             while step_length > 0 and route is not None and len(route) > 0:\r
55                 distance = self.routing_engine.get_coords_distance(new_position, route[0])\r
56                 if distance < float(route[0]['r']):\r
57                   del route[0]\r
58                   continue\r
59                 if distance < step_length:\r
60                     new_position = route[0]\r
61                     self.routing_engine.shift_user_route(uid)\r
62                     if self.routing_engine.get_user_route(uid) is None:\r
63                         break\r
64                     step_length -= distance\r
65                 else:\r
66                     direction = (float(route[0]['x']) - float(new_position['x']), float(route[0]['y']) - float(new_position['y']))\r
67                     scaling_factor = step_length / distance\r
68                     new_position = {\r
69                         'x': float(new_position['x']) + scaling_factor * direction[0],\r
70                         'y': float(new_position['y']) + scaling_factor * direction[1]\r
71                     }\r
72                     step_length = 0\r
73             new_pos = Position(x = float(new_position['x']), y = float(new_position['y']))\r
74             overall_direction = user.position.difference_to(new_pos)\r
75             user.position = new_pos\r
76             if overall_direction[1] == 0.0:\r
77                 angle = 90\r
78                 if overall_direction[0] < 0:\r
79                     angle = 270\r
80             else:\r
81                 angle = math.degrees(-1*math.atan(overall_direction[0] / overall_direction[1]))\r
82             if overall_direction[1] > 0:\r
83                 angle += 180\r
84             if angle < 0.0:\r
85                 angle += 360\r
86             user.set_rotation(angle)\r
87             distance_to_goal = user.position.distance(intent.position)\r
88             if distance_to_goal < 0.25:\r
89                 if self.loop_user_intents[uid] and not hasattr(user, 'original_intent'):\r
90                     user.intent[max(user.intent.keys())+1] = user.intent[sorted(user.intent.keys())[0]]\r
91                 del user.intent[sorted(user.intent.keys())[0]]\r
92                 self.routing_engine.clear_user_route(uid)\r
93             if uid not in self.last_device_update:\r
94                 self.last_device_update[uid] = {}\r
95             suos_on_route = self.routing_engine.get_suos_on_route(uid)\r
96             for suo_id in self.context.suo:\r
97                 last_update = self.last_device_update[uid].get(suo_id)\r
98                 if last_update is not None and datetime.datetime.utcnow() - last_update < datetime.timedelta(seconds = 1):\r
99                     continue\r
100                 suo = self.context.suo[suo_id]\r
101                 if suo.is_user_in_range(user):\r
102                     msg = {\r
103                         'user': uid,\r
104                         'activity': 'routing',\r
105                         'seconds_until': -1.0,\r
106                     }\r
107                     self.send_info_to_device(suo_id, msg)\r
108                     if not suo_id in self.last_device_update[uid]:\r
109                         self.last_device_update[uid][suo_id] = datetime.datetime.utcnow()\r
110                 else:\r
111                     if suo_id in suos_on_route:\r
112                         seconds_until = suos_on_route[suo_id] / walking_speed\r
113                         msg = {\r
114                             'user': uid,\r
115                             'activity': 'routing',\r
116                             'seconds_until': seconds_until,\r
117                         }\r
118                         self.send_info_to_device(suo_id, msg)\r
119                         if not suo_id in self.last_device_update[uid]:\r
120                             self.last_device_update[uid][suo_id] = datetime.datetime.utcnow()\r
121 \r
122     def loop_sync(self):\r
123         self.exit_loop = False\r
124         while not self.exit_loop:\r
125             self.tick()\r
126             time.sleep(200)\r
127 \r
128     def loop_async(self):\r
129         self.exit_loop = False\r
130         self.next_async_tick()\r
131 \r
132     def next_async_tick(self):\r
133         if not self.exit_loop:\r
134             self.tick()\r
135             bg_thread = threading.Timer(0.2, self.next_async_tick, ())\r
136             bg_thread.daemon = True\r
137             bg_thread.start()\r
138 \r