Skip to content
21 changes: 17 additions & 4 deletions config/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,31 @@ PLOT:
orig_chirp: *ch_sent # Chirp associated with receive data
sample_rate: *s_rate
sig_speed: 3e8 # Speed of signal through medium [m/s]
direct_start: &dir_start 20 # Start search for direct path @ this sample
internal_start: &int_start 20 # Start search for internal path @ this sample
echo_start: 10 # Start search for echo @ this # of samps. after direct path

# --------------------------- Noise Test -----------------------------#
NOISE:
rx_samps: *save_loc # Receive data to use
rx_samps: *save_loc # Receive data to use
orig_chirp: *ch_sent # Chirp associated with receive data
sample_rate: *s_rate
noise_std: 6 # Standard deviation for white noise generation
#coherent_sums: *coh_sums # Number of sums to eliminate noise.
direct_start: *dir_start # Sample at which to start search for direct path
num_pulses: *num_pulses
num_presums: *num_presums
internal_start: *int_start # Sample at which to start search for direct path
show_graphs: False # For debugging
describe: False # For debugging

# --------------------------- Windowing Test ------------------------#
WINDOW:
orig_chirp: *ch_sent # Generated chirp
window_type: "Hann" # Options: Check scipy.signal.windows. TODO: Implement this.
sample_rate: *s_rate
amp_one: 1 # Simulation: amplitude of 1st chirp
amp_two: 0.2 # Simulation: amplitude of 2nd chirp
delay: 0.2 # Simulation: time delay between chirps [ms]
noise_std: 0.025 # Simulation: magnitude of thermal noise
loopback: "archive/loopback.bin" # Simulation: ideal loopback location
show_graphs: True # For debugging
describe: True # For debugging

33 changes: 19 additions & 14 deletions postprocessing/noise_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# statement so that you can set the value of average_before_save via yaml)
# Assumes that SDR configuration is loopback.
import numpy as np
import math
import numpy.random as rand
import scipy.signal as sp
import matplotlib.pyplot as plt
Expand All @@ -19,31 +20,35 @@
rx_samps = noise_params["rx_samps"]
orig_chirp = noise_params["orig_chirp"]
noise_std = noise_params["noise_std"]
coh_sums = noise_params["coherent_sums"]
direct_start = noise_params["direct_start"]
pulses = noise_params["num_pulses"]
presums = noise_params["num_presums"]
internal_start = noise_params["internal_start"]
show_graphs = noise_params["show_graphs"]
describe = noise_params["describe"]

coh_sums = math.ceil(pulses/presums) # If I understand what main.cpp is doing

print("--- Loaded constants from config.yaml ---")

# Open received data
print("--- Opening data and determining direct path peak for first signal---")
print("--- Opening data and determining internal path peak for first signal---")
rx_sig = pr.extractSig(rx_samps)
#rx_sig = np.divide(rx_sig, presums)
n_rx_samps = int(np.shape(rx_sig)[0])
if (show_graphs): pr.plotSignal(rx_sig, 'Loopback Test', sample_rate)
if (show_graphs): pr.plotSigVsTime(rx_sig, 'Loopback Test', sample_rate)

# Read original chirp
tx_sig = pr.extractSig(orig_chirp)
n_tx_samps = int(np.shape(tx_sig)[0])
if (show_graphs): pr.plotSignal(tx_sig, 'Original Chirp', sample_rate)
if (show_graphs): pr.plotSigVsTime(tx_sig, 'Original Chirp', sample_rate)

# Read the first chirp in received data -- this is the "ideal" signal for SNR
samps_per = int(n_rx_samps / coh_sums)
rx_ideal = rx_sig[0:samps_per]
xcorr_ideal = np.abs(sp.correlate(rx_ideal, tx_sig, mode='valid', method='auto'))
dir_peak = pr.findDirectPath(xcorr_ideal, direct_start, describe)
internal_peak = pr.findInternalPath(xcorr_ideal, internal_start, describe)

