"""
Classes for spectral analysis.
"""
import datetime
from copy import copy, deepcopy
from math import floor
from random import randint
from distutils.version import LooseVersion
from typing import Union, List
import numpy as np
from skimage import filters
from matplotlib import pyplot as plt
from matplotlib.colorbar import Colorbar
from matplotlib.figure import Figure
from matplotlib.ticker import FuncFormatter, IndexLocator, MaxNLocator
from numpy import ma
from scipy import ndimage
from sortedcontainers import SortedList
import ruptures as rpt
from sunpy import __version__
from sunpy.time import parse_time
from scipy import interpolate
from radiospectra.spectrum import Spectrum
from radiospectra.util import ConditionalDispatch, Parent, common_base, get_day, merge, to_signed
__all__ = ['Spectrogram', 'LinearTimeSpectrogram']
SUNPY_LT_1 = LooseVersion(__version__) < LooseVersion('1.0')
# 1080 because that usually is the maximum vertical pixel count on modern
# screens nowadays (2012).
DEFAULT_YRES = 1080
# This should not be necessary, as observations do not take more than a day
# but it is used for completeness' and extendibility's sake.
# XXX: Leap second?
SECONDS_PER_DAY = 86400
# Used for COPY_PROPERTIES
REFERENCE = 0
COPY = 1
DEEPCOPY = 2
def figure(*args, **kwargs):
"""
Returns a new SpectroFigure, a figure extended with features useful for
analysis of spectrograms.
Compare pyplot.figure.
"""
kw = {
'FigureClass': SpectroFigure,
}
kw.update(kwargs)
return plt.figure(*args, **kw)
def _min_delt(arr):
deltas = (arr[:-1] - arr[1:])
# Multiple values at the same frequency are just thrown away
# in the process of linearizaion
return deltas[deltas != 0].min()
def _list_formatter(lst, fun=None):
"""
Returns a function that takes x, pos and returns fun(lst[x]) if fun is not
None, else lst[x] or "" if x is out of range.
"""
def _fun(x, pos):
x = int(x)
if x >= len(lst) or x < 0:
return ""
elem = lst[x]
if fun is None:
return elem
return fun(elem)
return _fun
def _union(sets):
"""
Returns a union of sets.
"""
union = set()
for s in sets:
union |= s
return union
class _LinearView(object):
"""
Helper class for frequency channel linearization.
Attributes
----------
arr : Spectrogram
Spectrogram to linearize.
delt : float
Delta between frequency channels in linearized spectrogram. Defaults to
(minimum delta / 2.) because of the Shannon sampling theorem.
"""
def __init__(self, arr, delt=None):
self.arr = arr
if delt is None:
# Nyquist–Shannon sampling theorem
delt = _min_delt(arr.freq_axis) / 2.
self.delt = delt
midpoints = (self.arr.freq_axis[:-1] + self.arr.freq_axis[1:]) / 2
self.midpoints = np.concatenate([midpoints, arr.freq_axis[-1:]])
self.max_mp_delt = np.min(self.midpoints[1:] - self.midpoints[:-1])
self.freq_axis = np.arange(
self.arr.freq_axis[0], self.arr.freq_axis[-1], -self.delt
)
self.time_axis = self.arr.time_axis
self.shape = (len(self), arr.data.shape[1])
def __len__(self):
return int(1 + (self.arr.freq_axis[0] - self.arr.freq_axis[-1]) /
self.delt)
def _find(self, arr, item):
if item < 0:
item = item % len(self)
if item >= len(self):
raise IndexError
freq_offset = item * self.delt
freq = self.arr.freq_axis[0] - freq_offset
# The idea is that when we take the biggest delta in the mid points,
# we do not have to search anything that is between the beginning and
# the first item that can possibly be that frequency.
min_mid = int(max(0, (freq - self.midpoints[0]) // self.max_mp_delt))
for n, mid in enumerate(self.midpoints[min_mid:]):
if mid <= freq:
return arr[min_mid + n]
return arr[min_mid + n]
def __getitem__(self, item):
return self._find(self.arr, item)
def get_freq(self, item):
return self._find(self.arr.freq_axis, item)
def make_mask(self, max_dist):
mask = np.zeros(self.shape, dtype=np.bool)
for n, item in enumerate(range(len(self))):
freq = self.arr.freq_axis[0] - item * self.delt
if abs(self.get_freq(item) - freq) > max_dist:
mask[n, :] = True
return mask
class SpectroFigure(Figure):
def _init(self, data, freqs):
self.data = data
self.freqs = freqs
def ginput_to_time(self, inp):
return [
self.data.start + datetime.timedelta(seconds=secs)
for secs in self.ginput_to_time_secs(inp)
]
def ginput_to_time_secs(self, inp):
return np.array([float(self.data.time_axis[x]) for x, y in inp])
def ginput_to_time_offset(self, inp):
v = self.ginput_to_time_secs(inp)
return v - v.min()
def ginput_to_freq(self, inp):
return np.array([self.freqs[y] for x, y in inp])
def time_freq(self, points=0):
inp = self.ginput(points)
min_ = self.ginput_to_time_secs(inp).min()
start = self.data.start + datetime.timedelta(seconds=min_)
return TimeFreq(
start, self.ginput_to_time_offset(inp), self.ginput_to_freq(inp)
)
class TimeFreq(object):
"""
Class to use for plotting frequency vs time.
Attributes
----------
start : `datetime.datetime`
Start time of the plot.
time : `~numpy.ndarray`
Time of the data points as offset from start in seconds.
freq : `~numpy.ndarray`
Frequency of the data points in MHz.
"""
def __init__(self, start, time, freq):
self.start = start
self.time = time
self.freq = freq
def plot(self, time_fmt="%H:%M:%S", **kwargs):
"""
Plot the spectrum.
Parameters
----------
time_fmt : str
The time format in a `~datetime.datetime` compatible format
**kwargs : dict
Any additional plot arguments that should be used
when plotting.
Returns
-------
fig : `~matplotlib.Figure`
A plot figure.
"""
figure = plt.gcf()
axes = figure.add_subplot(111)
axes.plot(self.time, self.freq, **kwargs)
xa = axes.get_xaxis()
xa.set_major_formatter(
FuncFormatter(
lambda x, pos: (
self.start + datetime.timedelta(seconds=x)
).strftime(time_fmt)
)
)
axes.set_xlabel("Time [UT]")
axes.set_ylabel("Frequency [MHz]")
xa = axes.get_xaxis()
for tl in xa.get_ticklabels():
tl.set_fontsize(10)
tl.set_rotation(30)
figure.add_axes(axes)
figure.subplots_adjust(bottom=0.2)
figure.subplots_adjust(left=0.2)
return figure
def peek(self, *args, **kwargs):
"""
Plot spectrum onto current axes.
:param \*args: dict.
:param \**kwargs: dict, Any additional plot arguments that should be used when plotting.
:returns: A plot figure.
:rtype: matplotlib.Figure
"""
plt.figure()
ret = self.plot(*args, **kwargs)
plt.show()
return ret
[docs]class Spectrogram(Parent):
"""
Spectrogram Class.
.. warning:: This module is under development! Use at your own risk.
data : `~numpy.ndarray`
two-dimensional array of the image data of the spectrogram.
time_axis : `~numpy.ndarray`
one-dimensional array containing the offset from the start
for each column of data.
freq_axis : `~numpy.ndarray`
one-dimensional array containing information about the
frequencies each row of the image corresponds to.
start : `~datetime.datetime`
starting time of the measurement
end : `~datetime.datetime`
end time of the measurement
t_init : int
offset from the start of the day the measurement began. If None
gets automatically set from start.
t_label : str
label for the time axis
f_label : str
label for the frequency axis
content : str
header for the image
instruments : str array
instruments that recorded the data, may be more than one if
it was constructed using combine_frequencies or join_many.
"""
# Contrary to what pylint may think, this is not an old-style class.
# pylint: disable=E1002,W0142,R0902
# This needs to list all attributes that need to be
# copied to maintain the object and how to handle them.
COPY_PROPERTIES = [
('time_axis', COPY),
('freq_axis', COPY),
('instruments', COPY),
('start', REFERENCE),
('end', REFERENCE),
('t_label', REFERENCE),
('f_label', REFERENCE),
('content', REFERENCE),
('t_init', REFERENCE),
]
_create = ConditionalDispatch.from_existing(Parent._create)
@property
def shape(self):
return self.data.shape
@property
def dtype(self):
return self.data.dtype
def _get_params(self):
"""
Implementation detail.
"""
return {
name: getattr(self, name) for name, _ in self.COPY_PROPERTIES
}
[docs] def interpolate2d(spec, overwrite=True):
"""
Interpolate the input data and it returns a new Spectrogram.
:param spec: The original Spectrogram.
:param bool overwrite: if function interpolated_spec has been called directly, there will be a possibility to overwrite it's current spectrogram data.
:returns: New Spectrogram with the interpolated data.
"""
# the deepcopy of the original spec
spec_copy = deepcopy(spec)
# subtraction the background with "constbacksub", "elimwrongchannels"
spec_sub = spec_copy.subtract_bg("constbacksub", "elimwrongchannels")
# Interpolation
time_x = spec_sub.time_axis
freq_y = spec_sub.freq_axis
data_z = spec_sub.data
inter_f = interpolate.interp2d(time_x, freq_y, data_z)
# the Frequency before the Interpolation
ynew = spec.freq_axis
znew = inter_f(time_x, ynew)
# If overwrite= True => it will overwrite the new values into the Spec_sub.
if overwrite:
spec_sub.time_axis = time_x
spec_sub.freq_axis = ynew
spec_sub.data = znew[::-1]
# Update the FITS Header history => .header.set('Card_name', 'The content')
spec_sub.header.set(
'HISTORY', ': The Interpolate data after using the constbacksub, elimwrongchannels.')
return spec_sub
def _slice(self, y_range, x_range):
"""
Return new spectrogram reduced to the values passed as slices.
Implementation detail.
"""
data = self.data[y_range, x_range]
params = self._get_params()
soffset = 0 if x_range.start is None else x_range.start
soffset = int(soffset)
eoffset = self.shape[1] if x_range.stop is None else x_range.stop # pylint: disable=E1101
eoffset -= 1
eoffset = int(eoffset)
params.update({
'time_axis': self.time_axis[
x_range.start:x_range.stop:x_range.step
] - self.time_axis[soffset],
'freq_axis': self.freq_axis[
y_range.start:y_range.stop:y_range.step],
'start': self.start + datetime.timedelta(
seconds=self.time_axis[soffset]),
'end': self.start + datetime.timedelta(
seconds=self.time_axis[eoffset]),
't_init': self.t_init + self.time_axis[soffset],
})
return self.__class__(data, **params)
def _with_data(self, data):
new = copy(self)
new.data = data
return new
def __init__(self, data, time_axis, freq_axis, start, end, t_init=None,
t_label="Time", f_label="Frequency", content="",
instruments=None):
# Because of how object creation works, there is no avoiding
# unused arguments in this case.
self.data = data
if t_init is None:
diff = start - get_day(start)
t_init = diff.seconds
if instruments is None:
instruments = set()
self.start = start
self.end = end
self.t_label = t_label
self.f_label = f_label
self.t_init = t_init
self.time_axis = time_axis
self.freq_axis = freq_axis
self.rfi_freq_axis = np.array([])
self.content = content
self.instruments = instruments
[docs] def peek(self, *args, **kwargs):
"""
Plot spectrum onto current axes.
:param \*args: dict.
:param \**kwargs: dict, any additional plot arguments that should be used when plotting.
:returns: A plot figure.
:rtype: matplotlib.Figure
"""
figure()
ret = self.plot(*args, **kwargs)
plt.show()
return ret
[docs] def plot(self, figure=None, overlays=[], colorbar=True, vmin=None,
vmax=None, linear=True, showz=True, yres=DEFAULT_YRES,
max_dist=None, **matplotlib_args):
"""
Plot spectrogram onto figure.
:param matplotlib.Figure figure: Figure to plot the spectrogram on. If None, new Figure is created.
:param list overlays: List of overlays (functions that receive figure and axes and return new ones) to be applied after drawing.
:param bool colorbar: Flag that determines whether or not to draw a colorbar. If existing figure is passed, it is attempted to overdraw old colorbar.
:param float vmin: Clip intensities lower than vmin before drawing.
:param float vmax: Clip intensities higher than vmax before drawing.
:param bool linear: If set to True, "stretch" image to make frequency axis linear.
:param bool showz: If set to True, the value of the pixel that is hovered with the mouse is shown in the bottom right corner.
:param int or None yres: To be used in combination with linear=True. If None, sample the image with half the minimum frequency delta. Else, sample theimage to be at most yres pixels in vertical dimension. Defaultsto 1080 because that's a common screen size.
:param float or None max_dist: If not None, mask elements that are further than max_dist away from actual data points (ie, frequencies that actually have data from the receiver and are not just nearest-neighbour interpolated).
"""
# [] as default argument is okay here because it is only read.
# pylint: disable=W0102,R0914
if linear:
delt = yres
if delt is not None:
delt = max(
(self.freq_axis[0] - self.freq_axis[-1]) / (yres - 1),
_min_delt(self.freq_axis) / 2.
)
delt = float(delt)
data = _LinearView(self.clip_values(vmin, vmax), delt)
freqs = np.arange(
self.freq_axis[0], self.freq_axis[-1], -data.delt
)
else:
data = np.array(self.clip_values(vmin, vmax))
freqs = self.freq_axis
figure = plt.gcf()
if figure.axes:
axes = figure.axes[0]
else:
axes = figure.add_subplot(111)
params = {
'origin': 'lower',
'aspect': 'auto',
}
params.update(matplotlib_args)
if linear and max_dist is not None:
toplot = ma.masked_array(data, mask=data.make_mask(max_dist))
else:
toplot = data
im = axes.imshow(toplot, **params)
xa = axes.get_xaxis()
ya = axes.get_yaxis()
xa.set_major_formatter(
FuncFormatter(self.time_formatter)
)
if linear:
# Start with a number that is divisible by 5.
init = (self.freq_axis[0] % 5) / data.delt
nticks = 15.
# Calculate MHz difference between major ticks.
dist = (self.freq_axis[0] - self.freq_axis[-1]) / nticks
# Round to next multiple of 10, at least ten.
dist = max(round(dist, -1), 10)
# One pixel in image space is data.delt MHz, thus we can convert
# our distance between the major ticks into image space by dividing
# it by data.delt.
ya.set_major_locator(
IndexLocator(
dist / data.delt, init
)
)
ya.set_minor_locator(
IndexLocator(
dist / data.delt / 10, init
)
)
def freq_fmt(x, pos):
# This is necessary because matplotlib somehow tries to get
# the mid-point of the row, which we do not need here.
x = x + 0.5
return self.format_freq(self.freq_axis[0] - x * data.delt)
else:
freq_fmt = _list_formatter(freqs, self.format_freq)
ya.set_major_locator(MaxNLocator(integer=True, steps=[1, 5, 10]))
ya.set_major_formatter(
FuncFormatter(freq_fmt)
)
axes.set_xlabel(self.t_label)
axes.set_ylabel(self.f_label)
# figure.suptitle(self.content)
figure.suptitle(
' '.join([
get_day(self.start).strftime("%d %b %Y"),
'Radio flux density',
'(' + ', '.join(self.instruments) + ')',
])
)
for tl in xa.get_ticklabels():
tl.set_fontsize(10)
tl.set_rotation(30)
figure.add_axes(axes)
figure.subplots_adjust(bottom=0.2)
figure.subplots_adjust(left=0.2)
if showz:
axes.format_coord = self._mk_format_coord(
data, figure.gca().format_coord)
if colorbar:
if len(figure.axes) > 1:
Colorbar(figure.axes[1], im).set_label("Intensity")
else:
figure.colorbar(im).set_label("Intensity")
for overlay in overlays:
figure, axes = overlay(figure, axes)
for ax in figure.axes:
ax.autoscale()
if isinstance(figure, SpectroFigure):
figure._init(self, freqs)
return axes
def __getitem__(self, key):
only_y = not isinstance(key, tuple)
if only_y:
return self.data[int(key)]
elif isinstance(key[0], slice) and isinstance(key[1], slice):
return self._slice(key[0], key[1])
elif isinstance(key[1], slice):
# return Spectrum( # XXX: Right class
# super(Spectrogram, self).__getitem__(key),
# self.time_axis[key[1].start:key[1].stop:key[1].step]
# )
return np.array(self.data[key])
elif isinstance(key[0], slice):
return Spectrum(
self.data[key],
self.freq_axis[key[0].start:key[0].stop:key[0].step]
)
return self.data[int(key)]
[docs] def clip_freq(self, vmin=None, vmax=None):
"""
Return a new spectrogram only consisting of frequencies in the interval
[vmin, vmax].
:param float vmin: All frequencies in the result are greater or equal to this.
:param float vmax: All frequencies in the result are smaller or equal to this.
"""
left = 0
if vmax is not None:
while self.freq_axis[left] > vmax:
left += 1
right = len(self.freq_axis) - 1
if vmin is not None:
while self.freq_axis[right] < vmin:
right -= 1
return self[left:right + 1, :]
[docs] def auto_find_background(self, amount=0.05):
"""
Automatically find the background. This is done by first subtracting
the average value in each channel and then finding those times which
have the lowest standard deviation.
:param float amount: The percent amount (out of 1) of lowest standard deviation to consider.
"""
# pylint: disable=E1101,E1103
data = self.data.astype(to_signed(self.dtype))
# Subtract average value from every frequency channel.
tmp = (data - np.average(self.data, 1).reshape(self.shape[0], 1))
# Get standard deviation at every point of time.
# Need to convert because otherwise this class's __getitem__
# is used which assumes two-dimensionality.
sdevs = np.asarray(np.std(tmp, 0))
# Get indices of values with lowest standard deviation.
cand = sorted(list(range(self.shape[1])), key=lambda y: sdevs[y])
# Only consider the best 5 %.
return cand[:max(1, int(amount * len(cand)))]
[docs] def auto_const_bg(self):
"""
Automatically determine background.
"""
realcand = self.auto_find_background()
bg = np.average(self.data[:, realcand], 1)
return bg.reshape(self.shape[0], 1)
[docs] def subtract_bg(self, *args, **kwargs):
"""
Perform background subtraction, with the opportunity to choose between different procedures
and the ability to remove the radio frequency interference (RFI).
:param \*args: List of desired methods that should be called.
:param \**kwargs: List of arguments that should be passed to a subfunction.
"""
spec = copy(self)
for arg in args:
if arg == "default":
# default background subtraction of radiospectra
spec = spec._with_data(spec.data - spec.auto_const_bg())
elif arg == "constbacksub":
spec = spec.constbacksub(overwrite=False)
elif arg == "subtract_bg_sliding_window":
_sbg, _bg, _min_sdevs, _cps = spec.subtract_bg_sliding_window(
**kwargs)
spec = _sbg
elif arg == "glid_back_sub":
spec = spec.glid_back_sub(overwrite=False)
elif arg == "elimwrongchannels":
spec = spec.elimwrongchannels(overwrite=False)
if len(args) == 0:
# default background subtraction of radiospectra
spec = spec._with_data(spec.data - spec.auto_const_bg())
return spec
[docs] def subtract_bg_sliding_window(self, amount: float = 0.05, window_width: int = 0, affected_width: int = 0,
change_points: Union[bool, List[int]] = False):
"""
Performs background subtraction with a sliding window. Change points where significant jumps in
value are observed or expected can be specified or automatically estimated. If change points are present
each one will split the spectrogram and each resulting part will have its background removed independently.
:param float amount: The percent amount (out of 1) of lowest standard deviation to consider.
:param int window_width: The percent amount (out of 1) of lowest standard deviation to consider.
:param int affected_width: The width of the section where the background calculated by the window_width gets subtracted. It is centered in the sliding window and is also the step size.
:param list of int or bool change_points: If a list of ints is provided it will use these values as change points. If a bool isprovided it will estimate the change points if true or won't include any change points if false.
"""
_data = self.data.copy()
_og_image_height = _data.shape[0]
_og_image_width = _data.shape[1]
_bg = np.zeros([_og_image_height, _og_image_width])
_min_sdevs = np.zeros([_og_image_height, _og_image_width])
_out = _data.copy()
if isinstance(change_points, bool):
if change_points:
_cps = self.estimate_change_points()
else:
_cps = []
else:
_cps = change_points
if len(_cps) == 0:
_images = [(0, _og_image_width)]
else:
_images = []
_temp = 0
_cps = sorted(_cps)
for _cp in _cps:
_images.append((_temp, _cp))
_temp = _cp
_images.append((_temp, _og_image_width))
for (_img_start, _img_end) in _images:
_cwp = _img_start
_img_width = _img_end - _img_start
_img_data = _data[:, _img_start:_img_end]
_window_height = _og_image_height
_window_width = _img_width if (
window_width == 0 or window_width > _img_width) else window_width
_affected_height = _og_image_height
_affected_width = _img_width if (affected_width == 0 or affected_width > _img_width) else (
affected_width if affected_width <= _window_width else _window_width)
_data_minus_avg = (
_img_data - np.average(_img_data, 1).reshape(_img_data.shape[0], 1))
_sdevs = [(index, std)
for (index, std) in enumerate(np.std(_data_minus_avg, 0))]
_half = max((_window_width - _affected_width) // 2, 0)
_division_fix = _half + \
_half != max(_img_width - _affected_width, 0)
_max_amount = max(1, int(amount * _img_width))
# calc initial set of used columns
_window_sdevs = [sdev for sdev in _sdevs[:_half]]
_sorted_sdevs = sorted(_window_sdevs, key=lambda y: y[1])
_bg_used_sdevs = SortedList(_sorted_sdevs, key=lambda y: y[1])
while _cwp <= _img_end:
_affected_left = _cwp
_affected_right = min(
_affected_left + _affected_width, _img_end)
_window_left = max(
_affected_left - _half if _division_fix else _affected_left - _half, _img_start)
_window_right = min(_affected_right + _half, _img_end)
for sdev in _sdevs[max(_window_left - _affected_width, 0) - _img_start:_window_left - _img_start]:
_bg_used_sdevs.discard(sdev)
if _window_right <= _img_end:
_bg_used_sdevs.update(
_sdevs[_window_right - _affected_width - _img_start:_window_right - _img_start])
# calc current background
_current_background = np.average(
_img_data[:, [sdev[0] for sdev in _bg_used_sdevs[:_max_amount]]], 1)
for sdev in _bg_used_sdevs[:_max_amount]:
_min_sdevs[:, sdev[0] + _img_start] += 1
_bg[:, _affected_left:_affected_right] = np.repeat(_current_background.reshape(_bg.shape[0], 1),
(_affected_right - _affected_left), axis=1)
_cwp += _affected_width
_sbg = np.ma.subtract(_out, _bg)
return self._with_data(_sbg), self._with_data(_bg), self._with_data(_min_sdevs), _cps
[docs] def estimate_change_points(self, window_width=100, max_length_single_segment=20000, segment_width=10000):
"""
Estimates the change points of the spectrogram and returns the indices.
If the spectrogram is too big it will get segmented.
These segments will overlap by 2*window_width to not miss change points where the spectrogram is segmented.
:param int window_width: width of the sliding window.
:param int max_length_single_segment: max width of data array. If bigger it will get segmented because of memory reasons.
:param int segment_width: width of the segments if the CallistoSpectrogram gets segmented.
"""
avgs = np.average(self.data, axis=0)
penalty = np.log(len(avgs)) * 8 * np.std(avgs) ** 2
changepoints = set()
if len(avgs) > max_length_single_segment:
num = len(avgs) // (segment_width - window_width * 2)
segments = [(x, x + segment_width)
for x in np.multiply(range(0, num), (segment_width - window_width * 2))]
segments.append((len(avgs) - segment_width, len(avgs)))
else:
segments = [(0, len(avgs))]
if (len(segments)) > 3:
m = 'mahalanobis'
else:
m = 'rbf'
for start, end in segments:
mod = rpt.Window(model=m, width=window_width).fit(avgs[start:end])
res = np.array(mod.predict(pen=penalty)[:-1]) + start
changepoints.update(res)
return sorted(changepoints)
[docs] def constbacksub(self, overwrite=True):
"""
Background subtraction method where the average and the standard deviation of each row
will be calculated and subtracted from the image.
:param bool overwrite: If function constbacksub has been called directly, there will be a possibility to overwrite it's current spectrogram data.
"""
im = self.data.copy()
nx = im.shape[0]
ny = im.shape[1]
average_arr = np.average(im, axis=1)
for i in range(nx):
im[i, :] = im[i, :] - average_arr[i]
sdev_arr = np.zeros(ny)
for i in range(ny):
sdev_arr[i] = np.std(im[:, i])
zist = np.argsort(sdev_arr)
nPart = int(ny * 0.05)
zist = zist[0: nPart + 1]
bkgArr = im[:, zist]
background = np.average(bkgArr, axis=1)
for j in range(ny):
im[:, j] = im[:, j] - background
if overwrite:
self.data = im
return self._with_data(im)
[docs] def elimwrongchannels(self, overwrite=True):
"""
Removing the RFI (radio frequency interference) from the spectrogram.
:param bool overwrite: If function elimwrongchannels has been called directly, there will be a possibility to overwrite it's current spectrogram data.
"""
im = self.data.copy()
freq_axis = self.freq_axis.copy()
rfi_freq_axis = self.rfi_freq_axis.copy()
ny = len(im.data[:, 0])
# optimize std part
std = np.ndarray((ny,), dtype=np.double)
for i in range(ny):
# Calculate std for each row and ignore nan's
std[i] = np.nanstd(im[i, :].data)
# Byte scale
std = ((std - np.nanmin(std)) * 255) / \
(np.nanmax(std) - np.nanmin(std))
std[std > 255] = 255
np.nan_to_num(std, copy=False)
std = std.astype(int)
mean_sigma = np.average(std)
positions = std < 5 * mean_sigma
zist = std[positions]
eliminated_channels = len(std) - len(zist)
print(str(eliminated_channels) + " channels eliminated")
rfi_freq_axis = self.update_rfi_header(
freq_axis, rfi_freq_axis, positions)
if np.sum(zist) != -1 and eliminated_channels != 0:
im = im[positions, :]
freq_axis = freq_axis[positions]
print("Eliminating sharp jumps between channels ...")
y_profile = np.average(
(filters.roberts(im.astype(float)).astype(float)), 1)
masked_data = np.ma.masked_array(y_profile, np.isnan(y_profile))
y_profile = masked_data - np.ma.average(masked_data)
mean_sigma = np.std(y_profile)
positions = np.abs(y_profile) < 2 * mean_sigma
zist = y_profile[positions]
eliminated_channels = len(y_profile) - len(zist)
print(str(eliminated_channels) + " channels eliminated")
rfi_freq_axis = self.update_rfi_header(
freq_axis, rfi_freq_axis, positions)
if np.sum(zist) != -1 and eliminated_channels != 0:
im = im[positions, :]
freq_axis = freq_axis[positions]
if np.sum(zist) == -1:
print("Sorry, all channels are bad ...")
im = 0
if overwrite:
self.data = im
self.freq_axis = freq_axis
self.rfi_freq_axis = rfi_freq_axis
spec = self._with_data(im)
spec.freq_axis = freq_axis
spec.rfi_freq_axis = rfi_freq_axis
return spec
[docs] def glid_back_sub(self, window_width=0, weighted=False, overwrite=True):
"""
A gliding background subtraction method, where the sum weighted from the
coefficients of the evenly spaces values of each row will be subtracted from the spectrogram.
:param int window_width: The width of the sliding window that is used to calculate the background.
:param bool weighted: If true, the coefficients of each rows will be considered dependent on the image width.
:param bool overwrite: If function glid_back_sub has been called directly, there will be a possibility to overwrite it's current spectrogram data.
"""
image = self.data.copy()
nx = len(image[0, :])
ny = len(image[:, 0])
if window_width == 0:
w_len_half = int(image.shape[1] / 2)
else:
w_len_half = int(window_width / 2)
w_len = w_len_half * 2
backgr = np.zeros((ny, nx), dtype=np.float)
# i is of type float!
i = 0
if weighted:
coeffs = w_len_half - np.arange(0, w_len_half, dtype=np.float)
sum = np.sum(coeffs)
while i < w_len_half:
img_data = image[:, 0:w_len_half + i].flatten()
min_array_size = min(coeffs.size, img_data.size)
coeffs_sum = np.multiply(
img_data[0:min_array_size], coeffs[0:min_array_size])
backgr[:, i] = np.sum(coeffs_sum) / sum
img_data = image[:, nx - i - 1:nx].flatten()
flipped_coeffs = coeffs[::-1]
min_array_size = min(flipped_coeffs.size, img_data.size)
coeffs_sum = np.multiply(
img_data[0:min_array_size], flipped_coeffs[0:min_array_size])
backgr[:, nx - i - 1] = np.sum(coeffs_sum) / sum
i += 1
coeffs = np.insert(coeffs, 0, w_len_half - i)
sum = sum + coeffs[i]
while i < nx - w_len_half:
img_data = image[:, i - w_len_half:i + w_len_half].flatten()
min_array_size = min(coeffs.size, img_data.size)
coeffs_sum = np.multiply(
img_data[0:min_array_size], coeffs[0:min_array_size])
backgr[:, i] = np.sum(coeffs_sum) / sum
i += 1
else:
while i < w_len_half + 1:
backgr[:, i] = np.average(
image[:, 0:min(i + w_len_half, nx)] + 1, 1)
i += 1
i = w_len_half + 1
while i < nx - w_len_half:
backgr[:, i] = backgr[:, i - 1] + \
(image[:, i + w_len_half] -
image[:, i - 1 - w_len_half]) / w_len
i += 1
while i <= nx - 1:
backgr[:, i] = np.average(image[:, i:nx], 1)
i += 1
if overwrite:
self.data = image
return self._with_data(image - backgr)
[docs] def randomized_auto_const_bg(self, amount):
"""
Automatically determine background. Only consider a randomly chosen
subset of the image.
:param int amount: Size of random sample that is considered for calculation of the background.
"""
cols = [randint(0, self.shape[1] - 1) for _ in range(amount)]
# pylint: disable=E1101,E1103
data = self.data.astype(to_signed(self.dtype))
# Subtract average value from every frequency channel.
tmp = (data - np.average(self.data, 1).reshape(self.shape[0], 1))
# Get standard deviation at every point of time.
# Need to convert because otherwise this class's __getitem__
# is used which assumes two-dimensionality.
tmp = tmp[:, cols]
sdevs = np.asarray(np.std(tmp, 0))
# Get indices of values with lowest standard deviation.
cand = sorted(list(range(amount)), key=lambda y: sdevs[y])
# Only consider the best 5 %.
realcand = cand[:max(1, int(0.05 * len(cand)))]
# Average the best 5 %
bg = np.average(self[:, [cols[r] for r in realcand]], 1)
return bg.reshape(self.shape[0], 1)
[docs] def randomized_subtract_bg(self, amount):
"""
Perform randomized constant background subtraction. Does not produce
the same result every time it is run.
:param int amount: Size of random sample that is considered for calculation of the background.
"""
return self._with_data(self.data - self.randomized_auto_const_bg(amount))
[docs] def clip_values(self, vmin=None, vmax=None, out=None):
"""
Clip intensities to be in the interval [vmin, vmax].
Any values greater than the maximum will be assigned the maximum,
any values lower than the minimum will be assigned the minimum.
If either is left out or None, do not clip at that side of the interval.
:param int or float min: New minimum value for intensities.
:param int or float max: New maximum value for intensities.
"""
# pylint: disable=E1101
if vmin is None:
vmin = int(self.data.min())
if vmax is None:
vmax = int(self.data.max())
return self._with_data(self.data.clip(vmin, vmax, out))
[docs] def rescale(self, vmin=0, vmax=1, dtype=np.dtype('float32')):
"""
Rescale intensities to [vmin, vmax]. Note that vmin ≠ vmax and
spectrogram.min() ≠ spectrogram.max().
:param float or int vmin: New minimum value in the resulting spectrogram.
:param float or int vmax: New maximum value in the resulting spectrogram.
:param ``numpy.dtype`` dtype: Data-type of the resulting spectrogram.
"""
if vmax == vmin:
raise ValueError("Maximum and minimum must be different.")
if self.data.max() == self.data.min():
raise ValueError("Spectrogram needs to contain distinct values.")
data = self.data.astype(dtype) # pylint: disable=E1101
return self._with_data(
vmin + (vmax - vmin) * (data - self.data.min()) / # pylint: disable=E1101
(self.data.max() - self.data.min()) # pylint: disable=E1101
)
[docs] def interpolate(self, frequency):
"""
Linearly interpolate intensity at unknown frequency using linear
interpolation of its two neighbours.
:param float or int frequency: Unknown frequency for which to linearly interpolate the intensities. freq_axis[0] >= frequency >= self_freq_axis[-1]
"""
lfreq, lvalue = None, None
for freq, value in zip(self.freq_axis, self.data[:, :]):
if freq < frequency:
break
lfreq, lvalue = freq, value
else:
raise ValueError("Frequency not in interpolation range")
if lfreq is None:
raise ValueError("Frequency not in interpolation range")
diff = frequency - freq # pylint: disable=W0631
ldiff = lfreq - frequency
return (ldiff * value + diff * lvalue) / (diff + ldiff) # pylint: disable=W0631
[docs] def linearize_freqs(self, delta_freq=None):
"""
Rebin frequencies so that the frequency axis is linear.
:param float delta_freq: Difference between consecutive values on the new frequency axis. Defaults to half of smallest delta in current frequency axis. Compare Nyquist-Shannon sampling theorem.
"""
if delta_freq is None:
# Nyquist–Shannon sampling theorem
delta_freq = _min_delt(self.freq_axis) / 2.
nsize = int((self.freq_axis.max() - self.freq_axis.min()) /
delta_freq + 1)
new = np.zeros((int(nsize), self.shape[1]), dtype=self.data.dtype)
freqs = self.freq_axis - self.freq_axis.max()
freqs = freqs / delta_freq
midpoints = np.round((freqs[:-1] + freqs[1:]) / 2)
fillto = np.concatenate(
[midpoints - 1, np.round([freqs[-1]]) - 1]
)
fillfrom = np.concatenate(
[np.round([freqs[0]]), midpoints - 1]
)
fillto = np.abs(fillto)
fillfrom = np.abs(fillfrom)
for row, from_, to_ in zip(self, fillfrom, fillto):
new[int(from_): int(to_)] = row
vrs = self._get_params()
vrs.update({
'freq_axis': np.linspace(
self.freq_axis.max(), self.freq_axis.min(), nsize
)
})
return self.__class__(new, **vrs)
[docs] def freq_overlap(self, other):
"""
Get frequency range present in both spectrograms. Returns (min, max)
tuple.
:param Spectrogram other: Other spectrogram with which to look for frequency overlap.
"""
lower = max(self.freq_axis[-1], other.freq_axis[-1])
upper = min(self.freq_axis[0], other.freq_axis[0])
if lower > upper:
raise ValueError("No overlap.")
return lower, upper
[docs] def time_to_x(self, time):
"""
Return x-coordinate in spectrogram that corresponds to the passed
`~datetime.datetime` value.
:param time: ``sunpy.time.parse_time`` compatible str `datetime.datetime` to find the x coordinate for.
"""
diff = time - self.start
diff_s = SECONDS_PER_DAY * diff.days + diff.seconds
if self.time_axis[-1] < diff_s < 0:
raise ValueError("Out of bounds")
for n, elem in enumerate(self.time_axis):
if diff_s < elem:
return n - 1
# The last element is the searched one.
return n
[docs] def at_freq(self, freq):
return self[np.nonzero(self.freq_axis == freq)[0], :]
@staticmethod
def _mk_format_coord(spec, fmt_coord):
def format_coord(x, y):
shape = list(map(int, spec.shape))
xint, yint = int(x), int(y)
if 0 <= xint < shape[1] and 0 <= yint < shape[0]:
pixel = spec[yint][xint]
else:
pixel = ""
return '{!s} z={!s}'.format(fmt_coord(x, y), pixel)
return format_coord
[docs]class LinearTimeSpectrogram(Spectrogram):
"""
Spectrogram evenly sampled in time.
:param float t_delt: difference between the items on the time axis.
"""
# pylint: disable=E1002
COPY_PROPERTIES = Spectrogram.COPY_PROPERTIES + [
('t_delt', REFERENCE),
]
def __init__(self, data, time_axis, freq_axis, start, end,
t_init=None, t_delt=None, t_label="Time", f_label="Frequency",
content="", instruments=None):
if t_delt is None:
t_delt = _min_delt(freq_axis)
super(LinearTimeSpectrogram, self).__init__(
data, time_axis, freq_axis, start, end, t_init, t_label, f_label,
content, instruments
)
self.t_delt = t_delt
[docs] @staticmethod
def make_array(shape, dtype=np.dtype('float32')):
"""
Function to create an array with shape and dtype.
:param tuple shape: shape of the array to create.
:param ``numpy.dtype`` dtype: data-type of the array to create.
"""
return np.zeros(shape, dtype=dtype)
[docs] @staticmethod
def memmap(filename):
"""
Return function that takes shape and dtype and returns a memory mapped
array.
:param str filename: File to store the memory mapped array in.
"""
return (
lambda shape, dtype=np.dtype('float32'): np.memmap(
filename, mode="write", shape=shape, dtype=dtype
)
)
[docs] def resample_time(self, new_delt):
"""
Rescale image so that the difference in time between pixels is new_delt
seconds.
:param float new_delt: New delta between consecutive values.
"""
if self.t_delt == new_delt:
return self
factor = self.t_delt / float(new_delt)
# The last data-point does not change!
new_size = floor((self.shape[1] - 1) *
factor + 1) # pylint: disable=E1101
data = ndimage.zoom(
self.data, (1, new_size / self.shape[1])) # pylint: disable=E1101
params = self._get_params()
params.update({
'time_axis': np.linspace(
self.time_axis[0],
self.time_axis[int((new_size - 1) * new_delt / self.t_delt)],
new_size
),
't_delt': new_delt,
})
return self.__class__(data, **params)
JOIN_REPEAT = object()
[docs] @classmethod
def join_many(cls, specs, mk_arr=None, nonlinear=False,
maxgap=0, fill=JOIN_REPEAT):
"""
Produce new Spectrogram that contains spectrograms joined together in
time.
:param list specs: List of spectrograms to join together in time.
:param bool nonlinear: If True, leave out gaps between spectrograms. Else, fill them with the value specified in fill.
:param float, int or None maxgap: Largest gap to allow in second. If None, allow gap of arbitrary size.
:param float or int fill: Value to fill missing values (assuming nonlinear=False) with. Can be LinearTimeSpectrogram.JOIN_REPEAT to repeat the values for the time just before the gap.
:param function mk_array: Function that is called to create the resulting array. Can be set to LinearTimeSpectrogram.memap(filename) to create a memory mapped
result array.
"""
# XXX: Only load header and load contents of files
# on demand.
mask = None
if mk_arr is None:
mk_arr = cls.make_array
specs = sorted(specs, key=lambda x: x.start)
freqs = specs[0].freq_axis
if not all(np.array_equal(freqs, sp.freq_axis) for sp in specs):
raise ValueError("Frequency channels do not match.")
# Smallest time-delta becomes the common time-delta.
min_delt = min(sp.t_delt for sp in specs)
dtype_ = max(sp.dtype for sp in specs)
specs = [sp.resample_time(min_delt) for sp in specs]
size = sum(sp.shape[1] for sp in specs)
data = specs[0]
start_day = data.start
xs = []
last = data
for elem in specs[1:]:
e_init = (
SECONDS_PER_DAY * (
get_day(elem.start) - get_day(start_day)
).days + elem.t_init
)
x = int((e_init - last.t_init) / min_delt)
xs.append(x)
diff = last.shape[1] - x
if maxgap is not None and -diff > maxgap / min_delt:
raise ValueError("Too large gap.")
# If we leave out undefined values, we do not want to
# add values here if x > t_res.
if nonlinear:
size -= max(0, diff)
else:
size -= diff
last = elem
# The non existing element after the last one starts after
# the last one. Needed to keep implementation below sane.
xs.append(specs[-1].shape[1])
# We do that here so the user can pass a memory mapped
# array if they'd like to.
arr = mk_arr((data.shape[0], size), dtype_)
time_axis = np.zeros((size,))
sx = 0
# Amount of pixels left out due to non-linearity. Needs to be
# considered for correct time axes.
sd = 0
for x, elem in zip(xs, specs):
diff = x - elem.shape[1]
e_time_axis = elem.time_axis
elem = elem.data
if x > elem.shape[1]:
if nonlinear:
x = elem.shape[1]
else:
# If we want to stay linear, fill up the missing
# pixels with placeholder zeros.
filler = np.zeros((data.shape[0], diff))
if fill is cls.JOIN_REPEAT:
filler[:, :] = elem[:, -1, np.newaxis]
else:
filler[:] = fill
minimum = e_time_axis[-1]
e_time_axis = np.concatenate([
e_time_axis,
np.linspace(
minimum + min_delt,
minimum + diff * min_delt,
diff
)
])
elem = np.concatenate([elem, filler], 1)
arr[:, sx:sx + x] = elem[:, :x]
if diff > 0:
if mask is None:
mask = np.zeros((data.shape[0], size), dtype=np.uint8)
mask[:, sx + x - diff:sx + x] = 1
time_axis[sx:sx + x] = e_time_axis[:x] + data.t_delt * (sx + sd)
if nonlinear:
sd += max(0, diff)
sx += x
params = {
'time_axis': time_axis,
'freq_axis': data.freq_axis,
'start': data.start,
'end': specs[-1].end,
't_delt': data.t_delt,
't_init': data.t_init,
't_label': data.t_label,
'f_label': data.f_label,
'content': data.content,
'instruments': _union(spec.instruments for spec in specs),
}
if mask is not None:
arr = ma.array(arr, mask=mask)
if nonlinear:
del params['t_delt']
return Spectrogram(arr, **params)
return common_base(specs)(arr, **params)
[docs] def time_to_x(self, time):
"""
Return x-coordinate in spectrogram that corresponds to the passed
datetime value.
:param time: ``sunpy.time.parse_time`` compatible str ``datetime.datetime`` to find the x coordinate for.
"""
# This is impossible for frequencies because that mapping
# is not injective.
if SUNPY_LT_1:
time = parse_time(time)
else:
time = parse_time(time).datetime
diff = time - self.start
diff_s = SECONDS_PER_DAY * diff.days + diff.seconds
result = diff_s // self.t_delt
if 0 <= result <= self.shape[1]: # pylint: disable=E1101
return result
raise ValueError("Out of range.")
[docs] @staticmethod
def intersect_time(specs):
"""
Return slice of spectrograms that is present in all of the ones passed.
:param list specs: List of spectrograms of which to find the time intersections.
"""
delt = min(sp.t_delt for sp in specs)
start = max(sp.t_init for sp in specs)
# XXX: Could do without resampling by using
# sp.t_init below, not sure if good idea.
specs = [sp.resample_time(delt) for sp in specs]
cut = [sp[:, int((start - sp.t_init) / delt):] for sp in specs]
length = min(sp.shape[1] for sp in cut)
return [sp[:, :length] for sp in cut]
[docs] @classmethod
def combine_frequencies(cls, specs):
"""
Return new spectrogram that contains frequencies from all the
spectrograms in spec. Only returns time intersection of all of them.
:param list spec: List of spectrograms of which to combine the frequencies into one.
"""
if not specs:
raise ValueError("Need at least one spectrogram.")
specs = cls.intersect_time(specs)
one = specs[0]
dtype_ = max(sp.dtype for sp in specs)
fsize = sum(sp.shape[0] for sp in specs)
new = np.zeros((fsize, one.shape[1]), dtype=dtype_)
freq_axis = np.zeros((fsize,))
for n, (data, row) in enumerate(merge(
[
[(sp, n) for n in range(sp.shape[0])] for sp in specs
],
key=lambda x: x[0].freq_axis[x[1]]
)):
new[n, :] = data[row, :]
freq_axis[n] = data.freq_axis[row]
params = {
'time_axis': one.time_axis, # Should be equal
'freq_axis': freq_axis,
'start': one.start,
'end': one.end,
't_delt': one.t_delt,
't_init': one.t_init,
't_label': one.t_label,
'f_label': one.f_label,
'content': one.content,
'instruments': _union(spec.instruments for spec in specs)
}
return common_base(specs)(new, **params)
[docs] def check_linearity(self, err=None, err_factor=None):
"""
Check linearity of time axis. If err is given, tolerate absolute
derivation from average delta up to err. If err_factor is given,tolerate up to err_factor * average_delta. If both are given,
TypeError is raised. Default to err=0.
:param float err: Absolute difference each delta is allowed to diverge from the average. Cannot be used in combination with err_factor.
:param float err_factor: Relative difference each delta is allowed to diverge from the average, i.e. err_factor * average. Cannot be used in combination with err.
"""
deltas = self.time_axis[:-1] - self.time_axis[1:]
avg = np.average(deltas)
if err is None and err_factor is None:
err = 0
elif err is None:
err = abs(err_factor * avg)
elif err_factor is not None:
raise TypeError("Only supply err or err_factor, not both")
return (abs(deltas - avg) <= err).all()
[docs] def in_interval(self, start=None, end=None):
"""
Return part of spectrogram that lies in [start, end).
:param None or ``datetime.datetime`` or ``sunpy.time.parse_time`` start: compatible string or time string Start time of the part of the spectrogram that is returned. If the measurement only spans over one day, a colon separated string representing the time can be passed.
:param None or ``datetime.datetime`` or ``sunpy.time.parse_time`` end: compatible string or time string See start.
"""
if start is not None:
try:
if SUNPY_LT_1:
start = parse_time(start)
else:
start = parse_time(start).datetime
except ValueError:
# XXX: We could do better than that.
if get_day(self.start) != get_day(self.end):
raise TypeError(
"Time ambiguous because data spans over more than one day"
)
start = datetime.datetime(
self.start.year, self.start.month, self.start.day,
*list(map(int, start.split(":")))
)
start = self.time_to_x(start)
if end is not None:
try:
if SUNPY_LT_1:
end = parse_time(end)
else:
end = parse_time(end).datetime
except ValueError:
if get_day(self.start) != get_day(self.end):
raise TypeError(
"Time ambiguous because data spans over more than one day"
)
end = datetime.datetime(
self.start.year, self.start.month, self.start.day,
*list(map(int, end.split(":")))
)
end = self.time_to_x(end)
if start:
start = int(start)
if end:
end = int(end)
return self[:, start:end]