Source code for obspy.clients.nrl.client

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Client for accessing the `IRIS Library of Nominal Response for Seismic
Instruments <https://ds.iris.edu/NRL/>`_ (NRL).  To cite use of the NRL, please
see [Templeton2017]_.

:copyright:
    Lloyd Carothers IRIS/PASSCAL, 2016
    The ObsPy Development Team (devs@obspy.org)
:license:
    GNU Lesser General Public License, Version 3
    (https://www.gnu.org/copyleft/lesser.html)
"""
import codecs
import io
import os
import warnings
from configparser import ConfigParser, DuplicateSectionError
from urllib.parse import urlparse

import requests

import obspy
from obspy.core.compatibility import get_text_from_response
from obspy.core.inventory.util import _textwrap
from obspy.core.util.decorator import deprecated


# Simple cache for remote NRL access. The total data amount will always be
# fairly small so I don't think it needs any cache eviction for now.
_remote_nrl_cache = {}


[docs]class NRL(object): """ NRL client base class for accessing the Nominal Response Library. https://ds.iris.edu/NRL/ Created with a URL for remote access or filesystem accessing a local copy. .. warning:: Remote access to online NRL is deprecated as it will stop working in Spring 2023 due to server side changes. """ _index = 'index.txt'
[docs] def __new__(cls, root=None): # root provided and it's no web URL if root: scheme = urlparse(root).scheme if scheme in ('http', 'https'): return super(NRL, cls).__new__(RemoteNRL) # Check if it's really a folder on the file-system. if not os.path.isdir(root): msg = ("Provided path '{}' seems to be a local file path " "but the directory does not exist.").format(root) raise ValueError(msg) return super(NRL, cls).__new__(LocalNRL) # Otherwise delegate to the remote NRL client to deal with all kinds # of remote resources (currently only HTTP). return super(NRL, cls).__new__(RemoteNRL)
[docs] def __init__(self): try: sensor_index = self._join(self.root, 'sensors', self._index) self.sensors = self._parse_ini(sensor_index) datalogger_index = self._join(self.root, 'dataloggers', self._index) self.dataloggers = self._parse_ini(datalogger_index) self._nrl_version = 1 except FileNotFoundError: sensor_index = self._join(self.root, 'sensor', self._index) self.sensors = self._parse_ini(sensor_index) datalogger_index = self._join(self.root, 'datalogger', self._index) self.dataloggers = self._parse_ini(datalogger_index) # version 2 also has additional base nodes "integrated" and "soh" self._nrl_version = 2
[docs] def __str__(self): info = ['NRL library at ' + self.root] if self.sensors is None: info.append(' Sensors not parsed yet.') else: info.append( ' Sensors: {} manufacturers'.format(len(self.sensors))) if len(self.sensors): keys = [key for key in sorted(self.sensors)] lines = _textwrap("'" + "', '".join(keys) + "'", initial_indent=' ', subsequent_indent=' ') info.extend(lines) if self.dataloggers is None: info.append(' Dataloggers not parsed yet.') else: info.append(' Dataloggers: {} manufacturers'.format( len(self.dataloggers))) if len(self.dataloggers): keys = [key for key in sorted(self.dataloggers)] lines = _textwrap("'" + "', '".join(keys) + "'", initial_indent=' ', subsequent_indent=' ') info.extend(lines) return '\n'.join(_i.rstrip() for _i in info)
[docs] def _repr_pretty_(self, p, cycle): # pragma: no cover p.text(str(self))
[docs] def _choose(self, choice, path): # Should return either a path or a resp cp = self._get_cp_from_ini(path) options = cp.options(choice) if 'path' in options: newpath = cp.get(choice, 'path') elif 'resp' in options: newpath = cp.get(choice, 'resp') elif 'xml' in options: newpath = cp.get(choice, 'xml') # Strip quotes of new path newpath = self._clean_str(newpath) path = os.path.dirname(path) return self._join(path, newpath)
[docs] def _parse_ini(self, path): nrl_dict = NRLDict(self) cp = self._get_cp_from_ini(path) for section in cp.sections(): options = sorted(cp.options(section)) if section.lower() == 'main': if options not in (['question'], ['detail', 'question']): # pragma: no cover msg = "Unexpected structure of NRL file '{}'".format(path) raise NotImplementedError(msg) nrl_dict._question = self._clean_str(cp.get(section, 'question')) continue else: if options == ['path']: nrl_dict[section] = NRLPath(self._choose(section, path)) continue # sometimes the description field is named 'description', but # sometimes also 'descr' # NRL version 2 does not seem to have any of the 'descr = ' # oddities anymore, but it can be downloaded in RESP format or # StationXML format and then the option name is different elif options in (['description', 'resp'], ['descr', 'resp'], ['resp'], ['description', 'xml']): if 'descr' in options: descr = cp.get(section, 'descr') elif 'description' in options: descr = cp.get(section, 'description') else: descr = '<no description>' descr = self._clean_str(descr) resp_path = self._choose(section, path) if 'resp' in options: resp_type = 'RESP' elif 'xml' in options: resp_type = 'STATIONXML' else: raise NotImplementedError(msg) nrl_dict[section] = (descr, resp_path, resp_type) continue else: # pragma: no cover msg = "Unexpected structure of NRL file '{}'".format(path) raise NotImplementedError(msg) return nrl_dict
[docs] def _clean_str(self, string): return string.strip('\'"')
[docs] def get_datalogger_response(self, datalogger_keys): """ Get the datalogger response. :type datalogger_keys: list[str] :rtype: :class:`~obspy.core.inventory.response.Response` """ datalogger = self.dataloggers for key in datalogger_keys: datalogger = datalogger[key] # Parse to an inventory object and return a response object. description, path, resp_type = datalogger with io.BytesIO(self._read_resp(path).encode()) as buf: buf.seek(0, 0) return obspy.read_inventory( buf, format=resp_type)[0][0][0].response
[docs] def get_sensor_response(self, sensor_keys): """ Get the sensor response. :type sensor_keys: list[str] :rtype: :class:`~obspy.core.inventory.response.Response` """ sensor = self.sensors for key in sensor_keys: sensor = sensor[key] # Parse to an inventory object and return a response object. description, path, resp_type = sensor with io.BytesIO(self._read_resp(path).encode()) as buf: buf.seek(0, 0) return obspy.read_inventory( buf, format=resp_type)[0][0][0].response
[docs] def get_response(self, datalogger_keys, sensor_keys): """ Get Response from NRL tree structure :param datalogger_keys: List of data-loggers. :type datalogger_keys: list[str] :param sensor_keys: List of sensors. :type sensor_keys: list[str] :rtype: :class:`~obspy.core.inventory.response.Response` >>> nrl = NRL() >>> response = nrl.get_response( ... sensor_keys=['Nanometrics', 'Trillium Compact 120 (Vault, ' ... 'Posthole, OBS)', '754 V/m/s'], ... datalogger_keys=['REF TEK', 'RT 130 & 130-SMA', '1', '200']) >>> print(response) # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS Channel Response From M/S (Velocity in Meters per Second) to COUNTS (Digital Counts) Overall Sensitivity: 4.74576e+08 defined at 1.000 Hz 10 stages: Stage 1: PolesZerosResponseStage from M/S to V, gain: 754.3 Stage 2: ResponseStage from V to V, gain: 1 Stage 3: Coefficients... from V to COUNTS, gain: 629129 Stage 4: Coefficients... from COUNTS to COUNTS, gain: 1 Stage 5: Coefficients... from COUNTS to COUNTS, gain: 1 Stage 6: Coefficients... from COUNTS to COUNTS, gain: 1 Stage 7: Coefficients... from COUNTS to COUNTS, gain: 1 Stage 8: Coefficients... from COUNTS to COUNTS, gain: 1 Stage 9: Coefficients... from COUNTS to COUNTS, gain: 1 Stage 10: Coefficients... from COUNTS to COUNTS, gain: 1 """ dl_resp = self.get_datalogger_response(datalogger_keys) sensor_resp = self.get_sensor_response(sensor_keys) sensor_stage0 = sensor_resp.response_stages[0] # information on changes between NRL v1 and v2: # https://ds.iris.edu/files/nrl/NominalResponseLibraryVersions.pdf if self._nrl_version == 1: # Combine both by replace stage one in the data logger with stage # one of the sensor. dl_resp.response_stages.pop(0) dl_resp.response_stages.insert(0, sensor_stage0) elif self._nrl_version == 2: for stage in dl_resp.response_stages: stage.stage_sequence_number += len(sensor_resp.response_stages) dl_resp.response_stages = ( sensor_resp.response_stages + dl_resp.response_stages) else: raise NotImplementedError() dl_resp.instrument_sensitivity.input_units = sensor_stage0.input_units dl_resp.instrument_sensitivity.input_units_description = \ sensor_stage0.input_units_description try: dl_resp.recalculate_overall_sensitivity() except ValueError: msg = "Failed to recalculate overall sensitivity." warnings.warn(msg) return dl_resp
[docs]class NRLDict(dict):
[docs] def __init__(self, nrl): self._nrl = nrl
[docs] def __str__(self): if len(self): if self._question: info = ['{} ({} items):'.format(self._question, len(self))] else: info = ['{} items:'.format(len(self))] texts = ["'{}'".format(k) for k in sorted(self.keys())] info.extend(_textwrap(", ".join(texts), initial_indent=' ', subsequent_indent=' ')) return '\n'.join(_i.rstrip() for _i in info) else: return '0 items.'
[docs] def _repr_pretty_(self, p, cycle): # pragma: no cover p.text(str(self))
[docs] def __getitem__(self, name): value = super(NRLDict, self).__getitem__(name) # if encountering a not yet parsed NRL Path, expand it now if isinstance(value, NRLPath): value = self._nrl._parse_ini(value) self[name] = value return value
[docs]class NRLPath(str): pass
[docs]class LocalNRL(NRL): """ Subclass of NRL for accessing local copy NRL. """
[docs] def __init__(self, root): self.root = root self._join = os.path.join super(self.__class__, self).__init__()
[docs] def _get_cp_from_ini(self, path): """ Returns a configparser from a path to an index.txt """ try: cp = ConfigParser() with codecs.open(path, mode='r', encoding='UTF-8') as f: cp.read_file(f) # it seems requesting a full RESP archive of NRL version 2 has all # items duplicated in the index.txt files. expecting this to be fixed # upstream so this is just for now except DuplicateSectionError: cp = ConfigParser(strict=False) with codecs.open(path, mode='r', encoding='UTF-8') as f: cp.read_file(f) return cp
[docs] def _read_resp(self, path): # Returns Unicode string of RESP with open(path, 'r') as f: return f.read()
[docs]class RemoteNRL(NRL): """ DEPRECATED Subclass of NRL for accessing remote copy of NRL. Direct access to online NRL is deprecated as it will stop working when the original NRLv1 gets taken offline (Spring 2023), please consider working locally with a downloaded full copy of the old NRLv1 or new NRLv2 following instructions on the `NRL landing page <https://ds.iris.edu/ds/nrl/>`_. """
[docs] @deprecated() def __init__(self, root='https://ds.iris.edu/NRL'): """ DEPRECATED Direct access to online NRL is deprecated as it will stop working when the original NRLv1 gets taken offline (Spring 2023), please consider working locally with a downloaded full copy of the old NRLv1 or new NRLv2 following instructions on the `NRL landing page <https://ds.iris.edu/ds/nrl/>`_. """ self.root = root super(self.__class__, self).__init__()
[docs] def _download(self, url): """ Download service with basic cache. """ if url not in _remote_nrl_cache: r = requests.get(url) _remote_nrl_cache[url] = get_text_from_response(r) return _remote_nrl_cache[url]
[docs] def _join(self, *paths): url = paths[0] for path in paths[1:]: url = requests.compat.urljoin(url + '/', path) return url
[docs] def _get_cp_from_ini(self, path): ''' Returns a configparser from a path to an index.txt ''' cp = ConfigParser() with io.StringIO(self._download(path)) as buf: cp.read_file(buf) return cp
[docs] def _read_resp(self, path): return self._download(path)
if __name__ == "__main__": # pragma: no cover import doctest doctest.testmod(exclude_empty=True)