karabo.util package

Subpackages

Submodules

karabo.util.config_util module

karabo.util.dask module

Module for dask-related functionality.

class DaskHandler

Bases: DaskHandlerBasic

Public & dev API for dask associated functionality.

This is the public dask-api for Karabo, where you don’t have to worry which dask-handler of this module to use. You can do almost everything through this class. The only exception is, if you want to adjust the default settings on a slurm-system (customization through DaskHandlerSlurm).

dask_client

The Dask client object. If None, a new client will be created.

Type:

distributed.client.Client | None

memory_limit

The memory_limit per worker in GB. If None, the memory limit will be set to the maximum available memory on the node (see documentation) in dask for memory_limit.

Type:

float | None

n_threads_per_worker

The number of threads to use per worker. Standard is None, which means that the number of threads will be equal to the number of cores.

Type:

int | None

use_dask

Whether to use Dask or not. If None, then Karabo will decide whether to use dask or not for certain tasks.

Type:

bool | None

use_processes

Use processes instead of threads? Threads:

  • Fast to initiate.

  • No need to transfer data to them.

  • Limited by the GIL, which allows one thread to read the code at once.

Processes:
  • Take time to set up.

  • Slow to transfer data to.

  • Each have their own GIL and so don’t need to take turns reading the code.

Type:

bool

classmethod get_dask_client() Client

Get (create if not exists) a dask-client.

Returns:

Dask-client.

classmethod parallelize_with_dask(iterate_function: Callable[[...], Any], iterable: Iterable[Any], *args: Any, **kwargs: Any) Any | Tuple[Any, ...] | List[Any]

Run a function over an iterable in parallel using dask, and gather the results.

args & kwargs will get passed to Delayed.

Parameters:
  • iterate_function – The function to be applied to each element of iterable. The function takes the current element of the iterable as its first argument, followed by any positional arguments, and then any keyword arguments.

  • iterable – The iterable over which the function will be applied. Each element of iterable will be passed to iterate_function.

Returns: A tuple containing the results of the iterate_function for each

element in the iterable. The results are gathered using dask’s compute function.

classmethod setup() None

Calls get_dask_client.

classmethod should_dask_be_used(override: bool | None = None) bool

Util function to decide whether dask should be used or not.

Parameters:

override – Override? Has highest priority.

Returns:

Decision whether dask should be used or not.

class DaskHandlerBasic

Bases: object

Base-class for dask-handler functionality.

dask_client

The Dask client object. If None, a new client will be created.

Type:

distributed.client.Client | None

memory_limit

The memory_limit per worker in GB. If None, the memory limit will be set to the maximum available memory on the node (see documentation) in dask for memory_limit.

Type:

float | None

n_threads_per_worker

The number of threads to use per worker. Standard is None, which means that the number of threads will be equal to the number of cores.

Type:

int | None

use_dask

Whether to use Dask or not. If None, then Karabo will decide whether to use dask or not for certain tasks.

Type:

bool | None

use_processes

Use processes instead of threads? Threads:

  • Fast to initiate.

  • No need to transfer data to them.

  • Limited by the GIL, which allows one thread to read the code at once.

Processes:
  • Take time to set up.

  • Slow to transfer data to.

  • Each have their own GIL and so don’t need to take turns reading the code.

Type:

bool

dask_client: Client | None = None
classmethod get_dask_client() Client

Get (create if not exists) a dask-client.

Returns:

Dask-client.

memory_limit: float | None = None
n_threads_per_worker: int | None = None
classmethod parallelize_with_dask(iterate_function: Callable[[...], Any], iterable: Iterable[Any], *args: Any, **kwargs: Any) Any | Tuple[Any, ...] | List[Any]

Run a function over an iterable in parallel using dask, and gather the results.

args & kwargs will get passed to Delayed.

Parameters:
  • iterate_function – The function to be applied to each element of iterable. The function takes the current element of the iterable as its first argument, followed by any positional arguments, and then any keyword arguments.

  • iterable – The iterable over which the function will be applied. Each element of iterable will be passed to iterate_function.

Returns: A tuple containing the results of the iterate_function for each

element in the iterable. The results are gathered using dask’s compute function.

classmethod setup() None

Calls get_dask_client.

classmethod should_dask_be_used(override: bool | None = None) bool

Util function to decide whether dask should be used or not.

Parameters:

override – Override? Has highest priority.

Returns:

Decision whether dask should be used or not.

use_dask: bool | None = None
use_processes: bool = False
class DaskHandlerSlurm

Bases: DaskHandlerBasic

Dask-handler for slurm-based jobs.

use_workers_or_nannies

