#!/usr/bin/env python
"""
Simple interface to TKP source identification & measurement code.
John Sanders & John Swinbank, 2011.
This is a simplified script for running source finding with a minimal set of
arguments. It does not provide a full configuration interface or access to
all features.
Run as:
$ pyse file ...
For help with command line options:
$ pyse --help
See chapters 2 & 3 of Spreeuw, PhD Thesis, University of Amsterdam, 2010,
<http://dare.uva.nl/en/record/340633> for details.
"""
import argparse
import ast
import json
import logging
import math
import numbers
import os.path
import pdb
import sys
from types import TracebackType
from dataclasses import asdict, replace
from io import StringIO
from pathlib import Path
import astropy.io.fits as pyfits
import numpy
import sourcefinder
from sourcefinder.accessors import open as open_accessor
from sourcefinder.accessors import sourcefinder_image_from_accessor
from sourcefinder.accessors import writefits as tkp_writefits
from sourcefinder.config import (
read_conf,
ImgConf,
ExportSettings,
IMGCONF_HELP,
EXPORTSETTINGS_HELP,
)
from sourcefinder.utils import generate_result_maps
[docs]
def imgconf_help(name: str) -> str:
text = IMGCONF_HELP.get(name, "").strip()
# Retrieve default value from ImgConf dataclass
default = getattr(ImgConf, name, None)
if default is not None:
text += f"\n\n[default: {default}]"
return text
[docs]
def exportsettings_help(name: str) -> str:
text = EXPORTSETTINGS_HELP.get(name, "").strip()
# Retrieve default value from ExportSettings dataclass
default = getattr(ExportSettings, name, None)
if default is not None:
text += f"\n\n[default: {default}]"
return text
[docs]
def parse_monitoringlist_positions(
args, str_name="monitor_coords", list_name="monitor_list"
):
"""Load a list of monitoring list (RA, Dec) tuples from command
line arguments.
This function processes the flags `--monitor-coords` and `--monitor-list`.
It does not handle units, which should be matched against the requirements
of the consuming code.
Parameters
----------
args : argparse.Namespace
The command line arguments object.
str_name : str, default: "monitor_coords"
The name of the argument containing the JSON string of coordinates.
list_name : str, default: "monitor_list"
The name of the argument containing the file path to a JSON file with
coordinates.
Returns
-------
list[tuple[float, float]]
A list of (RA, Dec) tuples parsed from the input arguments.
Raises
------
json.JSONDecodeError
If the JSON string or file content cannot be parsed.
"""
monitor_coords = []
if hasattr(args, str_name) and getattr(args, str_name):
try:
monitor_coords.extend(json.loads(getattr(args, str_name)))
except json.JSONDecodeError:
logging.error(
"Could not parse monitor-coords from command line:"
"string passed was:\n%s" % (getattr(args, str_name),)
)
raise
if hasattr(args, list_name) and getattr(args, list_name):
try:
with open(getattr(args, list_name)) as file:
mon_list = json.load(file)
monitor_coords.extend(mon_list)
except json.JSONDecodeError:
logging.error(
"Could not parse monitor-coords from file: "
+ getattr(args, list_name)
)
raise
return monitor_coords
[docs]
def parse_none(value):
if isinstance(value, str):
if value.lower() == "none" or value.lower() == "null":
return None
return value
[docs]
def construct_argument_parser():
parser = argparse.ArgumentParser(
description=(
"PySE image configuration options. These can override "
"the values specified in the TOML config file."
),
)
general_group = parser.add_argument_group("General")
general_group.add_argument(
"--config-file",
help=(
"TOML file containing input arguments to PySE. Overridden by "
"command line arguments, but overrides built-in defaults from "
"config.ImgConf and config.ExportSettings. "
"This is especially convenient when swapping between "
"configurations for the same project."
),
)
general_group.add_argument(
"--pdb",
action="store_true",
help=(
"Enter debug mode when the application crashes. Meant to be "
"used for more comprehensive debugging."
),
)
general_group.add_argument(
"--show-args",
action="store_true",
help="""
View all arguments pyse will run with in the current configuration after aggregating the command-line
arguments, config-file parameters and pyse defaults.
""",
)
general_group.add_argument(
"-v",
"--version",
action="version",
version=f"PySE {sourcefinder.__version__} (Python {sys.version.split()[0]})",
)
image_group = parser.add_argument_group("Image parameters")
image_group.add_argument(
"--interpolate-order",
type=int,
help=imgconf_help("interpolate_order"),
)
image_group.add_argument(
"--median-filter",
type=int,
help=imgconf_help("median_filter"),
)
image_group.add_argument(
"--mf-threshold",
type=float,
help=imgconf_help("mf_threshold"),
)
image_group.add_argument(
"--rms-filter",
type=float,
help=imgconf_help("rms_filter"),
)
image_group.add_argument(
"--deblend-mincont",
type=float,
help=imgconf_help("deblend_mincont"),
)
image_group.add_argument(
"--structuring-element",
type=ast.literal_eval,
help=imgconf_help("structuring_element"),
)
image_group.add_argument(
"--vectorized",
action="store_true",
help=imgconf_help("vectorized"),
)
image_group.add_argument(
"--nr-threads",
type=int,
help=imgconf_help("nr_threads"),
)
image_group.add_argument(
"--margin",
type=int,
help=imgconf_help("margin"),
)
image_group.add_argument(
"--radius",
type=float,
help=imgconf_help("radius"),
)
image_group.add_argument(
"--back-size-x",
type=int,
help=imgconf_help("back_size_x"),
)
image_group.add_argument(
"--back-size-y",
type=int,
help=imgconf_help("back_size_y"),
)
image_group.add_argument(
"--grid",
type=int,
help=imgconf_help("grid"),
)
image_group.add_argument(
"--eps-ra",
type=float,
help=imgconf_help("eps_ra"),
)
image_group.add_argument(
"--eps-dec",
type=float,
help=imgconf_help("eps_dec"),
)
image_group.add_argument(
"--detection-thr",
type=float,
help=imgconf_help("detection_thr"),
)
image_group.add_argument(
"--analysis-thr", type=float, help=imgconf_help("analysis_thr")
)
image_group.add_argument(
"--fdr", action="store_true", help=imgconf_help("fdr")
)
image_group.add_argument("--alpha", type=float, help=imgconf_help("alpha"))
image_group.add_argument(
"--deblend-nthresh",
type=int,
help=imgconf_help("deblend_nthresh"),
)
image_group.add_argument(
"--bmaj",
type=float,
help=imgconf_help("bmaj"),
)
image_group.add_argument(
"--bmin",
type=float,
help=imgconf_help("bmin"),
)
image_group.add_argument(
"--bpa",
type=float,
help=imgconf_help("bpa"),
)
image_group.add_argument(
"--force-beam",
action="store_true",
help=imgconf_help("force_beam"),
)
image_group.add_argument(
"--detection-image", type=str,
help=imgconf_help("detection_image")
)
image_group.add_argument(
"--fixed-posns",
help=imgconf_help("fixed_posns"),
)
image_group.add_argument(
"--fixed-posns-file",
help=imgconf_help("fixed_posns_file"),
)
image_group.add_argument(
"--ffbox",
type=float,
help=imgconf_help("ffbox"),
)
image_group.add_argument(
"--ew-sys-err",
type=float,
help=imgconf_help("ew_sys_err"),
)
image_group.add_argument(
"--ns-sys-err",
type=float,
help=imgconf_help("ns_sys_err"),
)
# Arguments relating to output:
export_group = parser.add_argument_group("Export parameters")
export_group.add_argument(
"--output-dir",
help=exportsettings_help("output_directory"),
)
export_group.add_argument(
"--skymodel",
action="store_true",
help=exportsettings_help("skymodel"),
)
export_group.add_argument(
"--csv",
action="store_true",
help=exportsettings_help("csv"),
)
export_group.add_argument(
"--regions",
action="store_true",
help=exportsettings_help("regions"),
)
export_group.add_argument(
"--rmsmap",
action="store_true",
help=exportsettings_help("rmsmap"),
)
export_group.add_argument(
"--sigmap",
action="store_true",
help=exportsettings_help("sigmap"),
)
export_group.add_argument(
"--residuals",
action="store_true",
help=exportsettings_help("residuals"),
)
export_group.add_argument(
"--islands",
action="store_true",
help=exportsettings_help("islands"),
)
# Finally, as positional arguments, the file list:
parser.add_argument("files", nargs="*", help="Image files for processing")
return parser
[docs]
def regions(sourcelist):
"""
Return a string containing a DS9-compatible region file describing all the
sources in sourcelist.
"""
output = StringIO()
print("# Region file format: DS9 version 4.1", file=output)
print(
'global color=green dashlist=8 3 width=1 font="helvetica 10 normal" select=1 highlite=1 dash=0 fixed=0 edit=1 move=1 delete=1 include=1 source=1',
file=output,
)
print("image", file=output)
for source in sourcelist:
# NB, here we convert from internal 0-origin indexing to DS9 1-origin
# indexing
print(
"ellipse(%f, %f, %f, %f, %f)"
% (
source.x.value + 1.0,
source.y.value + 1.0,
source.smaj.value * 2,
source.smin.value * 2,
math.degrees(source.theta) + 90,
),
file=output,
)
return output.getvalue()
[docs]
def skymodel(sourcelist, ref_freq=73800000):
"""
Return a string containing a skymodel from the extracted sources for use
in self-calibration.
"""
output = StringIO()
print(
"#(Name, Type, Ra, Dec, I, Q, U, V, MajorAxis, MinorAxis, Orientation, "
"ReferenceFrequency='60e6', SpectralIndex='[0.0]') = format",
file=output,
)
for source in sourcelist:
print(
"%s, GAUSSIAN, %s, %s, %f, 0, 0, 0, %f, %f, %f, %f, [0]"
% (
"ra:%fdec:%f" % (source.ra, source.dec),
"%fdeg" % (source.ra,),
"%fdeg" % (source.dec,),
source.flux,
source.smaj_asec,
source.smin_asec,
source.theta_celes,
ref_freq,
),
file=output,
)
return output.getvalue()
[docs]
def csv(sourcelist, conf):
"""
Return a string containing a csv from the extracted sources.
"""
output = StringIO()
print(", ".join(conf.export.source_params_file), file=output)
for source in sourcelist:
values = source.serialize(conf=conf)
print(", ".join(f"{float(v):.6f}" for v in values), file=output)
return output.getvalue()
[docs]
def summary(filename, sourcelist):
"""
Return a string containing a human-readable summary of all sources in
sourcelist.
"""
output = StringIO()
print("** %s **\n" % filename, file=output)
for source in sourcelist:
print("RA: %s, dec: %s" % (str(source.ra), str(source.dec)),
file=output)
print(
"Error radius (arcsec): %s" % (str(source.error_radius)),
file=output,
)
print(
"Semi-major axis (arcsec): %s" % (str(source.smaj_asec)),
file=output,
)
print(
"Semi-minor axis (arcsec): %s" % (str(source.smin_asec)),
file=output,
)
print("Position angle: %s" % (str(source.theta_celes)), file=output)
print("Flux: %s" % (str(source.flux)), file=output)
print("Peak: %s\n" % (str(source.peak)), file=output)
return output.getvalue()
[docs]
def handle_args():
"""
Parses command line options & arguments using OptionParser.
Options & default values for the script are defined herein.
"""
parser = construct_argument_parser()
arguments = parser.parse_args()
unstructured_args = vars(arguments)
# Extract file paths, which are only to be supplied via command line,
# not config
files = unstructured_args.pop("files")
# Automatically start the debugger on an unhandled exception if
# specified
debug_on_error = unstructured_args.pop("pdb")
if debug_on_error:
def excepthook(
_exc_type: type[BaseException],
_exc_value: BaseException,
tb: TracebackType | None,
) -> None:
"""Enter post-mortem debugging on uncaught exceptions."""
if tb is not None:
pdb.post_mortem(tb)
sys.excepthook = excepthook
show_args = unstructured_args.pop("show_args")
# Merge the CLI arguments with the config file parameters
config_file = unstructured_args.pop("config_file")
conf = read_conf(config_file)
# Structure the arguments based on their group
cli_args: dict = dict()
for argument in parser._actions:
section_name = argument.container.title
if section_name not in cli_args:
cli_args[section_name] = dict()
arg_name = argument.dest
if arg_name in unstructured_args:
# Default arguments like '--help' and arguments popped from
# unstructured args like "--pdb" should be skipped
cli_value = unstructured_args[arg_name]
if cli_value is not None:
# For argparse arguments where no value is provided in the
# command line we get a None value, ignore these
cli_args[section_name][arg_name] = cli_value
image_args = cli_args["Image parameters"]
# Handle grid vs back-size-x/y precedence
if image_args.get("grid") is not None:
if image_args.get("back_size_x") is None:
image_args["back_size_x"] = image_args["grid"]
if image_args.get("back_size_y") is None:
image_args["back_size_y"] = image_args["grid"]
# Note: Dataclass replace is not recursive for stacked dataclasses.
# Replace one by one to avoid arguments being reset to their
# defaults.
conf_image = replace(conf.image, **image_args)
conf_export = replace(conf.export, **cli_args["Export parameters"])
# The functions skymodel, csv, summary and regions are currently not able
# to handle Pandas DataFrames.
no_pandas_df_dict = {"pandas_df": False}
conf_export = replace(conf_export, **no_pandas_df_dict)
conf = replace(conf, image=conf_image, export=conf_export)
# Overwrite 'fixed_coords' with a parsed list of coords
# collated from both command line and file.
fixed_coords = parse_monitoringlist_positions(
conf.image, str_name="fixed_posns", list_name="fixed_posns_file"
)
if show_args:
def print_dict_content_recursively(dict_, current_depth):
for key, value in dict_.items():
if isinstance(value, dict):
print(current_depth * " ", f"=== {key} ===")
print_dict_content_recursively(value, current_depth + 1)
else:
print(current_depth * " ", f"{key}: {value}")
print_dict_content_recursively(asdict(conf), current_depth=0)
sys.exit(0)
if len(files) == 0:
raise ValueError("At least one input file must be supplied")
# Quick & dirty check that the position list looks plausible
if fixed_coords:
mlist = numpy.array(fixed_coords)
if not (len(mlist.shape) == 2 and mlist.shape[1] == 2):
parser.error("Positions for forced-fitting must be [RA,dec] pairs")
# We have four potential modes, of which we choose only one to run:
#
# 1. Blind source finding
# 1.1 Thresholding, no detection image (no extra cmd line options)
# 1.2 Thresholding, detection image (--detection-image)
# 1.3 FDR (--fdr)
#
# 2. Fit to fixed points (--fixed-coords and/or --fixed-list)
if fixed_coords:
if conf.image.fdr:
parser.error("--fdr not supported with fixed positions")
elif conf.image.detection_image:
parser.error(
"--detection-image not supported with fixed positions")
elif conf.export.residuals:
parser.error("--residuals not supported with fixed positions")
elif conf.export.islands:
parser.error("--islands not supported with fixed positions")
mode = "fixed" # mode 2 above
elif conf.image.fdr:
if conf.image.detection_image:
parser.error("--detection-image not supported with --fdr")
mode = "fdr" # mode 1.3 above
elif conf.image.detection_image:
mode = "detimage" # mode 1.2 above
else:
mode = "threshold" # mode 1.1 above
return conf, mode, files
[docs]
def writefits(filename, data, header={}):
try:
os.unlink(filename)
except OSError:
# Thrown if file didn't exist
pass
tkp_writefits(data, filename, header)
[docs]
def get_detection_labels(filename, det, anl, beam, configuration, plane=0):
print("Detecting islands in %s" % (filename,))
print("Thresholding with det = %f sigma, analysis = %f sigma" % (det, anl))
ff = open_accessor(filename, beam=beam, plane=plane)
imagedata = sourcefinder_image_from_accessor(ff, conf=configuration)
labels, labelled_data, *_ = imagedata.label_islands(
det * imagedata.rmsmap, anl * imagedata.rmsmap
)
return labels, labelled_data
[docs]
def get_beam(bmaj, bmin, bpa):
if (
isinstance(bmaj, numbers.Real)
and isinstance(bmin, numbers.Real)
and isinstance(bpa, numbers.Real)
):
return float(bmaj), float(bmin), float(bpa)
if bmaj or bmin or bpa:
print("WARNING: partial beam specification ignored")
return None
[docs]
def bailout(reason):
# Exit with error
print("ERROR: %s" % reason)
sys.exit(1)
[docs]
def run_sourcefinder(files, conf, mode):
"""
Iterate over the list of files, running a sourcefinding step on each in
turn. If specified, a DS9-compatible region file and/or a FITS file
showing the residuals after Gaussian fitting are dumped for each file.
A string containing a human-readable list of sources is returned.
"""
output = StringIO()
beam = get_beam(conf.image.bmaj, conf.image.bmin, conf.image.bpa)
if mode == "detimage":
labels, labelled_data = get_detection_labels(
conf.image.detection_image,
conf.image.detection_thr,
conf.image.analysis_thr,
beam,
conf,
)
else:
labels, labelled_data = [], None
for counter, filename in enumerate(files):
print(
"Processing %s (file %d of %d)."
% (filename, counter + 1, len(files))
)
imagename = os.path.splitext(os.path.basename(filename))[0]
ff = open_accessor(filename, beam=beam, plane=0)
imagedata = sourcefinder_image_from_accessor(ff, conf=conf)
if mode == "fixed":
sr = imagedata.fit_fixed_positions(
eval(
conf.image.fixed_posns
), # fixed_posns is a string, use eval to obtain its contents
conf.image.ffbox * max(imagedata.beam[0:2]),
)
else:
if mode == "fdr":
print(
"Using False Detection Rate algorithm with alpha = %f"
% (conf.image.alpha,)
)
sr = imagedata.fd_extract(
alpha=conf.image.alpha,
)
else:
if labelled_data is None:
print(
"Thresholding with det = %f sigma, analysis = %f sigma"
% (conf.image.detection_thr, conf.image.analysis_thr)
)
sr = imagedata.extract(
labelled_data=labelled_data,
labels=labels,
)
export_dir = Path(conf.export.output_dir)
export_dir.mkdir(parents=True, exist_ok=True)
if conf.export.regions:
regionfile = export_dir / (imagename + ".reg")
regionfile = open(regionfile, "w")
regionfile.write(regions(sr))
regionfile.close()
# This applies a slower method for computing the Gaussian islands and
# residuals than the methods from the extract module - since these
# methods are called in parallel, from image.py, or vectorized - and
# extends to pixel positions corresponding to values below the analysis
# threshold, i.e. well into the background noise. Some users may want to
# accept the extra compute time to be able to compare the residuals with
# the background noise.
# if conf.export.residuals or conf.export.islands:
# gaussian_map, residual_map = generate_result_maps(imagedata.data,
# sr)
if conf.export.residuals:
residualfile = export_dir / (imagename + ".residuals.fits")
writefits(
residualfile,
imagedata.Gaussian_residuals,
pyfits.getheader(filename),
)
if conf.export.islands:
islandfile = export_dir / (imagename + ".islands.fits")
writefits(
islandfile,
imagedata.Gaussian_islands,
pyfits.getheader(filename),
)
if conf.export.rmsmap:
rmsfile = export_dir / (imagename + ".rms.fits")
writefits(
rmsfile,
numpy.array(imagedata.rmsmap),
pyfits.getheader(filename),
)
if conf.export.sigmap:
sigfile = export_dir / (imagename + ".sig.fits")
writefits(
sigfile,
numpy.array(imagedata.data_bgsubbed / imagedata.rmsmap),
pyfits.getheader(filename),
)
if conf.export.skymodel:
with open(
export_dir / (imagename + ".skymodel"), "w"
) as skymodelfile:
if ff.freq_eff:
skymodelfile.write(skymodel(sr, ff.freq_eff))
else:
print(
"WARNING: Using default reference frequency for %s"
% (skymodelfile.name,)
)
skymodelfile.write(skymodel(sr))
if conf.export.csv:
with open(export_dir / (imagename + ".csv"), "w") as csvfile:
csvfile.write(csv(sr, conf))
print(summary(filename, sr), end=" ", file=output)
return output.getvalue()
[docs]
def main():
logging.basicConfig()
conf, mode, files = handle_args()
print(run_sourcefinder(files, conf, mode), end=" ")
return 0
if __name__ == "__main__":
sys.exit(main())