if (show_graphs): pr.plotChirpVsTime(rx_ideal, "'Ideal' Signal", sample_rate)
if (show_graphs): pr.plotSigVsTime(rx_ideal, "'Ideal' Signal", sample_rate)

# Split received data into individual signals
print("--- Splitting rx signal into individual chirps ---")
Expand All @@ -63,11 +68,11 @@
snrs = np.zeros(coh_sums)
for sig in signals:

# Match filter signal & determine direct path peak to ensure alignment with ideal signal
# Match filter signal & determine internal path peak to ensure alignment with ideal signal
if (describe): print("\n--- Beginning Processing of Signal %d ---" % count)
xcorr_sig = np.abs(sp.correlate(sig, tx_sig, mode='valid', method='auto'))

aligned = (dir_peak == pr.findDirectPath(xcorr_sig, direct_start, describe))
aligned = (internal_peak == pr.findInternalPath(xcorr_sig, internal_start, describe))
if (describe): print("\tIs Signal %d aligned with the first? %r" % (count, aligned))
if (not aligned):
if describe: print("--- Skipping signal %d due to poor alignment with first signal ---", count)
Expand All @@ -92,7 +97,7 @@
rx_time[x] = x/sample_rate
rx_time *= 1e6

pr.plotChirpVsTime(rx_noise, "Individual Signal With Noise", sample_rate)
pr.plotSigVsTime(rx_noise, "Individual Signal With Noise", sample_rate)

xcorr_noise = np.abs(sp.correlate(rx_noise, tx_sig, mode='valid', method='auto'))
xcorr_noise_samps = np.shape(xcorr_noise)[0]
Expand Down Expand Up @@ -122,13 +127,13 @@
count += 1

print("\n--- Plotting total signal after %d coherent summations ---" % coh_sums)
pr.plotChirpVsTime(avg_total, "Total after %d summations" % coh_sums, sample_rate)
pr.plotSigVsTime(avg_total, "Total After %d Averaged Summations" % coh_sums, sample_rate)

print("--- Plotting match filter of total signal after %d coherent summations ---" % coh_sums)
xcorr_total = np.abs(sp.correlate(avg_total, tx_sig, mode='valid', method='auto'))
plt.figure()
plt.plot(xcorr_noise_time, 20*np.log10(xcorr_total)) # xcorr_noise and xcorr_total are the same shape
plt.title("Match-filter of %d coherent summations" % coh_sums)
plt.title("Match-filter of Total After %d Averaged Summations" % coh_sums)
plt.xlabel('Time (ms)')
plt.ylabel('Power [dB]')
plt.show()
Expand All @@ -137,8 +142,8 @@
plt.figure()
plt.plot(np.add(1, range(coh_sums)), snrs)
plt.xlabel("Number of sums")
plt.ylabel("Calculated SNR")
plt.title("Signal to Noise Ratio versus Number of Coherent Sums")
plt.ylabel("SNR")
plt.title("Signal to Noise Ratio vs. Number of Coherent Sums")
plt.show()


Expand Down
14 changes: 7 additions & 7 deletions postprocessing/plot_samples.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
sample_rate = rx_params["sample_rate"] # Hertz
rx_samps = rx_params["rx_samps"] # Received data to analyze
orig_ch = rx_params["orig_chirp"] # Chirp associated with the received data
direct_start = rx_params["direct_start"]
internal_start = rx_params["internal_start"]
echo_start = rx_params["echo_start"]
sig_speed = rx_params["sig_speed"]

Expand All @@ -29,18 +29,18 @@
# Read and plot RX/TX
rx_sig = pr.extractSig(rx_samps)
print("--- Plotting real samples read from %s ---" % rx_samps)
pr.plotChirpVsTime(rx_sig, 'Received Samples', sample_rate)
pr.plotSigVsTime(rx_sig, 'Received Samples', sample_rate)

tx_sig = pr.extractSig(orig_ch)
print("--- Plotting transmited chirp, stored in %s ---" % orig_ch)
pr.plotChirpVsTime(tx_sig, 'Transmitted Chirp', sample_rate)
pr.plotSigVsTime(tx_sig, 'Transmitted Chirp', sample_rate)

