# -*- coding: utf-8 -*-
"""
ObsPy implementation for parsing the SeisComp XML format
to an Inventory object.
This is a modified version of obspy.io.stationxml.
:author:
Mathijs Koymans (koymans@knmi.nl), 11.2015 - [Jollyfant@GitHub]
:copyright:
The ObsPy Development Team (devs@obspy.org)
:license:
GNU Lesser General Public License, Version 3
(https://www.gnu.org/copyleft/lesser.html)
"""
import math
import re
import warnings
from lxml import etree
import numpy as np
from scipy.signal import tf2zpk
import obspy
from obspy.core.util.obspy_types import (ComplexWithUncertainties,
FloatWithUncertaintiesAndUnit)
from obspy.core.inventory import (Azimuth, ClockDrift, Dip,
Distance, Frequency, Latitude,
Longitude, SampleRate)
from obspy.core.inventory.response import (
CoefficientsTypeResponseStage, FilterCoefficient,
FIRResponseStage, PolesZerosResponseStage,
PolynomialResponseStage, ResponseListElement,
ResponseListResponseStage)
from obspy.io.stationxml.core import _read_floattype
SOFTWARE_MODULE = "ObsPy %s" % obspy.__version__
SOFTWARE_URI = "http://www.obspy.org"
SCHEMA_VERSION = ['0.6', '0.7', '0.8', '0.9', '0.10', '0.11', '0.12', '0.13']
SCHEMA_NAMESPACE_BASE = "http://geofon.gfz-potsdam.de/ns/seiscomp3-schema"
[docs]
def _get_schema_namespace(version_string):
"""
>>> print(_get_schema_namespace('0.13'))
http://geofon.gfz-potsdam.de/ns/seiscomp3-schema/0.13
"""
namespace = "%s/%s" % (SCHEMA_NAMESPACE_BASE, version_string)
return namespace
[docs]
def _count_complex(complex_string):
"""
Returns number of complex numbers in string (formatted according to
SeisComp XML schema type "ComplexArray"). Raises an Exception if string
seems invalid.
"""
counts = set()
for char in '(,)':
counts.add(complex_string.count(char))
if len(counts) != 1:
msg = ("Invalid string for list of complex numbers:"
"\n'%s'") % complex_string
raise ValueError(msg)
return counts.pop()
[docs]
def _parse_list_of_complex_string(complex_string):
"""
Returns a list of complex numbers, parsed from a string (formatted
according to SeisComp XML schema type "ComplexArray").
"""
count = _count_complex(complex_string)
numbers = re.findall(r'\(\s*([^,\s]+)\s*,\s*([^)\s]+)\s*\)',
complex_string)
if len(numbers) != count:
msg = ("Unexpected count of complex numbers parsed from string:"
"\n Raw string: '%s'\n Expected count of complex numbers: %s"
"\n Parsed complex numbers: %s") % (complex_string, count,
numbers)
raise ValueError(msg)
return numbers
[docs]
def _read_sc3ml(path_or_file_object, **kwargs):
"""
Function for reading a SeisComp XML file.
:param path_or_file_object: File name or file like object.
"""
root = etree.parse(path_or_file_object).getroot()
# Code can be used for version 0.6 to 0.13 (Seiscomp 6.x)
for version in SCHEMA_VERSION:
namespace = _get_schema_namespace(version)
if root.find("{%s}%s" % (namespace, "Inventory")) is not None:
break
else:
raise ValueError("Schema version not supported.")
def _ns(tagname):
return "{%s}%s" % (namespace, tagname)
# This needs to be tested, did not find an inventory
# with the journal entry.
journal = root.find(_ns("Journaling"))
if journal is not None:
entry = journal.find(_ns("entry"))
if entry is not None:
created = _tag2obj(entry, _ns("created"), obspy.UTCDateTime)
sender = _tag2obj(entry, _ns("sender"), str)
else:
created = None
sender = "ObsPy Inventory"
# Set source to this script
source = "scxml import"
module = None
module_uri = None
# Find the inventory root element. (Only finds the first. We expect only
# one, so any more than that will be ignored.)
inv_element = root.find(_ns("Inventory"))
# Pre-generate a dictionary of the sensors, dataloggers and responses to
# avoid costly linear search when parsing network nodes later.
# Register sensors
sensors = {}
for sensor_element in inv_element.findall(_ns("sensor")):
public_id = sensor_element.get("publicID")
if public_id:
if public_id in sensors:
msg = ("Found multiple matching sensor tags with the same "
"publicID '{}'.".format(public_id))
raise obspy.ObsPyException(msg)
else:
sensors[public_id] = sensor_element
# Register dataloggers
dataloggers = {}
for datalogger_element in inv_element.findall(_ns("datalogger")):
public_id = datalogger_element.get("publicID")
if public_id:
if public_id in dataloggers:
msg = ("Found multiple matching datalogger tags with the same "
"publicID '{}'.".format(public_id))
raise obspy.ObsPyException(msg)
else:
dataloggers[public_id] = datalogger_element
# Register reponses
responses = {}
for response_type in ["responseFAP", "responseFIR", "responsePAZ",
"responseIIR", "responsePolynomial"]:
for response_element in inv_element.findall(_ns(response_type)):
public_id = response_element.get("publicID")
if public_id:
if public_id in responses:
msg = ("Found multiple matching {} tags with the same "
"publicID '{}'.".format(response_type, public_id))
raise obspy.ObsPyException(msg)
else:
responses[public_id] = response_element
# Organize all the collection instrument information into a unified
# intrumentation register
instrumentation_register = {
"sensors": sensors,
"dataloggers": dataloggers,
"responses": responses
}
# Collect all networks from the scxml inventory
networks = []
for net_element in inv_element.findall(_ns("network")):
networks.append(_read_network(instrumentation_register,
net_element, _ns))
return obspy.core.inventory.Inventory(networks=networks, source=source,
sender=sender, created=created,
module=module, module_uri=module_uri)
[docs]
def _tag2obj(element, tag, convert):
"""
Reads text from tag in element
:param element: etree element
:param tag: name of tag to be read
:param convert: intrinsic function (e.g. int, str, float)
"""
try:
# Single closing tags e.g. <analogueFilterChain/>.text return None
# and will be converted to a string 'None' when convert is str
found_tag_text = element.find(tag).text
if found_tag_text is None:
return None
return convert(found_tag_text)
except Exception:
None
[docs]
def _read_network(instrumentation_register, net_element, _ns):
"""
Reads the network structure
:param instrumentation_register: register of instrumentation metadata
:param net_element: network element to be read
:param _ns: namespace
"""
# Get the network code as attribute (e.g. <network code="GB">)
network = obspy.core.inventory.Network(net_element.get("code"))
# There is no further information in the attributes of <network>
# Start and end date are included as tags
network.start_date = _tag2obj(net_element, _ns("start"), obspy.UTCDateTime)
network.end_date = _tag2obj(net_element, _ns("end"), obspy.UTCDateTime)
network.description = _tag2obj(net_element, _ns("description"), str)
# get the restricted_status (boolean)
# true is evaluated to 'open'; false to 'closed'
# to match stationXML format
network.restricted_status = _get_restricted_status(net_element, _ns)
# Collect the stations
stations = []
for sta_element in net_element.findall(_ns("station")):
stations.append(_read_station(instrumentation_register,
sta_element, _ns))
network.stations = stations
return network
[docs]
def _get_restricted_status(element, _ns):
"""
get the restricted_status (boolean)
true is evaluated to 'open' and false to 'closed'
to match stationXML formatting
"""
restricted_status = _tag2obj(element, _ns("restricted"), str)
if restricted_status == 'false':
return 'open'
else:
return 'closed'
[docs]
def _read_station(instrumentation_register, sta_element, _ns):
"""
Reads the station structure
:param instrumentation_register: register of instrumentation metadata
:param sta_element: station element to be read
:param _ns: name space
"""
# Read location tags
longitude = _read_floattype(sta_element, _ns("longitude"), Longitude,
datum=True)
latitude = _read_floattype(sta_element, _ns("latitude"), Latitude,
datum=True)
elevation = _read_floattype(sta_element, _ns("elevation"), Distance,
unit=True)
station = obspy.core.inventory.Station(code=sta_element.get("code"),
latitude=latitude,
longitude=longitude,
elevation=elevation)
station.site = _read_site(sta_element, _ns)
# There is no relevant info in the base node
# Read the start and end date (creation, termination) from tags
# "Vault" and "Geology" are not defined in scxml ?
station.start_date = _tag2obj(sta_element, _ns("start"), obspy.UTCDateTime)
station.end_date = _tag2obj(sta_element, _ns("end"), obspy.UTCDateTime)
station.creation_date = _tag2obj(sta_element, _ns("start"),
obspy.UTCDateTime)
station.termination_date = _tag2obj(sta_element, _ns("end"),
obspy.UTCDateTime)
# get the restricted_status (boolean)
# true is evaluated to 'open'; false to 'closed'
station.restricted_status = _get_restricted_status(sta_element, _ns)
# Get all the channels, scxml keeps these in <sensorLocation> tags in the
# station element. Individual channels are contained within <stream> tags
channels = []
for sen_loc_element in sta_element.findall(_ns("sensorLocation")):
for channel in sen_loc_element.findall(_ns("stream")):
channels.append(_read_channel(instrumentation_register,
channel, _ns))
station.channels = channels
return station
[docs]
def _read_site(sta_element, _ns):
"""
Reads site information from the station element tags
and region from network element
In scxml, site information are included as
tags in the station_element
:param sta_element: station element
:param _ns: namespace
"""
# The region is defined in the parent network element
net_element = sta_element.getparent()
region = _tag2obj(net_element, _ns("region"), str)
# The country, place, description are given in the
# station element
country = _tag2obj(sta_element, _ns("country"), str)
place = _tag2obj(sta_element, _ns("place"), str)
description = _tag2obj(sta_element, _ns("description"), str)
# The name is usually the description
name = description
return obspy.core.inventory.Site(name=name, description=None,
town=place, county=None, region=region,
country=country)
[docs]
def _read_datalogger(equip_element, _ns):
"""
Reads equipment information from datalogger
Some information is not present > to None
:param data_log_element: element to be parsed
:param _ns: name space
"""
resource_id = equip_element.get("publicID")
description = _tag2obj(equip_element, _ns("description"), str)
manufacturer = _tag2obj(equip_element, _ns("digitizerManufacturer"), str)
model = _tag2obj(equip_element, _ns("digitizerModel"), str)
return obspy.core.inventory.Equipment(
resource_id=resource_id, type=model, description=description,
manufacturer=manufacturer, vendor=None, model=model,
serial_number=None, installation_date=None,
removal_date=None, calibration_dates=None)
[docs]
def _read_sensor(equip_element, _ns):
"""
Reads equipment information from element
Some information is not present > to None
:param equip_element: element to be parsed
:param _ns: name space
"""
# try to read some element tags, most are missing anyway
resource_id = equip_element.get("publicID")
equipment_type = _tag2obj(equip_element, _ns("type"), str)
description = _tag2obj(equip_element, _ns("description"), str)
manufacturer = _tag2obj(equip_element, _ns("manufacturer"), str)
model = _tag2obj(equip_element, _ns("model"), str)
return obspy.core.inventory.Equipment(
resource_id=resource_id, type=equipment_type, description=description,
manufacturer=manufacturer, vendor=None, model=model,
serial_number=None, installation_date=None,
removal_date=None, calibration_dates=None)
[docs]
def _read_channel(instrumentation_register, cha_element, _ns):
"""
reads channel element from scxml format
:param instrumentation_register: register of instrumentation metadata
:param cha_element: channel element
:param _ns: namespace
"""
code = cha_element.get("code")
# Information is also kept within the parent <sensorLocation> element
sen_loc_element = cha_element.getparent()
sen_sta_element = sen_loc_element.getparent()
sen_net_element = sen_sta_element.getparent()
network_code = sen_net_element.get("code")
station_code = sen_sta_element.get("code")
location_code = sen_loc_element.get("code")
seed_id = '{}.{}.{}.{}'.format(
network_code,
station_code,
location_code,
code
)
# get site info from the <sensorLocation> element
longitude = _read_floattype(sen_loc_element, _ns("longitude"), Longitude,
datum=True)
latitude = _read_floattype(sen_loc_element, _ns("latitude"), Latitude,
datum=True)
elevation = _read_floattype(sen_loc_element, _ns("elevation"), Distance,
unit=True)
depth = _read_floattype(cha_element, _ns("depth"), Distance,
unit=True)
# Set values to 0 if they are is missing (see #1816)
if longitude is None:
msg = "Sensor is missing longitude information, using 0.0"
warnings.warn(msg)
longitude = 0
if latitude is None:
msg = "Sensor is missing latitude information, using 0.0"
warnings.warn(msg)
latitude = 0
if elevation is None:
msg = "Sensor is missing elevation information, using 0.0"
warnings.warn(msg)
elevation = 0
if depth is None:
msg = "Channel is missing depth information, using 0.0"
warnings.warn(msg)
depth = 0
channel = obspy.core.inventory.Channel(
code=code, location_code=location_code, latitude=latitude,
longitude=longitude, elevation=elevation, depth=depth)
# obtain the sensorID and link to particular publicID <sensor> element
# in the inventory base node
sensor_id = cha_element.get("sensor")
if sensor_id is None:
sensor_element = None
else:
sensor_element = instrumentation_register['sensors']\
.get(sensor_id)
# obtain the poles and zeros responseID and link to particular
# <responsePAZ> publicID element in the inventory base node
if (sensor_element is not None and
sensor_element.get("response") is not None):
response_id = sensor_element.get("response")
if response_id is not None:
# Change in v0.10 the way identifiers are delimited (# -> /)
response_element = instrumentation_register['responses']\
.get(response_id)
else:
msg = (
"Could not find response tag with public ID '{response}'. "
"Omitting response stage information from Inventory for "
"channel '{channel}'.".format(response=response_id,
channel=seed_id)
)
warnings.warn(msg)
response_element = None
else:
response_element = None
# obtain the dataloggerID and link to particular <responsePAZ> publicID
# element in the inventory base node
datalogger_id = cha_element.get("datalogger")
if datalogger_id is None:
data_log_element = None
else:
data_log_element = instrumentation_register['dataloggers']\
.get(datalogger_id)
channel.restricted_status = _get_restricted_status(cha_element, _ns)
# There is no further information in the attributes of <stream>
# Start and end date are included as tags instead
channel.start_date = _tag2obj(cha_element, _ns("start"), obspy.UTCDateTime)
channel.end_date = _tag2obj(cha_element, _ns("end"), obspy.UTCDateTime)
# Determine sample rate (given is a numerator, denominator)
# Assuming numerator is # samples and denominator is # seconds
numerator = _tag2obj(cha_element, _ns("sampleRateNumerator"), int)
denominator = _tag2obj(cha_element, _ns("sampleRateDenominator"), int)
# If numerator is non-zero and denominator zero, will raise
# ZeroDivisionError.
rate = numerator / denominator if numerator != 0 else 0
channel.sample_rate_ratio_number_samples = numerator
channel.sample_rate_ratio_number_seconds = denominator
channel.sample_rate = _read_float_var(rate, SampleRate)
if sensor_element is not None:
channel.sensor = _read_sensor(sensor_element, _ns)
if data_log_element is not None:
channel.data_logger = _read_datalogger(data_log_element, _ns)
temp = _read_floattype(data_log_element, _ns("maxClockDrift"),
ClockDrift)
if temp is not None:
if channel.sample_rate != 0.0:
channel.clock_drift_in_seconds_per_sample = \
_read_float_var(temp / channel.sample_rate, ClockDrift)
else:
msg = "Clock drift division by sample rate of 0: " \
"using sec/sample"
warnings.warn(msg)
channel.sample_rate = temp
channel.azimuth = _read_floattype(cha_element, _ns("azimuth"), Azimuth)
channel.dip = _read_floattype(cha_element, _ns("dip"), Dip)
match = re.search(r'{([^}]*)}', cha_element.tag)
if match:
namespace = match.group(1)
else:
namespace = _get_schema_namespace('0.13')
channel.extra = {'format': {
'value': _tag2obj(cha_element, _ns("format"), str),
# storage format of channel not supported by StationXML1.1 anymore,
# keep it as a foreign tag to be nice if anybody needs to access it
'namespace': namespace}}
if channel.sample_rate == 0.0:
msg = "Something went hopelessly wrong, found sampling-rate of 0!"
warnings.warn(msg)
# Begin to collect digital/analogue filter chains
# This information is stored as an array in the datalogger element
response_dig_id = []
response_paz_id = []
if data_log_element is not None:
# Find the decimation element with a particular num/denom
decim_element = data_log_element.find(_ns(
"decimation[@sampleRateDenominator='" +
str(int(denominator)) + "'][@sampleRateNumerator='" +
str(int(numerator)) + "']"))
analogue_filter_chain = _tag2obj(decim_element,
_ns("analogueFilterChain"), str)
if analogue_filter_chain is not None:
response_paz_id = analogue_filter_chain.split(" ")
digital_filter_chain = _tag2obj(decim_element,
_ns("digitalFilterChain"), str)
if digital_filter_chain is not None:
response_dig_id = digital_filter_chain.split(" ")
channel.response = _read_response(instrumentation_register,
sensor_element, response_element,
cha_element, data_log_element, _ns,
channel.sample_rate,
response_dig_id, response_paz_id)
return channel
[docs]
def _read_instrument_sensitivity(sen_element, cha_element, _ns):
"""
reads the instrument sensitivity (gain) from the sensor and channel element
"""
gain = _tag2obj(cha_element, _ns("gain"), float)
frequency = _tag2obj(cha_element, _ns("gainFrequency"), float)
input_units_name = _tag2obj(sen_element, _ns("unit"), str)
output_units_name = str(None)
sensitivity = obspy.core.inventory.response.InstrumentSensitivity(
value=gain, frequency=frequency,
input_units=input_units_name,
output_units=output_units_name)
# assuming these are equal to frequencyStart/frequencyEnd
sensitivity.frequency_range_start = \
_tag2obj(sen_element, _ns("lowFrequency"), float)
sensitivity.frequency_range_end = \
_tag2obj(sen_element, _ns("highFrequency"), float)
return sensitivity
[docs]
def _read_response(instrumentation_register, sen_element, resp_element,
cha_element, data_log_element, _ns, samp_rate, fir,
analogue):
"""
reads response from scxml format
:param instrumentation_register: Dictionary of dictionaries of
instrumentation response metadata, top level keyed by response type,
and subdictionaries keyed by response ID.
:param _ns: namespace
"""
response = obspy.core.inventory.response.Response()
response.instrument_sensitivity = _read_instrument_sensitivity(
sen_element, cha_element, _ns)
if resp_element is None:
return response
# uncomment to include resource id for response (not shown in stationXML)
"""
response.resource_id = resp_element.attrib.get('publicID')
if response.resource_id is not None:
response.resource_id = str(response.resource_id)
"""
# The sampling rate is not given per fir filter as in stationXML
# We are only given a decimation factor per stage, therefore we are
# required to reconstruct the sampling rates at a given stage from
# this chain of factors
# start with the final sampling_rate after all stages are applied
# invert the fir stages to reverse engineer (backwards) the sample rate
# during any fir stage
samp_rate = float(samp_rate)
fir_stage_rates = []
if len(fir):
# Reverse the chain
fir = fir[::-1]
for fir_id in fir:
# get the particular stage decimation factor
# multiply the decimated sample rate by this factor
# These may be FIR, IIR or PAZ
fir_element = instrumentation_register['responses'].get(fir_id)
if fir_element is None:
continue
dec_fac = _tag2obj(fir_element, _ns("decimationFactor"), int)
if dec_fac is not None and int(dec_fac) != 0:
samp_rate *= dec_fac
fir_stage_rates.append(float(samp_rate))
# Return filter chain to original and also revert the rates
fir = fir[::-1]
fir_stage_rates = fir_stage_rates[::-1]
# Attempt to read stages in the proper order
# scxml does not group stages by an ID
# We are required to do stage counting ourselves
stage = 1
# Get the sensor units, default to M/S
sensor_units = _tag2obj(sen_element, _ns("unit"), str)
if sensor_units is None:
msg = "Sensor unit not set, assuming M/S"
warnings.warn(msg)
sensor_units = "M/S"
# Get the first PAZ stage
# Input unit: M/S or M/S**2
# Output unit: V
if resp_element is not None:
paz_response = _read_response_stage(resp_element, _ns, samp_rate,
stage, sensor_units, 'V')
if paz_response is not None:
response.response_stages.append(paz_response)
stage += 1
# Apply analogue filter stages (if any)
# Input unit: V
# Output unit: V
if len(analogue):
for analogue_id in analogue:
analogue_element = instrumentation_register['responses']\
.get(analogue_id)
if analogue_element is None:
msg = ('Analogue responsePAZ not in inventory:'
'%s, stopping before stage %i') % (analogue_id, stage)
warnings.warn(msg)
return response
analogue_response = _read_response_stage(analogue_element, _ns,
samp_rate, stage, 'V',
'V')
if analogue_response is not None:
response.response_stages.append(analogue_response)
stage += 1
# Apply datalogger (digitizer)
# Input unit: V
# Output unit: COUNTS
if data_log_element is not None:
coeff_response = _read_response_stage(data_log_element, _ns,
samp_rate, stage, 'V',
'COUNTS')
if coeff_response is not None:
response.response_stages.append(coeff_response)
stage += 1
# Apply final digital filter stages
# Input unit: COUNTS
# Output unit: COUNTS
for fir_id, rate in zip(fir, fir_stage_rates):
stage_element = instrumentation_register['responses'].get(fir_id)
if stage_element is None:
msg = ("fir response not in inventory: %s, stopping correction"
"before stage %i") % (fir_id, stage)
warnings.warn(msg)
return response
fir_response = _read_response_stage(stage_element, _ns, rate, stage,
'COUNTS', 'COUNTS')
if fir_response is not None:
response.response_stages.append(fir_response)
stage += 1
return response
[docs]
def _map_transfer_type(pz_transfer_function_type):
if pz_transfer_function_type == 'A':
return 'LAPLACE (RADIANS/SECOND)'
elif pz_transfer_function_type == 'B':
return 'LAPLACE (HERTZ)'
elif pz_transfer_function_type == 'D':
return 'DIGITAL (Z-TRANSFORM)'
else:
msg = ("Unknown transfer function code %s. Defaulting to Laplace"
"(rad)") % pz_transfer_function_type
warnings.warn(msg)
return 'LAPLACE (RADIANS/SECOND)'
[docs]
def _read_response_stage(stage, _ns, rate, stage_sequence_number, input_units,
output_units):
# Strip the namespace to get the element name (response type)
elem_type = stage.tag.split("}")[1]
# Get the stage gain and frequency: 0 and 0.00 per default
stage_gain = _tag2obj(stage, _ns("gain"), float) or 0
stage_gain_frequency = _tag2obj(stage, _ns("gainFrequency"),
float) or 0.00
# Get the stage name
name = stage.get("name")
if name is not None:
name = str(name)
# And the public resource identifier
resource_id = stage.get("publicID")
if resource_id is not None:
resource_id = str(resource_id)
# Set up decimation parameters
decimation = {
"factor": None,
"delay": None,
"correction": None,
"rate": None,
"offset": None
}
# Skip decimation for analogue outputs
# Since 0.10 ResponsePAZ can have a decimation attributes
if output_units != "V":
# Get element or default value
decimation['factor'] = _tag2obj(
stage, _ns("decimationFactor"), int) or 1
decimation['delay'] = _tag2obj(
stage, _ns("delay"), float) or 0
decimation["correction"] = _tag2obj(
stage, _ns("correction"), float) or 0
decimation['offset'] = _tag2obj(
stage, _ns("offset"), float) or 0
decimation['rate'] = _read_float_var(rate, Frequency)
# Decimation delay/correction need to be normalized
if rate != 0.0:
if decimation['delay'] is not None:
decimation['delay'] = \
_read_float_var(decimation['delay'] / rate,
FloatWithUncertaintiesAndUnit, unit=True)
if decimation['correction'] is not None:
decimation['correction'] = \
_read_float_var(decimation['correction'] / rate,
FloatWithUncertaintiesAndUnit, unit=True)
# Set up list of for this stage arguments
kwargs = {
"stage_sequence_number": stage_sequence_number,
"input_units": str(input_units),
"output_units": str(output_units),
"input_units_description": None,
"output_units_description": None,
"resource_id": None,
"resource_id2": resource_id,
"stage_gain": stage_gain,
"stage_gain_frequency": stage_gain_frequency,
"name": name,
"description": None,
"decimation_input_sample_rate": decimation['rate'],
"decimation_factor": decimation['factor'],
"decimation_offset": decimation['offset'],
"decimation_delay": decimation['delay'],
"decimation_correction": decimation['correction']
}
# Different processing for different types of responses
# currently supported: PAZ, COEFF, FIR
# Polynomial response is not supported, could not find example
if elem_type == 'responsePAZ':
# read normalization params
normalization_freq = _read_floattype(stage,
_ns("normalizationFrequency"),
Frequency)
normalization_factor = _tag2obj(stage, _ns("normalizationFactor"),
float)
# Parse the type of the transfer function
# A: Laplace (rad)
# B: Laplace (Hz)
# D: digital (z-transform)
pz_transfer_function_type = _tag2obj(stage, _ns("type"), str)
pz_transfer_function_type = \
_map_transfer_type(pz_transfer_function_type)
# Parse string of poles and zeros
# paz are stored as a string in scxml
# e.g. (-0.01234,0.01234) (-0.01234,-0.01234)
zeros_array = stage.find(_ns("zeros"))
poles_array = stage.find(_ns("poles"))
if zeros_array is not None and zeros_array.text is not None:
zeros_array = _parse_list_of_complex_string(zeros_array.text)
else:
zeros_array = []
if poles_array is not None and poles_array.text is not None:
poles_array = _parse_list_of_complex_string(poles_array.text)
else:
poles_array = []
# Keep counter for pole/zero number
cnt = 0
poles = []
zeros = []
for el in poles_array:
poles.append(_tag2pole_or_zero(el, cnt))
cnt += 1
for el in zeros_array:
zeros.append(_tag2pole_or_zero(el, cnt))
cnt += 1
# Return the paz response
return PolesZerosResponseStage(
pz_transfer_function_type=pz_transfer_function_type,
normalization_frequency=normalization_freq,
normalization_factor=normalization_factor, zeros=zeros,
poles=poles, **kwargs)
# For IIR filters reuse the PolesZerosResponseStage
elif elem_type == 'responseIIR':
pz_transfer_function_type = _tag2obj(stage, _ns("type"), str)
pz_transfer_function_type = _map_transfer_type(
pz_transfer_function_type)
numerators = stage.find(_ns("numerators")).text.split(" ")
denominators = stage.find(_ns("denominators")).text.split(" ")
numerators = list(map(lambda x: float(x), numerators))
denominators = list(map(lambda x: float(x), denominators))
# Convert linear filter to pole, zero, gain repr.
# See #2004 @andres-h
zeros, poles, gain = \
(np.round(ele, 6) for ele in tf2zpk(numerators, denominators))
msg = "ResponseIIR is not fully tested in ObsPy. Please be cautious"
warnings.warn(msg)
return PolesZerosResponseStage(
pz_transfer_function_type=pz_transfer_function_type,
normalization_frequency=0,
normalization_factor=1, zeros=zeros,
poles=poles, **kwargs)
# Datalogger element: V => Counts
# Set empty coefficients and hard code as digital
elif elem_type == "datalogger":
return CoefficientsTypeResponseStage(
cf_transfer_function_type="DIGITAL",
numerator=[], denominator=[], **kwargs)
elif elem_type == 'responsePolynomial':
# Polynomial response (UNTESTED)
# Currently not implemented in ObsPy (20-11-2015)
f_low = None
f_high = None
max_err = None
appr_type = _tag2obj(stage, _ns("approximationType"), str)
appr_low = _tag2obj(stage, _ns("approximationLowerBound"), float)
appr_high = _tag2obj(stage, _ns("approximationUpperBound"), float)
coeffs_str = _tag2obj(stage, _ns("coefficients"), str)
if coeffs_str is not None:
coeffs = coeffs_str.strip().split(" ")
coeffs_float = []
i = 0
# pass additional mapping of coefficient counter
# so that a proper stationXML can be formatted
for c in coeffs:
temp = _read_float_var(c, FilterCoefficient,
additional_mapping={str("number"): i})
coeffs_float.append(temp)
i += 1
return PolynomialResponseStage(
approximation_type=appr_type, frequency_lower_bound=f_low,
frequency_upper_bound=f_high, approximation_lower_bound=appr_low,
approximation_upper_bound=appr_high, maximum_error=max_err,
coefficients=coeffs, **kwargs)
elif elem_type == 'responseFIR':
# For the responseFIR obtain the symmetry and
# list of coefficients
coeffs_str = _tag2obj(stage, _ns("coefficients"), str)
coeffs_float = []
if coeffs_str is not None and coeffs_str != 'None':
coeffs = coeffs_str.strip().split(" ")
i = 0
# pass additional mapping of coefficient counter
# so that a proper stationXML can be formatted
for c in coeffs:
temp = _read_float_var(c, FilterCoefficient,
additional_mapping={str("number"): i})
coeffs_float.append(temp)
i += 1
# Write the FIR symmetry to what ObsPy expects
# A: NONE,
# B: ODD,
# C: EVEN
symmetry = _tag2obj(stage, _ns("symmetry"), str)
if symmetry == 'A':
symmetry = 'NONE'
elif symmetry == 'B':
symmetry = 'ODD'
elif symmetry == 'C':
symmetry = 'EVEN'
else:
raise ValueError('Unknown symmetry metric; expected A, B, or C')
return FIRResponseStage(
coefficients=coeffs_float, symmetry=symmetry, **kwargs)
elif elem_type == 'responseFAP':
data = _tag2obj(stage, _ns("tuples"), str)
data = np.array(data.split(), dtype=np.float64)
freq, amp, phase = data.reshape((-1, 3)).T
elements = []
for freq_, amp_, phase_ in zip(freq, amp, phase):
elements.append(ResponseListElement(freq_, amp_, phase_))
return ResponseListResponseStage(
response_list_elements=elements, **kwargs)
[docs]
def _tag2pole_or_zero(paz_element, count):
"""
Parses scxml paz format
Uncertainties on poles removed, not present in scxml.xsd?
Always put to None so no internal conflict
The sanitization removes the first/last parenthesis
and split by comma, real part is 1st, imaginary 2nd
:param paz_element: string of poles or zeros e.g. (12320, 23020)
"""
real, imag = map(float, paz_element)
if real is not None or imag is not None:
real = real or 0
imag = imag or 0
x = ComplexWithUncertainties(real, imag)
x.upper_uncertainty = None
x.upper_uncertainty = None
x.number = count
return x
[docs]
def _read_float_var(elem, cls, unit=False, datum=False, additional_mapping={}):
"""
function to read floattype to cls object (based on _read_floattype)
normally ObsPy would read this directly from a tag, but with different
tag names this is no longer possible; instead we just pass the value
and not the tag name. We always set the unit/datum/uncertainties to None
because they are not provided by scxml ?
:param elem: float value to be converted
:param cls: obspy.core.inventory class
"""
try:
convert = float(elem)
except Exception:
warnings.warn(
"Encountered a value '%s' which could not be converted to a "
"float. Will be skipped. Please contact to report this "
"issue." % elem,
UserWarning)
return None
if math.isnan(convert):
warnings.warn("'%s' has a value of NaN. It will be skipped." %
elem, UserWarning)
return None
obj = cls(convert)
if unit:
obj.unit = None
if datum:
obj.datum = None
obj.lower_uncertainty = None
obj.upper_uncertainty = None
for key1, key2 in additional_mapping.items():
setattr(obj, key1, key2)
return obj