Whether to use workers or nannies (default). This could lead to more processing (see documentation for dask usage in Karabo).

Type:

Literal[‘workers’, ‘nannies’]

n_workers_scheduler_node

The number of workers to start on the scheduler node.

Type:

int

timeout

Timeout in seconds for the dask-scheduler to wait for all the workers to connect.

Type:

int

classmethod get_dask_client() Client

Get (create if not exists) a dask-client for a SLURM environment.

Returns:

Dask-client.

classmethod get_node_id() int

Gets the current node-id.

Returns:

Node-id.

classmethod get_node_name() str

Gets the current node-name.

Returns:

Node-name.

classmethod get_number_of_nodes() int

Gets the number of nodes of the slurm-job.

Returns:

Number of nodes.

classmethod is_first_node() bool

Util function to check if current-node is fist-node.

Returns:

Check-result.

classmethod is_on_slurm_cluster() bool

Util function to check if code is running in a slurm-job.

Returns:

Check-result.

n_workers_scheduler_node: int = 1
classmethod should_dask_be_used(override: bool | None = None) bool

Util function to decide whether dask should be used or not.

This implementation differs a bit from the basic-class, where

on SLURM-systems, additional checks are taken into consideration.

Parameters:

override – Override? Has highest priority.

Returns:

Decision whether dask should be used or not.

timeout: int = 60
use_workers_or_nannies: Literal['workers', 'nannies'] = 'nannies'

karabo.util.data_util module

Gauss(x: bool | int | integer | float | floating | ndarray[Any, dtype[int64 | float64]], x0: bool | int | integer | float | floating | ndarray[Any, dtype[int64 | float64]], y0: bool | int | integer | float | floating | ndarray[Any, dtype[int64 | float64]], a: bool | int | integer | float | floating | ndarray[Any, dtype[int64 | float64]], sigma: bool | int | integer | float | floating | ndarray[Any, dtype[int64 | float64]]) int64 | float64 | ndarray[Any, dtype[int64 | float64]]
Voigt(x: bool | int | integer | float | floating | ndarray[Any, dtype[int64 | float64]], x0: bool | int | integer | float | floating | ndarray[Any, dtype[int64 | float64]], y0: bool | int | integer | float | floating | ndarray[Any, dtype[int64 | float64]], a: bool | int | integer | float | floating | ndarray[Any, dtype[int64 | float64]], sigma: bool | int | integer | float | floating | ndarray[Any, dtype[int64 | float64]], gamma: bool | int | integer | float | floating | ndarray[Any, dtype[int64 | float64]]) int64 | float64 | ndarray[Any, dtype[int64 | float64]]
calculate_chunk_size_from_max_chunk_size_in_memory(max_chunk_memory_size: str, data_array: DataArray | List[DataArray]) int
calculate_required_number_of_chunks(max_chunk_size_in_memory: str, data_array: List[DataArray]) int
extract_chars_from_string(string: str) str
extract_digit_from_string(string: str) int
full_getter(self: object) Dict[str, Any]
full_setter(self: object, state: Dict[str, Any]) None
get_module_absolute_path() str
get_module_path_of_module(module: ModuleType) str
get_spectral_sky_data(ra: ndarray[Any, dtype[float64]], dec: ndarray[Any, dtype[float64]], freq0: ndarray[Any, dtype[float64]], nfreq: int) ndarray[Any, dtype[float64]]
input_wrapper(msg: str, ret: str = 'y') str

Wrapper of standard input to define what return ret it will get during Unit-tests, since the test just stops otherwise. The environment variable ‘SKIP_INPUT’ or ‘UNIT_TEST’ must be set with an arbitrary value to return ret.

Parameters:
  • msg – input message

  • ret – return value if ‘SKIP_INPUT’ or ‘UNIT_TEST’ is set, default=’y’

parse_size(size_str: str) int
read_CSV_to_ndarray(file: str) ndarray[Any, dtype[float64]]
resample_spectral_lines(npoints: int, dfreq: ndarray[Any, dtype[float64]], spec_line: ndarray[Any, dtype[float64]]) Tuple[ndarray[Any, dtype[float64]], ndarray[Any, dtype[float64]]]

karabo.util.file_handler module

class FileHandler

Bases: object

Utility file-handler for unspecified directories.

Provides cache-management functionality. FileHandler.root_stm (short-term-memory-dir) and FileHandler.root_ltm (long-term-memory-dir) are static root-directories where each according cache-dir is located. In case someone wants to extract something specific from the cache, the path is usually printed blue & bold in stdout.

