Skip to content

Commit

Permalink
Update wdr setup: Add station_dobson_corrections table and ES index
Browse files Browse the repository at this point in the history
- Added `station_dobson_corrections` table to the database
- Added Elasticsearch (ES) index for station corrections
- Cleaned up the code
- Performed test updates
  • Loading branch information
Simran Mattu committed Mar 7, 2025
1 parent ce5a5bd commit 976bcb2
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 5 deletions.
3 changes: 2 additions & 1 deletion woudc_data_registry/epicentre/instrument.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ def add(ctx, station, dataset, contributor, name, model, serial, geometry):
instrument_ = {
'station_id': station,
'dataset_id': dataset,
'contributor_id': contributor,
'contributor': contributor.split(':')[0],
'project': contributor.split(':')[1],
'name': name,
'model': model,
'serial': serial,
Expand Down
4 changes: 2 additions & 2 deletions woudc_data_registry/epicentre/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,10 @@ def add_metadata(entity, dict_, save_to_registry=True, save_to_index=True):

dict_['country_id'] = getattr(results[0], Country.id_field)

if 'contributor_id' in dict_:
if 'contributor' in dict_:
LOGGER.debug('Querying for matching contributor')
results = REGISTRY.session.query(Contributor).filter(
Contributor.contributor_id == dict_['contributor_id'])
Contributor.contributor_id == f"{dict_['contributor']}:{dict_['project']}") # noqa

if results.count() == 0:
msg = f"Invalid contributor: {dict_['contributor_id']}"
Expand Down
157 changes: 155 additions & 2 deletions woudc_data_registry/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,13 @@
import yaml
from sqlalchemy import (Boolean, Column, create_engine, Date, DateTime,
Float, Enum, ForeignKey, Integer, String, Time,
UniqueConstraint, ForeignKeyConstraint, ARRAY)
UniqueConstraint, ForeignKeyConstraint, ARRAY, Text)
from sqlalchemy.exc import OperationalError, ProgrammingError
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship

from elasticsearch.exceptions import (ConnectionError, RequestError)

