Source code for skysegmentor.groupfinder

import numpy as np
import healpy as hp
from typing import List, Tuple, Union


def _cascade(labels: np.ndarray, indexin: int) -> int:
    """Cascades down a linked list of label reassignments.

    Parameters
    ----------
    labels : int array
        Label array.
    indexin : int
        Label index in.

    Returns
    -------
    indexout : out
        Cascaded label index out.
    """
    indexout = indexin
    while labels[indexout - 1] != indexout:
        indexout = labels[indexout - 1]
    return indexout


def _cascade_all(labels: np.ndarray) -> np.ndarray:
    """Cascade label index for an array.

    Parameters
    ----------
    labels : int array
        Label array.

    Returns
    -------
    labelsout : int array
        Cascade all label index in an array.
    """
    labelsout = np.copy(labels)
    for i in range(0, len(labels)):
        labelsout[i] = _cascade(labels, labels[i])
    return labelsout


def _unionise(ind1: int, ind2: int, labels: np.ndarray) -> Tuple[int, int, int]:
    """Finds the union of two label indexes.

    Parameters
    ----------
    ind1, ind2 : int
        Index 1 and 2.
    labels : int array
        Label array.

    Returns
    -------
    ind1out, ind2out : int
        Outputted index 1 and 2.
    indout : int
        Index out.
    """
    ind1out = _cascade(labels, ind1)
    ind2out = _cascade(labels, ind2)
    if ind1out <= ind2out:
        indout = ind1out
    else:
        indout = ind2out
    return ind1out, ind2out, indout


def _shuffle_down(labels: np.ndarray) -> np.ndarray:
    """Shuffle label index for an array.

    Parameters
    ----------
    labels : int array
        Label array.

    Returns
    -------
    labelsout : int array
        Cascade all label index in an array.
    """
    assert np.all(labels > 0)
    labelsout = np.copy(labels)
    nlabels = np.zeros(len(labels))
    for i in range(0, len(labels)):
        nlabels[labels[i] - 1] = nlabels[labels[i] - 1] + 1
    maplabel = np.zeros(len(labels)).astype("int")
    j = 0
    for i in range(0, len(labels)):
        if nlabels[i] > 0:
            j = j + 1
            maplabel[i] = j
        else:
            maplabel[i] = 0
    for i in range(0, len(labels)):
        labelsout[i] = maplabel[labels[i] - 1]
    return labelsout


def _if_list_concatenate(arr: List[Union[float, int]]) -> np.ndarray:
    """Concatenates list only if length is > 1."""
    if len(arr) > 1:
        arr = np.concatenate(arr)
    return arr


[docs] def unionfinder(binmap: np.ndarray) -> np.ndarray: """Group or label assignment on a healpix grid using the HoshenKopelman algorithm. Parameters ---------- binmap : array Binary healpix map. Returns ------- groupID : array Labelled healpix map. """ """Group or label assignment on a healpix grid using the HoshenKopelman algorithm. Parameters ---------- binmap : array Binary healpix map. Returns ------- groupID : array Labelled healpix map. """ nside = hp.npix2nside(len(binmap)) groupID = np.zeros(hp.nside2npix(nside)) groupID = groupID.astype("int") cond = np.where(binmap == 1)[0] groupID[cond] = -1 groupID_equals = [] currentID = 0 for i in range(0, len(binmap)): if binmap[i] == 1 and groupID[i] == -1: currentID += 1 groupID[i] = currentID groupID_equals.append([]) if groupID[i] != 0: neighs = hp.get_all_neighbours(nside, i) neighs = neighs[neighs != -1] neighs = neighs[groupID[neighs] != 0] if len(neighs) > 0: groupID[neighs[groupID[neighs] == -1]] = groupID[i] neighs = neighs[groupID[neighs] != groupID[i]] if len(neighs) > 0: groupID_equals[groupID[i] - 1].append(np.unique(groupID[neighs])) groupID_equals2 = [ np.unique(_if_list_concatenate(_groupID)) for _groupID in groupID_equals ] groupID_ind = np.arange(len(groupID_equals)) + 1 groupID_pair1 = [] groupID_pair2 = [] for i in range(0, len(groupID_equals2)): for j in range(0, len(groupID_equals2[i])): groupID_pair1.append(groupID_ind[i]) groupID_pair2.append(groupID_equals2[i][j]) groupID_pair1 = np.array(groupID_pair1) groupID_pair2 = np.array(groupID_pair2) cond = np.where(groupID_pair1 > groupID_pair2)[0] temp = groupID_pair2[cond] groupID_pair2[cond] = groupID_pair1[cond] groupID_pair1[cond] = temp for i in range(0, len(groupID_pair1)): ind1out, ind2out, indout = _unionise( groupID_pair1[i], groupID_pair2[i], groupID_ind ) groupID_ind[ind1out - 1] = indout groupID_ind[ind2out - 1] = indout groupID_ind = _cascade_all(groupID_ind) groupID_ind = _shuffle_down(groupID_ind) cond = np.where(groupID != 0)[0] groupID[cond] = groupID_ind[groupID[cond] - 1] return groupID