rail_pz_service.db package

Database table definitions and utility functions

class rail_pz_service.db.Algorithm(**kwargs)[source]

Bases: Base, RowMixin

Algorithm is wrapper for a specific RAIL class that implements a particular p(z) estimation algorithm.

This just defines the particular python class implementing the algorithm. The selection of a particular instance of the training Model and any non-default a parameters used to initialze an Estimator are handled in their own classes.

class_name: Mapped[str]

Name for the python class implementing the algorithm

class_string: str = 'algorithm'
col_names_for_table = ['id', 'name', 'class_name']

column names to use when printing the table

estimators_: Mapped[list[Estimator]]

Access to list of associated Estimator

id: Mapped[int]

primary key

models_: Mapped[list[Model]]

Access to list of associated Model

name: Mapped[str]

Name for this Algorithm, unique

pydantic_mode_class

alias of Algorithm

class rail_pz_service.db.Base(**kwargs)[source]

Bases: DeclarativeBase

Base class for DB tables

Parameters:

kwargs (Any)

metadata: ClassVar[MetaData] = MetaData()

Refers to the _schema.MetaData collection that will be used for new _schema.Table objects.

See also

orm_declarative_metadata

registry: ClassVar[_RegistryType] = <sqlalchemy.orm.decl_api.registry object>

Refers to the _orm.registry in use where new _orm.Mapper objects will be associated.

class rail_pz_service.db.Cache(logger=None)[source]

Bases: object

Cache for objects created from specific DB rows

Parameters:

logger (structlog.BoundLogger | None)

clear()[source]

Clear out the cache

Return type:

None

async create_request(session, dataset_name, estimator_name)[source]

Run a request

Parameters:
  • session (async_scoped_session) – DB session manager

  • dataset_name (str) – Name of associated Dataset

  • estimator_name (str) – Name of associated Estimator

Returns:

Request in question

Return type:

Request

Example

from structlog import get_logger
logger = get_logger(__name__)
cache  = pz_rail_service.db.Cache.shared_cache(logger)
new_request = await cache.create_request(
    session,
    dataset_name='my_com_cam_dataset',
    estimator_name='my_gpz_com_cam_estimaor',
)
async get_algo_class(session, key)[source]

Get a python class associated to a particular algorithm

Parameters:
  • session (async_scoped_session) – DB session manager

  • key (int) – DB id of the algorithm in question

Returns:

Python class of the associated algorithm

Return type:

type[CatEstimator]

Raises:
async get_catalog_tag_class(session, key)[source]

Get a python class associated to a particular catalog_tag

Parameters:
  • session (async_scoped_session) – DB session manager

  • key (int) – DB id of the catalog_tag in question

Returns:

Python class of the associated algorithmcatalog_tag

Return type:

type[CatalogConfigBase]

Raises:
async get_estimator(session, key)[source]

Get a particular CatEstimator

Parameters:
  • session (async_scoped_session) – DB session manager

  • key (int) – DB id of the estimator in question

Returns:

Estimator in question

Return type:

CatEstimator

Raises:
async get_qp_dist(session, key)[source]

Get the qp.Ensemble from a particular request

Parameters:
  • session (async_scoped_session) – DB session manager

  • key (int) – DB id of the requestion in question

Returns:

Ensemble in question

Return type:

qp.Ensemble

Raises:

RAILRequestError – Requsts failed for some reason

async get_qp_file(session, key)[source]

Get the output file from a particular request

Parameters:
  • session (async_scoped_session) – DB session manager

  • key (int) – DB id of the requestion in question

Returns:

Path to the file in question

Return type:

str

Raises:

RAILRequestError – Requsts failed for some reason

async load_algorithms_from_rail_env(session)[source]

Load all of the CatEstimator algorithsm from RailEnv

Parameters:

session (async_scoped_session) – DB session manager

Returns:

Newly created Algorithm database rows

Return type:

list[Algorithm]

