forked from SensibilityTestbed/sensorlib
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsensorlib.r2py
More file actions
118 lines (84 loc) · 3.01 KB
/
sensorlib.r2py
File metadata and controls
118 lines (84 loc) · 3.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
"""
sensorlib.repy -- This is the core of the sensor library.
Check out SL4A's *Facade's into Android sensors,
http://www.mithril.com.au/android/doc/index.html
Albert Rafetseder, Datenkugerl e.U., Austria
Yanyan Zhuang, UBC, Canada
Change log:
v 0.0.1, 20140101 1200 AR
Refactored off test_phone.repy, which itself was based on an older
rendition of test_sensors.repy
v 0.0.2, 20140215 1400 YZ
Refactored the core part out from demo code sensor_demo_script.r2py
v 0.0.3, 20140304 1400 YZ
Removed sensor dict, facade and methodlists. Users need to start/stop
certain sensing function by themselves, but this gives
them more flexibility.
"""
# constants and global variables
AP_PORT = 45678
mycontext['id'] = 0
class SensorException(RepyException):
"""This exception gets raised when reading a sensor results in an error.
"""
pass
def usage():
log("""
Something went wrong in your code..
""")
# we need a better log than above
def send_rpc_request(socket_object, method_name, parameter_list):
# Sanitize the parameters, None becomes an empty list
parameter_list = parameter_list or []
json_request_string = '{"params": ' + str(parameter_list) + ', "id": ' + \
str(mycontext['id']) + ', "method": "' + \
str(method_name) + '"}\n'
mycontext['id'] += 1
bytes_sent = 0
while bytes_sent<len(json_request_string):
sendstring = json_request_string[bytes_sent:]
bytes_sent += socket_object.send(sendstring)
#log(sendstring)
def get_rpc_response(socket_object, timeout=10):
received_string = ""
time_started = getruntime()
while getruntime() < time_started+timeout:
try:
chunk = socket_object.recv(1024)
received_string += chunk
except SocketWouldBlockError:
if received_string != "":
break
pass
except SocketClosedRemote:
pass
return received_string
def get_connectionport():
mycontext['ap_port'] = AP_PORT
return mycontext['ap_port']
def getconnection(ap_port):
ports = list(getresources()[0]['connport'])
for port in ports:
port = int(port)
try:
sl4a_socket = openconnection("127.0.0.1", ap_port, "127.0.0.1", port, 5)
log("Using port", port, "to get sensor data.\n")
return sl4a_socket
except (RepyArgumentError, AddressBindingError, ResourceForbiddenError, DuplicateTupleError, AlreadyListeningError, CleanupInProgressError, ConnectionRefusedError, TimeoutError, InternetConnectivityError), e:
log("Connection error: ", repr(e), '\n')
except Exception as e:
log("Using port", port, "had a problem:", repr(e), '\n')
continue
log("No ports available.\n")
exitall()
def request_data(sl4a_socket, method, args):
send_rpc_request(sl4a_socket, method, args)
response = get_rpc_response(sl4a_socket)
parsed = json_parse(str(response))
error = parsed['error']
if error != None:
log("Something went wrong!\n")
raise SensorException(error)
else:
result = parsed['result']
return result