Source code for sourcefinder.accessors.lofarcasaimage

"""
This module implements the CASA LOFAR data container format, described in this
document:

http://www.lofar.org/operations/lib/exe/fetch.php?media=:public:documents
:casa_image_for_lofar_0.03.00.pdf
"""
import datetime
import logging

import numpy
from casacore.tables import table as casacore_table

from sourcefinder.accessors.casaimage import CasaImage
from sourcefinder.accessors.lofaraccessor import LofarAccessor
from sourcefinder.utility.coordinates import julian2unix

[docs] logger = logging.getLogger(__name__)
[docs] subtable_names = ( 'LOFAR_FIELD', 'LOFAR_ANTENNA', 'LOFAR_HISTORY', 'LOFAR_ORIGIN', 'LOFAR_QUALITY', 'LOFAR_STATION', 'LOFAR_POINTING', 'LOFAR_OBSERVATION' )
[docs] class LofarCasaImage(CasaImage, LofarAccessor): # type: ignore[misc] """ Use casacore to pull image data out of a CASA table. This accessor assumes the casatable contains the values described in the CASA Image description for LOFAR. 0.03.00. Parameters ---------- url : str Location of the CASA table. plane : int, default: 0 If the data is a datacube, specifies which plane to use. beam : tuple, default: None Beam parameters in degrees, in the form (bmaj, bmin, bpa). If not supplied, the method will attempt to read these from the header. """ def __init__(self, url, plane=0, beam=None): super().__init__(url, plane, beam) table = casacore_table(self.url, ack=False) subtables = self.open_subtables(table)
[docs] self.taustart_ts = self.parse_taustartts(subtables)
[docs] self.tau_time = self.parse_tautime(subtables)
# Additional, LOFAR-specific metadata
[docs] self.antenna_set = self.parse_antennaset(subtables)
self.ncore, self.nremote, self.nintl = self.parse_stations(subtables)
[docs] self.subbandwidth = self.parse_subbandwidth(subtables)
[docs] self.subbands = self.parse_subbands(subtables)
@staticmethod
[docs] def open_subtables(table): """Open all subtables defined in the LOFAR format. Parameters ---------- table : casacore.tables.table A casacore table handler to a LOFAR CASA table. Returns ------- dict A dictionary containing all LOFAR CASA subtables """ subtables = {} for subtable in subtable_names: subtable_location = table.getkeyword("ATTRGROUPS")[subtable] subtables[subtable] = casacore_table(subtable_location, ack=False) return subtables
@staticmethod
[docs] def parse_taustartts(subtables): """Extract the image start time from the CASA table header. Parameters ---------- subtables : dict A dictionary containing all LOFAR CASA subtables. Returns ------- datetime.datetime The earliest observation start time as a datetime object. Notes ----- We sort the table in order of ascending start time then choose the first value to ensure we get the earliest possible starting time. """ observation_table = subtables['LOFAR_OBSERVATION'] julianstart = observation_table.query( sortlist="OBSERVATION_START", limit=1).getcell( "OBSERVATION_START", 0 ) unixstart = julian2unix(julianstart) taustart_ts = datetime.datetime.fromtimestamp(unixstart) return taustart_ts
@staticmethod
[docs] def non_overlapping_time(series): """Calculate the sum of total time ranges without overlap. Parameters ---------- series : list of tuple A list of 2-item tuples representing time ranges, where each tuple contains the start and end time of a range. Returns ------- float The total length of all time ranges without overlap, most likely in seconds. """ series.sort() overlap = total = 0 for n, (start, end) in enumerate(series): total += end - start for (nextstart, nextend) in series[n + 1:]: if nextstart >= end: break overlapstart = max(nextstart, start) overlapend = min(nextend, end) overlap += overlapend - overlapstart start = overlapend return total - overlap
@staticmethod
[docs] def parse_tautime(subtables): """Returns the total on-sky time for this image. Parameters ---------- subtables : dict A dictionary containing all LOFAR CASA subtables. Returns ------- float The total on-sky time, most likely in seconds. """ origin_table = subtables['LOFAR_ORIGIN'] startcol = origin_table.col('START') endcol = origin_table.col('END') series = [(int(start), int(end)) for start, end in zip(startcol, endcol)] tau_time = LofarCasaImage.non_overlapping_time(series) return tau_time
@staticmethod
[docs] def parse_antennaset(subtables): """Extract the antenna set used in the observation. This method retrieves the unique antenna set from the LOFAR_OBSERVATION subtable. Parameters ---------- subtables : dict A dictionary containing all LOFAR CASA subtables. Returns ------- numpy.ndarray An array containing the unique antenna set used in the observation. Raises ------ Exception if multiple antenna sets are found. Notes ----- The method uses the `unique_column_values` function from the `CasaImage` class to extract unique values from the "ANTENNA_SET" column. """ observation_table = subtables['LOFAR_OBSERVATION'] antennasets = CasaImage.unique_column_values(observation_table, "ANTENNA_SET") if len(antennasets) == 1: return antennasets[0] else: raise Exception("Cannot handle multiple antenna sets in image")
@staticmethod
[docs] def parse_subbands(subtables): """Extract the number of subbands used in the observation. This method retrieves the unique number of subbands from the LOFAR_ORIGIN subtable. If multiple values are found, an exception is raised. Parameters ---------- subtables : dict A dictionary containing all LOFAR CASA subtables. Returns ------- int The number of subbands used in the observation. Raises ------ Exception If varying numbers of channels are found in the subtable. Notes ----- The method uses the `unique_column_values` function from the `CasaImage` class to extract unique values from the "NUM_CHAN" column. """ origin_table = subtables['LOFAR_ORIGIN'] num_chans = CasaImage.unique_column_values(origin_table, "NUM_CHAN") if len(num_chans) == 1: return num_chans[0] else: raise Exception( "Cannot handle varying numbers of channels in image")
@staticmethod
[docs] def parse_subbandwidth(subtables): """Calculate the subband width for the observation. This method determines the subband width based on the clock frequency retrieved from the LOFAR_OBSERVATION subtable. Parameters ---------- subtables : dict A dictionary containing all LOFAR CASA subtables. Returns ------- float The subband width in Hz. Raises ------ Exception If multiple clock frequencies are found in the subtable. Notes ----- The method uses the clock frequency and its unit to calculate the subband width. The clock frequency is divided by 1024 to determine the subband width. For more details, see: https://www.astron.nl/lofarwiki/doku.php?id=public:documents: raw_olap_data_formats&s[]=subband 'base_subband_hz = clock_hz / 1024' """ freq_units = { 'Hz': 1, 'kHz': 10 ** 3, 'MHz': 10 ** 6, 'GHz': 10 ** 9, } observation_table = subtables['LOFAR_OBSERVATION'] clockcol = observation_table.col('CLOCK_FREQUENCY') clock_values = CasaImage.unique_column_values(observation_table, "CLOCK_FREQUENCY") if len(clock_values) == 1: clock = clock_values[0] unit = clockcol.getkeyword('QuantumUnits')[0] trueclock = freq_units[unit] * clock subbandwidth = trueclock / 1024 return subbandwidth else: raise Exception("Cannot handle varying clocks in image")
@staticmethod
[docs] def parse_stations(subtables): """Extract the number of specific LOFAR stations used in the observation. This method calculates the number of core, remote, and international LOFAR stations used based on the observation and antenna subtables. Parameters ---------- subtables : dict A dictionary containing all LOFAR CASA subtables. Returns ------- tuple A tuple containing the number of core stations, remote stations, and international stations in the format (ncore, nremote, nintl). Notes ----- The method uses the "NVIS_USED" column from the LOFAR_OBSERVATION subtable and the "NAME" column from the LOFAR_ANTENNA subtable to determine which stations were used. Stations are categorized based on their names: - Core stations start with "CS". - Remote stations start with "RS". - International stations have other prefixes. """ observation_table = subtables['LOFAR_OBSERVATION'] antenna_table = subtables['LOFAR_ANTENNA'] nvis_used = observation_table.getcol('NVIS_USED') names = numpy.array(antenna_table.getcol('NAME')) mask = numpy.sum(nvis_used, axis=2) > 0 used = names[mask[0]] ncore = nremote = nintl = 0 for station in used: if station.startswith('CS'): ncore += 1 elif station.startswith('RS'): nremote += 1 else: nintl += 1 return ncore, nremote, nintl