Raises:

RAILIntegrityError – Rows already exist in database

Example

from structlog import get_logger
logger = get_logger(__name__)
cache = pz_rail_service.db.Cache.shared_cache(logger)
algos = await cache.load_algorithms_from_rail_env(
    session,
)
async load_catalog_tags_from_rail_env(session)[source]

Load all of the CatalogTag from RAIL classes

Parameters:

session (async_scoped_session) – DB session manager

Returns:

Newly created CatalogTag database rows

Return type:

list[CatalogTag]

Example

from structlog import get_logger
logger = get_logger(__name__)
cache = pz_rail_service.db.Cache.shared_cache(logger)
catalog_tags = await cache.load_catalog_tags_from_rail_env(
    session,
)
async load_dataset_from_file(session, name, path, catalog_tag_name, data=None)[source]

Import a data file to the archive area and add a Dataset row

Parameters:
  • session (async_scoped_session) – DB session manager

  • name (str) – Name for new Dataset

  • path (Path) – Path to input file. Note that it will be copied to DB area

  • catalog_tag_name (str) – Name of CatalogTag that described contents of file

  • data (dict | None)

Returns:

Newly created Dataset

Return type:

Dataset

Raises:

Example

from structlog import get_logger
logger = get_logger(__name__)
cache  = pz_rail_service.db.Cache.shared_cache(logger)
new_dataset = await cache.load_dataset_from_file(
    session,
    name='my_com_cam_dataset',
    path='local_version_of_file.hdf5',
    catalog_tag_name='com_cam',
)
async load_dataset_from_values(session, name, data, catalog_tag_name, path=None)[source]

Import a data file to the archive area and add a Dataset row

Parameters:
  • session (async_scoped_session) – DB session manager

  • name (str) – Name for new Dataset

  • data (dict) – Data to input

  • catalog_tag_name (str) – Name of CatalogTag that described contents of file

  • path (str | None)

Returns:

Newly created Dataset

Return type:

Dataset

Raises:

Example

from structlog import get_logger
logger = get_logger(__name__)
cache  = pz_rail_service.db.Cache.shared_cache(logger)

data = dict(
    u_cModelMag=24.4, g_cModelMag=24.4, r_cModelMag=24.4,
    i_cModelMag=24.4, z_cModelMag=24.4, y_cModelMag=24.4,
    u_cModelMagErr=0.5, g_cModelMagErr=0.5, r_cModelMagErr=0.5,
    i_cModelMagErr=0.5, z_cModelMagErr=0.5, y_cModelMagErr=0.5,
)
new_dataset = await cache.load_dataset_from_file(
    session,
    name='my_com_cam_dataset',
    data=data,
    catalog_tag_name='com_cam',
)
async load_estimator(session, name, model_name, config=None)[source]

Create a new Estimator

Parameters:
  • session (async_scoped_session) – DB session manager

  • name (str) – Name for new Estimator

  • model_name (str) – Name of associated model

  • config (dict | None) – Extra paraemeters to use when running estimator

Returns:

Newly created Estimator

Return type:

Estimator

Raises:

RAILIntegrityError – Rows already exist in database

Example

from structlog import get_logger
logger = get_logger(__name__)
cache  = pz_rail_service.db.Cache.shared_cache(logger)
new_dataset = await cache.load_dataset_from_file(
    session,
    name='my_com_cam_dataset',
    path='local_version_of_data_file.hdf5',
    catalog_tag_name='com_cam',
)
async load_model_from_file(session, name, path, algo_name, catalog_tag_name)[source]

Import a model file to the archive area and add a Model

Parameters:
  • session (async_scoped_session) – DB session manager

  • name (str) – Name for new Model

  • path (Path) – Path to input file. Note that it will be copied to DB area

  • algo_name (str) – Name of Algorithm that uses the model

  • catalog_tag_name (str) – Name of CatalogTag that described contents of file

Returns:

