rail_pz_service.db package
Database table definitions and utility functions
- class rail_pz_service.db.Algorithm(**kwargs)[source]
-
Algorithm is wrapper for a specific RAIL class that implements a particular p(z) estimation algorithm.
This just defines the particular python class implementing the algorithm. The selection of a particular instance of the training Model and any non-default a parameters used to initialze an Estimator are handled in their own classes.
- class_name: Mapped[str]
Name for the python class implementing the algorithm
- class_string: str = 'algorithm'
- col_names_for_table = ['id', 'name', 'class_name']
column names to use when printing the table
- id: Mapped[int]
primary key
- name: Mapped[str]
Name for this Algorithm, unique
- class rail_pz_service.db.Base(**kwargs)[source]
Bases:
DeclarativeBaseBase class for DB tables
- Parameters:
kwargs (Any)
- metadata: ClassVar[MetaData] = MetaData()
Refers to the
_schema.MetaDatacollection that will be used for new_schema.Tableobjects.See also
orm_declarative_metadata
- registry: ClassVar[_RegistryType] = <sqlalchemy.orm.decl_api.registry object>
Refers to the
_orm.registryin use where new_orm.Mapperobjects will be associated.
- class rail_pz_service.db.Cache(logger=None)[source]
Bases:
objectCache for objects created from specific DB rows
- Parameters:
logger (structlog.BoundLogger | None)
- async create_request(session, dataset_name, estimator_name)[source]
Run a request
- Parameters:
session (async_scoped_session) – DB session manager
dataset_name (str) – Name of associated Dataset
estimator_name (str) – Name of associated Estimator
- Returns:
Request in question
- Return type:
Example
from structlog import get_logger logger = get_logger(__name__) cache = pz_rail_service.db.Cache.shared_cache(logger) new_request = await cache.create_request( session, dataset_name='my_com_cam_dataset', estimator_name='my_gpz_com_cam_estimaor', )
- async get_algo_class(session, key)[source]
Get a python class associated to a particular algorithm
- Parameters:
session (async_scoped_session) – DB session manager
key (int) – DB id of the algorithm in question
- Returns:
Python class of the associated algorithm
- Return type:
type[CatEstimator]
- Raises:
RAILImportError – Python class could not be loaded
RAILMissingIDError – ID not found in database
- async get_catalog_tag_class(session, key)[source]
Get a python class associated to a particular catalog_tag
- Parameters:
session (async_scoped_session) – DB session manager
key (int) – DB id of the catalog_tag in question
- Returns:
Python class of the associated algorithmcatalog_tag
- Return type:
type[CatalogConfigBase]
- Raises:
RAILImportError – Python class could not be loaded
RAILMissingIDError – ID not found in database
- async get_estimator(session, key)[source]
Get a particular CatEstimator
- Parameters:
session (async_scoped_session) – DB session manager
key (int) – DB id of the estimator in question
- Returns:
Estimator in question
- Return type:
CatEstimator
- Raises:
RAILImportError – Python class could not be loaded
RAILMissingIDError – ID not found in database
- async get_qp_dist(session, key)[source]
Get the qp.Ensemble from a particular request
- Parameters:
session (async_scoped_session) – DB session manager
key (int) – DB id of the requestion in question
- Returns:
Ensemble in question
- Return type:
qp.Ensemble
- Raises:
RAILRequestError – Requsts failed for some reason
- async get_qp_file(session, key)[source]
Get the output file from a particular request
- Parameters:
session (async_scoped_session) – DB session manager
key (int) – DB id of the requestion in question
- Returns:
Path to the file in question
- Return type:
str
- Raises:
RAILRequestError – Requsts failed for some reason
- async load_algorithms_from_rail_env(session)[source]
Load all of the CatEstimator algorithsm from RailEnv
- Parameters:
session (async_scoped_session) – DB session manager
- Returns:
Newly created Algorithm database rows
- Return type:
list[Algorithm]
- Raises:
RAILIntegrityError – Rows already exist in database
Example
from structlog import get_logger logger = get_logger(__name__) cache = pz_rail_service.db.Cache.shared_cache(logger) algos = await cache.load_algorithms_from_rail_env( session, )
- async load_catalog_tags_from_rail_env(session)[source]
Load all of the CatalogTag from RAIL classes
- Parameters:
session (async_scoped_session) – DB session manager
- Returns:
Newly created CatalogTag database rows
- Return type:
list[CatalogTag]
Example
from structlog import get_logger logger = get_logger(__name__) cache = pz_rail_service.db.Cache.shared_cache(logger) catalog_tags = await cache.load_catalog_tags_from_rail_env( session, )
- async load_dataset_from_file(session, name, path, catalog_tag_name, data=None)[source]
Import a data file to the archive area and add a Dataset row
- Parameters:
session (async_scoped_session) – DB session manager
name (str) – Name for new Dataset
path (Path) – Path to input file. Note that it will be copied to DB area
catalog_tag_name (str) – Name of CatalogTag that described contents of file
data (dict | None)
- Returns:
Newly created Dataset
- Return type:
- Raises:
RAILIntegrityError – Rows already exist in database
RAILFileNotFoundError – Input file not found
RAILBadDatasetError – Input file failed validation checks
Example
from structlog import get_logger logger = get_logger(__name__) cache = pz_rail_service.db.Cache.shared_cache(logger) new_dataset = await cache.load_dataset_from_file( session, name='my_com_cam_dataset', path='local_version_of_file.hdf5', catalog_tag_name='com_cam', )
- async load_dataset_from_values(session, name, data, catalog_tag_name, path=None)[source]
Import a data file to the archive area and add a Dataset row
- Parameters:
session (async_scoped_session) – DB session manager
name (str) – Name for new Dataset
data (dict) – Data to input
catalog_tag_name (str) – Name of CatalogTag that described contents of file
path (str | None)
- Returns:
Newly created Dataset
- Return type:
- Raises:
RAILIntegrityError – Rows already exist in database
RAILFileNotFoundError – Input file not found
RAILBadDatasetError – Input file failed validation checks
Example
from structlog import get_logger logger = get_logger(__name__) cache = pz_rail_service.db.Cache.shared_cache(logger) data = dict( u_cModelMag=24.4, g_cModelMag=24.4, r_cModelMag=24.4, i_cModelMag=24.4, z_cModelMag=24.4, y_cModelMag=24.4, u_cModelMagErr=0.5, g_cModelMagErr=0.5, r_cModelMagErr=0.5, i_cModelMagErr=0.5, z_cModelMagErr=0.5, y_cModelMagErr=0.5, ) new_dataset = await cache.load_dataset_from_file( session, name='my_com_cam_dataset', data=data, catalog_tag_name='com_cam', )
- async load_estimator(session, name, model_name, config=None)[source]
Create a new Estimator
- Parameters:
session (async_scoped_session) – DB session manager
name (str) – Name for new Estimator
model_name (str) – Name of associated model
config (dict | None) – Extra paraemeters to use when running estimator
- Returns:
Newly created Estimator
- Return type:
- Raises:
RAILIntegrityError – Rows already exist in database
Example
from structlog import get_logger logger = get_logger(__name__) cache = pz_rail_service.db.Cache.shared_cache(logger) new_dataset = await cache.load_dataset_from_file( session, name='my_com_cam_dataset', path='local_version_of_data_file.hdf5', catalog_tag_name='com_cam', )
- async load_model_from_file(session, name, path, algo_name, catalog_tag_name)[source]
Import a model file to the archive area and add a Model
- Parameters:
session (async_scoped_session) – DB session manager
name (str) – Name for new Model
path (Path) – Path to input file. Note that it will be copied to DB area
algo_name (str) – Name of Algorithm that uses the model
catalog_tag_name (str) – Name of CatalogTag that described contents of file
- Returns:
Newly created Model
- Return type:
- Raises:
RAILIntegrityError – Rows already exist in database
RAILFileNotFoundError – Input file not found
RAILBadModelError – Input file failed validation checks
Example
from structlog import get_logger logger = get_logger(__name__) cache = pz_rail_service.db.Cache.shared_cache(logger) new_model = await cache.load_model_from_file( session, name='my_gpz_com_cam_model', path='local_version_of_file.pkl', algo_name='GPZEstimator', catalog_tag_name='com_cam', )
- async run_request(session, request_id)[source]
Run a request
- Parameters:
session (async_scoped_session) – DB session manager
request_id (int) – Id of the request in the Request table
- Returns:
Request in question
- Return type:
Example
from structlog import get_logger logger = get_logger(__name__) cache = pz_rail_service.db.Cache.shared_cache(logger) new_request = await cache.create_request( session, dataset_name='my_com_cam_dataset', estimator_name='my_gpz_com_cam_estimaor', ) await cache.run_request( session, new_request.id, )
- Parameters:
logger (BoundLogger)
- Return type:
- class rail_pz_service.db.CatalogTag(**kwargs)[source]
-
Defines what kind of catalog we are analyzing data from. Specifically what to expect for the names of the magnitude columns.
This is implemented in the rail.utils.catalog_utils module, which uses a catalog tag to set the default parameters for RAIL modules to match the catalog.
- class_name: Mapped[str]
Name for the python class implementing the CatalogTag
- class_string: str = 'catalog_tag'
- col_names_for_table = ['id', 'name', 'class_name']
column names to use when printing the table
- id: Mapped[int]
primary key
- name: Mapped[str]
Name for this CatalogTag, unique
- pydantic_mode_class
alias of
CatalogTag
- class rail_pz_service.db.Dataset(**kwargs)[source]
-
Color data about set of objects that can be used to obtain p(z) esimates.
It is asscoated with a CatalogTag that defines which columns names to expect.
It can either be stored in a file (for larger datasets) or as a python dict (for small datasets of a few objects, useful when uploading things on the fly
- catalog_tag_: Mapped[CatalogTag]
Access to associated CatalogTag
- catalog_tag_id: Mapped[int]
foreign key into catalog_tag table
- class_string: str = 'dataset'
- col_names_for_table = ['id', 'name', 'n_objects', 'catalog_tag_id', 'path']
column names to use when printing the table
- data: Mapped[dict | None]
Data for the dataset (could be None)
- async classmethod get_create_kwargs(session, **kwargs)[source]
Get additional keywords needed to create a row
This should be overridden by sub-classes as needed
The default is to just return the original keywords
- Parameters:
session (async_scoped_session) – DB session manager
**kwargs (Any) – Columns and associated values for the new row
- Returns:
Keywords needed to create a new row
- Return type:
dict
- id: Mapped[int]
primary key
- n_objects: Mapped[int]
Number of objects in the dataset
- name: Mapped[str]
Name for this Dataset, unique
- path: Mapped[str | None]
Path to the relevant file (could be None)
- classmethod validate_data(data, catalog_tag)[source]
Validate that these data are appropriate for the CatalogTag
- Parameters:
data (dict) – Data in question
catalog_tag (CatalogTag) – Catalog tab in question
- Returns:
Size of the datset, data formatted as strings
- Return type:
tuple[int, dict[str, list[float]]]
- classmethod validate_data_for_path(path, catalog_tag)[source]
Validate that these data are appropriate for the CatalogTag
- Parameters:
path (Path) – File with the data
catalog_tag (CatalogTag) – CatalogTag in question
- Returns:
Size of the datset
- Return type:
int
- class rail_pz_service.db.Estimator(**kwargs)[source]
-
Combinination of an Algorithm to run a trained Model to apply to the data, and any specific configuration overrides.
- algo_id: Mapped[int]
foreign key into ‘Algorithm’ table
- catalog_tag_: Mapped[CatalogTag]
Access to associated CatalogTag
- catalog_tag_id: Mapped[int]
foreign key into ‘CatalogTag’ table
- class_string: str = 'estimator'
- col_names_for_table = ['id', 'name', 'algo_id', 'catalog_tag_id', 'model_id']
column names to use when printing the table
- config: Mapped[dict | None]
Configuration parameters for this estimator
- async classmethod get_create_kwargs(session, **kwargs)[source]
Get additional keywords needed to create a row
This should be overridden by sub-classes as needed
The default is to just return the original keywords
- Parameters:
session (async_scoped_session) – DB session manager
**kwargs (Any) – Columns and associated values for the new row
- Returns:
Keywords needed to create a new row
- Return type:
dict
- id: Mapped[int]
primary key
- model_id: Mapped[int]
foreign key into ‘Model’ table
- name: Mapped[str]
Name of the model, unique
- class rail_pz_service.db.Model(**kwargs)[source]
-
Specific ML model that is trained to work with a specific Algorithm. On a particular type of data (CatalogTag)
Typically a Model is stored as a pickle file.
The rail.core.model.Model class provides a standard wrapper to store meta data such as the name of the python class that created the model, and the applicable CatalogTag to use the model with.
- algo_id: Mapped[int]
foreign key into Algorithm table
- catalog_tag_: Mapped[CatalogTag]
Access to associated CatalogTag
- catalog_tag_id: Mapped[int]
foreign key into CatalogTag table
- class_string: str = 'model'
- col_names_for_table = ['id', 'name', 'algo_id', 'catalog_tag_id', 'path']
column names to use when printing the table
- async classmethod get_create_kwargs(session, **kwargs)[source]
Get additional keywords needed to create a row
This should be overridden by sub-classes as needed
The default is to just return the original keywords
- Parameters:
session (async_scoped_session) – DB session manager
**kwargs (Any) – Columns and associated values for the new row
- Returns:
Keywords needed to create a new row
- Return type:
dict
- id: Mapped[int]
primary key
- name: Mapped[str]
Name for this Model, unique
- path: Mapped[str]
Path to the relevant file
- classmethod validate_model(path, algo, catalog_tag)[source]
Validate that the model is appropriate for the Algorithm and CatalogTag
- Parameters:
path (Path) – File with the data
algo (Algorithm) – Algorithm in question
catalog_tag (CatalogTag) – Catalog tag in question
- Return type:
None
- class rail_pz_service.db.Request(**kwargs)[source]
-
Basic processing unit in rail_pz_service. A Request to generate per-galaxy p(z) for all of the object in a particular Dataset using specific Estimator.
The output p(z) distribution will be stored in a qp file.
This also store some metadata including timestamps and the user who intiated the Request.
- class_string: str = 'request'
- col_names_for_table = ['id', 'user', 'estimator_id', 'dataset_id', 'qp_file_path']
column names to use when printing the table
- dataset_id: Mapped[int]
foreign key into dataset table
- estimator_id: Mapped[int]
foreign key into estimator table
- async classmethod get_create_kwargs(session, **kwargs)[source]
Get additional keywords needed to create a row
This should be overridden by sub-classes as needed
The default is to just return the original keywords
- Parameters:
session (async_scoped_session) – DB session manager
**kwargs (Any) – Columns and associated values for the new row
- Returns:
Keywords needed to create a new row
- Return type:
dict
- async classmethod get_open_requests(session)[source]
- Parameters:
session (async_scoped_session)
- Return type:
Sequence[Request]
- id: Mapped[int]
primary key
- qp_file_path: Mapped[str | None]
path to the output file
- time_created: Mapped[datetime]
timestamp of when the request was created in the DB
- time_finished: Mapped[datetime | None]
timestamp of when the request processing was finished
- time_started: Mapped[datetime | None]
timestamp of when the request processing started by an Estimator
- user: Mapped[str]
User who orginated this Request
- class rail_pz_service.db.RowMixin[source]
Bases:
objectMixin class to define common features of database rows for all the tables we use in rail_server
Here we a just defining the interface to manipulate any sort of table.
- class_string: str
- async classmethod create_row(session, **kwargs)[source]
Create a single row
- Parameters:
session (async_scoped_session) – DB session manager
**kwargs (Any) – Columns and associated values for the new row
- Returns:
Newly created row
- Return type:
T
- Raises:
CMIntegrityError – catching a IntegrityError
- async classmethod delete_row(session, row_id)[source]
Delete a single row, matching row.id == row_id
- Parameters:
session (async_scoped_session) – DB session manager
row_id (int) – PrimaryKey of the row to delete
- Raises:
CMMissingIDError – Row does not exist
CMIntegrityError – sqlalchemy.IntegrityError raised
- Return type:
None
- async classmethod get_create_kwargs(session, **kwargs)[source]
Get additional keywords needed to create a row
This should be overridden by sub-classes as needed
The default is to just return the original keywords
- Parameters:
session (async_scoped_session) – DB session manager
**kwargs (Any) – Columns and associated values for the new row
- Returns:
Keywords needed to create a new row
- Return type:
dict
- async classmethod get_row(session, row_id)[source]
Get a single row, matching row.id == row_id
- Parameters:
session (async_scoped_session) – DB session manager
row_id (int) – PrimaryKey of the row to return
- Returns:
The matching row
- Return type:
T
- Raises:
RAILMissingIDError – Row with ID does not exist
- async classmethod get_row_by_name(session, name)[source]
Get a single row, with row.name == name
- Parameters:
session (async_scoped_session) – DB session manager
name (str) – name of the row to return
- Returns:
Matching row
- Return type:
T
- Raises:
RAILMissingNameError – Row with ID does not exist
- async classmethod get_rows(session, skip=0, limit=100)[source]
Get rows associated to a particular table
- Parameters:
session (async_scoped_session) – DB session manager
skip (int) – Number of rows to skip before returning results
limit (int) – Number of row to return
- Returns:
All the matching rows
- Return type:
Sequence[T]
- id: Any
- name: Any
- pydantic_mode_class: type[BaseModel]
- async classmethod update_row(session, row_id, **kwargs)[source]
Update a single row, matching row.id == row_id
- Parameters:
session (async_scoped_session) – DB session manager
row_id (int) – PrimaryKey of the row to return
**kwargs (Any) – Columns and associated new values
- Returns:
Updated row
- Return type:
T
- Raises:
RAILIDMismatchError – ID mismatch between row IDs
RAILMissingIDError – Could not find row
RAILIntegrityError – catching a IntegrityError