# -*- coding: utf-8 -*-
"""
Testing utilities for ObsPy.
:copyright:
The ObsPy Development Team (devs@obspy.org)
:license:
GNU Lesser General Public License, Version 3
(https://www.gnu.org/copyleft/lesser.html)
"""
import difflib
import doctest
import inspect
import io
import os
import re
import numpy as np
from lxml import etree
MODULE_TEST_SKIP_CHECKS = {}
[docs]
def compare_xml_strings(doc1, doc2):
"""
Simple helper function to compare two XML byte strings.
:type doc1: bytes
:type doc2: bytes
"""
obj1 = etree.fromstring(doc1).getroottree()
obj2 = etree.fromstring(doc2).getroottree()
buf = io.BytesIO()
obj1.write_c14n(buf)
buf.seek(0, 0)
str1 = buf.read().decode()
str1 = [_i.strip() for _i in str1.splitlines()]
buf = io.BytesIO()
obj2.write_c14n(buf)
buf.seek(0, 0)
str2 = buf.read().decode()
str2 = [_i.strip() for _i in str2.splitlines()]
unified_diff = difflib.unified_diff(str1, str2)
err_msg = "\n".join(unified_diff)
if err_msg: # pragma: no cover
raise AssertionError("Strings are not equal.\n" + err_msg)
[docs]
def remove_unique_ids(xml_string, remove_creation_time=False):
"""
Removes unique ID parts of e.g. 'publicID="..."' attributes from xml
strings.
:type xml_string: str
:param xml_string: xml string to process
:type remove_creation_time: bool
:param xml_string: controls whether to remove 'creationTime' tags or not.
:rtype: str
"""
prefixes = ["id", "publicID", "pickID", "originID", "preferredOriginID",
"preferredMagnitudeID", "preferredFocalMechanismID",
"referenceSystemID", "methodID", "earthModelID",
"triggeringOriginID", "derivedOriginID", "momentMagnitudeID",
"greensFunctionID", "filterID", "amplitudeID",
"stationMagnitudeID", "earthModelID", "slownessMethodID",
"pickReference", "amplitudeReference"]
if remove_creation_time:
prefixes.append("creationTime")
for prefix in prefixes:
xml_string = re.sub("%s='.*?'" % prefix, '%s=""' % prefix, xml_string)
xml_string = re.sub('%s=".*?"' % prefix, '%s=""' % prefix, xml_string)
xml_string = re.sub("<%s>.*?</%s>" % (prefix, prefix),
'<%s/>' % prefix, xml_string)
return xml_string
[docs]
def get_all_py_files():
"""
Return a list with full absolute paths to all .py files in ObsPy file tree.
:rtype: list[str]
"""
util_dir = os.path.abspath(inspect.getfile(inspect.currentframe()))
obspy_dir = os.path.dirname(os.path.dirname(os.path.dirname(util_dir)))
py_files = set()
# Walk the obspy directory structure
for dirpath, _, filenames in os.walk(obspy_dir):
py_files.update([os.path.abspath(os.path.join(dirpath, i)) for i in
filenames if i.endswith(".py")])
return sorted(py_files)
[docs]
def create_diverse_catalog():
"""
Create a catalog with a single event that has many features.
Uses most the event related classes.
"""
# imports are here in order to avoid circular import issues
import obspy.core.event as ev
from obspy import UTCDateTime, Catalog
# local dict for storing state
state = dict(time=UTCDateTime('2016-05-04T12:00:01'))
def _create_event():
event = ev.Event(
event_type='mining explosion',
event_descriptions=[_get_event_description()],
picks=[_create_pick()],
origins=[_create_origins()],
station_magnitudes=[_get_station_mag()],
magnitudes=[_create_magnitudes()],
amplitudes=[_get_amplitudes()],
focal_mechanisms=[_get_focal_mechanisms()],
)
# set preferred origin, focal mechanism, magnitude
preferred_objects = dict(
origin=event.origins[-1].resource_id,
focal_mechanism=event.focal_mechanisms[-1].resource_id,
magnitude=event.magnitudes[-1].resource_id,
)
for item, value in preferred_objects.items():
setattr(event, 'preferred_' + item + '_id', value)
event.scope_resource_ids()
return event
def _create_pick():
# setup some of the classes
creation = ev.CreationInfo(
agency='SwanCo',
author='Indago',
creation_time=UTCDateTime(),
version='10.10',
author_url=ev.ResourceIdentifier('smi:local/me.com'),
)
pick = ev.Pick(
time=state['time'],
comments=[ev.Comment(x) for x in 'BOB'],
evaluation_mode='manual',
evaluation_status='final',
creation_info=creation,
phase_hint='P',
polarity='positive',
onset='emergent',
back_azimith_errors={"uncertainty": 10},
slowness_method_id=ev.ResourceIdentifier('smi:local/slow'),
backazimuth=122.1,
horizontal_slowness=12,
method_id=ev.ResourceIdentifier(),
horizontal_slowness_errors={'uncertainty': 12},
filter_id=ev.ResourceIdentifier(),
waveform_id=ev.WaveformStreamID('UU', 'FOO', '--', 'HHZ'),
)
state['pick_id'] = pick.resource_id
return pick
def _create_origins():
ori = ev.Origin(
resource_id=ev.ResourceIdentifier('smi:local/First'),
time=UTCDateTime('2016-05-04T12:00:00'),
time_errors={'uncertainty': .01},
longitude=-111.12525,
longitude_errors={'uncertainty': .020},
latitude=47.48589325,
latitude_errors={'uncertainty': .021},
depth=2.123,
depth_errors={'uncertainty': 1.22},
depth_type='from location',
time_fixed=False,
epicenter_fixed=False,
reference_system_id=ev.ResourceIdentifier(),
method_id=ev.ResourceIdentifier(),
earth_model_id=ev.ResourceIdentifier(),
arrivals=[_get_arrival()],
composite_times=[_get_composite_times()],
quality=_get_origin_quality(),
origin_type='hypocenter',
origin_uncertainty=_get_origin_uncertainty(),
region='US',
evaluation_mode='manual',
evaluation_status='final',
)
state['origin_id'] = ori.resource_id
return ori
def _get_arrival():
return ev.Arrival(
resource_id=ev.ResourceIdentifier('smi:local/Ar1'),
pick_id=state['pick_id'],
phase='P',
time_correction=.2,
azimuth=12,
distance=10,
takeoff_angle=15,
takeoff_angle_errors={'uncertainty': 10.2},
time_residual=.02,
horizontal_slowness_residual=12.2,
backazimuth_residual=12.2,
time_weight=.23,
horizontal_slowness_weight=12,
backazimuth_weight=12,
earth_model_id=ev.ResourceIdentifier(),
commens=[ev.Comment(x) for x in 'Nothing'],
)
def _get_composite_times():
return ev.CompositeTime(
year=2016,
year_errors={'uncertainty': 0},
month=5,
month_errors={'uncertainty': 0},
day=4,
day_errors={'uncertainty': 0},
hour=0,
hour_errors={'uncertainty': 0},
minute=0,
minute_errors={'uncertainty': 0},
second=0,
second_errors={'uncertainty': .01}
)
def _get_origin_quality():
return ev.OriginQuality(
associate_phase_count=1,
used_phase_count=1,
associated_station_count=1,
used_station_count=1,
depth_phase_count=1,
standard_error=.02,
azimuthal_gap=.12,
ground_truth_level='GT0',
)
def _get_origin_uncertainty():
return ev.OriginUncertainty(
horizontal_uncertainty=1.2,
min_horizontal_uncertainty=.12,
max_horizontal_uncertainty=2.2,
confidence_ellipsoid=_get_confidence_ellipsoid(),
preferred_description="uncertainty ellipse",
)
def _get_confidence_ellipsoid():
return ev.ConfidenceEllipsoid(
semi_major_axis_length=12,
semi_minor_axis_length=12,
major_axis_plunge=12,
major_axis_rotation=12,
)
def _create_magnitudes():
return ev.Magnitude(
resource_id=ev.ResourceIdentifier(),
mag=5.5,
mag_errors={'uncertainty': .01},
magnitude_type='Mw',
origin_id=state['origin_id'],
station_count=1,
station_magnitude_contributions=[_get_station_mag_contrib()],
)
def _get_station_mag():
station_mag = ev.StationMagnitude(
mag=2.24,
)
state['station_mag_id'] = station_mag.resource_id
return station_mag
def _get_station_mag_contrib():
return ev.StationMagnitudeContribution(
station_magnitude_id=state['station_mag_id'],
)
def _get_event_description():
return ev.EventDescription(
text='some text about the EQ',
type='earthquake name',
)
def _get_amplitudes():
return ev.Amplitude(
generic_amplitude=.0012,
type='A',
unit='m',
period=1,
time_window=_get_timewindow(),
pick_id=state['pick_id'],
scalling_time=state['time'],
mangitude_hint='ML',
scaling_time_errors=ev.QuantityError(uncertainty=42.0),
)
def _get_timewindow():
return ev.TimeWindow(
begin=1.2,
end=2.2,
reference=UTCDateTime('2016-05-04T12:00:00'),
)
def _get_focal_mechanisms():
return ev.FocalMechanism(
nodal_planes=_get_nodal_planes(),
principal_axis=_get_principal_axis(),
azimuthal_gap=12,
station_polarity_count=12,
misfit=.12,
station_distribution_ratio=.12,
moment_tensor=_get_moment_tensor(),
)
def _get_nodal_planes():
return ev.NodalPlanes(
nodal_plane_1=ev.NodalPlane(strike=12, dip=2, rake=12),
nodal_plane_2=ev.NodalPlane(strike=12, dip=2, rake=12),
preferred_plane=2,
)
def _get_principal_axis():
return ev.PrincipalAxes(
t_axis=15,
p_axis=15,
n_axis=15,
)
def _get_moment_tensor():
return ev.MomentTensor(
scalar_moment=12213,
tensor=_get_tensor(),
variance=12.23,
variance_reduction=98,
double_couple=.22,
clvd=.55,
iso=.33,
source_time_function=_get_source_time_function(),
data_used=[_get_data_used()],
method_id=ev.ResourceIdentifier(),
inversion_type='general',
)
def _get_tensor():
return ev.Tensor(
m_rr=12,
m_rr_errors={'uncertainty': .01},
m_tt=12,
m_pp=12,
m_rt=12,
m_rp=12,
m_tp=12,
)
def _get_source_time_function():
return ev.SourceTimeFunction(
type='triangle',
duration=.12,
rise_time=.33,
decay_time=.23,
)
def _get_data_used():
return ev.DataUsed(
wave_type='body waves',
station_count=12,
component_count=12,
shortest_period=1,
longest_period=20,
)
events = [_create_event()]
return Catalog(events=events)
[docs]
def streams_almost_equal(st1, st2, default_stats=True, rtol=1e-05, atol=1e-08,
equal_nan=True):
"""
Return True if two streams are almost equal.
:param st1: The first :class:`~obspy.core.stream.Stream` object.
:param st2: The second :class:`~obspy.core.stream.Stream` object.
:param default_stats:
If True only compare the default stats on the traces, such as seed
identification codes, start/end times, sampling_rates, etc. If
False also compare extra stats attributes such as processing and
format specific information.
:param rtol: The relative tolerance parameter passed to
:func:`~numpy.allclose` for comparing time series.
:param atol: The absolute tolerance parameter passed to
:func:`~numpy.allclose` for comparing time series.
:param equal_nan:
If ``True`` NaNs are evaluated equal when comparing the time
series.
:return: bool
.. rubric:: Example
1) Changes to the non-default parameters of the
:class:`~obspy.core.trace.Stats` objects of the stream's contained
:class:`~obspy.core.trace.Trace` objects will cause the streams to
be considered unequal, but they will be considered almost equal.
>>> from obspy import read
>>> st1 = read()
>>> st2 = read()
>>> # The traces should, of course, be equal.
>>> assert st1 == st2
>>> # Perform detrending on st1 twice so processing stats differ.
>>> st1 = st1.detrend('linear')
>>> st1 = st1.detrend('linear')
>>> st2 = st2.detrend('linear')
>>> # The traces are no longer equal, but are almost equal.
>>> assert st1 != st2
>>> assert streams_almost_equal(st1, st2)
2) Slight differences in each trace's data will cause the streams
to be considered unequal, but they will be almost equal if the
differences don't exceed the limits set by the ``rtol`` and
``atol`` parameters.
>>> from obspy import read
>>> st1 = read()
>>> st2 = read()
>>> # Perturb the trace data in st2 slightly.
>>> for tr in st2:
... tr.data *= (1 + 1e-6)
>>> # The streams are no longer equal.
>>> assert st1 != st2
>>> # But they are almost equal.
>>> assert streams_almost_equal(st1, st2)
>>> # Unless, of course, there is a large change.
>>> st1[0].data *= 10
>>> assert not streams_almost_equal(st1, st2)
"""
from obspy.core.stream import Stream
# Return False if both objects are not streams or not the same length.
are_streams = isinstance(st1, Stream) and isinstance(st2, Stream)
if not are_streams or not len(st1) == len(st2):
return False
# Kwargs to pass trace_almost_equal.
tr_kwargs = dict(default_stats=default_stats, rtol=rtol, atol=atol,
equal_nan=equal_nan)
# Ensure the streams are sorted (as done with the __equal__ method)
st1_sorted = st1.select()
st1_sorted.sort()
st2_sorted = st2.select()
st2_sorted.sort()
# Iterate over sorted trace pairs and determine if they are almost equal.
for tr1, tr2 in zip(st1_sorted, st2_sorted):
if not traces_almost_equal(tr1, tr2, **tr_kwargs):
return False # If any are not almost equal return None.
return True
[docs]
def traces_almost_equal(tr1, tr2, default_stats=True, rtol=1e-05, atol=1e-08,
equal_nan=True):
"""
Return True if the two traces are almost equal.
:param tr1: The first :class:`~obspy.core.trace.Trace` object.
:param tr2: The second :class:`~obspy.core.trace.Trace` object.
:param default_stats:
If True only compare the default stats on the traces, such as seed
identification codes, start/end times, sampling_rates, etc. If
False also compare extra stats attributes such as processing and
format specific information.
:param rtol: The relative tolerance parameter passed to
:func:`~numpy.allclose` for comparing time series.
:param atol: The absolute tolerance parameter passed to
:func:`~numpy.allclose` for comparing time series.
:param equal_nan:
If ``True`` NaNs are evaluated equal when comparing the time
series.
:return: bool
"""
from obspy.core.trace import Trace
# If other isnt a trace, or data is not the same len return False.
if not isinstance(tr2, Trace) or len(tr1.data) != len(tr2.data):
return False
# First compare the array values
try: # Use equal_nan if available
all_close = np.allclose(tr1.data, tr2.data, rtol=rtol,
atol=atol, equal_nan=equal_nan)
except TypeError:
# This happens on very old versions of numpy. Essentially
# we just need to handle NaN detection on our own, if equal_nan.
is_close = np.isclose(tr1.data, tr2.data, rtol=rtol, atol=atol)
if equal_nan:
isnan = np.isnan(tr1.data) & np.isnan(tr2.data)
else:
isnan = np.zeros(tr1.data.shape).astype(bool)
all_close = np.all(isnan | is_close)
# Then compare the stats objects
stats1 = _make_stats_dict(tr1, default_stats)
stats2 = _make_stats_dict(tr2, default_stats)
return all_close and stats1 == stats2
[docs]
def _make_stats_dict(tr, default_stats):
"""
Return a dict of stats from trace optionally including processing.
"""
from obspy.core.trace import Stats
if not default_stats:
return dict(tr.stats)
return {i: tr.stats[i] for i in Stats.defaults}
if __name__ == '__main__':
doctest.testmod(exclude_empty=True)