Newly created Model

Return type:

Model

Raises:

Example

from structlog import get_logger
logger = get_logger(__name__)
cache  = pz_rail_service.db.Cache.shared_cache(logger)
new_model = await cache.load_model_from_file(
    session,
    name='my_gpz_com_cam_model',
    path='local_version_of_file.pkl',
    algo_name='GPZEstimator',
    catalog_tag_name='com_cam',
)
async run_request(session, request_id)[source]

Run a request

Parameters:
  • session (async_scoped_session) – DB session manager

  • request_id (int) – Id of the request in the Request table

Returns:

Request in question

Return type:

Request

Example

from structlog import get_logger
logger = get_logger(__name__)
cache  = pz_rail_service.db.Cache.shared_cache(logger)
new_request = await cache.create_request(
    session,
    dataset_name='my_com_cam_dataset',
    estimator_name='my_gpz_com_cam_estimaor',
)
await cache.run_request(
    session,
    new_request.id,
)
classmethod shared_cache(logger)[source]
Parameters:

logger (BoundLogger)

Return type:

Cache

class rail_pz_service.db.CatalogTag(**kwargs)[source]

Bases: Base, RowMixin

Defines what kind of catalog we are analyzing data from. Specifically what to expect for the names of the magnitude columns.

This is implemented in the rail.utils.catalog_utils module, which uses a catalog tag to set the default parameters for RAIL modules to match the catalog.

class_name: Mapped[str]

Name for the python class implementing the CatalogTag

class_string: str = 'catalog_tag'
col_names_for_table = ['id', 'name', 'class_name']

column names to use when printing the table

datasets_: Mapped[list[Dataset]]

Access to list of associated Dataset

estimators_: Mapped[list[Estimator]]

Access to list of associated Estimator

id: Mapped[int]

primary key

models_: Mapped[list[Model]]

Access to list of associated Model

name: Mapped[str]

Name for this CatalogTag, unique

pydantic_mode_class

alias of CatalogTag

class rail_pz_service.db.Dataset(**kwargs)[source]

Bases: Base, RowMixin

Color data about set of objects that can be used to obtain p(z) esimates.

It is asscoated with a CatalogTag that defines which columns names to expect.

