Source code for irisreader.utils.download

#!/usr/bin/env python3

# import libraries
import re, os
import requests
from bs4 import BeautifulSoup
from tqdm import tqdm
import gzip, shutil, tarfile
import math
import pandas as pd
import irisreader as ir

# Function to parse directory listing
def parse_url_content( url ):
    page = requests.get( url ).text
    soup = BeautifulSoup( page, 'html.parser' )
    rows = soup.find_all( "tr" )
    ret = []
            
    for i in range( 3, len(rows)-1 ):
        cols = rows[i].find_all( "td" )
        filename = cols[1].find_all( 'a', href=True)[0]['href']
        if filename[-3:] == '.gz':
            ret.append( {'file': filename, 'modified': cols[2].get_text(), 'size': cols[3].get_text() } )
        
    return pd.DataFrame( ret )

# Function to download a single file
def download_file( url, path ):
    r = requests.get(url, stream=True)

    # Total size in bytes.
    total_size = int(r.headers.get('content-length', 0)); 
    block_size = 1024
    wrote = 0 

    filename = path + "/" + os.path.basename( url )
    if os.path.exists( filename[:-3] ) or os.path.exists( filename[:-7] + "_t000_r00000.fits" ) or (os.path.exists( filename ) and os.path.getsize( filename ) == total_size ):
        print( os.path.basename(url) + ": File already exists" )
        return True
    else:
        print( "\nDownloading " + os.path.basename(url) )

    with open( filename, 'wb' ) as f:
        for data in tqdm(r.iter_content(block_size), total=math.ceil(total_size//block_size) , unit='KB', unit_scale=True):
            wrote = wrote  + len(data)
            f.write(data)
    if total_size != 0 and wrote != total_size:
        raise Exception("Download error - something went wrong")
        return False
    else:
        return True
    
# Function to extract all files in a path
def extract_all( path ):
    
    # extract all gzip files
    gz_files = [file for file in os.listdir( path ) if file[-3:]=='.gz']
    if len( gz_files ) > 0:
        print( "extracting files.." )
    
    for f in gz_files:
        #print( "extracting " + path + "/" + f )
        with open( path + "/" + f[:-3], 'wb') as extracted_file, gzip.open( path + "/" + f, 'rb') as gzip_file:
                shutil.copyfileobj( gzip_file, extracted_file )
        os.remove( path + "/" + f )
    
    # extract tar files if necessary
    tar_files = [file for file in os.listdir( path ) if file[-4:]=='.tar']
    for f in tar_files:
        #print( "extracting " + path + "/" + f )
        tar = tarfile.open( path + "/" + f )
        tar.extractall( path=path )
        tar.close()
        os.remove( path + "/" + f )
       
# Function to download an observation
[docs]def download( obs_identifier, target_directory, type='all', uncompress=True, open_obs=True, mirror=None ): """ Downloads a given IRIS observation. Parameters ---------- obs_identifier : str Observation identifier in the form yyyymmdd_hhmmss_oooooooooo, e.g. 20140323_052543_3860263227 target_directory : str Path to store downloaded observation to (defaults to home directory) type : str Type of data to download: 'all': all data 'sji': only SJI files 'raster': only raster files uncompress : bool Uncompress files after download? (automatically set to True if open_obs is True) open_obs : bool Immediately open observation and return observation object? Otherwise a boolean indicating download success is returned mirror : str Mirror to be used: 'lmsal': LMSAL (http://www.lmsal.com/solarsoft/irisa/data/level2_compressed/) 'uio': University of Oslo (http://sdc.uio.no/vol/fits/iris/level2/) Returns ------- An open observation handle or a boolean indicating download success. """ # if user desires to open observation then uncompress anyway if open_obs: uncompress = True # set mirror url if mirror is None: mirror = ir.config.default_mirror if not mirror in ir.config.mirrors.keys(): raise ValueError("The mirror you specified does not exist! Available mirrors: ", ir.config.mirrors.keys() ) else: download_url = ir.config.mirrors[ mirror ] # extract year, month and day from obs identifier m = re.search('([\d]{4})([\d]{2})([\d]{2})_([\d]{6})_([\d]{10})', obs_identifier ) if m: year, month, day, time, obsid = m.group(1), m.group(2), m.group(3), m.group(4), m.group(5) else: raise ValueError("Please pass an obs identifier in the form of yyyymmdd_hhmmss_oooooooooo.") # create directory url obs_url = download_url + year + "/" + month + "/" + day + "/" + obs_identifier # get directory listing and filter for SJI or raster if necessary listing = parse_url_content( obs_url + "/" ) if len( listing ) == 0: raise Exception( "Something went wrong with getting the observation directory content! Please check whether your data mirror path is correct:\n {}\n Directory Listing: {}".format( ir.config.mirrors[ir.config.default_mirror], listing ) ) listing_sji = listing[[('_SJI_' in filename and filename[-2:] == 'gz') for filename in listing['file']]] listing_raster = listing[[('_raster' in filename and filename[-2:] == 'gz') for filename in listing['file']]] if type == 'sji': listing = listing_sji elif type == 'raster': listing = listing_raster else: listing = listing_sji.append( listing_raster ) # create directory if necessary local_path = target_directory + "/" + obs_identifier if not os.path.exists( local_path ): os.mkdir( local_path ) # download files download_status = True for filename in listing['file']: url = obs_url + "/" + filename ret = download_file( url, path=target_directory + "/" + obs_identifier ) download_status = ret and download_status # uncompress if desired if uncompress: extract_all( local_path ) # return observation object if desired - otherwise return download status if open_obs: from irisreader import observation return observation( target_directory + "/" + obs_identifier )