-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathphasing.py
More file actions
233 lines (186 loc) · 8.3 KB
/
phasing.py
File metadata and controls
233 lines (186 loc) · 8.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
#!/usr/bin/env python3
# Copyright 2021 Daniel Estevez <daniel@destevez.net>
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
from astropy.time import Time, TimeDelta
from astropy.coordinates import SkyCoord, AltAz, ICRS, ITRS,\
CartesianRepresentation
import astropy.units as u
import astropy.constants as const
import numpy as np
def compute_uvw(ts, source, ant_coordinates, ref_coordinates):
"""Computes UVW antenna coordinates with respect to reference
Args:
ts: array of Times to compute the coordinates
source: source SkyCoord
ant_coordinates: antenna ECEF coordinates.
This is indexed as (antenna_number, xyz)
ref_coordinates: phasing reference centre coordinates.
This is indexed as (xyz)
Returns:
The UVW coordinates in metres of each of the baselines formed
between each of the antennas and the phasing reference. This
is indexed as (time, antenna_number, uvw)
"""
baselines_itrs = ant_coordinates - ref_coordinates
# Calculation of vector orthogonal to line-of-sight
# and pointing due north.
north_radec = [source.ra.deg, source.dec.deg + 90]
if north_radec[1] > 90:
north_radec[1] = 180 - north_radec[1]
north_radec[0] = 180 + north_radec[0]
north = SkyCoord(ra=north_radec[0]*u.deg, dec=north_radec[1]*u.deg)
source_itrs = source.transform_to(ITRS(obstime=Time(ts))).cartesian
north_itrs = north.transform_to(ITRS(obstime=Time(ts))).cartesian
east_itrs = north_itrs.cross(source_itrs)
ww = baselines_itrs @ source_itrs.xyz.value
vv = baselines_itrs @ north_itrs.xyz.value
uu = baselines_itrs @ east_itrs.xyz.value
uvw = np.stack((uu.T, vv.T, ww.T), axis=-1)
return uvw
def compute_uvw_altaz(ts, source, ant_coordinates, ref_coordinates):
"""Computes UVW antenna coordinates with respect to reference
for and altaz source
Args:
ts: Time to compute the coordinates
source: source SkyCoord
ant_coordinates: antenna ECEF coordinates.
This is indexed as (antenna_number, xyz)
ref_coordinates: phasing reference centre coordinates.
This is indexed as (xyz)
Returns:
The UVW coordinates in metres of each of the baselines formed
between each of the antennas and the phasing reference. This
is indexed as (time, antenna_number, uvw)
"""
baselines_itrs = ant_coordinates - ref_coordinates
# Calculation of vector orthogonal to line-of-sight
# and pointing due north.
loc = source.location
# Vector pointing to north pole
N = SkyCoord(ra=0*u.deg, dec=90*u.deg, location = loc,
obstime=ts).transform_to(ITRS(obstime=ts))
N = N.cartesian.xyz.value
# This is the source vector
V = source.transform_to(ITRS(obstime=ts)).cartesian.xyz.value
k = N - np.dot(V, N)*V
# Normalise
nn = np.sqrt((k**2).sum())
k /= nn
source_itrs = source.transform_to(ITRS(obstime=Time(ts))).cartesian
north_itrs = SkyCoord(ITRS(k)).cartesian
east_itrs = north_itrs.cross(source_itrs)
ww = baselines_itrs @ source_itrs.xyz.value
vv = baselines_itrs @ north_itrs.xyz.value
uu = baselines_itrs @ east_itrs.xyz.value
uvw = np.stack((uu.T, vv.T, ww.T), axis=-1)
return uvw
def compute_antenna_gainphase(uvw, delays, freq, n_ch, ch_bw, const_phase):
"""Computes the gain phase correction for each antenna
Args:
uvw: UVW coordinates of the antenna with respect to the reference
in metres, indexed as (time, antenna, uvw).
delays: cable delays of each antenna in seconds, indexed as
(time, antenna). These should be the residual delays in case
any delay correction has been applied in the receiver.
freq: sky frequency corresponding to the centre of the passband in Hz
n_ch: number of frequency channels
ch_bw: bandwidth or sample rate of each channel in Hz
phase: phase of each antenna, in radians
(antenna)
Returns:
The gain phase correction (as a complex number of modulus one)
corresponding to each time and antenna, indexed as
(time, antenna, channel)
"""
w_seconds = uvw[..., 2] / const.c.value
w_cycles = w_seconds * freq
phase = np.exp(-1j*2*np.pi*w_cycles)[..., np.newaxis]
delays = (delays - w_seconds)[..., np.newaxis]
ch_idx = np.arange(-n_ch//2, n_ch//2)
phase_slope = np.exp(1j*2*np.pi*delays*ch_idx*ch_bw)
const_phase = np.array(const_phase)[np.newaxis, ..., np.newaxis]
return phase * phase_slope * np.exp(1j*const_phase)
def compute_baseline_gainamplitude(w, delays, n_ch, ch_bw):
"""Computes the gain amplitude correction of a single baseline
The (non-closing) gain amplitude correction compensates correlation
losses due to non-overlapping samples in the correlation caused by
uncorrected delays.
Args:
w: W coordinates of the baseline in metres, indexed as
by time.
delays: cable delays of the baseline, defined as the delay of
antenna 1 minus the delay of antenna 2, indexed by time
n_ch: number of frequency channels
ch_bw: bandwidth or sample rate of each channel in Hz
Returns:
The gain amplitude correction corresponding to each time and
baseline, indexed by time
"""
w_seconds = w / const.c.value
delays = delays - w_seconds
amplitude_loss = np.abs(delays * ch_bw)
return 1 / (1 - amplitude_loss)
def apply_phasing(ts, corrs, source, freq, ch_bw, ant_coordinates,
delays, ref_coordinates=None):
"""Applies in-place the phasing to the correlation matrix
The correlation matrix is modified in place. The UVW coordinates are
returned.
Args:
ts: array of Times to compute the coordinates
source: source SkyCoord
corrs: correlation matrix, indexed as
(time, channel, baseline, polarization)
freq: sky frequency corresponding to the centre of the passband in Hz
ch_bw: bandwidth or sample rate of each channel in Hz
ant_coordinates: antenna ECEF coordinates.
This is indexed as (antenna_number, xyz)
delays: cable delays of each antenna in seconds, either indexed as
(time, antenna) or (antenna) if they are time-invariant. These
should be the residual delays in case any delay correction has
been applied in the receiver.
ref_coordinates: phasing reference centre coordinates.
This is indexed as (xyz). By default, the average of the antenna
coordinates is taken.
Returns:
The UVW coordinates in metres of each of the baselines. This
is indexed as (time, baseline, uvw)
"""
if delays.ndim == 1:
delays = np.repeat(delays[np.newaxis, :], ts.size, axis=0)
if ref_coordinates is None:
ref_coordinates = np.average(ant_coordinates, axis=0)
uvw = compute_uvw(ts, source, ant_coordinates, ref_coordinates)
uvw_baseline = np.zeros((ts.size, corrs.shape[2], 3))
n_ch = corrs.shape[1]
gainphase = compute_antenna_gainphase(
uvw, delays, freq, n_ch, ch_bw)
baselines = [(j, k) for j in range(ant_coordinates.shape[0])
for k in range(j+1)]
for j, b in enumerate(baselines):
if b[0] == b[1]:
# No need to do anything for autocorrelations.
# The corresponding uvw_baseline is already initialized
# to zero.
continue
corrs[:, :, j, :] *= gainphase[:, b[0], :][..., np.newaxis]
corrs[:, :, j, :] *= np.conjugate(
gainphase[:, b[1], :][..., np.newaxis])
uvw_baseline[:, j, :] = uvw[:, b[0]] - uvw[:, b[1]]
gainamp = compute_baseline_gainamplitude(
uvw_baseline[:, j, 2], delays[:, b[0]] - delays[:, b[1]],
n_ch, ch_bw)
corrs[:, :, j, :] *= gainamp[:, np.newaxis, np.newaxis]
return uvw_baseline
def midpoint_timestamps(t0, n, T):
"""Returns the timestamps corresponding to the midpoint of each interval
Args:
t0: initial timestamp (Time)
n: total number of timestamps (int)
T: interval duration (seconds)
Returns:
An array with the Time's t0 + T/2, t0 + 3T/2, ...
"""
T = TimeDelta(T, format='sec')
return t0 + T/2 + np.arange(n) * T