# Correlate the two chirps to determine time difference
print("--- Match filtering received chirp with transmitted chirp ---")
xcorr_sig = sp.correlate(rx_sig, tx_sig, mode='valid', method='auto')
# as finddirectpath is written right now, it must be called before taking log of the signal
# because if not, negative log values could have a greater absolute value than positive log values.
dir_peak = pr.findDirectPath(xcorr_sig, direct_start, True)
internal_peak = pr.findInternalPath(xcorr_sig, internal_start, True)
xcorr_sig = 20 * np.log10(np.absolute(xcorr_sig))

print("--- Plotting result of match filter ---")
Expand All @@ -57,13 +57,13 @@
plt.grid()

plt.figure()
plt.plot(range(-10,60), xcorr_sig[dir_peak-10:dir_peak+60])
plt.plot(range(-10,60), xcorr_sig[internal_peak-10:internal_peak+60])
plt.title("Output of Match Filter: Peaks")
plt.xlabel('Sample')
plt.ylabel('Power [dB]')
plt.grid()

[echo_samp, echo_dist] = pr.findEcho(xcorr_sig, sample_rate, dir_peak, echo_start, sig_speed, True)
echo_samp = pr.findEcho(xcorr_sig, sample_rate, internal_peak, echo_start, sig_speed, True)

sys.stdout.flush()
plt.show()
plt.show()
149 changes: 127 additions & 22 deletions postprocessing/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def extractSig (filename):
# signal - an array of samples to plot (complex)
# title - the title of the signal (eg. TX chirp)
# sample_rate - the sampling rate of the signal
def plotChirpVsTime (signal, title, sample_rate):
def plotSigVsTime (signal, title, sample_rate):
"Plot a transmitted or received signal versus time."
n_rx_samps = np.shape(signal)[0]
rx_time = np.zeros(n_rx_samps)
Expand All @@ -31,64 +31,66 @@ def plotChirpVsTime (signal, title, sample_rate):

fig, axs = plt.subplots(2)
fig.suptitle(title)
axs[0].set(xlabel='Time (ms)', ylabel='Real Voltage')
axs[0].set(xlabel='Time (ms)', ylabel='Real Voltage [V]')
axs[0].plot(rx_time, np.real(signal))
axs[1].set(xlabel='Time (ms)', ylabel='Imaginary Voltage')
axs[1].set(xlabel='Time (ms)', ylabel='Imaginary Voltage [V]')
axs[1].plot(rx_time, np.imag(signal))
return

# This function plots a TX or RX complex chirp in an Voltage vs. Sample graph.
# -----
# signal - an array of samples to plot (complex)
# title - the title of the signal (eg. TX chirp)
def plotChirpVsSample (signal, title):
def plotSigVsSample (signal, title):
"Plot a transmitted or received signal versus sample."
fig, axs = plt.subplots(2)
fig.suptitle(title)
axs[0].set(xlabel='Sample', ylabel='Real Voltage')
axs[0].set(xlabel='Sample', ylabel='Real Voltage [V]')
axs[0].plot(np.real(signal))
axs[1].set(xlabel='Sample', ylabel='Imaginary Voltage')
axs[1].set(xlabel='Sample', ylabel='Imaginary Voltage [V]')
axs[1].plot(np.imag(signal))
return

# This function returns the sample number of the direct path peak
# in a match-filtered rx signal.
# -----
# correl_sig - the match-filtered signal to examine (complex)
# direct_start - start search for direct path peak at this sample (sample)
# internal_start - start search for internal path peak at this sample (sample)
# describe - whether to include a text description as code executes
def findDirectPath (correl_sig, direct_start, describe=True):
def findInternalPath (correl_sig, internal_start, describe=True):
"Find the direct path peak of a cross-correlated signal."

