Source code for ampel.view.LightCurve

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# File:                Ampel-photometry/ampel/view/LightCurve.py
# License:             BSD-3-Clause
# Author:              valery brinnel <firstname.lastname@gmail.com>
# Date:                13.01.2018
# Last Modified Date:  17.05.2022
# Last Modified By:    Simeon Reusch <simeon.reusch@desy.de>

import operator
from dataclasses import dataclass
from typing import Any
from collections.abc import Callable, Sequence, Iterable
from ampel.types import JDict, OneOrMany, StockId
from ampel.content.T1Document import T1Document
from ampel.content.DataPoint import DataPoint

# Do not enable operator customizations by sub-classes for now
ops: dict[str, Callable[[str, Any], bool]] = {
	'>': operator.gt,
	'<': operator.lt,
	'>=': operator.ge,
	'<=': operator.le,
	'==': operator.eq,
	'!=': operator.ne,
	'is': operator.is_,
	'is not': operator.is_not
}

[docs] @dataclass(frozen=True) class LightCurve: """ Contains a collection of :class:`~ampel.content.DataPoint.DataPoint` (photo points and upper limits), and a few convenience methods to return values from internal collections. """ compound_id: int stock_id: StockId | Sequence[StockId] photopoints: None | Sequence[DataPoint] = None upperlimits: None | Sequence[DataPoint] = None @classmethod def build(cls, compound: T1Document, datapoints: Iterable[DataPoint]) -> 'LightCurve': return cls( compound_id = compound['link'], stock_id = compound['stock'], photopoints = [el for el in datapoints if el['id'] > 0], upperlimits = [el for el in datapoints if el['id'] < 0] ) def __len__(self): return (len(self.photopoints) if self.photopoints else 0) + \ (len(self.upperlimits) if self.upperlimits else 0)
[docs] def get_values(self, key: str, filters: None | OneOrMany[JDict] = None, of_upper_limits: bool = False ) -> None | list[Any]: """ :param filters: filter criteria for data points, e.g. ``{'attribute': 'magpsf', 'operator': '<', 'value': 18}`` :param of_upper_limits: return upper limits instead of photo points usage example:: lightcurve.get_values('jd') .. seealso:: :func:`get_tuples`, :func:`get_ntuples` """ if datapoints := self._get_datapoints(filters, of_upper_limits): return [dp['body'][key] for dp in datapoints if key in dp['body']] return None
[docs] def get_tuples(self, key1: str, key2: str, filters: None | OneOrMany[JDict] = None, of_upper_limits: bool = False ) -> None | list[tuple[Any, Any]]: """ :param filters: filter criteria for datapoints :param of_upper_limits: return upper limits instead of photo points usage example:: lightcurve.get_tuples('jd', 'magpsf', {'attribute': 'magpsf', 'operator': '<', 'value': 18}) .. seealso:: :func:`get_values`, :func:`get_ntuples` """ if datapoints := self._get_datapoints(filters, of_upper_limits): return [ (dp['body'][key1], dp['body'][key2]) for dp in datapoints if key1 in dp['body'] and key2 in dp['body'] ] return None
[docs] def get_ntuples(self, params: Sequence[str], filters: None | OneOrMany[JDict] = None, of_upper_limits: bool = False ) -> None | list[tuple]: """ :param params: list of keys :param filters: filter criteria for datapoints :param of_upper_limits: return upper limits instead of photo points usage example:: lightcurve.get_ntuples(["fid", "jd", "magpsf"], {'attribute': 'magpsf', 'operator': '<', 'value': 18}) .. seealso:: :func:`get_values`, :func:`get_tuples` """ if datapoints := self._get_datapoints(filters, of_upper_limits): return [ tuple(dp['body'][param] for param in params) for dp in datapoints # type: ignore[union-attr] if all(dp['body'].get(param, False) for param in params) ] return None
[docs] def get_photopoints(self, filters: None | OneOrMany[JDict] = None, ) -> None | Sequence[DataPoint]: """Get (filtered) photo points""" if filters and self.photopoints: return self._apply_filter(self.photopoints, filters) return self.photopoints
[docs] def get_upperlimits(self, filters: None | OneOrMany[JDict] = None, ) -> None | Sequence[DataPoint]: """Get (filtered) upper limits""" if filters and self.upperlimits: return self._apply_filter(self.upperlimits, filters) return self.upperlimits
# TODO: improve
[docs] def get_pos( self, ret: str = "brightest", filters: None | OneOrMany[JDict] = None, ) -> None | tuple[Any, Any] | Sequence[tuple[Any, Any]]: """ Calculate the position of the underlying object. :param ret: - raw: returns ((ra, dec), (ra, dec), ...) - mean: returns (<ra>, <dec>) - brightest: returns (ra, dec) - latest: returns (ra, dec) :param filters: filters to apply to photo points Get the position of the brightest PhotoPoint in the ZTF G band:: instance.get_pos( "brightest", {'attribute': 'alTags', 'operator': 'in', 'value': 'ZTF_G'} ) Get the position of the latest photopoint with a magnitude brighter than 18 (or an empty array if no photopoint matches this criteria):: instance.get_pos( "lastest", {'attribute': 'magpsf', 'operator': '<', 'value': 18} ) """ if ret == 'raw': return self.get_tuples('ra', 'dec', filters=filters) if not self.photopoints: return None pps = self._apply_filter(self.photopoints, filters) \ if filters is not None else self.photopoints if not pps: return None if ret == 'mean': ras = [pp['body']['ra'] for pp in pps] decs = [pp['body']['dec'] for pp in pps] return (sum(ras) / len(ras), sum(decs) / len(decs)) if ret == 'brightest': mags = sorted(pps, key=lambda x: x['body'].get('magpsf', 99)) return mags[0]['body']['ra'], mags[0]['body']['dec'] if ret == 'latest': mags = sorted(pps, key=lambda x: x['body']['obs_date']) return mags[-1]['body']['ra'], mags[-1]['body']['dec'] raise NotImplementedError(f'ret method: {ret} is not implemented')
def _get_datapoints(self, filters: None | OneOrMany[JDict] = None, of_upper_limits: bool = False ) -> None | Sequence[DataPoint]: if filters is None: if of_upper_limits: return self.upperlimits if self.upperlimits else None return self.photopoints datapoints = self.upperlimits if of_upper_limits else self.photopoints return self._apply_filter(datapoints, filters) if datapoints else None @staticmethod def _apply_filter( datapoints: Sequence[DataPoint], filters: None | OneOrMany[JDict] = None, ) -> Sequence[DataPoint]: """ :raises ValueError: if datapoints is None or in case of bad filter values """ if isinstance(filters, dict): filters = [filters] else: if filters is None or not isinstance(filters, list): raise ValueError("filters must be of type dict or list") for filtre in filters: op = ops[filtre['operator']] datapoints = [ dp for dp in datapoints if filtre['attribute'] in dp['body'] and op(dp['body'][filtre['attribute']], filtre['value']) ] return datapoints