from woudc_data_registry import config, registry
from woudc_data_registry.search import SearchIndex, search
from woudc_data_registry.util import (get_date, point2geojsongeometry,
Expand Down Expand Up @@ -1826,6 +1828,71 @@ def generate_ids(self):
self.ozone_id = ':'.join(map(str, components))


class StationDobsonCorrections(base):
""""""
__tablename__ = 'station_dobson_corrections'

id_field = "dobson_correction_id"
id_dependencies = ['station_id', 'AD_correcting_source',
'CD_correcting_factor']

# columns
dobson_correction_id = Column(String, primary_key=True)
station_id = Column(String(255), ForeignKey('stations.station_id'),
nullable=False)
AD_corrected = Column(Boolean, nullable=False, default=False)
CD_corrected = Column(Boolean, nullable=False, default=False)
AD_correcting_source = Column(String(255), nullable=False)
CD_correcting_source = Column(String(255), nullable=False)
CD_correcting_factor = Column(String(255), nullable=False, default='cd')
correction_comments = Column(Text, nullable=False)

# relationshipts
station = relationship('Station', backref=__tablename__)

def __init__(self, dict_):
self.station_id = dict_['station']
self.AD_corrected = bool(dict_['AD_corrected'])
self.CD_corrected = bool(dict_['CD_corrected'])
self.AD_correcting_source = dict_['AD_correcting_source']
self.CD_correcting_source = dict_['CD_correcting_source']
self.CD_correcting_factor = dict_['CD_correcting_factor']
self.correction_comments = dict_['correction_comments']

self.generate_ids()

@property
def __geo_interface__(self):
return {
'id': self.dobson_correction_id,
'type': 'Feature',
'geometry': point2geojsongeometry(self.station.x, self.station.y,
self.station.z),
'properties': {
'identifier': self.dobson_correction_id,
'station_id': self.station_id,
'AD_corrected': self.AD_corrected,
'CD_corrected': self.CD_corrected,
'AD_correcting_source': self.AD_correcting_source,
'CD_correcting_source': self.CD_correcting_source,
'CD_correcting_factor': self.CD_correcting_factor,
'correction_comments': self.correction_comments
}
}

def __repr__(self):
return f'Station Dobson Correction ({self.dobson_correction_id})'

def generate_ids(self):
"""Builds and sets class ID field from other attributes"""

if all([hasattr(self, field) and getattr(self, field) is not None
for field in self.id_dependencies]):
components = [getattr(self, field)
for field in self.id_dependencies]
self.dobson_correction_id = f"{self.station_id}:AD-{components[1]}:CD-{components[2]}" # noqa


def build_contributions(instrument_models):
"""function that forms contributions from other model lists"""

Expand Down Expand Up @@ -2000,6 +2067,76 @@ def setup(ctx):
click.echo(f'ERROR: {err}')


@click.command()
@click.pass_context
@click.option('--datadir', '-d',
type=click.Path(exists=True, resolve_path=True),
help='Path to core metadata files')
@click.pass_context
def setup_dobson_correction(ctx, datadir):
""" Add the station Dobson correction table and ES index to the
database and ES, respectively, that does not have it. """
from woudc_data_registry import config
import os
from woudc_data_registry.search import MAPPINGS, SearchIndex, SearchIndexError # noqa

registry_ = registry.Registry()

engine = create_engine(config.WDR_DATABASE_URL, echo=config.WDR_DB_DEBUG)

try:
click.echo(f'Generating model: {StationDobsonCorrections.__tablename__}') # noqa
StationDobsonCorrections.__table__.create(engine, checkfirst=True)
click.echo('Done')
except (OperationalError, ProgrammingError) as err:
click.echo(f'ERROR: {err}')

click.echo(f'Adding data to {StationDobsonCorrections.__tablename__}')
station_dobson_corrections = os.path.join(datadir, 'station_dobson_corrections.csv') # noqa

station_dobson_corrections_models = []

click.echo('Loading station dobson corrections items')
with open(station_dobson_corrections) as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
station_dobson_corrections = StationDobsonCorrections(row)
station_dobson_corrections_models.append(station_dobson_corrections) # noqa

click.echo('Storing station dobson corrections items in data registry')
for model in station_dobson_corrections_models:
registry_.save(model)

click.echo('Create ES index for station dobson corrections')
search_index = SearchIndex()

index_name = search_index.generate_index_name(MAPPINGS['station_dobson_corrections']['index']) # noqa
settings = {
'mappings': {
'properties': {
'geometry': {
'type': 'geo_shape'
}
}
},
'settings': {
'index': {
'number_of_shards': 1,
'number_of_replicas': 0
}
}
}
if 'properties' in MAPPINGS['station_dobson_corrections']:
settings['mappings']['properties']['properties'] = {
'properties': MAPPINGS['station_dobson_corrections']['properties']
}
try:
search_index.connection.indices.create(index=index_name, body=settings)
click.echo('ES index created: station_dobson_corrections')
except (ConnectionError, RequestError) as err:
click.echo(f'ERROR: {err}')


@click.command()
@click.pass_context
def teardown(ctx):
Expand Down Expand Up @@ -2043,6 +2180,7 @@ def init(ctx, datadir, init_search_index):
deployments = os.path.join(datadir, 'deployments.csv')
notifications = os.path.join(datadir, 'notifications.csv')
discovery_metadata = os.path.join(datadir, 'woudc.skos.yaml')
station_dobson_corrections = os.path.join(datadir, 'station_dobson_corrections.csv') # noqa

registry_ = registry.Registry()

Expand All @@ -2057,6 +2195,7 @@ def init(ctx, datadir, init_search_index):
contribution_models = []
notification_models = []
discovery_metadata_models = []
station_dobson_corrections_models = []

click.echo('Loading WMO countries metadata')
with open(wmo_countries) as jsonfile:
Expand Down Expand Up @@ -2157,6 +2296,13 @@ def init(ctx, datadir, init_search_index):
discovery_metadata_ = DiscoveryMetadata(row)
discovery_metadata_models.append(discovery_metadata_)

click.echo('Loading station dobson corrections items')
with open(station_dobson_corrections) as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
station_dobson_corrections = StationDobsonCorrections(row)
station_dobson_corrections_models.append(station_dobson_corrections) # noqa

click.echo('Storing projects in data registry')
for model in project_models:
registry_.save(model)
Expand Down Expand Up @@ -2187,6 +2333,9 @@ def init(ctx, datadir, init_search_index):
click.echo('Storing discovery metadata items in data registry')
for model in discovery_metadata_models:
registry_.save(model)
click.echo('Storing station dobson corrections items in data registry')
for model in station_dobson_corrections_models:
registry_.save(model)

instrument_from_registry = registry_.query_full_index(Instrument)

Expand Down Expand Up @@ -2237,6 +2386,8 @@ def init(ctx, datadir, init_search_index):
search_index.index(Notification, notification_docs)
click.echo('Storing discovery metadata items in search index')
search_index.index(DiscoveryMetadata, discovery_metadata_docs)
click.echo('Storing station dobson corrections items in search index')
search_index.index(StationDobsonCorrections, station_dobson_corrections) # noqa


@click.command('sync')
Expand All @@ -2256,7 +2407,8 @@ def sync(ctx):
Contribution,
Notification,
PeerDataRecord,
DiscoveryMetadata
DiscoveryMetadata,
StationDobsonCorrections
]

registry_ = registry.Registry()
Expand Down Expand Up @@ -2362,6 +2514,7 @@ def product_sync(ctx):
admin.add_command(show_config)
admin.add_command(registry__)
admin.add_command(search)
admin.add_command(setup_dobson_correction)

registry__.add_command(setup)
registry__.add_command(teardown)
Expand Down
34 changes: 34 additions & 0 deletions woudc_data_registry/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -1050,6 +1050,40 @@
'fields': {'raw': typedefs['keyword']}
}
}
},
'station_dobson_corrections': {
'index': 'station_dobson_corrections',
'properties': {
'identifier': {
'type': 'text',
'fields': {'raw': typedefs['keyword']}
},
'station_id': {
'type': 'text',
'fields': {'raw': typedefs['keyword']}
},
'AD_corrected': {
'type': 'boolean'
},
'CD_correction': {
'type': 'boolean'
},
'AD_correcting_source': {
'type': 'text',
'fields': {'raw': typedefs['keyword']}
},
'CD_correcting_source': {
'type': 'text',
'fields': {'raw': typedefs['keyword']}
},
'CD_correcting_factor': {
'type': 'text',
'fields': {'raw': typedefs['keyword']}
},
'correction_comments': {
'type': 'text'
}
}
}
}

Expand Down

0 comments on commit 976bcb2

Please sign in to comment.