if (describe): print("--- Searching for direct path peak ---")
if (describe): print("\tStarting search for direct path peak at sample %d." % direct_start)
relevant_dir = correl_sig[direct_start:]
dir_peak = np.argmax(np.absolute(relevant_dir)) + direct_start
if (describe): print("\tThe direct path peak was found at sample %d with value %f." % (dir_peak, np.abs(correl_sig[dir_peak])))
return dir_peak
if (describe): print("--- Searching for internal path peak ---")
if (describe): print("\tStarting search for internal path peak at sample %d." % internal_start)
relevant_intrnl = correl_sig[internal_start:]
intrnl_peak = np.argmax(np.absolute(relevant_intrnl)) + internal_start
if (describe): print("\tThe internal path peak was found at sample %d with value %f." % (intrnl_peak, np.abs(correl_sig[intrnl_peak])))
return intrnl_peak

# This function returns strongest echo peak in a match-filtered rx signal
# and the estimated distance to the reflector. Searching is conducted by
# specifying the number of samples past the direct path peak to begin search.
# -----
# correl_sig - the match-filtered signal to examine
# sample_rate - the sampling rate of the signal (s/s)
# dir_peak - the location of the direct path peak (sample)
# itrnl_peak - the location of the internal path peak (sample)
# echo_start - start search for echo peak this number of samples past dir_peak
# sig_speed - the speed of the signal through the medium (eg. 3e8) (m/s)
# describe - whether to include a text description as code executes
def findEcho (correl_sig, sample_rate, dir_peak, echo_start, sig_speed, describe=True):
def findEcho (correl_sig, sample_rate, itrnl_peak, echo_start, sig_speed, describe=True):
"Find strongest echo in a correlated signal starting 'echo_start' samples past the direct path peak."
if (describe): print("--- Searching for echo peak based on sample ---")
if (describe): print("\tStarting search for echo peak at sample %d." % (dir_peak + echo_start))
relevant_ech_s = correl_sig[(dir_peak + echo_start):]
echo_peak = np.argmax(relevant_ech_s) + dir_peak + echo_start
if (describe): print("\tStarting search for echo peak at sample %d." % (itrnl_peak + echo_start))
relevant_ech_s = correl_sig[(itrnl_peak + echo_start):]
echo_peak = np.argmax(relevant_ech_s) + itrnl_peak + echo_start
if (describe): print("\tThe strongest echo peak was found at sample %d with value %f." % (echo_peak, correl_sig[echo_peak]))

# Now calculate echo distance (assuming direct path occurs at time 0)
echo_dist = ((echo_peak - dir_peak) / sample_rate) * sig_speed
if (describe): print("\tThe signal echoed off a surface %f meters away. \n" % (echo_dist / 2))
return [echo_peak, echo_dist]
# echo_dist = ((echo_peak - dir_peak) / sample_rate) * sig_speed
#if (describe): print("\tThe signal echoed off a surface %f meters away. \n" % (echo_dist / 2))
#return [echo_peak, echo_dist]

return echo_peak


# This function calculates the SNR of a signal knowing what the signal should
Expand All @@ -111,3 +113,106 @@ def getSNR (ideal, actual):

snr = avg_signal_pwr / avg_noise_pwr
return snr

def testWindowSimple(test_sig, tx_sig, sample_rate, window, window_type = "No"):
n_tx_samps = np.shape(tx_sig)[0]
island = np.concatenate((np.zeros(n_tx_samps - 2), tx_sig, np.zeros(n_tx_samps - 2)))

wind_message = "%s Window Applied" % window_type

plotSigVsTime(island, 'Zero-padded Original Chirp, %s' % wind_message, sample_rate)

# Cross correlation with no window applied
xcorr_raw = sp.correlate(island, tx_sig, mode='valid', method='auto')
xcorr_sig = 20 * np.log10(np.absolute(xcorr_raw))

xcorr_sig_samps = np.shape(xcorr_sig)[0]
xcorr_sig_time = np.zeros(xcorr_sig_samps)
for x in range (xcorr_sig_samps):
xcorr_sig_time[x] = x * 1e6 /sample_rate

plt.figure()
plt.plot(xcorr_sig_time, xcorr_sig)
plt.title("Cross Correlation of Generated Chirp with Itself, %s" % wind_message)
plt.xlabel('Time (ms)')
plt.ylabel('Power [dB]')
plt.show()