It can either be stored in a file (for larger datasets) or as a python dict (for small datasets of a few objects, useful when uploading things on the fly

catalog_tag_: Mapped[CatalogTag]

Access to associated CatalogTag

catalog_tag_id: Mapped[int]

foreign key into catalog_tag table

class_string: str = 'dataset'
col_names_for_table = ['id', 'name', 'n_objects', 'catalog_tag_id', 'path']

column names to use when printing the table

data: Mapped[dict | None]

Data for the dataset (could be None)

async classmethod get_create_kwargs(session, **kwargs)[source]

Get additional keywords needed to create a row

This should be overridden by sub-classes as needed

The default is to just return the original keywords

Parameters:
  • session (async_scoped_session) – DB session manager

  • **kwargs (Any) – Columns and associated values for the new row

Returns:

Keywords needed to create a new row

Return type:

dict

id: Mapped[int]

primary key

n_objects: Mapped[int]

Number of objects in the dataset

name: Mapped[str]

Name for this Dataset, unique

path: Mapped[str | None]

Path to the relevant file (could be None)

pydantic_mode_class

alias of Dataset

requests_: Mapped[list[Request]]

Access to list of associated Request

classmethod validate_data(data, catalog_tag)[source]

Validate that these data are appropriate for the CatalogTag

Parameters:
  • data (dict) – Data in question

  • catalog_tag (CatalogTag) – Catalog tab in question

Returns:

Size of the datset, data formatted as strings

Return type:

tuple[int, dict[str, list[float]]]

classmethod validate_data_for_path(path, catalog_tag)[source]

Validate that these data are appropriate for the CatalogTag

Parameters:
  • path (Path) – File with the data

  • catalog_tag (CatalogTag) – CatalogTag in question

Returns:

Size of the datset

Return type:

int

class rail_pz_service.db.Estimator(**kwargs)[source]

Bases: Base, RowMixin

Combinination of an Algorithm to run a trained Model to apply to the data, and any specific configuration overrides.

algo_: Mapped[Algorithm]

Access to associated Algorithm

algo_id: Mapped[int]

foreign key into ‘Algorithm’ table

catalog_tag_: Mapped[CatalogTag]

Access to associated CatalogTag

catalog_tag_id: Mapped[int]

foreign key into ‘CatalogTag’ table

class_string: str = 'estimator'
col_names_for_table = ['id', 'name', 'algo_id', 'catalog_tag_id', 'model_id']

column names to use when printing the table

config: Mapped[dict | None]

Configuration parameters for this estimator

async classmethod get_create_kwargs(session, **kwargs)[source]

Get additional keywords needed to create a row

This should be overridden by sub-classes as needed

The default is to just return the original keywords

Parameters:
  • session (async_scoped_session) – DB session manager

  • **kwargs (Any) – Columns and associated values for the new row

Returns:

Keywords needed to create a new row

Return type:

dict

id: Mapped[int]

primary key

model_: Mapped[Model]

Access to associated Model

model_id: Mapped[int]

foreign key into ‘Model’ table

name: Mapped[str]

Name of the model, unique

pydantic_mode_class

alias of Estimator

requests_: Mapped[list[Request]]

Access to list of associated Request

class rail_pz_service.db.Model(**kwargs)[source]

Bases: Base, RowMixin

Specific ML model that is trained to work with a specific Algorithm. On a particular type of data (CatalogTag)

Typically a Model is stored as a pickle file.

The rail.core.model.Model class provides a standard wrapper to store meta data such as the name of the python class that created the model, and the applicable CatalogTag to use the model with.

algo_: Mapped[Algorithm]

Access to associated Algorithm

algo_id: Mapped[int]

foreign key into Algorithm table

catalog_tag_: Mapped[CatalogTag]

Access to associated CatalogTag

catalog_tag_id: Mapped[int]

foreign key into CatalogTag table

class_string: str = 'model'
col_names_for_table = ['id', 'name', 'algo_id', 'catalog_tag_id', 'path']

column names to use when printing the table

estimators_: Mapped[list[Estimator]]

Access to list of associated Estimator

async classmethod get_create_kwargs(session, **kwargs)[source]

Get additional keywords needed to create a row

This should be overridden by sub-classes as needed

The default is to just return the original keywords

Parameters:
  • session (async_scoped_session) – DB session manager

  • **kwargs (Any) – Columns and associated values for the new row

Returns:

Keywords needed to create a new row

Return type:

dict

id: Mapped[int]

primary key

name: Mapped[str]

Name for this Model, unique

path: Mapped[str]

Path to the relevant file

pydantic_mode_class

alias of Model

classmethod validate_model(path, algo, catalog_tag)[source]

Validate that the model is appropriate for the Algorithm and CatalogTag

Parameters:
  • path (Path) – File with the data

  • algo (Algorithm) – Algorithm in question

  • catalog_tag (CatalogTag) – Catalog tag in question

Return type:

None

class rail_pz_service.db.Request(**kwargs)[source]

Bases: Base, RowMixin

Basic processing unit in rail_pz_service. A Request to generate per-galaxy p(z) for all of the object in a particular Dataset using specific Estimator.

The output p(z) distribution will be stored in a qp file.

This also store some metadata including timestamps and the user who intiated the Request.

class_string: str = 'request'
col_names_for_table = ['id', 'user', 'estimator_id', 'dataset_id', 'qp_file_path']

column names to use when printing the table

dataset_: Mapped[Dataset]

Access to associated Dataset

dataset_id: Mapped[int]

foreign key into dataset table

estimator_: Mapped[Estimator]

Access to associated Estimator

estimator_id: Mapped[int]

foreign key into estimator table

async classmethod get_create_kwargs(session, **kwargs)[source]

Get additional keywords needed to create a row

This should be overridden by sub-classes as needed

The default is to just return the original keywords

Parameters:
  • session (async_scoped_session) – DB session manager

  • **kwargs (Any) – Columns and associated values for the new row

Returns:

Keywords needed to create a new row

Return type:

dict

async classmethod get_open_requests(session)[source]
Parameters:

session (async_scoped_session)

Return type:

Sequence[Request]

id: Mapped[int]

primary key

pydantic_mode_class

alias of Request

qp_file_path: Mapped[str | None]

path to the output file

time_created: Mapped[datetime]

timestamp of when the request was created in the DB

time_finished: Mapped[datetime | None]

timestamp of when the request processing was finished

time_started: Mapped[datetime | None]

timestamp of when the request processing started by an Estimator

user: Mapped[str]

User who orginated this Request

class rail_pz_service.db.RowMixin[source]

Bases: object

Mixin class to define common features of database rows for all the tables we use in rail_server

Here we a just defining the interface to manipulate any sort of table.

class_string: str
async classmethod create_row(session, **kwargs)[source]

Create a single row

Parameters:
  • session (async_scoped_session) – DB session manager

  • **kwargs (Any) – Columns and associated values for the new row

Returns:

Newly created row

Return type:

T

Raises:

CMIntegrityError – catching a IntegrityError

async classmethod delete_row(session, row_id)[source]

Delete a single row, matching row.id == row_id

Parameters:
  • session (async_scoped_session) – DB session manager

  • row_id (int) – PrimaryKey of the row to delete

Raises:
  • CMMissingIDError – Row does not exist

  • CMIntegrityError – sqlalchemy.IntegrityError raised

Return type:

None

async classmethod get_create_kwargs(session, **kwargs)[source]

Get additional keywords needed to create a row

This should be overridden by sub-classes as needed

The default is to just return the original keywords

Parameters:
  • session (async_scoped_session) – DB session manager

  • **kwargs (Any) – Columns and associated values for the new row

Returns:

Keywords needed to create a new row

Return type:

dict

async classmethod get_row(session, row_id)[source]

Get a single row, matching row.id == row_id

Parameters:
  • session (async_scoped_session) – DB session manager

  • row_id (int) – PrimaryKey of the row to return

Returns:

The matching row

Return type:

T

Raises:

RAILMissingIDError – Row with ID does not exist

async classmethod get_row_by_name(session, name)[source]

Get a single row, with row.name == name

Parameters:
  • session (async_scoped_session) – DB session manager

  • name (str) – name of the row to return

Returns:

Matching row

Return type:

T

Raises:

RAILMissingNameError – Row with ID does not exist

async classmethod get_rows(session, skip=0, limit=100)[source]

Get rows associated to a particular table

Parameters:
  • session (async_scoped_session) – DB session manager

  • skip (int) – Number of rows to skip before returning results

  • limit (int) – Number of row to return

Returns:

All the matching rows

Return type:

Sequence[T]

id: Any
name: Any
pydantic_mode_class: type[BaseModel]
to_model()[source]

Return a reow as a pydantic model

Return type:

BaseModel

async classmethod update_row(session, row_id, **kwargs)[source]

Update a single row, matching row.id == row_id

Parameters:
  • session (async_scoped_session) – DB session manager

  • row_id (int) – PrimaryKey of the row to return

  • **kwargs (Any) – Columns and associated new values

Returns:

Updated row

Return type:

T

Raises:
async update_values(session, **kwargs)[source]

Update values in a row

Parameters:
  • session (async_scoped_session) – DB session manager

  • **kwargs (Any) – Columns and associated new values

  • self (T)

Returns:

Updated row

Return type:

T

Raises:

CMIntegrityError – Catching a IntegrityError

DB Admin CLI