Source code for irisreader.preprocessing.image_cube_cropper

#!/usr/bin/env python

import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from tqdm import tqdm
import irisreader as ir
from irisreader.preprocessing import image_cropper, CorruptImageException, NullImageException

[docs]class image_cube_cropper( BaseEstimator, TransformerMixin ): """Implements a transformer that can crop all images of an observed line (be it SJI or spectra) by applying image_cropper to every individual image and identifying corrupt images and outliers. Corrupt images can either be identified directly when :py:class:`image_cropper` throws an error because more than 5% of the pixels of the border or the overall image are negative or, however, they can appear as outliers in the bound data returned by :py:class:`image_cropper`: sometimes whole stripes of data are corrupt, resulting in a rectangular image with for example half the width and the height of the valid image. We thus look for outliers in the data returned by :py:class:`image_cropper` and degrade them to corrupt images. Outliers are defined here as values that deviate more than 1.5% from the median bound (15 pixels on 1000 pixels). Parameters ---------- offset : integer Number of pixels that are removed as a safety border from all sides after the cropping. check_coverage : boolean Whether to check the coverage of the cropped image. It can happen that there are patches of negative values in images, either due to loss of data during transmission (typically a band or a large rectangular patch of negative data) or due to overall low data counts (missing data is no data). image_cropper labels an image as corrupt if >5% of its pixels are still negative after cropping. This might be problematic for lines with low data counts (and therefore many missing pixels) and the user is advised to disable the coverage check for such lines. A method that is able to distinguish missing data arising from transmission errors from missing data due to low data counts could be helpful here. """ # constructor def __init__( self, offset=0, check_coverage=True ): self._xmin = self._xmax = self._ymin = self._ymax = 0 self._data_cube_object = None self._offset = offset self._check_coverage = check_coverage self._corrupt_images = [] self._null_images = [] # function to crop a single image (in order to parallelize it later) # def _crop_image( self, step ): # cropper = image_cropper( offset=self._offset ) # image_bounds = [] # corrupt_images = [] # null_images = [] # # try: # cropper.fit( self._data_cube_object.get_image_step( step ) ) # image_bounds = cropper.get_bounds() # # except CorruptImageException: # image_bounds = [0,0,0,0] # corrupt_images.append( step ) # # except NullImageException: # image_bounds = [0,0,0,0] # null_images.append( step ) # # return image_bounds, corrupt_images, null_images # fit method: find boundaries
[docs] def fit( self, X, y=None ): """ Determines the overall cropping bounds and the corrupt images for the supplied data cube. Parameters ---------- X : irisreader.iris_data_cube (irisreader.sji_cube / irisreader.raster_cube) Data cube object that has to be cropped. Returns ------- image_cube_cropper This object with fitted variables. """ # make sure only iris_data_cube objects are taken as input if isinstance( X, ir.iris_data_cube ) or isinstance( X, ir.sji_cube ) or isinstance( X, ir.raster_cube ): self._data_cube_object = X else: raise ValueError("This is not a valid data cube object.") # make sure observation is a sit-and-stare if the observation is a slit-jaw image if self._data_cube_object.type == 'sji' and self._data_cube_object.mode != "sit-and-stare": raise ValueError("Only sit-and-stare observation can be cropped as a cube!") # set up image cropper cropper = image_cropper( offset=self._offset, check_coverage=self._check_coverage ) # get bounds on all images in the cube and store null and corrupt images image_bounds = [] # create progress bar for crop if verbosity_level is >= 1 step_range = range( self._data_cube_object.n_steps ) if ir.config.verbosity_level >= 1: step_range = tqdm( step_range ) for step in step_range: try: cropper.fit( self._data_cube_object.get_image_step( step ) ) image_bounds.append( cropper.get_bounds() ) except CorruptImageException: image_bounds.append( [0,0,0,0] ) self._corrupt_images.append( step ) except NullImageException: image_bounds.append( [0,0,0,0] ) self._null_images.append( step ) # pool = multiprocessing.Pool() # res = pool.map( self._crop_image, range( self._data_cube_object.n_steps ) ) # # return res image_bounds = np.vstack( image_bounds ) # define outliers as values that deviate more than 1.5% from the # median bound (20 pixels on 1000 pixels) # TODO: how to put this onto a more rigourous footing? outlier_threshold = 0.02 width, height = self._data_cube_object.get_image_step( 0 ).shape outlier_score = np.max(np.abs(image_bounds - np.median(image_bounds, axis=0)) / (np.array([width, width, height, height])), axis=1) # add outliers to corrupt images and remove null images from corrupt images self._corrupt_images += ( np.where( outlier_score >= outlier_threshold )[0].tolist() ) self._corrupt_images = sorted( list( set( self._corrupt_images ) - set( self._null_images ) ) ) self._null_images = sorted( self._null_images ) # set boundaries as the value closest to the center of the image of all # the inlier images self._xmin = np.max( image_bounds[outlier_score < outlier_threshold, 0] ) self._xmax = np.min( image_bounds[outlier_score < outlier_threshold, 1] ) self._ymin = np.max( image_bounds[outlier_score < outlier_threshold, 2] ) self._ymax = np.min( image_bounds[outlier_score < outlier_threshold, 3] ) return self
# function to return boundaries
[docs] def get_bounds( self ): """ Gets the overall bounds recovered by image_cube_cropper. Returns ------- int List [ xmin, xmax, ymin, ymax ] for the given data cube. """ if self._data_cube_object is None: raise ValueError("Please pass a data cube first using the fit method.") else: return [self._xmin, self._xmax, self._ymin, self._ymax]
# function to return corrupt images
[docs] def get_corrupt_images( self ): """ Gets time-step indices of corrupt images in the data cube. Returns ------- int Time-step indices of corrupt images in the data cube. """ if self._data_cube_object is None: raise ValueError("Please pass a data cube first using the fit method.") else: return self._corrupt_images
# function to return corrupt images
[docs] def get_null_images( self ): """ Gets time-step indices of null images in the data cube (if any, usually data cubes are read with keep_null=False). Returns ------- int Time-step indices of null images in the data cube. """ if self._data_cube_object is None: raise ValueError("Please pass a data cube first using the fit method.") else: return self._null_images
# Test code if __name__ == "__main__": import os from irisreader import observation obs = observation( "/home/chuwyler/Desktop/FITS/20140906_112339_3820259253/" ) sji = obs.sji[0] sji.plot(0) cropper = image_cube_cropper() cropper.fit( sji ) sji._set_bounds( cropper.get_bounds() ) sji.n_steps sji._remove_steps( cropper.get_null_images() ) sji.n_steps sji._remove_steps( cropper.get_corrupt_images() ) sji.n_steps sji.plot(400) raster3_dir = "/home/chuwyler/Desktop/FITS/20150404_155958_3820104165" raster3_files = sorted( [raster3_dir + "/" + file for file in os.listdir( raster3_dir ) if 'raster' in file] ) raster3 = raster_cube( raster3_files, line="Mg" ) raster3.plot(0) cropper = image_cube_cropper() cropper.fit( raster3 )