Honors ‘TMPDIR’ & ‘TMP’ and ‘SCRATCH’ env-var(s) for STM-disk-cache where ‘TMPDIR’ = ‘TMP’ > ‘SCRATCH’ > /tmp Honors ‘XDG_CACHE_HOME’ env-var(s) for LTM-disk-cache where ‘XDG_CACHE_HOME’ > $HOME/.cache > /tmp Note: Setting env-vars has only an effect if they’re set before importing Karabo. Run-time adjustments must be done directly on root_stm and root_ltm!

The root STM and LTM must be unique per user (seeded rnd chars+digits) to avoid conflicting dir-names on any computer with any root-directory.

LTM-root └── karabo-LTM-<user>-<10 rnd chars+digits>

├── <prefix><10 rnd chars+digits> | ├── <subdir> | └── <file> └── <prefix><10 rnd chars+digits>

├── <subdir> └── <file>

STM-root └── karabo-STM-<user>-<10 rnd chars+digits>

├── <prefix><10 rnd chars+digits> | ├── <subdir> | └── <file> └── <prefix><10 rnd chars+digits>

├── <subdir> └── <file>

FileHandler can be used the same way as tempfile.TemporaryDirectory using with.

classmethod clean(term: Literal['long', 'short'] = 'short') None

Removes the entire directory specified by term.

We strongly suggest to NOT use this function in a workflow. This function removed the entire karabo-disk-cache. So if there’s another karabo-process running in parallel, you could mess with their disk-cache as well.

Parameters:

term – “long” or “short” term memory

clean_instance() None

Cleans instance-bound tmp-dirs of self.tmps from disk.

classmethod empty_dir(dir_path: Path | str) None

Deletes all contents of dir_path, but not the directory itself.

This function assumes that all files and directories are owned by the function-user.

Parameters:

dir_path – Directory to empty.

get_tmp_dir(prefix: str | None = None, term: Literal['short'] = 'short', purpose: str | None = None, unique: object = None, mkdir: bool = True, seed: str | int | float | bytes | None = None) str
get_tmp_dir(prefix: str, term: Literal['long'], purpose: str | None = None, unique: object = None, mkdir: bool = True, seed: str | int | float | bytes | None = None) str

Gets a tmp-dir path.

This is the to-go function to get a tmp-dir in the according directory.

Parameters:
  • prefix – Dir-name prefix for STM (optional) and dir-name for LTM (required).

  • term – “short” for STM or “long” for LTM.

  • purpose – Creates a verbose print-msg with it’s purpose if set.

  • unique – If an object which has attributes is provided, then you get the same tmp-dir for the unique instance.

  • mkdir – Make-dir directly?

  • seed – Seed rnd chars+digits of a STM sub-dir for relocation purpose of different processes? Shouldn’t be used for LTM sub-dirs, unless you know what you’re doing. LTM sub-dirs are already seeded with prefix. However, if they are seeded for some reason, the seed is then something like prefix + seed, which leads to different LTM sub-dirs.

Returns:

tmp-dir path

classmethod is_dir_empty(dirname: Path | str) bool

Checks if dirname is empty assuming dirname exists.

Parameters:

dirname – Directory to check.

Raises:

NotADirectoryError – If dirname is not an existing directory.

Returns:

True if dir is empty, else False

classmethod ltm() str

LTM (long-term-memory) path.

classmethod remove_empty_dirs(term: Literal['long', 'short'] = 'short') None

Removes empty directories in the chosen cache-dir.

Parameters:

term – “long” or “short” term memory

root_ltm: str = '/home/runner/.cache'
root_stm: str = '/tmp'
classmethod stm() str

STM (short-term-memory) path.

assert_valid_ending(path: str | Path, ending: str) None

Utility function to check if the ending of path is ending.

Parameters:
  • path – Path to check.

  • ending – Ending match.

Raises:

ValueError – When the ending of path doesn’t match ending.

getsize(inode: str | Path) int

Gets the total size of a file or directory in number of bytes.

Parameters:

inode – Directory or file to get size from. Can take a while for a large dir.

Returns:

Number of bytes of inode.

write_dir(dir: TDirPathType, *, overwrite: bool = False) Generator[TDirPathType, None, None]

Enables transactional creating and writing into dir.

Assumes that dir is empty, meaning NOT partially filled with anything.

This function is NOT thread-safe!

Parameters:
  • dir – Directory to create & write.

  • overwrite – Allow overwrite? Be aware that it will replace the entire dir if it already exists. So be careful to provide the correct dir.

Yields:

Directory to fill safely.

karabo.util.gpu_util module

get_gpu_memory() int

Retrieves the available GPU memory in MiB by invoking nvidia-smi.

Returns:

