import urllib
import datetime
from collections import defaultdict
import time
import os
import ntpath
from distutils.version import LooseVersion
import numpy as np
from bs4 import BeautifulSoup
from scipy.ndimage import gaussian_filter1d
from astropy.io import fits
from astropy.nddata.ccddata import CCDData
from scipy.optimize import leastsq
from sunpy import __version__
from sunpy.time import parse_time
from sunpy.util.net import download_file
from radiospectra.spectrogram import REFERENCE, LinearTimeSpectrogram, _union
from radiospectra.util import ConditionalDispatch, minimal_pairs, run_cls
__all__ = ['CallistoSpectrogram']
SUNPY_LT_1 = LooseVersion(__version__) < LooseVersion('1.0')
TIME_STR = "%Y%m%d%H%M%S"
DEFAULT_URL = 'http://soleil.i4ds.ch/solarradio/data/2002-20yy_Callisto/'
_DAY = datetime.timedelta(days=1)
DATA_SIZE = datetime.timedelta(seconds=15 * 60)
def parse_filename(href):
name = href.split('.')[0]
try:
inst, date, time, no = name.rsplit('_')
dstart = datetime.datetime.strptime(date + time, TIME_STR)
except ValueError:
# If the split fails, the file name does not match out
# format,so we skip it and continue to the next
# iteration of the loop.
return None
return inst, no, dstart
PARSERS = [
# Everything starts with ""
("", parse_filename)
]
def query(start, end, instruments=None, url=DEFAULT_URL):
"""
Get URLs for callisto data from instruments between start and end.
:param start: sunpy.time.parse_time compatible.
:param end: sunpy.time.parse_time compatible.
:param sequence: Sequence of instruments whose data is requested.
:param str url: Base URL for the request.
"""
day = datetime.datetime(start.year, start.month, start.day)
while day <= end:
directory = url + day.strftime('%Y/%m/%d/')
opn = urllib.request.urlopen(directory)
try:
soup = BeautifulSoup(opn, 'lxml')
for link in soup.find_all("a"):
href = link.get("href")
for prefix, parser in PARSERS:
if href.startswith(prefix):
break
result = parser(href)
if result is None:
continue
inst, no, dstart = result
if (instruments is not None and
inst not in instruments and
(inst, int(no)) not in instruments):
continue
dend = dstart + DATA_SIZE
if dend > start and dstart < end:
yield directory + href
finally:
opn.close()
day += _DAY
def download(urls, directory):
"""
Download files from urls into directory.
:param list of str urls: urls of the files to retrieve.
:param str: directory to save them in.
"""
return [download_file(url, directory) for url in urls]
def _parse_header_time(date, time):
"""
Returns `~datetime.datetime` object from date and time fields of header.
"""
if time is not None:
date = date + 'T' + time
if SUNPY_LT_1:
time = parse_time(date)
else:
time = parse_time(date).datetime
return time
[docs]class CallistoSpectrogram(LinearTimeSpectrogram):
"""
Class used for dynamic spectra coming from the Callisto network.
Attributes
----------
:param header: `~astropy.io.fits.Header`, main header of the FITS file.
:param axes_header: `~astropy.io.fits.Header`, header for the axes table.
:param bool swapped: flag that specifies whether originally in the file the x-axis was frequency.
"""
# XXX: Determine those from the data.
SIGMA_SUM = 75
SIGMA_DELTA_SUM = 20
_create = ConditionalDispatch.from_existing(LinearTimeSpectrogram._create)
create = classmethod(_create.wrapper())
# Contrary to what pylint may think, this is not an old-style class.
# pylint: disable=E1002,W0142,R0902
# This needs to list all attributes that need to be
# copied to maintain the object and how to handle them.
COPY_PROPERTIES = LinearTimeSpectrogram.COPY_PROPERTIES + [
('header', REFERENCE),
('swapped', REFERENCE),
('axes_header', REFERENCE)
]
# List of instruments retrieved in July 2012 from
# http://soleil.i4ds.ch/solarradio/data/2002-20yy_Callisto/
INSTRUMENTS = {
'ALASKA', 'ALMATY', 'BIR', 'DARO', 'HB9SCT', 'HUMAIN',
'HURBANOVO', 'KASI', 'KENYA', 'KRIM', 'MALAYSIA', 'MRT1',
'MRT2', 'OOTY', 'OSRA', 'SWMC', 'TRIEST', 'UNAM'
}
[docs] def save(self, filepath=None):
"""
Save modified spectrogram back to filepath.
:param str filepath: path to save the spectrogram to.
"""
if filepath is None:
filepath = self.filename
main_header = self.get_header()
data = CCDData(data=self.data, header=main_header, unit='Sun')
# XXX: Update axes header.
data.header.append(card=('BZERO', 0, 'scaling offset'))
data.header.append(card=('BSCALE', 1, 'scaling factor'))
data.header['NAXIS1'] = (
data.header['NAXIS1'], 'length of data axis 1')
data.header['NAXIS2'] = (
data.header['NAXIS2'], 'length of data axis 2')
freq_col = fits.Column(
name="FREQUENCY",
format=f"{len(self.freq_axis)}D8.3",
array=np.reshape(np.array(self.freq_axis),
(1, len(self.freq_axis)))
)
time_col = fits.Column(
name="TIME",
format=f"{len(self.time_axis)}D8.3",
array=np.reshape(np.array(self.time_axis),
(1, len(self.time_axis)))
)
cols = fits.ColDefs([time_col, freq_col])
table = fits.BinTableHDU.from_columns(
cols, header=self.axes_header, name='AXES')
table.header['TTYPE1'] = (
table.header['TTYPE1'], 'label for field 1')
table.header['TFORM1'] = (
table.header['TFORM1'], 'data format of field: 8-byte DOUBLE')
table.header['TTYPE2'] = (
table.header['TTYPE2'], 'label for field 2')
table.header['TFORM2'] = (
table.header['TFORM2'], 'data format of field: 8-byte DOUBLE')
table.header['TSCAL1'] = 1
table.header['TZERO1'] = 0
table.header['TSCAL2'] = 1
table.header['TZERO2'] = 0
hdulist = data.to_hdu()
hdulist.insert(1, table)
if self.rfi_freq_axis.size != 0:
# Putting information, which frequencies channels were removed into the header
freq_col = fits.Column(
name="RFI_FREQUENCY",
format=f"{len(self.rfi_freq_axis)}D8.3",
array=np.reshape(np.array(self.rfi_freq_axis),
(1, len(self.rfi_freq_axis)))
)
rfi_freq_col = fits.ColDefs([freq_col])
rfi_table = fits.BinTableHDU.from_columns(
rfi_freq_col, header=self.axes_header, name='RFI_FREQ')
hdulist.insert(2, rfi_table)
if not os.path.exists(filepath):
hdulist.writeto(filepath)
return filepath
else:
i = 0
split = filepath.split('.')
new_filepath = split[0] + f' ({i})' + \
f'{"." if len(split[1:]) > 0 else ""}' + '.'.join(split[1:])
while os.path.exists(new_filepath):
i += 1
new_filepath = split[0] + f' ({i})' + \
f'{"." if len(split[1:]) > 0 else ""}' + \
'.'.join(split[1:])
hdulist.writeto(new_filepath)
return new_filepath
[docs] @classmethod
def read(cls, filename: str, **kwargs):
"""
Reads in FITS file and return a new CallistoSpectrogram.
Any unknown (i.e. any except filename) keyword arguments get
passed to fits.open.
:param str filename: path of the file to read.
"""
fl = fits.open(filename, **kwargs)
data = np.ma.array(fl[0].data, dtype=cls.ARRAY_TYPE,
mask=np.full(fl[0].data.shape, False))
data.mask[np.where(np.isnan(data.data))] = True
header = fl[0].header
# not every fits file does provide axes in their header file
axes = None
if len(fl) != 1:
axes = fl[1]
modified = False
# simple patch for TIME-END issue
if header['TIME-END'][6:] == '60':
header['TIME-END'] = (header['TIME-END'][:6] +
'59', header.comments['TIME-END'])
modified = True
if header['TIME-END'][:2] == '24':
header['TIME-END'] = ('00' + header['TIME-END']
[2:], header.comments['TIME-END'])
modified = True
# since there are fits files where both cases of the '60' and '24' changes occurs,
# this makes sure only one time '[modified]' is written in the comments section
if modified:
header['TIME-END'] = (header['TIME-END'],
header.comments['TIME-END'] + ' [modified]')
start = _parse_header_time(
header['DATE-OBS'], header.get('TIME-OBS', header.get('TIME$_OBS'))
)
end = _parse_header_time(
header['DATE-END'], header.get('TIME-END', header.get('TIME$_END'))
)
swapped = "time" not in header["CTYPE1"].lower()
# Swap dimensions so x-axis is always time.
if swapped:
t_delt = header["CDELT2"]
t_init = header["CRVAL2"] - t_delt * header["CRPIX2"]
t_label = header["CTYPE2"]
f_delt = header["CDELT1"]
f_init = header["CRVAL1"] - t_delt * header["CRPIX1"]
f_label = header["CTYPE1"]
data = data.transpose()
else:
t_delt = header["CDELT1"]
t_init = header["CRVAL1"] - t_delt * header["CRPIX1"]
t_label = header["CTYPE1"]
f_delt = header["CDELT2"]
f_init = header["CRVAL2"] - t_delt * header["CRPIX2"]
f_label = header["CTYPE2"]
# Table may contain the axes data. If it does, the other way of doing
# it might be very wrong.
tm = None
fq = None
if axes is not None:
try:
# It's not my fault. Neither supports __contains__ nor .get
tm = axes.data['TIME']
except KeyError:
tm = None
try:
fq = axes.data['FREQUENCY']
except KeyError:
fq = None
if tm is not None:
# Fix dimensions (whyever they are (1, x) in the first place)
time_axis = np.squeeze(tm)
else:
# Otherwise, assume it's linear.
time_axis = np.linspace(
0, data.shape[1] - 1, num=data.shape[1]) * t_delt # pylint: disable=E1101
if fq is not None:
freq_axis = np.squeeze(fq)
else:
freq_axis = np.linspace(
0, data.shape[0] - 1, num=data.shape[0]) * f_delt + f_init # pylint: disable=E1101
# Remove duplicate entries on the borders
left = 1
while freq_axis[left] == freq_axis[0]:
left += 1
right = data.shape[0] - 1
while freq_axis[right] == freq_axis[-1]:
right -= 1
c_left = left - 1
c_right = right + 2
if c_left > 0:
data.data[:c_left, :] = cls.MISSING_VALUE
data.mask[:c_left, :] = True
if c_right < (len(freq_axis) - 1):
data.data[c_right:, :] = cls.MISSING_VALUE
data.mask[c_right:, :] = True
# mask the zigzag pattern
zigzag = [*range(255), *range(255)]
for row_index in range(data.shape[0]):
if np.array_equal(data[row_index, :len(zigzag)], zigzag):
data.data[row_index] = cls.MISSING_VALUE
data.mask[row_index] = True
content = header["CONTENT"]
instruments = {header["INSTRUME"]}
axes_header = None
if axes:
axes_header = axes.header
fl.close()
return cls(
data, time_axis, freq_axis, start, end, t_init, t_delt,
t_label, f_label, content, instruments,
header, axes_header, swapped, filename
)
def __init__(self, data, time_axis, freq_axis, start, end,
t_init=None, t_delt=None, t_label="Time", f_label="Frequency",
content="", instruments=None, header=None, axes_header=None,
swapped=False, filename=None):
# Because of how object creation works, there is no avoiding
# unused arguments in this case.
# pylint: disable=W0613
if not isinstance(data, np.ma.MaskedArray):
data = np.ma.array(data, dtype=self.ARRAY_TYPE, mask=False)
super(CallistoSpectrogram, self).__init__(
data, time_axis, freq_axis, start, end,
t_init, t_delt, t_label, f_label,
content, instruments
)
self.header = header
self.axes_header = axes_header
self.swapped = swapped
if filename:
head, tail = ntpath.split(filename)
self.filename = tail or ntpath.basename(head)
else:
self.filename = None
[docs] @classmethod
def is_datasource_for(cls, header):
"""
Check if class supports data from the given FITS file.
:param header: `~astropy.io.fits.Header` main header of the FITS file
"""
return header.get('instrume', '').strip() in cls.INSTRUMENTS
[docs] def remove_border(self):
"""
Remove duplicate entries on the borders.
"""
left = 0
while self.freq_axis[left] == self.freq_axis[0]:
left += 1
right = self.shape[0] - 1
while self.freq_axis[right] == self.freq_axis[-1]:
right -= 1
return self[left - 1:right + 2, :]
[docs] @classmethod
def read_many(cls, filenames, sort_by=None):
"""
Returns a list of CallistoSpectrogram objects read from filenames.
:param list of str filenames: list of paths to read from.
:param str sort_by: optional attribute of the resulting objects to sort from, e.g. start to sort by starting time.
"""
objs = list(map(cls.read, filenames))
if sort_by is not None:
objs.sort(key=lambda x: getattr(x, sort_by))
return objs
[docs] @classmethod
def from_range(cls, instrument, start, end):
"""
Automatically download data from instrument between start and end and
join it together.
:param str instrument: instrument to retrieve the data from.
:param `~sunpy.time.parse_time` start: compatible start of the measurement.
:param `~sunpy.time.parse_time` end: compatible end of the measurement.
"""
if SUNPY_LT_1:
start = parse_time(start)
end = parse_time(end)
else:
start = parse_time(start).datetime
end = parse_time(end).datetime
urls = query(start, end, [instrument])
data = list(map(cls.from_url, urls))
freq_buckets = defaultdict(list)
for elem in data:
freq_buckets[tuple(elem.freq_axis)].append(elem)
try:
return cls.combine_frequencies(
[cls.new_join_many(elem) for elem in freq_buckets.values()]
)
except ValueError:
raise ValueError("No data found.")
def _overlap(self, other):
"""
Find frequency and time overlap of two spectrograms.
"""
one, two = self.intersect_time([self, other])
ovl = one.freq_overlap(two)
return one.clip_freq(*ovl), two.clip_freq(*ovl)
@staticmethod
def _to_minimize(a, b):
"""
Function to be minimized for matching to frequency channels.
"""
def _fun(p):
if p[0] <= 0.2 or abs(p[1]) >= a.max():
return float("inf")
return a - (p[0] * b + p[1])
return _fun
def _homogenize_params(self, other, maxdiff=1):
"""
Return triple with a tuple of indices (in self and other,
respectively), factors and constants at these frequencies.
:param `radiospectra.CallistoSpectrogram` other: Spectrogram to be homogenized with the current one.
:param float maxdiff: Threshold for which frequencies are considered equal.
"""
pairs_indices = [
(x, y) for x, y, d in minimal_pairs(self.freq_axis, other.freq_axis)
if d <= maxdiff
]
pairs_data = [
(self[n_one, :], other[n_two, :]) for n_one, n_two in pairs_indices
]
# XXX: Maybe unnecessary.
pairs_data_gaussian = [
(gaussian_filter1d(a, 15), gaussian_filter1d(b, 15))
for a, b in pairs_data
]
# If we used integer arithmetic, we would accept more invalid
# values.
pairs_data_gaussian64 = np.float64(pairs_data_gaussian)
least = [
leastsq(self._to_minimize(a, b), [1, 0])[0]
for a, b in pairs_data_gaussian64
]
factors = [x for x, y in least]
constants = [y for x, y in least]
return pairs_indices, factors, constants
[docs] def homogenize(self, other, maxdiff=1):
"""
Return overlapping part of self and other as (self, other) tuple.
Homogenize intensities so that the images can be used with
combine_frequencies. Note that this works best when most of the picture
is signal, so use :meth:`in_interval` to select the subset of your
image before applying this method.
:param `radiospectra.CallistoSpectrogram` other: Spectrogram to be homogenized with the current one.
:param float maxdiff: Threshold for which frequencies are considered equal.
"""
one, two = self._overlap(other)
pairs_indices, factors, constants = one._homogenize_params(
two, maxdiff
)
# XXX: Maybe (xd.freq_axis[x] + yd.freq_axis[y]) / 2.
pairs_freqs = [one.freq_axis[x] for x, y in pairs_indices]
# XXX: Extrapolation does not work this way.
# XXX: Improve.
f1 = np.polyfit(pairs_freqs, factors, 3)
f2 = np.polyfit(pairs_freqs, constants, 3)
return (one,
two * np.polyval(f1, two.freq_axis)[:, np.newaxis] +
np.polyval(f2, two.freq_axis)[:, np.newaxis])
[docs] def extend(self, minutes=15, **kwargs):
"""
Requests subsequent files from the server. If minutes is negative, retrieve preceding files.
"""
if len(self.instruments) != 1:
raise ValueError
instrument = next(iter(self.instruments))
if minutes > 0:
data = CallistoSpectrogram.load_from_range(
instrument,
self.end, self.end + datetime.timedelta(minutes=minutes)
)
elif minutes < 0:
data = CallistoSpectrogram.load_from_range(
instrument,
self.start - datetime.timedelta(minutes=-minutes), self.start
)
else:
return self
filtered_data = list()
for current_data in data:
if current_data.header["PWM_VAL"] == self.header["PWM_VAL"]:
filtered_data.append(current_data)
return CallistoSpectrogram.new_join_many([self] + filtered_data, **kwargs)
[docs] @classmethod
def from_url(cls, url):
"""
Returns CallistoSpectrogram read from URL.
:param str url: URL to retrieve the data from
:returns: newSpectrogram `radiospectra.CallistoSpectrogram`
"""
return cls.read(url)
[docs] @classmethod
def load_from_range(cls, instrument, start, end, **kwargs):
"""
Automatically download data from instrument between start and
end.
:param str instrument: instrument to retrieve the data from.
:param `~sunpy.time.parse_time` start: compatible start of the measurement.
:param `~sunpy.time.parse_time` end: compatible end of the measurement.
"""
kw = {
'maxgap': None,
'fill': cls.JOIN_REPEAT,
}
kw.update(kwargs)
if SUNPY_LT_1:
start = parse_time(start)
end = parse_time(end)
else:
start = parse_time(start).datetime
end = parse_time(end).datetime
urls = query(start, end, [instrument])
specs = list(map(cls.from_url, urls))
return specs
[docs] @classmethod
def new_join_many(cls, specs , polarisations=False):
"""
Produce new Spectrogram that contains spectrograms
joined together in time and frequency.
:param list specs: List of CallistoSpectrogram's to join together in time.
:param bool: TODO
"""
# checks
if not specs:
raise ValueError("Need at least one spectrogram.")
if len(specs) == 1:
return specs[0]
# sorts specs
specs = sorted(specs, key=lambda x: x.start)
# initiates combine_polarisations or gives separated spectrograms back
if polarisations:
sorting_dict = cls.detect_and_combine_polarisations(specs)
else:
pwm_val_list = set(
map(lambda x: "{}_{}".format(x.header['PWM_VAL'], x.filename.split('.')[0].split('_')[-1]), specs))
sorting_dict = dict(map(lambda x: (x, []), pwm_val_list))
for spec in specs:
sorting_dict["{}_{}".format(spec.header['PWM_VAL'], spec.filename.split('.')[0].split('_')[-1])].append(
spec)
map(lambda x: x.start, list(sorting_dict.values()))
if len(specs) == 1:
return specs[0]
joined_spec_list = []
for PWM, sorted_specs in sorting_dict.items():
if not isinstance(sorted_specs[0], CallistoSpectrogram):
raise ValueError("Can only combine CallistoSpectrogram's.")
instr = sorted_specs[0].header['INSTRUME']
delta = sorted_specs[0].header['CDELT1']
first_time_point = sorted_specs[0].start + \
datetime.timedelta(seconds=sorted_specs[0].time_axis[0])
last_time_point = sorted_specs[0].start + \
datetime.timedelta(seconds=sorted_specs[0].time_axis[-1])
for spec in sorted_specs[1:]:
if not isinstance(spec, CallistoSpectrogram):
raise ValueError("Can only combine CallistoSpectrogram's.")
if spec.header['INSTRUME'] != instr:
raise ValueError(
"Can only combine spectrogram's from the same instrument.")
if spec.header['CDELT1'] != delta:
raise ValueError(
"Can only combine spectrogram's with the same time delta (CDELT1).")
cur_start = spec.start + \
datetime.timedelta(seconds=spec.time_axis[0])
cur_end = spec.start + \
datetime.timedelta(seconds=spec.time_axis[-1])
if cur_start < first_time_point:
first_time_point = cur_start
if cur_end > last_time_point:
last_time_point = cur_end
new_header = sorted_specs[0].get_header()
new_axes_header = sorted_specs[0].axes_header
borderless_specs = [sp.mark_border() for sp in sorted_specs]
new_freq_axis = np.array(
sorted(_union(set(sp.freq_axis) for sp in borderless_specs), key=lambda y: -y))
new_time_axis = np.arange(
0, (last_time_point - first_time_point).total_seconds() + 0.00001, delta)
for spec in sorted_specs:
curr_start = spec.start + \
datetime.timedelta(seconds=spec.time_axis[0])
diff = (curr_start - first_time_point).total_seconds()
diff_index = int(diff / delta)
new_time_axis[diff_index:diff_index +
spec.time_axis.shape[0]] = (spec.time_axis + diff)
nan_arr = np.empty((len(new_freq_axis), len(
new_time_axis)), dtype=cls.ARRAY_TYPE)
nan_arr[:] = cls.MISSING_VALUE
new_data = np.ma.array(nan_arr, mask=True)
# fill new data array
for sp in borderless_specs:
if np.array_equal(new_freq_axis, sp.freq_axis):
c_start_time = (
(sp.start + datetime.timedelta(seconds=sp.time_axis[0])) - first_time_point).total_seconds()
temp_pos_time = np.where(new_time_axis == c_start_time)
if len(temp_pos_time[0]) == 1:
new_pos_time = temp_pos_time[0][0]
new_data[:, new_pos_time:new_pos_time +
sp.shape[1]] = sp.data[:, :]
new_data.mask[:, new_pos_time:new_pos_time +
sp.shape[1]] = sp.data.mask[:, :]
else:
for pos_freq in range(sp.shape[0]):
c_freq = sp.freq_axis[pos_freq]
new_pos_freq = np.where(new_freq_axis == c_freq)[0][0]
c_start_time = ((sp.start + datetime.timedelta(
seconds=sp.time_axis[0])) - first_time_point).total_seconds()
temp_pos_time = np.where(new_time_axis == c_start_time)
if len(temp_pos_time[0]) == 1:
new_pos_time = temp_pos_time[0][0]
new_data[new_pos_freq, new_pos_time:new_pos_time +
sp.shape[1]] = sp.data[pos_freq, :]
new_data.mask[new_pos_freq, new_pos_time:new_pos_time + sp.shape[1]] = sp.data.mask[
pos_freq, :]
ftp_time = first_time_point.time()
second_of_day = ftp_time.hour * 3600 + ftp_time.minute * 60 + ftp_time.second
params = {
'time_axis': new_time_axis,
'freq_axis': new_freq_axis,
'start': first_time_point,
'end': last_time_point,
't_delt': delta,
't_init': second_of_day,
't_label': sorted_specs[0].t_label,
'f_label': sorted_specs[0].f_label,
'content': sorted_specs[0].content,
'instruments': _union(spec.instruments for spec in sorted_specs),
'filename': f'JOINED_{sorted_specs[0].filename}'
}
new_header['DATE-OBS'] = min([x.get_header()['DATE-OBS']
for x in sorted_specs])
new_header['TIME-OBS'] = min([x.get_header()['TIME-OBS']
for x in sorted_specs])
new_header['DATE-END'] = max([x.get_header()['DATE-END']
for x in sorted_specs])
new_header['TIME-END'] = max([x.get_header()['TIME-END']
for x in sorted_specs])
new_header['DATAMIN'] = min(
[x.get_header()['DATAMIN'] for x in sorted_specs])
new_header['DATAMAX'] = max(
[x.get_header()['DATAMAX'] for x in sorted_specs])
new_header['CRVAL1'] = second_of_day
new_header['CRVAL2'] = len(new_freq_axis)
new_axes_header['NAXIS1'] = int(
new_axes_header['BITPIX']) * (len(new_time_axis) + len(new_freq_axis))
new_axes_header['TFORM1'] = str(len(new_time_axis)) + "D8.3"
new_axes_header['TFORM2'] = str(len(new_freq_axis)) + "D8.3"
joined_spec = CallistoSpectrogram(
new_data, header=new_header, axes_header=new_axes_header, **params)
joined_spec.adjust_header()
joined_spec_list.append(joined_spec)
if len(joined_spec_list) == 1:
return joined_spec_list[0]
else:
return joined_spec_list
[docs] @classmethod
def detect_and_combine_polarisations(cls, specs):
"""Takes a list of spectrogram and evaluates and processes all polarisations.
:param specs: A list of spectrograms.
:returns: A dictionary of all spectrograms sorted by there PWM_VAL and lists for all combined polarisations.
"""
pwm_val_list = set(map(lambda x: x.header['PWM_VAL'], specs))
sorted_work_dict = dict(map(lambda x: (x, []), pwm_val_list))
pwm_val_list = set(
map(lambda x: "{}_{}".format(x.header['PWM_VAL'], x.filename.split('.')[0].split('_')[-1]), specs))
sorting_dict = dict(map(lambda x: (x, []), pwm_val_list))
pol_val_list = set(map(lambda x: "{}_{}".format(
x.header['PWM_VAL'], "Pol"), specs))
sorting_dict.update(map(lambda x: (x, []), pol_val_list))
for spec in specs:
sorting_dict["{}_{}".format(spec.header['PWM_VAL'], spec.filename.split('.')[0].split('_')[-1])].append(
spec)
sorted_work_dict[spec.header['PWM_VAL']].append(spec)
map(lambda x: x.start, list(sorting_dict.values()))
for PWM, specs_list in sorted_work_dict.items():
for index, spec1 in enumerate(specs_list[:-1]):
spec2 = specs_list[index + 1]
delta1 = float(spec1.header['CDELT1'])
delta2 = float(spec1.header['CDELT1'])
if abs(delta1 - delta2) > 0.000001:
continue
if spec1.header['INSTRUME'] != spec2.header['INSTRUME']:
continue
if spec1.shape != spec2.shape:
continue
if abs((spec1.start - spec2.start).total_seconds()) > delta1:
continue
if not np.array_equal(spec1.freq_axis, spec2.freq_axis):
continue
if not np.array_equal(spec1.time_axis, spec2.time_axis):
continue
merged_spec = CallistoSpectrogram.combine_polarisation(
specs[index], specs[index + 1])
sorting_dict["{}_{}".format(
merged_spec.header['PWM_VAL'], "Pol")].append(merged_spec)
del_list = []
for PWM, specs_list in sorting_dict.items():
if not specs_list:
del_list.append(PWM)
for val in del_list:
del sorting_dict[val]
return sorting_dict
[docs] @classmethod
def combine_polarisation(cls, spec1, spec2):
"""
Compares and combines two spectrograms that are polarisations of the same event
:param spec1: CallistoSpectrogram, The first polarized spectrogram.
:param spec2: CallistoSpectrogram, The second polarized spectrogram.
"""
def pythagoras_combine(a, b):
return (a ** 2 + b ** 2) ** 0.5
# checks
delta1 = float(spec1.header['CDELT1'])
delta2 = float(spec1.header['CDELT1'])
if abs(delta1 - delta2) > 0.000001:
raise ValueError('CDELT1 of spectrograms are not the same')
if spec1.header['INSTRUME'] != spec2.header['INSTRUME']:
raise ValueError('Instruments of spectrograms are not the same')
if spec1.shape != spec2.shape:
raise ValueError('Shapes of spectrograms not the same')
if abs((spec1.start - spec2.start).total_seconds()) > delta1:
raise ValueError(
'Start times of spectrograms are too far from each other')
if not np.array_equal(spec1.freq_axis, spec2.freq_axis):
raise ValueError('Frequency axes of spectrograms are not the same')
if not np.array_equal(spec1.time_axis, spec2.time_axis):
raise ValueError('Time axes of spectrograms are not the same')
merge_funk = np.vectorize(
pythagoras_combine, excluded=[cls.MISSING_VALUE])
merged_matrix = merge_funk(spec1.data, spec2.data)
x = np.logical_or(spec1.data.mask, spec2.data.mask)
merged_matrix = np.ma.array(merged_matrix, mask=x)
merged_spec = spec1._with_data(merged_matrix)
merged_spec.header["DATAMIN"] = merged_matrix.min()
merged_spec.header["DATAMAX"] = merged_matrix.max()
return merged_spec
[docs] def remove_single_freq_rfi(self, threshold=17, row_window_height=3):
"""
Detects rfi in single frequencies and masks the data
:param Num threshold: Minimal Intensity difference between the row and the mean of the surrounding rows to be flagged as rfi.
:param int row_window_height: The amount of rows before and after should be considered for the surrounding rows.
"""
rfi_mask = np.full(self.shape, False)
for row_index in range(self.shape[0]):
aff_row = self.data[row_index]
rows_before = self.data[max(
row_index - row_window_height, 0):row_index]
rows_after = self.data[row_index +
1:min(row_index + 1 + row_window_height, self.shape[0])]
window_rows = np.concatenate([rows_before, rows_after], axis=0)
window_mean = np.mean(window_rows, axis=0)
diff = abs(aff_row - window_mean)
rfi_mask[row_index, np.where(diff >= threshold)] = True
new_data = self.data.copy()
new_data.mask[np.where(rfi_mask)] = True
new_data.data[np.where(rfi_mask)] = self.MISSING_VALUE
return self._with_data(new_data)
[docs] def mark_border(self):
"""
Mark duplicate entries on the borders.
"""
left = 1
while self.freq_axis[left] == self.freq_axis[0]:
left += 1
right = self.data.shape[0] - 1
while self.freq_axis[right] == self.freq_axis[-1]:
right -= 1
c_left = left - 1
c_right = right + 2
if c_left > 0:
self.data.data[:c_left, :] = self.MISSING_VALUE
self.data.mask[:c_left, :] = True
if c_right < (len(self.freq_axis) - 1):
self.data.data[c_right:, :] = self.MISSING_VALUE
self.data.mask[c_right:, :] = True
return self
CallistoSpectrogram._create.add(
run_cls('from_range'),
lambda cls, instrument, start, end: True,
check=False
)
try:
CallistoSpectrogram.create.__func__.__doc__ = (
""" Create CallistoSpectrogram from given input dispatching to the
appropriate from_* function.
Possible signatures: """ + CallistoSpectrogram._create.generate_docs())
except AttributeError:
CallistoSpectrogram.create.__func__.__doc__ = (
""" Create CallistoSpectrogram from given input dispatching to the
appropriate from_* function.
Possible signatures: """ + CallistoSpectrogram._create.generate_docs())