import numpy as np
import healpy as hp
from typing import List, Tuple, Optional
from . import coords, rotate
[docs]
def get_partition_IDs(partition: np.ndarray) -> np.ndarray:
"""Returns the total weights of each partition.
Parameters
----------
partition : array
Partition IDs on a map. Unfilled partitions are assigned partition 0.
Returns
-------
partition_IDs : array
Unique partition IDs, including zero.
"""
partition_IDs = np.unique(partition)
return partition_IDs
[docs]
def total_partition_weights(
partition: np.ndarray, weights: np.ndarray
) -> Tuple[np.ndarray, np.ndarray]:
"""Returns the total weights of each partition.
Parameters
----------
partition : array
Partition IDs on a map. Unfilled partitions are assigned partition 0.
weights : array
A weight assigned to each element of the partition array.
Returns
-------
partition_IDs : array
Unique partition IDs, including zero.
partition_weights : array
The total weight for each partition.
"""
partition_IDs = get_partition_IDs(partition)
Npartitions = np.max(partition_IDs) + 1
partition_weights = np.zeros(Npartitions)
np.add.at(partition_weights, partition, weights)
return partition_IDs, partition_weights
[docs]
def remove_val4array(array: np.ndarray, val: float) -> np.ndarray:
"""Removes a given value from an array.
Parameters
----------
array : array
Data vector.
val : float
Value to be removed from an array.
"""
return array[array != val]
[docs]
def fill_map(pixID: np.ndarray, nside: int, val: float = 1.0) -> np.ndarray:
"""Fill a Healpix map with a given value val at given pixel locations.
Parameters
----------
pixID : int array
Pixel index.
nside : int
HEalpix map nside.
val : float, optional
Value to fill certain pixels with.
"""
tmap = np.zeros(hp.nside2npix(nside))
tmap[pixID] = val
return tmap
[docs]
def find_map_barycenter(
bnmap: np.ndarray, wmap: Optional[np.ndarray] = None
) -> Tuple[float, float]:
"""Determines the barycenter of center of mass direction of the input binary map.
Parameters
----------
bnmap : array
binary map.
wmap : array, optional
The weights.
Returns
-------
phic, thec : float
The center
"""
if wmap is None:
wmap = np.ones(len(bnmap))
pixID = np.where(bnmap != 0.0)[0]
if len(pixID) == 0:
raise ValueError("Binary map must contain at least one non-zero pixel.")
nside = hp.npix2nside(len(bnmap))
pixID = np.where(bnmap != 0.0)[0]
the, phi = hp.pix2ang(nside, pixID)
wei = wmap[pixID]
x, y, z = coords.sphere2cart(np.ones(len(phi)), phi, the, center=[0.0, 0.0, 0.0])
xc = np.sum(x * wei) / np.sum(wei)
yc = np.sum(y * wei) / np.sum(wei)
zc = np.sum(z * wei) / np.sum(wei)
_, phic, thec = coords.cart2sphere(xc, yc, zc)
phir, ther = rotate.rotate_usphere(phi, the, [-phic, -thec, 0.0])
themax = np.max(ther)
return phic, thec, themax
[docs]
def find_points_barycenter(
phi: np.ndarray, the: np.ndarray, weights: Optional[np.ndarray] = None
) -> Tuple[float, float]:
"""Determines the barycenter of center of mass direction of the input point dataset.
Parameters
----------
phi, the : array
Angular coordinates.
weights : array, optional
Weights for points.
Returns
-------
phic, thec : float
The center
"""
if len(phi) == 0 or len(the) == 0:
raise ValueError("Input arrays phi and the must not be empty.")
if len(phi) != len(the):
raise ValueError("Input arrays phi and the must have the same length.")
if weights is not None and len(weights) != len(phi):
raise ValueError("Weights array must be the same length as phi and the.")
if weights is None:
weights = np.ones(len(phi))
x, y, z = coords.sphere2cart(np.ones(len(phi)), phi, the, center=[0.0, 0.0, 0.0])
xc = np.sum(x * weights) / np.sum(weights)
yc = np.sum(y * weights) / np.sum(weights)
zc = np.sum(z * weights) / np.sum(weights)
_, phic, thec = coords.cart2sphere(xc, yc, zc)
phir, ther = rotate.rotate_usphere(phi, the, [-phic, -thec, 0.0])
themax = np.max(ther)
return phic, thec, themax
[docs]
def get_map_border(
bnmap: np.ndarray, wmap: Optional[np.ndarray] = None, res: List[float] = [200, 100]
) -> Tuple[np.ndarray, np.ndarray]:
"""Determines the outer border of binary map region.
Parameters
----------
bnmap : array
binary map.
wmap : array, optional
The weights.
res : list, optional
Resolution of spherical cap grid where [phiresolution, thetaresolution]
to find region border.
Returns
-------
phi_border, the_border : array
Approximate border region.
"""
nside = hp.npix2nside(len(bnmap))
phic, thec, themax = find_map_barycenter(bnmap, wmap=wmap)
psize = res[0]
tsize = res[1]
pedges = np.linspace(0.0, 2 * np.pi, psize + 1)
pmid = 0.5 * (pedges[1:] + pedges[:-1])
tedges = np.linspace(0.0, np.max(themax) * 1.05, tsize + 1)
tmid = 0.5 * (tedges[1:] + tedges[:-1])
pcap, tcap = np.meshgrid(pmid, tmid, indexing="ij")
pshape = np.shape(pcap)
pcap_rot, tcap_rot = rotate.rotate_usphere(
pcap.flatten(), tcap.flatten(), [0.0, thec, phic]
)
pixID = hp.ang2pix(nside, tcap_rot, pcap_rot)
wcap_rot = bnmap[pixID]
pcap_rot = pcap_rot.reshape(pshape)
tcap_rot = tcap_rot.reshape(pshape)
wcap_rot = wcap_rot.reshape(pshape)
phi_border, the_border = [], []
tind = np.arange(len(tmid))
for i in range(len(wcap_rot)):
if len(tind[wcap_rot[i] != 0.0]) > 0:
ind = np.max(tind[wcap_rot[i] != 0.0])
phi_border.append(pcap_rot[i, ind])
the_border.append(tcap_rot[i, ind])
phi_border = np.array(phi_border)
the_border = np.array(the_border)
return phi_border, the_border
[docs]
def get_points_border(
phi: np.ndarray,
the: np.ndarray,
weights: Optional[np.ndarray] = None,
res: int = 100,
) -> Tuple[np.ndarray, np.ndarray]:
"""Determines the outer border of binary map region.
Parameters
----------
phi, the : array
Angular coordinates.
weights : array, optional
Weights for points.
res : int, optional
Resolution of spherical cap grid for phiresolution to find region border.
Returns
-------
phi_border, the_border : array
Approximate border region.
"""
if phi.size == 0 or the.size == 0:
raise ValueError("Input point set is empty")
phic, thec, themax = find_points_barycenter(phi, the, weights=weights)
pedges = np.linspace(0.0, 2 * np.pi, res + 1)
phi_rot, the_rot = rotate.rotate_usphere(phi, the, [-phic, -thec, 0.0])
phi_border, the_border = [], []
for i in range(0, len(pedges) - 1):
cond = np.where((phi_rot >= pedges[i]) & (phi_rot <= pedges[i + 1]))[0]
if len(cond) > 0:
ind = cond[np.argmax(the_rot[cond])]
phi_border.append(phi[ind])
the_border.append(the[ind])
phi_border = np.array(phi_border)
the_border = np.array(the_border)
return phi_border, the_border
[docs]
def get_map_most_dist_points(
bnmap: np.ndarray, wmap: Optional[np.ndarray] = None, res: List[float] = [100, 50]
) -> Tuple[float, float, float, float]:
"""Returns the most distant points on a binary map.
Parameters
----------
bnmap : int array
Binary healpix map.
wmap : array, optional
The weights.
res : list, optional
Resolution of spherical cap grid where [phiresolution, thetaresolution]
to find region border.
Returns
-------
p1, t1, p2, t2 : float
Angular coordinates (phi, theta) for the most distant points (1 and 2) on
the binary map.
"""
phi_border, the_border = get_map_border(bnmap, wmap=wmap, res=res)
pp1, pp2 = np.meshgrid(phi_border, phi_border, indexing="ij")
tt1, tt2 = np.meshgrid(the_border, the_border, indexing="ij")
pp1, pp2, tt1, tt2 = pp1.flatten(), pp2.flatten(), tt1.flatten(), tt2.flatten()
dist = coords.distusphere(pp1, tt1, pp2, tt2)
ind = np.argmax(dist)
p1, p2 = pp1[ind], pp2[ind]
t1, t2 = tt1[ind], tt2[ind]
return p1, t1, p2, t2
[docs]
def get_points_most_dist_points(
phi: np.ndarray,
the: np.ndarray,
weights: Optional[np.ndarray] = None,
res: int = 100,
) -> Tuple[float, float, float, float]:
"""Returns the most distant points from a set of points.
Parameters
----------
phi, the : array
Angular coordinates.
weights : array, optional
Weights for points.
res : int, optional
Resolution of spherical cap grid for phiresolution to find region border.
Returns
-------
p1, t1, p2, t2 : float
Angular coordinates (phi, theta) for the most distant points (1 and 2) on
the binary map.
"""
if len(phi) == 0 or len(the) == 0:
raise ValueError("Input coordinate arrays are empty.")
phi_border, the_border = get_points_border(phi, the, weights=weights, res=res)
pp1, pp2 = np.meshgrid(phi_border, phi_border, indexing="ij")
tt1, tt2 = np.meshgrid(the_border, the_border, indexing="ij")
pp1, pp2, tt1, tt2 = pp1.flatten(), pp2.flatten(), tt1.flatten(), tt2.flatten()
dist = coords.distusphere(pp1, tt1, pp2, tt2)
ind = np.argmax(dist)
p1, p2 = pp1[ind], pp2[ind]
t1, t2 = tt1[ind], tt2[ind]
return p1, t1, p2, t2
[docs]
def weight_dif(
phi_split: float, phi: np.ndarray, weights: np.ndarray, balance: int = 1
) -> float:
"""Compute the difference between the weights on either side of phi_split.
Parameters
----------
phi_split : float
Longitude split.
phi : array
Longitude coordinates.
weights : array
Weights corresponding to each longitude coordinates.
balance : float, optional
A multiplication factor assigned to weights below phi_split.
"""
cond = np.where(phi <= phi_split)[0]
weights1 = balance * np.sum(weights[cond])
cond = np.where(phi > phi_split)[0]
weights2 = np.sum(weights[cond])
return abs(weights1 - weights2)
[docs]
def find_dphi(phi: np.ndarray, weights: np.ndarray, balance: int = 1) -> float:
"""Determines the splitting longitude required for partitioning, either with
1-to-1 weights on either side or unbalanced weighting if balance != 1.
Parameters
----------
phi : array
Longitude coordinates.
weights : array
Weights corresponding to each longitude coordinates.
Returns
-------
dphi : float
Splitting longitude.
"""
cond = np.where(weights != 0.0)[0]
if len(cond) == 0:
raise ValueError("Weights must contain at least one non-zero value.")
dphis = np.linspace(phi.min(), phi.max(), 100)
_dphi = dphis[1] - dphis[0]
weights_dif = np.array(
[weight_dif(dphi, phi, weights, balance=balance) for dphi in dphis]
)
ind = np.argmin(weights_dif)
dphi = dphis[ind]
dphis = np.linspace(dphi - 2 * _dphi, dphi + 2 * _dphi, 100)
weights_dif = np.array(
[weight_dif(dphi, phi, weights, balance=balance) for dphi in dphis]
)
ind = np.argmin(weights_dif)
dphi = dphis[ind]
return dphi
[docs]
def segmentmap2(
weightmap: np.ndarray,
balance: int = 1,
partitionmap: Optional[np.ndarray] = None,
partition: Optional[int] = None,
res: List[int] = [100, 50],
) -> np.ndarray:
"""Segment a map with weights into 2 equal (unequal in balance != 1).
Parameters
----------
weightmap : array
Healpix weight map.
balance : float, optional
Balance of the weights for the partitioning.
partitionmap : int array, optional
Partitioned map IDs.
partition : int, optional
A singular partition to be partitioned in two pieces.
res : list, optional
Resolution of spherical cap grid where [phiresolution, thetaresolution]
to find region border.
Returns
-------
partitionmap : int array
Partitioned map IDs.
"""
npix = len(weightmap)
nside = hp.npix2nside(npix)
pixID = np.nonzero(weightmap)[0]
bnmap = np.zeros(npix)
bnmap[pixID] = 1
if partitionmap is None:
partitionmap = np.copy(bnmap)
maxpartition = 1
partition = 1
else:
maxpartition = int(np.max(partitionmap))
_pixID = np.where(partitionmap == partition)[0]
_bnmap = np.zeros(npix)
_bnmap[_pixID] = 1
_the, _phi = hp.pix2ang(nside, _pixID)
_weights = weightmap[_pixID]
if np.sum(_bnmap) != len(_bnmap):
p1, t1, p2, t2 = get_map_most_dist_points(_bnmap, wmap=weightmap, res=res)
a1, a2, a3 = rotate.rotate2plane([p1, t1], [p2, t2])
_phi, _the = rotate.forward_rotate(_phi, _the, a1, a2, a3)
_dphi = find_dphi(_phi, _weights, balance=balance)
_cond = np.where(_phi > _dphi)[0]
partitionmap[_pixID[_cond]] = maxpartition + 1
return partitionmap
[docs]
def segmentpoints2(
phi: np.ndarray,
the: np.ndarray,
weights: Optional[np.ndarray] = None,
balance: int = 1,
partitionID: Optional[np.ndarray] = None,
partition: Optional[int] = None,
res: int = 100,
) -> np.ndarray:
"""Segments a set of points with weights into 2 equal (unequal in balance != 1).
Parameters
----------
phi, the : array
Angular positions.
weights : array, optional
Angular position weights.
balance : float, optional
Balance of the weights for the partitioning.
partitionID : int array, optional
Partitioned map IDs.
partition : int, optional
A singular partition to be partitioned in two pieces.
res : float, optional
Resolution of spherical cap phiresolution to find region border.
Returns
-------
partitionID : int array
Partitioned map IDs.
"""
if weights is None:
weights = np.ones(len(phi))
if partitionID is None:
partitionID = np.ones(len(phi))
maxpartition = 1
partition = 1
else:
maxpartition = int(np.max(partitionID))
_pixID = np.where(partitionID == partition)[0]
_bnmap = np.ones(len(_pixID))
_phi, _the = phi[_pixID], the[_pixID]
_weights = weights[_pixID]
p1, t1, p2, t2 = get_points_most_dist_points(_phi, _the, weights=_weights, res=res)
a1, a2, a3 = rotate.rotate2plane([p1, t1], [p2, t2])
_phi, _the = rotate.forward_rotate(_phi, _the, a1, a2, a3)
_dphi = find_dphi(_phi, _weights, balance=balance)
_cond = np.where(_phi > _dphi)[0]
partitionID[_pixID[_cond]] = maxpartition + 1
return partitionID
[docs]
def segmentmapN(
weightmap: np.ndarray, Npartitions: int, res: List[int] = [100, 50]
) -> np.ndarray:
"""Segment a map with weights into equal Npartition sides.
Parameters
----------
weightmap : array
Healpix weight map.
Npartitions : int
Number of partitioned regions
res : list, optional
Resolution of spherical cap grid where [phiresolution, thetaresolution]
to find region border.
Returns
-------
partitionmap : int array
Partitioned map IDs.
"""
if Npartitions <= 1:
raise ValueError("Npartitions must be > 1.")
# The number of partitions currently assigned for each partition ID.
part_Npart = np.zeros(Npartitions)
part_Npart[0] = Npartitions
maxpartition = 1
partitionmap = np.zeros(len(weightmap))
pixID = np.nonzero(weightmap)
partitionmap[pixID] = 1.0
while any(part_Npart == 0):
for i in range(0, len(part_Npart)):
partition = i + 1
if part_Npart[i] > 1:
wei1 = int(np.floor(part_Npart[i] / 2.0))
wei2 = part_Npart[i] - wei1
part_Npart[i] = wei1
part_Npart[maxpartition] = wei2
maxpartition += 1
balance = wei2 / wei1
partitionmap = segmentmap2(
weightmap,
balance=balance,
partitionmap=partitionmap,
partition=partition,
res=res,
)
return partitionmap
[docs]
def segmentpointsN(
phi: np.ndarray,
the: np.ndarray,
Npartitions: int,
weights: Optional[np.ndarray] = None,
res: int = 100,
) -> np.ndarray:
"""Segments a set of points with weights into equal Npartition sides.
Parameters
----------
weightmap : array
Healpix weight map.
Npartitions : int
Number of partitioned regions
res : list, optional
Resolution of spherical cap grid where [phiresolution, thetaresolution]
to find region border.
Returns
-------
partitionmap : int array
Partitioned map IDs.
"""
if Npartitions <= 1:
raise ValueError("Npartitions must be > 1.")
# The number of partitions currently assigned for each partition ID.
part_Npart = np.zeros(Npartitions)
part_Npart[0] = Npartitions
maxpartition = 1
if weights is None:
weights = np.ones(len(phi))
partitionID = np.ones(len(weights))
while any(part_Npart == 0):
for i in range(0, len(part_Npart)):
partition = i + 1
if part_Npart[i] > 1:
wei1 = int(np.floor(part_Npart[i] / 2.0))
wei2 = part_Npart[i] - wei1
part_Npart[i] = wei1
part_Npart[maxpartition] = wei2
maxpartition += 1
balance = wei2 / wei1
partitionID = segmentpoints2(
phi,
the,
weights=weights,
balance=balance,
partitionID=partitionID,
partition=partition,
res=res,
)
return partitionID