forked from JeremyGrosser/noaaport
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathwiredown.py
More file actions
executable file
·211 lines (181 loc) · 7.05 KB
/
wiredown.py
File metadata and controls
executable file
·211 lines (181 loc) · 7.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
#!/awips2/python/bin/python
import logging
import socket
import os.path
import os
import sys
import shutil
import nbs
import time
# For edex notifications
from ufpy.qpidingest import *
warnprintdir = "/awips2/local/spool/warnprint"
noticeprintdir = "/awips2/local/spool/noticeprint"
class Streamer(object):
def __init__(self, hosts):
self.hosts = hosts
self.log = logging.getLogger('nbs')
self.log.setLevel(logging.DEBUG)
self.files = {}
self.count = 0
self.basedir = "/awips2/data_store"
def stream(self, host):
self.log.info('Connecting to %s:%i' % host)
self.sock = socket.socket()
self.sock.connect(host)
return nbs.Connection(self.sock)
def reliable_stream(self):
i = 0
while True:
try:
for packet in self.stream(self.hosts[i]):
yield packet
except Exception, e:
self.log.error('Stream error %r: %s' % (self.hosts[i], str(e)))
i = (i + 1) % len(self.hosts)
def handle_incoming(self, filename, content):
self.count += 1
del self.files[filename]
self.log.debug(filename)
ccblen = 2 * (((ord(content[0]) & 63) << 8) + ord(content[1]))
hdr = content[ccblen:].splitlines()
# initialize these to gibberish just in case
# processing the header fails
(wmo,site,ts) = ('NPHX99', 'UNKN', time.strftime("%d%H%M"))
pil = "NONPIL"
try:
(wmo,site,ts) = hdr[0].split(' ')[0:3]
pil = hdr[2]
except:
print "WTF\n\tHDR 1: %s\n\tHDR 2: %s\n\tHDR 3: %s\n\tHDR 4: %s\n\tHDR 5: %s\n" % (hdr[0], hdr[1], hdr[2], hdr[3], hdr[4])
if pil == '@pil':
# KNCF is currently dropping in @pil as the pilcode
pil = 'MONMSG'
if wmo[0:2] == 'NT':
outdir = "{BASEDIR}/tstmsg/{SITE}".format(BASEDIR=self.basedir, SITE=site)
outfile = "{WMO}_{SITE}_{DATE}_{WALLTIME}".format(WMO=wmo, SITE=site, DATE=ts, WALLTIME=time.strftime("%Y%m%d%H%M%S"))
elif pil == 'MONMSG':
outdir = "{BASEDIR}/tstmsg/{SITE}".format(BASEDIR=self.basedir, SITE=site)
outfile = "{WMO}_{SITE}_{DATE}_{WALLTIME}".format(WMO=wmo, SITE=site, DATE=ts, WALLTIME=time.strftime("%Y%m%d%H%M%S"))
else:
outdir = "{BASEDIR}/nwws/{DATE}/{HR}/{SITE}".format(BASEDIR=self.basedir, DATE=time.strftime("%Y%m%d"), HR=time.strftime("%H"), SITE=site)
outfile = "{WMO}_{SITE}_{PIL}_{DATE}.txt".format(WMO=wmo, SITE=site, PIL=pil.replace(' ', '_'), DATE=ts)
odof = "%s/%s" % (outdir,outfile)
if not os.path.exists(outdir):
os.makedirs(outdir)
fd = file(odof, 'w')
fd.write(content[ccblen:])
fd.flush()
fd.close()
npcode = "%s /p%s" % (hdr[0], pil)
self.log.debug("%s (%s)" % (odof, npcode))
z.sendmessage(odof, npcode)
if wmo[0] == 'W':
do_warn(wmo, site, pil, content[ccblen:])
if wmo[1] == 'N':
do_notice(wmo, site, pil, content[ccblen:])
def run(self):
for packet in self.reliable_stream():
if not packet.filename in self.files:
self.files[packet.filename] = nbs.FileAssembler(
packet.filename, self.handle_incoming)
assembler = self.files[packet.filename]
assembler.add_part(packet)
#self.log.debug(repr(packet))
"""
The idea with the do_{warn,canwarn,notice} functions is that they would be used
for immediate and special handling of warnings and notices. Presently, I think
I'll have them print out. In the future, we may do something crazy involving
blinkenlights and buzzers.
Since the WMO codes are highly regular, it is much easier to determine if a
product is actually important. First letter = 'W' is a warning. First letter
is 'N', it's a notice. Second letter of notice is 'W'? Well, that's a notice
about a warning, which is generally a cancellation.
See https://www.wmo.int/pages/prog/www/ois/Operational_Information/Publications/WMO_386/AHLsymbols/TableB1.html
for all the goodies
Where WMO codes break down and the PIL takes over is when we see things
like localized forecasts. KJAN will produce numerous reports for a
whole slew of products for various cities and they'll have the same WMO
code. For example, JAN issues CLI products for several cities under
the same WMO code:
fxatext=# select distinct cccid, nnnid, site, wmoid, xxxid, bbbid, nnnid || xxxid as pil from stdtextproducts where nnnid = 'CLI' and site = 'KJAN' and xxxid != 'JAN' ;
cccid | nnnid | site | wmoid | xxxid | bbbid | pil
-------+-------+------+--------+-------+-------+--------
JAN | CLI | KJAN | CDUS44 | GLH | | CLIGLH
JAN | CLI | KJAN | CDUS44 | HBG | | CLIHBG
JAN | CLI | KJAN | CDUS44 | MEI | | CLIMEI
JAN | CLI | KJAN | CDUS44 | TVR | | CLITVR
JAN | CLI | KJAN | CDUS44 | GWO | | CLIGWO
(5 rows)
"""
def do_tstmsg(wmo,site,pil,content):
"""
Special handling for test messages
"""
print "* * * %s sent a test message * * *" % site
return True
def do_warn(wmo,site,pil,content):
"""
Writes a warning to the warning print spool
"""
# Special handling for test messages
if wmo[0:2] == 'NT':
do_tstmsg(wmo,site,pil,content)
return True
elif pil == 'MONMSG':
do_tstmsg(wmo,site,pil,content)
return True
if wmo[0:2] == 'NW':
print "* * * WARNING CANCELLATION * * *"
else:
print "* * * W A R N I N G R E C E I V E D * * *"
print content
ts = time.strftime("%Y%m%d%H%M%S")
fp = open("%s/%s_%s_%s.txt" % (warnprintdir,wmo,pil.strip(),ts), 'w')
fp.write(content)
fp.flush()
fp.close()
return True
def do_canwarn(wmo,site,pil,content):
"""
Writes a warning cancellation to the warning print spool.
"""
do_warn(wmo,site,pil,content)
def do_notice(wmo,site,pil,content):
"""
Writes a notice to the notice print spool, unless it is
a warning cancellation, in which case we call do_canwarn
"""
# Special handling for test messages
if wmo[0:2] == 'NT':
do_tstmsg(wmo,site,pil,content)
return True
elif pil == 'MONMSG':
do_tstmsg(wmo,site,pil,content)
return True
if wmo[1] == 'W':
# Special handling for warning cancellations
do_canwarn(wmo,site,pil,content)
return True
print "* * * NOTICE RECEIVED * * *"
print content
ts = time.strftime("%Y%m%d%H%M%S")
fp = open("%s/%s_%s_%s.txt" % (noticeprintdir,wmo,pil.strip(),ts), 'w')
fp.write(content)
fp.flush()
fp.close()
return True
def main():
(host,port) = ('w.nbsp.inoaaport.net', 2210)
s = Streamer([(host, port)])
s.run()
if __name__ == '__main__':
try:
os.makedirs(warnprintdir)
os.makedirs(noticeprintdir)
except:
# If you can't write to /awips2, you're probably going to have a bad time
None
os.environ['TZ'] = 'GMT'
z = IngestViaQPID()
main()