"""Some generic utility routines for number handling and calculating
(specific) variances
"""
import itertools
import logging
import numpy as np
from numba import guvectorize, float32, int32
import pandas as pd
from sourcefinder import extract, stats, utils
from sourcefinder.config import Conf, ImgConf, ExportSettings
from sourcefinder.utility import containers
from sourcefinder.utility.uncertain import Uncertain
from sourcefinder.utility.sourceparams import make_measurements_dataframe
from concurrent.futures import ThreadPoolExecutor
from functools import cached_property
from functools import partial
from scipy import ndimage
from numba import guvectorize, float32, int32
import os
[docs]
logger = logging.getLogger(__name__)
[docs]
class ImageData(object):
"""Encapsulates an image in terms of a numpy array + meta/headerdata.
This is your primary contact point for interaction with images: it
includes facilities for source extraction and measurement, etc.
Parameters
----------
data : 2D np.ndarray
Observational image data. Must be a regular np.ndarray, since image
data read from e.g. a FITS file is not a MaskedArray.
beam : tuple
Clean beam specification as (semi-major axis, semi-minor axis,
position angle) with the axes in pixel coordinates and the position
angle in radians
wcs : utility.coordinates.wcs
World coordinate system specification, in our case it is always
about sky coordinates.
conf : Conf, default: Conf(image=ImgConf(), export=ExportSettings())
Configuration options for source finding. This includes settings
related to image processing (e.g., background and rms
noise estimation, thresholds) as well as export options (e.g., source
parameters and output maps).
"""
def __init__(
self,
data,
beam,
wcs,
conf: Conf = Conf(image=ImgConf(), export=ExportSettings()),
):
# Do data, wcs and beam need deepcopy?
# Probably not (memory overhead, in particular for data),
# but then the user shouldn't change them outside ImageData in the
# meantime
# self.rawdata is a 2D numpy array.
# Single precision is good enough in all cases.
[docs]
self.rawdata = np.ascontiguousarray(data, dtype=np.float32)
[docs]
self.wcs = wcs # a utility.coordinates.wcs instance
if utils.is_valid_beam_tuple(beam):
self.beam = beam # tuple of (semimaj, semimin, theta) with
# semimaj and semimin in pixel coordinates and theta, the position
# angle, in radians.
else:
raise ValueError(
(
"Partial beam specification: one or more of "
"(bmaj, bmin, bpa) are not specified, "
"adequately, image processing is not possible.",
RuntimeWarning,
)
)
# These three quantities are only dependent on the beam, so should be
# calculated once the beam is known and not for each source separately.
[docs]
self.fudge_max_pix_factor = utils.fudge_max_pix(
beam[0], beam[1], beam[2]
)
[docs]
self.beamsize = utils.calculate_beamsize(beam[0], beam[1])
[docs]
self.correlation_lengths = utils.calculate_correlation_lengths(
beam[0], beam[1]
)
[docs]
self.clip: dict[float, np.ndarray] = {}
[docs]
self.labels: dict[float, tuple[np.ndarray, int]] = {}
@property
[docs]
def conf(self) -> Conf:
return self._conf
###########################################################################
# #
# Properties and attributes. #
# #
# Properties are attributes managed by methods; rather than calling the #
# method directly, the attribute automatically invokes it. Result of the #
# function calls are cached, subsequent calls doesn't recompute. #
# #
# clearcache() clears all the cached data, which can get quite large. #
# It may be wise to call this, for example, in an exception handler #
# dealing with MemoryErrors. #
# #
###########################################################################
@cached_property
[docs]
def grids(self):
"""Gridded RMS and background data for interpolating"""
return self.__grids()
@cached_property
[docs]
def backmap(self):
"""Mean background map"""
if not hasattr(self, "_user_backmap"):
return self._interpolate(self.grids["mean"], self.grids["indices"])
else:
return self._user_backmap
@cached_property
[docs]
def rmsmap(self):
"""root-mean-squares map, i.e. the standard deviation of the local
background noise, interpolated across the image."""
if not hasattr(self, "_user_noisemap"):
return self._interpolate(
self.grids["rms"], self.grids["indices"], roundup=True
)
else:
return self._user_noisemap
@cached_property
[docs]
def data(self):
"""Masked image data"""
# We will ignore all the data which is masked for the rest of the
# sourcefinding process. We build up the mask by stacking ("or-ing
# together") a number of different effects:
#
# * A margin from the edge of the image;
# * Any data outside a given radius from the centre of the image;
# * Data which is "obviously" bad (NaN).
mask = np.zeros((self.xdim, self.ydim))
if self.conf.image.margin:
margin_mask = np.ones((self.xdim, self.ydim))
margin_mask[
self.conf.image.margin : -self.conf.image.margin,
self.conf.image.margin : -self.conf.image.margin,
] = 0
mask = np.logical_or(mask, margin_mask)
if self.conf.image.radius:
radius_mask = utils.circular_mask(
self.xdim, self.ydim, self.conf.image.radius
)
mask = np.logical_or(mask, radius_mask)
mask = np.logical_or(mask, np.isnan(self.rawdata))
return np.ma.array(self.rawdata, mask=mask)
@cached_property
[docs]
def data_bgsubbed(self):
"""Background subtracted masked image data"""
return (self.data - self.backmap).astype(np.float32, copy=False)
@property
[docs]
def xdim(self):
"""X pixel dimension of (unmasked) data"""
return self.rawdata.shape[0]
@property
[docs]
def ydim(self):
"""Y pixel dimension of (unmasked) data"""
return self.rawdata.shape[1]
@property
[docs]
def pixmax(self):
"""Maximum pixel value (pre-background subtraction)"""
return self.data.max()
@property
[docs]
def pixmin(self):
"""Minimum pixel value (pre-background subtraction)"""
return self.data.min()
[docs]
def clearcache(self):
"""Zap any calculated data stored in this object.
Clear the background and rms maps, labels, clip, and any locally held
data. All of these can be reconstructed from the data accessor.
Note that this *must* be run to pick up any new settings.
"""
try:
self.labels.clear()
self.clip.clear()
del self.backmap
del self.rmsmap
del self.data
del self.data_bgsubbed
del self.grids
del self.Gaussian_islands
del self.Gaussian_residuals
del self.residuals_from_deblending
except AttributeError:
pass
###########################################################################
# #
# General purpose image handling. #
# #
# Routines for saving and trimming data, and calculating background/RMS #
# maps (in conjuntion with the properties above). #
# #
###########################################################################
# Private "support" methods
[docs]
def __grids(self):
"""Calculate background and RMS grids of this image.
These grids can be interpolated up to make maps of the original image
dimensions: see _interpolate().
This is called automatically when ImageData.backmap,
ImageData.rmsmap or ImageData.fdrmap is first accessed.
"""
# there's no point in working with the whole of the data array
# if it's masked.
useful_chunk = ndimage.find_objects(np.where(self.data.mask, 0, 1))
assert len(useful_chunk) == 1
x_dim, y_dim = self.data[useful_chunk[0]].shape
# Use 'back-size-x' and 'back-size-y' if available, fall back to 'grid'.
back_size_x = self.conf.image.back_size_x or self.conf.image.grid
if back_size_x is None:
raise ValueError(
(
"Expected either back-size-x or grid to be set "
"in the config object"
)
)
back_size_y = self.conf.image.back_size_y or self.conf.image.grid
if back_size_y is None:
raise ValueError(
(
"Expected either back-size-y or grid to be set "
"in the config object"
)
)
# We should divide up the image into subimages such that each grid
# node is centered on a subimage. This is only possible if
# self.back_size_x and self.back_size_y are divisors of xdim and ydim,
# respectively. If not, we need to select a frame within useful_chunk
# that does have the appropriate dimensions. At the same time, it
# should be as large as possible and centered within useful_chunk.
rem_row = np.mod(x_dim, back_size_x)
rem_col = np.mod(y_dim, back_size_y)
start_offset_row, rem_rem_row = divmod(rem_row, 2)
start_offset_col, rem_rem_col = divmod(rem_col, 2)
end_offset_row = start_offset_row + rem_rem_row
end_offset_col = start_offset_col + rem_rem_col
offsets = np.array(
[
start_offset_row,
-end_offset_row,
start_offset_col,
-end_offset_col,
]
)
useful_chunk_inds = ImageData.slices_to_indices(useful_chunk)[0]
centred_inds = useful_chunk_inds + offsets
# Before proceeding, check that our data has the size of at least
# one subimage, for both dimensions.
if (
centred_inds[1] - centred_inds[0] > back_size_x
and centred_inds[3] - centred_inds[2] > back_size_y
):
subimages, number_of_elements_for_each_subimage = (
utils.make_subimages(
self.data.data[
centred_inds[0] : centred_inds[1],
centred_inds[2] : centred_inds[3],
],
self.data.mask[
centred_inds[0] : centred_inds[1],
centred_inds[2] : centred_inds[3],
],
back_size_x,
back_size_y,
)
)
mean_grid = np.zeros(
number_of_elements_for_each_subimage.shape, dtype=np.float32
)
rms_grid = np.zeros(
number_of_elements_for_each_subimage.shape, dtype=np.float32
)
stats.data_clipper_dynamic(
subimages,
number_of_elements_for_each_subimage,
mean_grid,
rms_grid,
)
# Fill in the zeroes with nearest neighbours.
# In this way we do not have to make a MaskedArray, which
# scipy.interpolate.interp1d cannot handle adequately.
mean_grid = utils.nearest_nonzero(mean_grid, rms_grid)
rms_grid = utils.nearest_nonzero(rms_grid, rms_grid)
else:
# Return an empty grid if we don't have enough pixels along both
# dimensions. In that case ImageData._interpolate will return
# completely masked background maps, which is what we want, since
# no sources will be extracted.
mean_grid = np.empty((0, 0), dtype=np.float32)
rms_grid = np.empty((0, 0), dtype=np.float32)
return {"mean": mean_grid, "rms": rms_grid, "indices": centred_inds}
[docs]
def _interpolate(self, grid, inds, roundup=False):
"""Interpolate a grid to produce a map of the dimensions of
the image.
Parameters
----------
grid : np.ma.MaskedArray
The grid to be interpolated.
roundup : bool, default: False
If True, values of the resultant map which are lower than the input
grid are trimmed. Default is False.
Returns
-------
np.ma.MaskedArray
The interpolated map.
Notes
-----
This function is used to transform the RMS, background or FDR grids
produced by :func:`_grids()` to a map we can compare with the image
data.
"""
# Use zeroes with the mask from the observational image as a starting
# point for the mean background and rms background maps. Next, use
# the interpolated values from the background grids, which were derived
# using kappa * sigma clipping, to fill in all unmasked pixels.
my_map = np.ma.MaskedArray(
np.zeros(self.data.shape), mask=self.data.mask, dtype=np.float32
)
# Remove the MaskedArrayFutureWarning warning and keep old numpy < 1.11
# behavior
my_map.unshare_mask()
# If the grid has size 0 there is no point in proceeding.
if grid.size == 0:
my_map.mask = True
return my_map
my_xdim, my_ydim = inds[1] - inds[0], inds[3] - inds[2]
if self.conf.image.median_filter:
f_grid = ndimage.median_filter(grid, self.conf.image.median_filter)
if self.conf.image.mf_threshold:
grid = np.where(
np.fabs(f_grid - grid) > self.conf.image.mf_threshold,
f_grid,
grid,
)
else:
grid = f_grid
if self.conf.image.interpolate_order == 1:
my_map[inds[0] : inds[1], inds[2] : inds[3]] = (
utils.two_step_interp(grid, my_xdim, my_ydim)
)
else:
# This condition is there to make sure we actually have some
# unmasked patch of the image to fill.
slicex = slice(-0.5, -0.5 + grid.shape[0], 1j * my_xdim)
slicey = slice(-0.5, -0.5 + grid.shape[1], 1j * my_ydim)
# Bicubic spline interpolation
my_map[inds[0] : inds[1], inds[2] : inds[3]] = (
ndimage.map_coordinates(
grid,
np.mgrid[slicex, slicey],
mode="nearest",
order=self.conf.image.interpolate_order,
)
)
# If the input grid was entirely masked, then the output map must
# also be masked: there's no useful data here. We don't search for
# sources on a masked background/RMS, so this data will be cleanly
# skipped by the rest of the sourcefinder
if np.ma.getmask(grid).all():
my_map.mask = True
elif roundup:
# In some cases, the spline interpolation may produce values
# lower than the minimum value in the map. If required, these
# can be trimmed off. No point doing this if the map is already
# fully masked, though.
my_map = np.ma.MaskedArray(
data=np.where(my_map >= np.min(grid), my_map, np.min(grid)),
mask=my_map.mask,
)
return my_map
###########################################################################
# #
# Source extraction. #
# #
# Provides for both traditional (islands-above-RMS) and FDR source #
# extraction systems. #
# #
###########################################################################
[docs]
def reverse_se(self):
"""Run source extraction on the negative of this image.
This process can be used to estimate the false positive rate, as there
should be no sources in the negative image.
Returns
-------
:class:`sourcefinder.utility.containers.ExtractionResults`
To prevent interference with the normal extraction process, cached
data (background map, clips, etc.) is cleared before and after
running this method. If this method is used frequently, a separate
cache may be implemented in the future.
"""
self.labels.clear()
self.clip.clear()
self.data_bgsubbed *= -1
results = self.extract()
self.data_bgsubbed *= -1
self.labels.clear()
self.clip.clear()
return results
@staticmethod
[docs]
def box_slice_about_pixel(x, y, box_radius):
"""Returns a slice centred about (x,y), of width = 2 *
int(box_radius) + 1.
Parameters
----------
x : int
Desired row index.
y : int
Desired column index.
box_radius : float
Radius of the box in pixel coordinates.
Returns
-------
tuple of slice
Slice centred about (x,y) with width = 2*box_radius + 1.
"""
ibr = int(box_radius)
x = int(x)
y = int(y)
return (slice(x - ibr, x + ibr + 1), slice(y - ibr, y + ibr + 1))
[docs]
def fit_to_point(
self, x: int, y: int, boxsize: int, threshold: float, fixed: str
):
"""Fit an elliptical Gaussian to a specified point on the
image.
Parameters
----------
x : int
Pixel x-coordinate of the point to fit.
y : int
Pixel y-coordinate of the point to fit.
boxsize : int
Length of the square section of the image to use for the fit.
threshold : float
Threshold below which data is not used for fitting (in units of
rmsmap).
fixed : str
If set to ``position``, the pixel coordinates are fixed in the
fit.
Returns
-------
Detection
An instance of :class:`sourcefinder.extract.Detection` containing
the fit results.
"""
logger.debug("Force-fitting pixel location ({},{})".format(x, y))
# First, check that x and y are actually valid semi-positive integers.
# Otherwise,
# If they are too high (positive), then indexing will fail
# BUT, if they are negative, then we get wrap-around indexing
# and the fit continues at the wrong position!
if x < 0 or x > self.xdim or y < 0 or y > self.ydim:
logger.warning(
"Dropping forced fit at ({},{}), "
"pixel position outside image".format(x, y)
)
return None
# Next, check if any of the central pixels (in a 3x3 box about the
# fitted pixel position) have been Masked
# (e.g. if NaNs, or close to image edge) - reject if so.
central_pixels_slice = ImageData.box_slice_about_pixel(x, y, 1)
if self.data.mask[central_pixels_slice].any():
logger.warning(
"Dropping forced fit at ({},{}), "
"Masked pixel in central fitting region".format(x, y)
)
return None
if (
# Recent NumPy
hasattr(np.ma.core, "MaskedConstant")
and isinstance(self.rmsmap, np.ma.core.MaskedConstant)
) or (
# Old NumPy
np.ma.is_masked(self.rmsmap[int(x), int(y)])
):
logger.error("Background is masked: cannot fit")
return None
chunk = ImageData.box_slice_about_pixel(x, y, boxsize / 2.0)
if threshold is not None:
# We'll mask out anything below threshold*self.rmsmap from the fit.
labels, num = self.labels.setdefault(
# Dictionary mapping threshold -> islands map
threshold,
ndimage.label(
self.clip.setdefault(
# Dictionary mapping threshold -> mask
threshold,
np.where(
self.data_bgsubbed > threshold * self.rmsmap, 1, 0
),
)
),
)
mylabel = labels[int(x), int(y)]
if mylabel == 0: # 'Background'
raise ValueError(
"Fit region is below specified threshold, fit aborted."
)
mask = np.where(labels[chunk] == mylabel, 0, 1)
fitme = np.ma.array(self.data_bgsubbed[chunk], mask=mask)
if len(fitme.compressed()) < 1:
raise IndexError("Fit region too close to edge or too small")
else:
fitme = self.data_bgsubbed[chunk]
if fitme.size < 1:
raise IndexError("Fit region too close to edge or too small")
if not len(fitme.compressed()):
logger.error("All data is masked: cannot fit")
return None
# set argument for fixed parameters based on input string
if fixed == "position":
_fixed = {"xbar": boxsize / 2.0, "ybar": boxsize / 2.0}
elif fixed == "position+shape":
_fixed = {
"xbar": boxsize / 2.0,
"ybar": boxsize / 2.0,
"semimajor": self.beam[0],
"semiminor": self.beam[1],
"theta": self.beam[2],
}
elif fixed is None:
_fixed = {}
else:
raise TypeError("Unkown fixed parameter")
if threshold is not None:
threshold_at_pixel = threshold * self.rmsmap[int(x), int(y)]
else:
threshold_at_pixel = None
try:
measurement, _, _ = extract.source_profile_and_errors(
fitme,
threshold_at_pixel,
self.rmsmap[chunk],
self.rmsmap[int(x), int(y)],
self.beam,
self.fudge_max_pix_factor,
self.beamsize,
self.correlation_lengths,
self.conf.image,
fixed=_fixed,
)
except ValueError:
# Fit failed to converge
# Moments are not applicable when holding parameters fixed
logger.error("Gaussian fit failed at %f, %f", x, y)
return None
try:
assert abs(measurement["xbar"]) < boxsize
assert abs(measurement["ybar"]) < boxsize
except AssertionError:
logger.warning("Fit falls outside of box.")
measurement["xbar"] += x - boxsize / 2.0
measurement["ybar"] += y - boxsize / 2.0
measurement.sig = (fitme / self.rmsmap[chunk]).max()
return extract.Detection(measurement, self)
[docs]
def fit_fixed_positions(
self,
positions,
boxsize,
threshold=None,
fixed="position+shape",
ids=None,
):
"""Convenience function to fit a list of sources at the given
positions.
This function wraps around :py:func:`fit_to_point`.
Parameters
----------
positions : list of tuples
List of (RA, Dec) tuples. Positions to be fit, in decimal degrees.
boxsize : int
Length of the square section of the image to use for the fit.
threshold : float, default: None
Threshold below which data is not used for fitting.
fixed : str, default: 'position+shape'
If set to `position`, the pixel coordinates are fixed in the fit.
ids : tuple, default: None
List of identifiers. If not None, must match the length and order of
the requested fits.
Notes
-----
boxsize is in pixel coordinates, not in sky coordinates.
Returns
-------
tuple
A list of successful fits. If ``ids`` is None, returns a single list
of :class:`sourcefinder.extract.Detection` s. Otherwise, returns a
tuple of two matched lists: ([detections], [matching_ids]).
"""
if ids is not None:
assert len(ids) == len(positions)
successful_fits = []
successful_ids = []
for idx, posn in enumerate(positions):
try:
(
x,
y,
) = self.wcs.s2p((posn[0], posn[1]))
except RuntimeError as e:
if str(e).startswith("wcsp2s error: 8:") or str(e).startswith(
"wcsp2s error: 9:"
):
logger.warning(
"Input coordinates (%.2f, %.2f) invalid: ",
posn[0],
posn[1],
)
else:
raise
else:
try:
fit_results = self.fit_to_point(
x, y, boxsize=boxsize, threshold=threshold, fixed=fixed
)
if not fit_results:
# We were unable to get a good fit
continue
if fit_results.ra.error == float(
"inf"
) or fit_results.dec.error == float("inf"):
logging.warning("position errors extend outside image")
else:
successful_fits.append(fit_results)
if ids:
successful_ids.append(ids[idx])
except IndexError as e:
logger.warning(
"Input pixel coordinates (%.2f, %.2f) "
"could not be fit because: " + str(e),
posn[0],
posn[1],
)
if ids:
return successful_fits, successful_ids
return successful_fits
[docs]
def label_islands(self, detectionthresholdmap, analysisthresholdmap):
"""Return a labelled array of pixels for fitting.
Parameters
----------
detectionthresholdmap : np.ma.MaskedArray
Detection threshold map with shape (nrow, ncol), matching the shape
of the observational image (self.rawdata). The values are of dtype
np.float32.
analysisthresholdmap : np.ma.MaskedArray
Analysis threshold map with shape (nrow, ncol), matching the shape
of the observational image (self.rawdata). The values are of dtype
np.float32.
Returns
-------
tuple
- labels_above_det_thr (np.ndarray): 1D array of labels
above detection threshold, with shape
(num_islands_above_detection_threshold,) and dtype
np.int64. Note that the length of this array may be
smaller than the total number of islands above the
analysis threshold, as some labels may have been
filtered out due to a peak spectral brightness lower
than the local detection threshold.
- labelled_data (np.ndarray): Array of labelled pixels,
where each pixel with a nonzero label corresponds to an
island above the analysis threshold. The array has the
same shape as the observational image (self.rawdata) and
contains integer values corresponding to the labels of
the islands. Pixels that do not belong to any island are
assigned a label of 0. The number of islands above the
analysis threshold is equal to the number of unique
labels in this array, which is equal to or larger than
num_islands_above_detection_threshold, i.e. the number
of islands above the detection threshold. This array
has dtype np.int32.
- num_islands_above_detection_threshold (int): Number of
islands above detection threshold.
- maxposs_above_det_thr (np.ndarray): Array of indices of
the maximum pixel values above detection threshold, with
shape (num_islands_above_detection_threshold, 2) and
dtype np.int32.
- maxis_above_det_thr (np.ndarray): Array of maximum pixel
values above detection threshold, with shape
(num_islands_above_detection_threshold,) and dtype
np.float32.
- npixs_above_det (np.ndarray): 1D array of pixel counts
for each island with peak spectral brightness above the
detection threshold, with shape
(num_islands_above_detection_threshold,) and dtype
np.int32.
- all_indices_above_det_thr (np.ndarray): Array of indices
of the islands above detection threshold, with shape
(num_islands_above_detection_threshold, 4) and dtype
np.int32.
"""
# If there is no usable data, we return an empty set of islands.
if not len(self.rmsmap.compressed()):
logging.warning("RMS map masked; sourcefinding skipped")
return (
np.zeros(0, dtype=np.int32),
np.zeros(self.data_bgsubbed.shape, dtype=np.int32),
0,
np.zeros((0, 2), dtype=np.int32),
np.zeros(0, dtype=np.float32),
np.zeros(0, dtype=np.int32),
np.zeros((0, 4), dtype=np.int32),
)
# At this point, we select all the data which is eligible for
# source fitting. We are actually using three separate filters, which
# exclude:
#
# 1. Anything which has been masked before we reach this point;
# 2. Any pixels which fall below the analysis threshold at that pixel
# position;
# 3. Any pixels corresponding to a position where the RMS noise is
# less than RMS_FILTER times the median RMS across
# the whole image.
#
# The third filter attempts to exclude those regions of the image
# which contain no usable data; for example, the parts of the image
# falling outside the circular region produced by awimager.
if self.conf.image.rms_filter:
clipped_data = np.ma.where(
(self.data_bgsubbed > analysisthresholdmap)
& (
self.rmsmap
>= (
self.conf.image.rms_filter
* np.median(self.grids["rms"])
)
),
1,
0,
)
else:
clipped_data = np.ma.where(
self.data_bgsubbed > analysisthresholdmap, 1, 0
)
if self.conf.image.remove_edge_sources:
clipped_data_mask = clipped_data.mask
# Treat masked pixels as source pixels, by setting "fill_value=1"
# instead of "fill_value=0". In this way, we can figure out if
# some source pixels were connected to masked pixels and should
# therefore not be included in the source extraction process.
clipped_data = clipped_data.filled(fill_value=1)
labelled_data, num_labels = ndimage.label(
clipped_data, self.conf.image.structuring_element
)
# "masked_labels" should contain the labels of sources connected to
# masked pixels, i.e. sources that we do not want to measure, since
# the pixels will be distributed asymmmetrically around the source's
# barycenter and the measurement will be compromised.
masked_labels = np.unique(labelled_data[clipped_data_mask])
# Also exclude sources with pixels - with values above the analysis
# threshold - touching the edges of the image, for the same reason.
edge_labels = np.unique(
np.concatenate(
(
labelled_data[0, :],
labelled_data[-1, :],
labelled_data[:, 0],
labelled_data[:, -1],
)
)
)
# Unite the labels corresponding to sources touching masked pixels
# with the labels corresponding to sources touching the edges of the
# image.
discarded_labels = np.unique(
np.concatenate((edge_labels, masked_labels))
)
# The background label (0) does not relate to source pixels and should
# therefore be excluded when we want to remove sources near masked
# pixels.
discarded_labels = np.delete(
discarded_labels, (discarded_labels == 0).nonzero()
)
# Replace bad labels by zeroes. Consequently, the labels in
# labelled_data may no longer form a consecutive sequence of integers
# 1,2,3,4...N.
labelled_data[np.isin(labelled_data, discarded_labels)] = 0
# np.arange(1, num_labels + 1)) to discard the zero label
# (background).
# Will break in the pathological case that all the image pixels
# are covered by sources, but we will take that risk.
labels_arr = np.arange(1, num_labels + 1, dtype=np.int32)
# Delete masked labels from the labels array.
labels_arr = np.delete(
labels_arr, np.isin(labels_arr, discarded_labels)
)
num_labels -= discarded_labels.size
# Get a bounding box for each island:
# NB Slices ordered by label value (1...N,), although some labels
# may have been removed.
slices = ndimage.find_objects(labelled_data)
# 'None' is returned for each removed (bad) label. We remove them from
# "slices".
slices = [x for x in slices if x is not None]
else:
clipped_data = clipped_data.filled(fill_value=0)
labelled_data, num_labels = ndimage.label(
clipped_data, self.conf.image.structuring_element
)
labels_arr = np.arange(1, num_labels + 1, dtype=np.int32)
# Get a bounding box for each island:
# NB Slices ordered by label value (1...N,)
slices = ndimage.find_objects(labelled_data)
# Derive all indices than correspond to the slices
all_indices = ImageData.slices_to_indices(slices)
# Derive maximum positions, maximum values and number of pixels
# per island.
maxposs = np.empty((num_labels, 2), dtype=np.int32)
dummy = np.empty_like(maxposs)
maxis = np.empty(num_labels, dtype=np.float32)
npixs = np.empty(num_labels, dtype=np.int32)
ImageData.extract_parms_image_slice(
self.data_bgsubbed.data.astype(dtype=np.float32, copy=False),
all_indices,
labelled_data,
labels_arr,
dummy,
maxposs,
maxis,
npixs,
)
# Here we remove the labels that correspond to islands below the
# detection threshold.
above_det_thr = (
maxis > detectionthresholdmap[maxposs[:, 0], maxposs[:, 1]]
)
num_islands_above_detection_threshold = above_det_thr.sum()
labels_above_det_thr = np.extract(above_det_thr, labels_arr)
maxposs_above_det_thr = np.compress(above_det_thr, maxposs, axis=0)
maxis_above_det_thr = np.extract(above_det_thr, maxis)
npixs_above_det = np.extract(above_det_thr, npixs)
all_indices_above_det_thr = np.compress(
above_det_thr, all_indices, axis=0
)
print(
f"Number of detected sources ="
f" {num_islands_above_detection_threshold}"
)
return (
labels_above_det_thr,
labelled_data,
num_islands_above_detection_threshold,
maxposs_above_det_thr,
maxis_above_det_thr,
npixs_above_det,
all_indices_above_det_thr,
)
@staticmethod
[docs]
def fit_islands(
fudge_max_pix_factor, beamsize, correlation_lengths, fixed, island
):
"""This function was created to enable the use of 'partial'
such that we can parallellize source measurements
"""
return island.fit(
fudge_max_pix_factor, beamsize, correlation_lengths, fixed=fixed
)
# “The sliced_to_indices function has been accelerated by a factor 2.7
# using ChatGPT 4.0. Its AI-output has been verified for correctness,
# accuracy and completeness, adapted where needed, and approved by the
# author
@staticmethod
[docs]
def slices_to_indices(slices):
"""Convert the list of tuples of slices generated by
scipy.ndimage.find_objects into a 2D int32 array with number
of rows equal to the number of islands and 4 columns, i.e 4
integers per island, containing the same information as the
slices, but more suitable for compilation by Numba
"""
num_slices = len(slices)
all_indices = np.empty((num_slices, 4), dtype=np.int32)
for index, (r, c) in enumerate(slices):
all_indices[index, 0] = r.start # Row start
all_indices[index, 1] = r.stop # Row stop
all_indices[index, 2] = c.start # Column start
all_indices[index, 3] = c.stop # Column stop
return all_indices
@staticmethod
@guvectorize(
[
(
float32[:, :],
int32[:],
int32[:, :],
int32[:],
int32[:],
int32[:],
float32[:],
int32[:],
)
],
"(n, m), (l), (n, m), " + "(), (k) -> (k), (), ()",
target="parallel",
)
[docs]
def _pyse(
self,
detectionthresholdmap,
analysisthresholdmap,
labelled_data=None,
labels=np.array([], dtype=np.int32),
):
"""Run Python-based source extraction on this image.
Parameters
----------
detectionthresholdmap : np.ma.MaskedArray
2D array of floats with the same shape as the observational image
(self.rawdata). The detection threshold map imposes an extra
threshold for source detection and is therefore higher than the
analysis threshold map.
analysisthresholdmap : np.ma.MaskedArray
2D array of floats with the same shape as the observational image
(self.rawdata). analysisthresholdmap imposes the primary threshold
for source detection. All the pixels within the island that exceed
this will be used when measuring the source. It is lower (or equal)
than detectionthresholdmap, or else we would be left with too few
pixels for proper source shape measurements, in some cases. This
map is computed as analysis_threshold * self.rmsmap.
labelled_data : np.ndarray, optional, default=None
Labelled island map (output of np.ndimage.label()). Will be
calculated automatically if not provided.
labels : np.ndarray, optional, default=np.array([], dtype=np.int32)
Array of integers representing the labels in the island map to use
for fitting.
Returns
-------
A `utility.containers.ExtractionResults` instance or a Pandas
DataFrame containing the results of the source extraction.
Notes
-----
This is described in detail in the "LOFAR Transients Pipeline" article
by John D. Swinbank et al., see
https://doi.org/10.1016/j.ascom.2015.03.002
"""
# Map our chunks onto a list of islands.
if labelled_data is None:
(
labels,
labelled_data,
num_islands,
maxposs,
maxis,
npixs,
indices,
) = self.label_islands(detectionthresholdmap, analysisthresholdmap)
if self.conf.image.deblend_nthresh or not self.conf.image.vectorized:
results = containers.ExtractionResults()
island_list = []
for index, label in enumerate(labels):
chunk = (
slice(indices[index, 0], indices[index, 1]),
slice(indices[index, 2], indices[index, 3]),
)
# In selected_data only the pixels with the "correct"
# (see above) labels are retained. Other pixel values are
# set to -(bignum).
# In this way, disconnected pixels within (rectangular)
# slices around islands (particularly the large ones) do
# not affect the source measurements.
selected_data = np.ma.where(
labelled_data[chunk] == label,
self.data_bgsubbed[chunk].data,
-extract.BIGNUM,
).filled(fill_value=-extract.BIGNUM)
island_list.append(
extract.Island(
selected_data,
self.rmsmap[chunk],
chunk,
detectionthresholdmap[chunk],
self.beam,
self.conf.image,
)
)
if self.conf.export.islands:
self.Gaussian_islands = np.zeros(self.data.shape)
# If required, we can save the 'leftovers' from the deblending and
# fitting processes for later analysis. This needs setting up here:
if self.conf.export.residuals:
self.Gaussian_residuals = np.zeros(self.data.shape)
self.residuals_from_deblending = np.zeros(self.data.shape)
for island in island_list:
self.residuals_from_deblending[
island.chunk
] += island.data.filled(fill_value=0.0)
# Deblend each of the islands to its consituent parts, if necessary
if self.conf.image.deblend_nthresh:
deblended_list = [x.deblend() for x in island_list]
island_list = list(utils.flatten(deblended_list))
# Set up the fixed fit parameters if 'force beam' is on:
if self.conf.image.force_beam:
fixed = {
"semimajor": self.beam[0],
"semiminor": self.beam[1],
"theta": self.beam[2],
}
else:
fixed = None
# Iterate over the list of islands and measure the source in each,
# appending it to the results list.
fit_islands_partial = partial(
ImageData.fit_islands,
self.fudge_max_pix_factor,
self.beamsize,
self.correlation_lengths,
fixed,
)
max_workers = self.conf.image.nr_threads
with ThreadPoolExecutor(max_workers=max_workers) as executor:
fit_results = list(
executor.map(fit_islands_partial, island_list)
)
for island, fit_result in zip(island_list, fit_results):
if fit_result:
measurement, Gauss_island, Gauss_residual = fit_result
else:
# Failed to fit; drop this island and go to the next.
continue
try:
det = extract.Detection(
measurement,
self,
chunk=island.chunk,
)
if det.ra.error == float("inf") or det.dec.error == float(
"inf"
):
logger.warning(
(
"Bad fit from blind extraction at "
"pixel coords:%f %f - measurement "
"discarded "
"(increase fitting margin?)"
),
det.x,
det.y,
)
else:
results.append(det)
except RuntimeError as e:
logger.error("Island not processed; unphysical?")
if self.conf.export.islands:
self.Gaussian_islands[island.chunk] += Gauss_island
if self.conf.export.residuals:
self.residuals_from_deblending[
island.chunk
] -= island.data.filled(fill_value=0.0)
self.Gaussian_residuals[island.chunk] += Gauss_residual
else:
(
moments_of_sources,
sky_barycenters,
ra_errors,
dec_errors,
error_radii,
smaj_asec,
errsmaj_asec,
smin_asec,
errsmin_asec,
theta_celes_values,
theta_celes_errors,
theta_dc_celes_values,
theta_dc_celes_errors,
Gaussian_islands,
Gaussian_residuals,
sig,
chisq,
reduced_chisq,
) = extract.source_measurements_vectorised(
num_islands,
npixs,
maxposs,
maxis,
self.data_bgsubbed.data,
self.rmsmap.data,
analysisthresholdmap.data,
indices,
labelled_data,
labels,
self.wcs,
self.fudge_max_pix_factor,
self.beam,
self.beamsize,
self.correlation_lengths,
self.conf,
)
if self.conf.export.islands:
self.Gaussian_islands = Gaussian_islands
if self.conf.export.residuals:
self.Gaussian_residuals = Gaussian_residuals
if self.conf.export.pandas_df:
sources_df = make_measurements_dataframe(
moments_of_sources,
sky_barycenters,
ra_errors,
dec_errors,
smaj_asec,
errsmaj_asec,
smin_asec,
errsmin_asec,
theta_celes_values,
theta_celes_errors,
theta_dc_celes_values,
theta_dc_celes_errors,
error_radii,
sig,
chisq,
reduced_chisq,
)
return sources_df[self.conf.export.source_params]
else:
results = containers.ExtractionResults()
for count, _ in enumerate(labels):
chunk = (
slice(indices[count, 0], indices[count, 1]),
slice(indices[count, 2], indices[count, 3]),
)
param = extract.ParamSet()
param.sig = sig[count]
param.chisq = chisq[count]
param.reduced_chisq = reduced_chisq[count]
param["peak"] = Uncertain(
moments_of_sources[count, 0, 0],
moments_of_sources[count, 1, 0],
)
param["flux"] = Uncertain(
moments_of_sources[count, 0, 1],
moments_of_sources[count, 1, 1],
)
param["xbar"] = Uncertain(
moments_of_sources[count, 0, 2],
moments_of_sources[count, 1, 2],
)
param["ybar"] = Uncertain(
moments_of_sources[count, 0, 3],
moments_of_sources[count, 1, 3],
)
param["semimajor"] = Uncertain(
moments_of_sources[count, 0, 4],
moments_of_sources[count, 1, 4],
)
param["semiminor"] = Uncertain(
moments_of_sources[count, 0, 5],
moments_of_sources[count, 1, 5],
)
param["theta"] = Uncertain(
moments_of_sources[count, 0, 6],
moments_of_sources[count, 1, 6],
)
param["semimaj_deconv"] = Uncertain(
moments_of_sources[count, 0, 7],
moments_of_sources[count, 1, 7],
)
param["semimin_deconv"] = Uncertain(
moments_of_sources[count, 0, 8],
moments_of_sources[count, 1, 8],
)
param["theta_deconv"] = Uncertain(
moments_of_sources[count, 0, 9],
moments_of_sources[count, 1, 9],
)
det = extract.Detection(param, self, chunk=chunk)
results.append(det)
if self.conf.export.pandas_df:
serialized_results = [
r.serialize(self.conf, every_parm=True) for r in results
]
return pd.DataFrame(serialized_results)
else:
return results