karabo.util package
Subpackages
Submodules
karabo.util.config_util module
karabo.util.dask module
Module for dask-related functionality.
- class DaskHandler
Bases:
DaskHandlerBasic
Public & dev API for dask associated functionality.
This is the public dask-api for Karabo, where you don’t have to worry which dask-handler of this module to use. You can do almost everything through this class. The only exception is, if you want to adjust the default settings on a slurm-system (customization through DaskHandlerSlurm).
- dask_client
The Dask client object. If None, a new client will be created.
- Type:
distributed.client.Client | None
- memory_limit
The memory_limit per worker in GB. If None, the memory limit will be set to the maximum available memory on the node (see documentation) in dask for memory_limit.
- Type:
float | None
- n_threads_per_worker
The number of threads to use per worker. Standard is None, which means that the number of threads will be equal to the number of cores.
- Type:
int | None
- use_dask
Whether to use Dask or not. If None, then Karabo will decide whether to use dask or not for certain tasks.
- Type:
bool | None
- use_processes
Use processes instead of threads? Threads:
Fast to initiate.
No need to transfer data to them.
Limited by the GIL, which allows one thread to read the code at once.
- Processes:
Take time to set up.
Slow to transfer data to.
Each have their own GIL and so don’t need to take turns reading the code.
- Type:
bool
- classmethod get_dask_client() Client
Get (create if not exists) a dask-client.
- Returns:
Dask-client.
- classmethod parallelize_with_dask(iterate_function: Callable[[...], Any], iterable: Iterable[Any], *args: Any, **kwargs: Any) Any | Tuple[Any, ...] | List[Any]
Run a function over an iterable in parallel using dask, and gather the results.
args & kwargs will get passed to Delayed.
- Parameters:
iterate_function – The function to be applied to each element of iterable. The function takes the current element of the iterable as its first argument, followed by any positional arguments, and then any keyword arguments.
iterable – The iterable over which the function will be applied. Each element of iterable will be passed to iterate_function.
- Returns: A tuple containing the results of the iterate_function for each
element in the iterable. The results are gathered using dask’s compute function.
- classmethod setup() None
Calls get_dask_client.
- classmethod should_dask_be_used(override: bool | None = None) bool
Util function to decide whether dask should be used or not.
- Parameters:
override – Override? Has highest priority.
- Returns:
Decision whether dask should be used or not.
- class DaskHandlerBasic
Bases:
object
Base-class for dask-handler functionality.
- dask_client
The Dask client object. If None, a new client will be created.
- Type:
distributed.client.Client | None
- memory_limit
The memory_limit per worker in GB. If None, the memory limit will be set to the maximum available memory on the node (see documentation) in dask for memory_limit.
- Type:
float | None
- n_threads_per_worker
The number of threads to use per worker. Standard is None, which means that the number of threads will be equal to the number of cores.
- Type:
int | None
- use_dask
Whether to use Dask or not. If None, then Karabo will decide whether to use dask or not for certain tasks.
- Type:
bool | None
- use_processes
Use processes instead of threads? Threads:
Fast to initiate.
No need to transfer data to them.
Limited by the GIL, which allows one thread to read the code at once.
- Processes:
Take time to set up.
Slow to transfer data to.
Each have their own GIL and so don’t need to take turns reading the code.
- Type:
bool
- dask_client: Client | None = None
- classmethod get_dask_client() Client
Get (create if not exists) a dask-client.
- Returns:
Dask-client.
- memory_limit: float | None = None
- n_threads_per_worker: int | None = None
- classmethod parallelize_with_dask(iterate_function: Callable[[...], Any], iterable: Iterable[Any], *args: Any, **kwargs: Any) Any | Tuple[Any, ...] | List[Any]
Run a function over an iterable in parallel using dask, and gather the results.
args & kwargs will get passed to Delayed.
- Parameters:
iterate_function – The function to be applied to each element of iterable. The function takes the current element of the iterable as its first argument, followed by any positional arguments, and then any keyword arguments.
iterable – The iterable over which the function will be applied. Each element of iterable will be passed to iterate_function.
- Returns: A tuple containing the results of the iterate_function for each
element in the iterable. The results are gathered using dask’s compute function.
- classmethod setup() None
Calls get_dask_client.
- classmethod should_dask_be_used(override: bool | None = None) bool
Util function to decide whether dask should be used or not.
- Parameters:
override – Override? Has highest priority.
- Returns:
Decision whether dask should be used or not.
- use_dask: bool | None = None
- use_processes: bool = False
- class DaskHandlerSlurm
Bases:
DaskHandlerBasic
Dask-handler for slurm-based jobs.
- use_workers_or_nannies
Whether to use workers or nannies (default). This could lead to more processing (see documentation for dask usage in Karabo).
- Type:
Literal[‘workers’, ‘nannies’]
- n_workers_scheduler_node
The number of workers to start on the scheduler node.
- Type:
int
- timeout
Timeout in seconds for the dask-scheduler to wait for all the workers to connect.
- Type:
int
- classmethod get_dask_client() Client
Get (create if not exists) a dask-client for a SLURM environment.
- Returns:
Dask-client.
- classmethod get_node_id() int
Gets the current node-id.
- Returns:
Node-id.
- classmethod get_node_name() str
Gets the current node-name.
- Returns:
Node-name.
- classmethod get_number_of_nodes() int
Gets the number of nodes of the slurm-job.
- Returns:
Number of nodes.
- classmethod is_first_node() bool
Util function to check if current-node is fist-node.
- Returns:
Check-result.
- classmethod is_on_slurm_cluster() bool
Util function to check if code is running in a slurm-job.
- Returns:
Check-result.
- n_workers_scheduler_node: int = 1
- classmethod should_dask_be_used(override: bool | None = None) bool
Util function to decide whether dask should be used or not.
- This implementation differs a bit from the basic-class, where
on SLURM-systems, additional checks are taken into consideration.
- Parameters:
override – Override? Has highest priority.
- Returns:
Decision whether dask should be used or not.
- timeout: int = 60
- use_workers_or_nannies: Literal['workers', 'nannies'] = 'nannies'
karabo.util.data_util module
- Gauss(x: bool | int | integer | float | floating | ndarray[Any, dtype[int64 | float64]], x0: bool | int | integer | float | floating | ndarray[Any, dtype[int64 | float64]], y0: bool | int | integer | float | floating | ndarray[Any, dtype[int64 | float64]], a: bool | int | integer | float | floating | ndarray[Any, dtype[int64 | float64]], sigma: bool | int | integer | float | floating | ndarray[Any, dtype[int64 | float64]]) int64 | float64 | ndarray[Any, dtype[int64 | float64]]
- Voigt(x: bool | int | integer | float | floating | ndarray[Any, dtype[int64 | float64]], x0: bool | int | integer | float | floating | ndarray[Any, dtype[int64 | float64]], y0: bool | int | integer | float | floating | ndarray[Any, dtype[int64 | float64]], a: bool | int | integer | float | floating | ndarray[Any, dtype[int64 | float64]], sigma: bool | int | integer | float | floating | ndarray[Any, dtype[int64 | float64]], gamma: bool | int | integer | float | floating | ndarray[Any, dtype[int64 | float64]]) int64 | float64 | ndarray[Any, dtype[int64 | float64]]
- calculate_chunk_size_from_max_chunk_size_in_memory(max_chunk_memory_size: str, data_array: DataArray | List[DataArray]) int
- calculate_required_number_of_chunks(max_chunk_size_in_memory: str, data_array: List[DataArray]) int
- extract_chars_from_string(string: str) str
- extract_digit_from_string(string: str) int
- full_getter(self: object) Dict[str, Any]
- full_setter(self: object, state: Dict[str, Any]) None
- get_module_absolute_path() str
- get_module_path_of_module(module: ModuleType) str
- get_spectral_sky_data(ra: ndarray[Any, dtype[float64]], dec: ndarray[Any, dtype[float64]], freq0: ndarray[Any, dtype[float64]], nfreq: int) ndarray[Any, dtype[float64]]
- input_wrapper(msg: str, ret: str = 'y') str
Wrapper of standard input to define what return ret it will get during Unit-tests, since the test just stops otherwise. The environment variable ‘SKIP_INPUT’ or ‘UNIT_TEST’ must be set with an arbitrary value to return ret.
- Parameters:
msg – input message
ret – return value if ‘SKIP_INPUT’ or ‘UNIT_TEST’ is set, default=’y’
- parse_size(size_str: str) int
- read_CSV_to_ndarray(file: str) ndarray[Any, dtype[float64]]
- resample_spectral_lines(npoints: int, dfreq: ndarray[Any, dtype[float64]], spec_line: ndarray[Any, dtype[float64]]) Tuple[ndarray[Any, dtype[float64]], ndarray[Any, dtype[float64]]]
karabo.util.file_handler module
- class FileHandler
Bases:
object
Utility file-handler for unspecified directories.
Provides cache-management functionality. FileHandler.root_stm (short-term-memory-dir) and FileHandler.root_ltm (long-term-memory-dir) are static root-directories where each according cache-dir is located. In case someone wants to extract something specific from the cache, the path is usually printed blue & bold in stdout.
Honors ‘TMPDIR’ & ‘TMP’ and ‘SCRATCH’ env-var(s) for STM-disk-cache where ‘TMPDIR’ = ‘TMP’ > ‘SCRATCH’ > /tmp Honors ‘XDG_CACHE_HOME’ env-var(s) for LTM-disk-cache where ‘XDG_CACHE_HOME’ > $HOME/.cache > /tmp Note: Setting env-vars has only an effect if they’re set before importing Karabo. Run-time adjustments must be done directly on root_stm and root_ltm!
The root STM and LTM must be unique per user (seeded rnd chars+digits) to avoid conflicting dir-names on any computer with any root-directory.
LTM-root └── karabo-LTM-<user>-<10 rnd chars+digits>
├── <prefix><10 rnd chars+digits> | ├── <subdir> | └── <file> └── <prefix><10 rnd chars+digits>
├── <subdir> └── <file>
STM-root └── karabo-STM-<user>-<10 rnd chars+digits>
├── <prefix><10 rnd chars+digits> | ├── <subdir> | └── <file> └── <prefix><10 rnd chars+digits>
├── <subdir> └── <file>
FileHandler can be used the same way as tempfile.TemporaryDirectory using with.
- classmethod clean(term: Literal['long', 'short'] = 'short') None
Removes the entire directory specified by term.
We strongly suggest to NOT use this function in a workflow. This function removed the entire karabo-disk-cache. So if there’s another karabo-process running in parallel, you could mess with their disk-cache as well.
- Parameters:
term – “long” or “short” term memory
- clean_instance() None
Cleans instance-bound tmp-dirs of self.tmps from disk.
- classmethod empty_dir(dir_path: Path | str) None
Deletes all contents of dir_path, but not the directory itself.
This function assumes that all files and directories are owned by the function-user.
- Parameters:
dir_path – Directory to empty.
- get_tmp_dir(prefix: str | None = None, term: Literal['short'] = 'short', purpose: str | None = None, unique: object = None, mkdir: bool = True, seed: str | int | float | bytes | None = None) str
- get_tmp_dir(prefix: str, term: Literal['long'], purpose: str | None = None, unique: object = None, mkdir: bool = True, seed: str | int | float | bytes | None = None) str
Gets a tmp-dir path.
This is the to-go function to get a tmp-dir in the according directory.
- Parameters:
prefix – Dir-name prefix for STM (optional) and dir-name for LTM (required).
term – “short” for STM or “long” for LTM.
purpose – Creates a verbose print-msg with it’s purpose if set.
unique – If an object which has attributes is provided, then you get the same tmp-dir for the unique instance.
mkdir – Make-dir directly?
seed – Seed rnd chars+digits of a STM sub-dir for relocation purpose of different processes? Shouldn’t be used for LTM sub-dirs, unless you know what you’re doing. LTM sub-dirs are already seeded with prefix. However, if they are seeded for some reason, the seed is then something like prefix + seed, which leads to different LTM sub-dirs.
- Returns:
tmp-dir path
- classmethod is_dir_empty(dirname: Path | str) bool
Checks if dirname is empty assuming dirname exists.
- Parameters:
dirname – Directory to check.
- Raises:
NotADirectoryError – If dirname is not an existing directory.
- Returns:
True if dir is empty, else False
- classmethod ltm() str
LTM (long-term-memory) path.
- classmethod remove_empty_dirs(term: Literal['long', 'short'] = 'short') None
Removes empty directories in the chosen cache-dir.
- Parameters:
term – “long” or “short” term memory
- root_ltm: str = '/home/runner/.cache'
- root_stm: str = '/tmp'
- classmethod stm() str
STM (short-term-memory) path.
- assert_valid_ending(path: str | Path, ending: str) None
Utility function to check if the ending of path is ending.
- Parameters:
path – Path to check.
ending – Ending match.
- Raises:
ValueError – When the ending of path doesn’t match ending.
- getsize(inode: str | Path) int
Gets the total size of a file or directory in number of bytes.
- Parameters:
inode – Directory or file to get size from. Can take a while for a large dir.
- Returns:
Number of bytes of inode.
- write_dir(dir: TDirPathType, *, overwrite: bool = False) Generator[TDirPathType, None, None]
Enables transactional creating and writing into dir.
Assumes that dir is empty, meaning NOT partially filled with anything.
This function is NOT thread-safe!
- Parameters:
dir – Directory to create & write.
overwrite – Allow overwrite? Be aware that it will replace the entire dir if it already exists. So be careful to provide the correct dir.
- Yields:
Directory to fill safely.
karabo.util.gpu_util module
- get_gpu_memory() int
Retrieves the available GPU memory in MiB by invoking nvidia-smi.
- Returns:
Available GPU memory in MiB.
- Return type:
int
- Raises:
RuntimeError – If unexpected output is encountered when running nvidia-smi.
- is_cuda_available() bool
Checks if CUDA-compatible GPU is available on the system by invoking nvidia-smi.
- Returns:
True if a CUDA-compatible GPU is found, otherwise False.
- Return type:
bool
karabo.util.hdf5_util module
- convert_healpix_2_radec(arr: ndarray[Any, dtype[Any]]) Tuple[float64, float64, int]
Convert array from healpix to 2-D array of RADEC :param arr: :return: RADEC in degrees
- get_healpix_image(hdffile: Any) Any
Get index maps, maps and frequency from HDF5 file
- get_vis_from_hdf5(hdffile: Any) Any
Get index maps, maps and frequency from HDF5 file
- h5_diter(g: Dict[str, Dataset | Group], prefix: str = '') Generator[Tuple[str, Dataset], Any, Any]
Get the data elements from the hdf5 datasets and groups Input: HDF5 file Output: Items and its path of data elements
- print_hd5_object_and_keys(hdffile: Any) Tuple[File, KeysViewHDF5]
Read HDF5 file Returns: HDF Object, relavent keys
karabo.util.helpers module
Module for helper utils which doesn’t belong to any other modules.
- get_rnd_str(k: int, seed: str | int | float | bytes | None = None) str
Creates a random ascii+digits string with length=`k`.
Most tmp-file tools are using a string-length of 10.
- Parameters:
k – Length of random string.
seed – Seed.
- Returns:
Random generated string.
karabo.util.jupyter module
- isNotebook() bool
karabo.util.math_util module
- cartesian_to_ll(x: bool | int | float, y: bool | int | float, z: int = 0) Tuple[float, float]
- get_poisson_disk_sky(min_size: Tuple[bool | int | float, bool | int | float], max_size: Tuple[bool | int | float, bool | int | float], flux_min: bool | int | float, flux_max: bool | int | float, r: int = 10) ndarray[Any, dtype[float64]]
- long_lat_to_cartesian(lat: bool | int | integer | float | floating, lon: bool | int | integer | float | floating) ndarray[Any, dtype[float64]]
- poisson_disc_samples(width: bool | int | float, height: bool | int | float, r: int, k: int = 5, ord: None | float | Literal['fro', 'nuc'] = None) List[Tuple[float, float]]
karabo.util.plotting_util module
- class Font
Bases:
object
- BLUE = '\x1b[94m'
- BOLD = '\x1b[1m'
- CYAN = '\x1b[96m'
- DARKCYAN = '\x1b[36m'
- END = '\x1b[0m'
- GREEN = '\x1b[92m'
- PURPLE = '\x1b[95m'
- RED = '\x1b[91m'
- UNDERLINE = '\x1b[4m'
- YELLOW = '\x1b[93m'
- get_slices(wcs: WCS) List[str]
karabo.util.rascil_util module
- filter_data_dir_warning_message() None
Avoid unnecessary RASCIL warning that confuses users.
Avoid the following RASCIL warning: The RASCIL data directory is not available - continuing but any simulations will fail …which pops up because we don’t download the RASCIL data directory. To the best of our knowledge, we don’t need the data directory. (31.07.2024) We can therefore ignore this warning and avoid unnecessarily alerting users with it.
karabo.util.survey module
This module is to create according survey-files for Karabo.
- create_MALS_survey_as_fits(directory: Path | str, version: int = 3, check_for_updates: bool = False, verbose: bool = True) str
Creates MALS (https://mals.iucaa.in/) survey as a .fits.gz file.
It takes care of downloading the file from ‘https://mals.iucaa.in/catalogue/catalogue_malsdr1v{0}_all.csv’, and convert it into a .fits.gz file. Downloading the .csv catalogue may take a while, because it is about 3.6GB large and downloading-speed depends on some uncontrollable factors. However, if you already have the .csv catalogue, just put it into directory to take it from the disk-cache. All file-products (.csv & .fits.gz) are saved in and loaded from directory.
This is just a utility function and is not meant to be embedded in any library-code.
In case the .fits.gz file already exists, it just returns the the file-path without doing anything like downloading or creating any file.
- Parameters:
directory – Directory to save and load the according catalogue files.
version – Survey version.
check_for_updates – Also check for new updates?
verbose – Verbose?
- Returns:
.fits.gz file-path.
karabo.util.testing module
- class ChangeWorkingDir
Bases:
object
Changes temporarily working dir for test-discovery.
- run_tests(pytest_args: str | None = None) None
Launches pytest.
- Parameters:
args – pytest cli-args, e.g. “-k test_my_favorite”