def simulateRealistic(tx_sig, dir_peak, n_lb_samps, noise_std, sample_rate, amp_one, amp_two, delay, window, window_type = "No", ):

wind_message = "%s Window Applied" % window_type

print("--- Generating two scaled, shifted loopback signals ---")
print("\tSimulating loopback signal without noise")
n_tx_samps = np.shape(tx_sig)[0]
if (window):
loop_part = np.concatenate((np.zeros(dir_peak + 1), np.multiply(np.real(tx_sig), sp.windows.hann(n_tx_samps)), np.zeros(n_lb_samps - dir_peak - 1 - n_tx_samps)))
else:
loop_part = np.concatenate((np.zeros(dir_peak + 1), np.real(tx_sig), np.zeros(n_lb_samps - dir_peak - 1 - n_tx_samps)))

plotSigVsTime(loop_part, "Real / Imag. Part of Simulated Loopback", sample_rate)

loop_sim = np.zeros(n_lb_samps, dtype=np.csingle)
for x in range(n_lb_samps):
loop_sim[x] = np.csingle(complex(loop_part[x], loop_part[x]))

white_noise = np.zeros(n_lb_samps, dtype=np.csingle)
for x in range(n_lb_samps):
noise_r = np.random.normal(0, noise_std)
noise_i = np.random.normal(0, noise_std)
white_noise[x] = np.csingle(complex(np.float32(noise_r), np.float32(noise_i)))

loop_one = np.add((loop_sim * amp_one), white_noise)

for x in range(n_lb_samps):
noise_r = np.random.normal(0, noise_std)
noise_i = np.random.normal(0, noise_std)
white_noise[x] = np.csingle(complex(np.float32(noise_r), np.float32(noise_i)))

samp_del = int((delay * 1e-6) * sample_rate)
print("\tSecond loopback signal shifted by %d samples compared to first." % samp_del)
loop_two = np.roll(np.add((loop_sim * amp_two), white_noise), samp_del)
simulated = loop_one + loop_two

plotSigVsTime(loop_one, "Simulated Direct Path", sample_rate)
plotSigVsTime(loop_two, "Simulated Echo Path", sample_rate)
plotSigVsTime(simulated, "Simulated Receive Signal, %s" % wind_message, sample_rate)

return simulated

def simulateCentered(tx_sig, noise_std, sample_rate, amp_one, amp_two, delay, window, window_type = "No"):

wind_message = "%s Window Applied" % window_type

print("--- Generating two scaled, shifted loopback signals ---")
print("\tSimulating loopback signal without noise")
n_tx_samps = np.shape(tx_sig)[0]
if (window):
loop_sig = np.multiply(np.real(tx_sig), sp.windows.hann(n_tx_samps))
else:
loop_sig = np.real(tx_sig)

plotSigVsTime(loop_sig, "Real / Imag. Part of Simulated Loopback", sample_rate)

n_loop_samps = 4*n_tx_samps
loop_part = np.zeros(n_loop_samps)
half_samp_del = int(((delay * 1e-6) * sample_rate)/2)

for x in range(n_tx_samps):
loop_part[int(n_loop_samps/2) - half_samp_del - int(n_tx_samps/2) + x] += amp_one * loop_sig[x]
loop_part[int(n_loop_samps/2) + half_samp_del - int(n_tx_samps/2) + x] += amp_two * loop_sig[x]

white_noise = np.zeros(n_loop_samps, dtype=np.csingle)
for x in range(n_loop_samps):
noise_r = np.random.normal(0, noise_std)
noise_i = np.random.normal(0, noise_std)
white_noise[x] = np.csingle(complex(np.float32(noise_r), np.float32(noise_i)))

simulated = np.zeros(n_loop_samps, dtype=np.csingle)
for x in range(n_loop_samps):
simulated[x] = np.csingle(complex(loop_part[x], loop_part[x])) + white_noise[x]

plotSigVsTime(simulated, "Simulated Receive Signal, %s" % wind_message, sample_rate)

return simulated
Loading