Available GPU memory in MiB.

Return type:

int

Raises:

RuntimeError – If unexpected output is encountered when running nvidia-smi.

is_cuda_available() bool

Checks if CUDA-compatible GPU is available on the system by invoking nvidia-smi.

Returns:

True if a CUDA-compatible GPU is found, otherwise False.

Return type:

bool

karabo.util.hdf5_util module

convert_healpix_2_radec(arr: ndarray[Any, dtype[Any]]) Tuple[float64, float64, int]

Convert array from healpix to 2-D array of RADEC :param arr: :return: RADEC in degrees

get_healpix_image(hdffile: Any) Any

Get index maps, maps and frequency from HDF5 file

get_vis_from_hdf5(hdffile: Any) Any

Get index maps, maps and frequency from HDF5 file

h5_diter(g: Dict[str, Dataset | Group], prefix: str = '') Generator[Tuple[str, Dataset], Any, Any]

Get the data elements from the hdf5 datasets and groups Input: HDF5 file Output: Items and its path of data elements

print_hd5_object_and_keys(hdffile: Any) Tuple[File, KeysViewHDF5]

Read HDF5 file Returns: HDF Object, relavent keys

karabo.util.helpers module

Module for helper utils which doesn’t belong to any other modules.

get_rnd_str(k: int, seed: str | int | float | bytes | None = None) str

Creates a random ascii+digits string with length=`k`.

Most tmp-file tools are using a string-length of 10.

Parameters:
  • k – Length of random string.

  • seed – Seed.

Returns:

Random generated string.

karabo.util.jupyter module

isNotebook() bool

karabo.util.math_util module

cartesian_to_ll(x: bool | int | float, y: bool | int | float, z: int = 0) Tuple[float, float]
get_poisson_disk_sky(min_size: Tuple[bool | int | float, bool | int | float], max_size: Tuple[bool | int | float, bool | int | float], flux_min: bool | int | float, flux_max: bool | int | float, r: int = 10) ndarray[Any, dtype[float64]]
long_lat_to_cartesian(lat: bool | int | integer | float | floating, lon: bool | int | integer | float | floating) ndarray[Any, dtype[float64]]
poisson_disc_samples(width: bool | int | float, height: bool | int | float, r: int, k: int = 5, ord: None | float | Literal['fro', 'nuc'] = None) List[Tuple[float, float]]

karabo.util.plotting_util module

class Font

Bases: object

BLUE = '\x1b[94m'
BOLD = '\x1b[1m'
CYAN = '\x1b[96m'
DARKCYAN = '\x1b[36m'
END = '\x1b[0m'
GREEN = '\x1b[92m'
PURPLE = '\x1b[95m'
RED = '\x1b[91m'
UNDERLINE = '\x1b[4m'
YELLOW = '\x1b[93m'
get_slices(wcs: WCS) List[str]

karabo.util.rascil_util module

filter_data_dir_warning_message() None

Avoid unnecessary RASCIL warning that confuses users.

Avoid the following RASCIL warning: The RASCIL data directory is not available - continuing but any simulations will fail …which pops up because we don’t download the RASCIL data directory. To the best of our knowledge, we don’t need the data directory. (31.07.2024) We can therefore ignore this warning and avoid unnecessarily alerting users with it.

karabo.util.survey module

This module is to create according survey-files for Karabo.

create_MALS_survey_as_fits(directory: Path | str, version: int = 3, check_for_updates: bool = False, verbose: bool = True) str

Creates MALS (https://mals.iucaa.in/) survey as a .fits.gz file.

It takes care of downloading the file from ‘https://mals.iucaa.in/catalogue/catalogue_malsdr1v{0}_all.csv’, and convert it into a .fits.gz file. Downloading the .csv catalogue may take a while, because it is about 3.6GB large and downloading-speed depends on some uncontrollable factors. However, if you already have the .csv catalogue, just put it into directory to take it from the disk-cache. All file-products (.csv & .fits.gz) are saved in and loaded from directory.

This is just a utility function and is not meant to be embedded in any library-code.

In case the .fits.gz file already exists, it just returns the the file-path without doing anything like downloading or creating any file.

Parameters:
  • directory – Directory to save and load the according catalogue files.

  • version – Survey version.

  • check_for_updates – Also check for new updates?

  • verbose – Verbose?

Returns:

.fits.gz file-path.

karabo.util.testing module

class ChangeWorkingDir

Bases: object

Changes temporarily working dir for test-discovery.

run_tests(pytest_args: str | None = None) None

Launches pytest.

Parameters:

args – pytest cli-args, e.g. “-k test_my_favorite”

Module contents