# -*- coding: utf-8 -*-
"""
Module for handling ObsPy :class:`~obspy.core.trace.Trace` and
:class:`~obspy.core.trace.Stats` objects.
:copyright:
The ObsPy Development Team (devs@obspy.org)
:license:
GNU Lesser General Public License, Version 3
(https://www.gnu.org/copyleft/lesser.html)
"""
import inspect
import math
import warnings
from copy import copy, deepcopy
import numpy as np
from decorator import decorator
from obspy.core import compatibility
from obspy.core.utcdatetime import UTCDateTime
from obspy.core.util import AttribDict, create_empty_data_chunk, NUMPY_VERSION
from obspy.core.util.base import _get_function_from_entry_point
from obspy.core.util.decorator import raise_if_masked, skip_if_no_data
from obspy.core.util.misc import (flat_not_masked_contiguous, get_window_times,
limit_numpy_fft_cache)
[docs]
class Stats(AttribDict):
"""
A container for additional header information of a ObsPy
:class:`~obspy.core.trace.Trace` object.
A ``Stats`` object may contain all header information (also known as meta
data) of a :class:`~obspy.core.trace.Trace` object. Those headers may be
accessed or modified either in the dictionary style or directly via a
corresponding attribute. There are various default attributes which are
required by every waveform import and export modules within ObsPy such as
:mod:`obspy.io.mseed`.
:type header: dict or :class:`~obspy.core.trace.Stats`, optional
:param header: Dictionary containing meta information of a single
:class:`~obspy.core.trace.Trace` object. Possible keywords are
summarized in the following `Default Attributes`_ section.
.. rubric:: Basic Usage
>>> stats = Stats()
>>> stats.network = 'BW'
>>> print(stats['network'])
BW
>>> stats['station'] = 'MANZ'
>>> print(stats.station)
MANZ
.. rubric:: _`Default Attributes`
``sampling_rate`` : float, optional
Sampling rate in hertz (default value is 1.0).
``delta`` : float, optional
Sample distance in seconds (default value is 1.0).
``calib`` : float, optional
Calibration factor (default value is 1.0).
``npts`` : int, optional
Number of sample points (default value is 0, which implies that no data
is present).
``network`` : string, optional
Network code (default is an empty string).
``location`` : string, optional
Location code (default is an empty string).
``station`` : string, optional
Station code (default is an empty string).
``channel`` : string, optional
Channel code (default is an empty string).
``starttime`` : :class:`~obspy.core.utcdatetime.UTCDateTime`, optional
Date and time of the first data sample given in UTC (default value is
"1970-01-01T00:00:00.0Z").
``endtime`` : :class:`~obspy.core.utcdatetime.UTCDateTime`, optional
Date and time of the last data sample given in UTC
(default value is "1970-01-01T00:00:00.0Z").
.. rubric:: Notes
(1) The attributes ``sampling_rate`` and ``delta`` are linked to each
other. If one of the attributes is modified the other will be
recalculated.
>>> stats = Stats()
>>> stats.sampling_rate
1.0
>>> stats.delta = 0.005
>>> stats.sampling_rate
200.0
(2) The attributes ``starttime``, ``npts``, ``sampling_rate`` and ``delta``
are monitored and used to automatically calculate the ``endtime``.
>>> stats = Stats()
>>> stats.npts = 60
>>> stats.delta = 1.0
>>> stats.starttime = UTCDateTime(2009, 1, 1, 12, 0, 0)
>>> stats.endtime
UTCDateTime(2009, 1, 1, 12, 0, 59)
>>> stats.delta = 0.5
>>> stats.endtime
UTCDateTime(2009, 1, 1, 12, 0, 29, 500000)
(3) The attribute ``endtime`` is read only and can not be modified.
>>> stats = Stats()
>>> stats.endtime = UTCDateTime(2009, 1, 1, 12, 0, 0)
Traceback (most recent call last):
...
AttributeError: Attribute "endtime" in Stats object is read only!
>>> stats['endtime'] = UTCDateTime(2009, 1, 1, 12, 0, 0)
Traceback (most recent call last):
...
AttributeError: Attribute "endtime" in Stats object is read only!
(4)
The attribute ``npts`` will be automatically updated from the
:class:`~obspy.core.trace.Trace` object.
>>> trace = Trace()
>>> trace.stats.npts
0
>>> trace.data = np.array([1, 2, 3, 4])
>>> trace.stats.npts
4
(5)
The attribute ``component`` can be used to get or set the component,
i.e. the last character of the ``channel`` attribute.
>>> stats = Stats()
>>> stats.channel = 'HHZ'
>>> stats.component # doctest: +SKIP
'Z'
>>> stats.component = 'L'
>>> stats.channel # doctest: +SKIP
'HHL'
"""
# set of read only attrs
readonly = ['endtime']
# default values
defaults = {
'sampling_rate': 1.0,
'delta': 1.0,
'starttime': UTCDateTime(0),
'endtime': UTCDateTime(0),
'npts': 0,
'calib': 1.0,
'network': '',
'station': '',
'location': '',
'channel': '',
}
# keys which need to refresh derived values
_refresh_keys = {'delta', 'sampling_rate', 'starttime', 'npts'}
# dict of required types for certain attrs
_types = {
'network': str,
'station': str,
'location': str,
'channel': str,
}
[docs]
def __init__(self, header={}):
"""
"""
super(Stats, self).__init__(header)
[docs]
def __setitem__(self, key, value):
"""
"""
if key in self._refresh_keys:
# ensure correct data type
if key == 'delta':
key = 'sampling_rate'
try:
value = 1.0 / float(value)
except ZeroDivisionError:
value = 0.0
elif key == 'sampling_rate':
value = float(value)
elif key == 'starttime':
value = UTCDateTime(value)
elif key == 'npts':
if not isinstance(value, int):
value = int(value)
# set current key
super(Stats, self).__setitem__(key, value)
# set derived value: delta
try:
delta = 1.0 / float(self.sampling_rate)
except ZeroDivisionError:
delta = 0
self.__dict__['delta'] = delta
# set derived value: endtime
if self.npts == 0:
timediff = 0
else:
timediff = float(self.npts - 1) * delta
self.__dict__['endtime'] = self.starttime + timediff
return
if key == 'component':
key = 'channel'
value = str(value)
if len(value) != 1:
msg = 'Component must be set with single character'
raise ValueError(msg)
value = self.channel[:-1] + value
# prevent a calibration factor of 0
if key == 'calib' and value == 0:
msg = 'Calibration factor set to 0.0!'
warnings.warn(msg, UserWarning)
# all other keys
if isinstance(value, dict):
super(Stats, self).__setitem__(key, AttribDict(value))
else:
super(Stats, self).__setitem__(key, value)
__setattr__ = __setitem__
[docs]
def __getitem__(self, key, default=None):
"""
"""
if key == 'component':
return super(Stats, self).__getitem__('channel', default)[-1:]
else:
return super(Stats, self).__getitem__(key, default)
[docs]
def __str__(self):
"""
Return better readable string representation of Stats object.
"""
priorized_keys = ['network', 'station', 'location', 'channel',
'starttime', 'endtime', 'sampling_rate', 'delta',
'npts', 'calib']
return self._pretty_str(priorized_keys)
[docs]
def _repr_pretty_(self, p, cycle):
p.text(str(self))
[docs]
def __getstate__(self):
state = self.__dict__.copy()
# Remove the unneeded entries
state.pop('delta', None)
state.pop('endtime', None)
return state
[docs]
def __setstate__(self, state):
self.__dict__.update(state)
# trigger refreshing
self.__setitem__('sampling_rate', state['sampling_rate'])
@decorator
def _add_processing_info(func, *args, **kwargs):
"""
This is a decorator that attaches information about a processing call as a
string to the Trace.stats.processing list.
"""
callargs = inspect.getcallargs(func, *args, **kwargs)
callargs.pop("self")
kwargs_ = callargs.pop("kwargs", {})
from obspy import __version__
info = "ObsPy {version}: {function}(%s)".format(
version=__version__,
function=func.__name__)
arguments = []
arguments += \
["%s=%s" % (k, repr(v)) if not isinstance(v, str) else
"%s='%s'" % (k, v) for k, v in callargs.items()]
arguments += \
["%s=%s" % (k, repr(v)) if not isinstance(v, str) else
"%s='%s'" % (k, v) for k, v in kwargs_.items()]
arguments.sort()
info = info % "::".join(arguments)
self = args[0]
result = func(*args, **kwargs)
# Attach after executing the function to avoid having it attached
# while the operation failed.
self._internal_add_processing_info(info)
return result
[docs]
class Trace(object):
"""
An object containing data of a continuous series, such as a seismic trace.
:type data: :class:`~numpy.ndarray` or :class:`~numpy.ma.MaskedArray`
:param data: Array of data samples
:type header: dict or :class:`~obspy.core.trace.Stats`
:param header: Dictionary containing header fields
:var id: A SEED compatible identifier of the trace.
:var stats: A container :class:`~obspy.core.trace.Stats` for additional
header information of the trace.
:var data: Data samples in a :class:`~numpy.ndarray` or
:class:`~numpy.ma.MaskedArray`
.. note::
The ``.data`` attribute containing the time series samples as a
:class:`numpy.ndarray` will always be made contiguous in memory. This
way it is always safe to use ``.data`` in routines that internally pass
the array to C code. On the other hand this might result in some
unwanted copying of data in memory. Experts can opt-out by setting
``Trace._always_contiguous = False``, in this case the user has to make
sure themselves that no C operations are performed on potentially
incontiguous data.
.. rubric:: Supported Operations
``trace = traceA + traceB``
Merges traceA and traceB into one new trace object.
See also: :meth:`Trace.__add__`.
``len(trace)``
Returns the number of samples contained in the trace. That is
it es equal to ``len(trace.data)``.
See also: :meth:`Trace.__len__`.
``str(trace)``
Returns basic information about the trace object.
See also: :meth:`Trace.__str__`.
"""
_always_contiguous = True
_max_processing_info = 100
[docs]
def __init__(self, data=np.array([]), header=None):
# make sure Trace gets initialized with suitable ndarray as self.data
# otherwise we could end up with e.g. a list object in self.data
_data_sanity_checks(data)
# set some defaults if not set yet
if header is None:
header = {}
header = deepcopy(header)
header.setdefault('npts', len(data))
self.stats = Stats(header)
# set data without changing npts in stats object (for headonly option)
super(Trace, self).__setattr__('data', data)
@property
def meta(self):
return self.stats
@meta.setter
def meta(self, value):
self.stats = value
[docs]
def __eq__(self, other):
"""
Implements rich comparison of Trace objects for "==" operator.
Traces are the same, if both their data and stats are the same.
"""
# check if other object is a Trace
if not isinstance(other, Trace):
return False
# comparison of Stats objects is supported by underlying AttribDict
if not self.stats == other.stats:
return False
# comparison of ndarrays is supported by NumPy
if not np.array_equal(self.data, other.data):
return False
return True
[docs]
def __ne__(self, other):
"""
Implements rich comparison of Trace objects for "!=" operator.
Calls __eq__() and returns the opposite.
"""
return not self.__eq__(other)
[docs]
def __lt__(self, other):
"""
Too ambiguous, throw an Error.
"""
raise NotImplementedError("Too ambiguous, therefore not implemented.")
[docs]
def __le__(self, other):
"""
Too ambiguous, throw an Error.
"""
raise NotImplementedError("Too ambiguous, therefore not implemented.")
[docs]
def __gt__(self, other):
"""
Too ambiguous, throw an Error.
"""
raise NotImplementedError("Too ambiguous, therefore not implemented.")
[docs]
def __ge__(self, other):
"""
Too ambiguous, throw an Error.
"""
raise NotImplementedError("Too ambiguous, therefore not implemented.")
[docs]
def __nonzero__(self):
"""
No data means no trace.
"""
return bool(len(self.data))
def __str__(self, id_length=None):
"""
Return short summary string of the current trace.
:rtype: str
:return: Short summary string of the current trace containing the SEED
identifier, start time, end time, sampling rate and number of
points of the current trace.
.. rubric:: Example
>>> tr = Trace(header={'station':'FUR', 'network':'GR'})
>>> str(tr) # doctest: +ELLIPSIS
'GR.FUR.. | 1970-01-01T00:00:00.000000Z - ... | 1.0 Hz, 0 samples'
"""
# set fixed id width
if id_length:
out = "%%-%ds" % (id_length)
trace_id = out % self.id
else:
trace_id = "%s" % self.id
out = ''
# output depending on delta or sampling rate bigger than one
if self.stats.sampling_rate < 0.1:
if hasattr(self.stats, 'preview') and self.stats.preview:
out = out + ' | '\
"%(starttime)s - %(endtime)s | " + \
"%(delta).1f s, %(npts)d samples [preview]"
else:
out = out + ' | '\
"%(starttime)s - %(endtime)s | " + \
"%(delta).1f s, %(npts)d samples"
else:
if hasattr(self.stats, 'preview') and self.stats.preview:
out = out + ' | '\
"%(starttime)s - %(endtime)s | " + \
"%(sampling_rate).1f Hz, %(npts)d samples [preview]"
else:
out = out + ' | '\
"%(starttime)s - %(endtime)s | " + \
"%(sampling_rate).1f Hz, %(npts)d samples"
# check for masked array
if np.ma.count_masked(self.data):
out += ' (masked)'
return trace_id + out % (self.stats)
[docs]
def _repr_pretty_(self, p, cycle):
p.text(str(self))
[docs]
def __len__(self):
"""
Return number of data samples of the current trace.
:rtype: int
:return: Number of data samples.
.. rubric:: Example
>>> trace = Trace(data=np.array([1, 2, 3, 4]))
>>> trace.count()
4
>>> len(trace)
4
"""
return len(self.data)
count = __len__
[docs]
def __setattr__(self, key, value):
"""
__setattr__ method of Trace object.
"""
# any change in Trace.data will dynamically set Trace.stats.npts
if key == 'data':
_data_sanity_checks(value)
if self._always_contiguous:
value = np.require(value, requirements=['C_CONTIGUOUS'])
self.stats.npts = len(value)
return super(Trace, self).__setattr__(key, value)
[docs]
def __getitem__(self, index):
"""
__getitem__ method of Trace object.
:rtype: list
:return: List of data points
"""
return self.data[index]
[docs]
def __mul__(self, num):
"""
Create a new Stream containing num copies of this trace.
:type num: int
:param num: Number of copies.
:returns: New ObsPy Stream object.
.. rubric:: Example
>>> from obspy import read
>>> tr = read()[0]
>>> st = tr * 5
>>> len(st)
5
"""
if not isinstance(num, int):
raise TypeError("Integer expected")
from obspy import Stream
st = Stream()
for _i in range(num):
st += self.copy()
return st
[docs]
def __truediv__(self, num):
"""
Split Trace into new Stream containing num Traces of the same size.
:type num: int
:param num: Number of traces in returned Stream. Last trace may contain
lesser samples.
:returns: New ObsPy Stream object.
.. rubric:: Example
>>> from obspy import read
>>> tr = read()[0]
>>> print(tr) # doctest: +ELLIPSIS
BW.RJOB..EHZ | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
>>> st = tr / 7
>>> print(st) # doctest: +ELLIPSIS
7 Trace(s) in Stream:
BW.RJOB..EHZ | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 429 samples
BW.RJOB..EHZ | 2009-08-24T00:20:07.290000Z ... | 100.0 Hz, 429 samples
BW.RJOB..EHZ | 2009-08-24T00:20:11.580000Z ... | 100.0 Hz, 429 samples
BW.RJOB..EHZ | 2009-08-24T00:20:15.870000Z ... | 100.0 Hz, 429 samples
BW.RJOB..EHZ | 2009-08-24T00:20:20.160000Z ... | 100.0 Hz, 429 samples
BW.RJOB..EHZ | 2009-08-24T00:20:24.450000Z ... | 100.0 Hz, 429 samples
BW.RJOB..EHZ | 2009-08-24T00:20:28.740000Z ... | 100.0 Hz, 426 samples
"""
if not isinstance(num, int):
raise TypeError("Integer expected")
from obspy import Stream
total_length = np.size(self.data)
rest_length = total_length % num
if rest_length:
packet_length = (total_length // num)
else:
packet_length = (total_length // num) - 1
tstart = self.stats.starttime
tend = tstart + (self.stats.delta * packet_length)
st = Stream()
for _i in range(num):
st.append(self.slice(tstart, tend).copy())
tstart = tend + self.stats.delta
tend = tstart + (self.stats.delta * packet_length)
return st
[docs]
def __mod__(self, num):
"""
Split Trace into new Stream containing Traces with num samples.
:type num: int
:param num: Number of samples in each trace in returned Stream. Last
trace may contain lesser samples.
:returns: New ObsPy Stream object.
.. rubric:: Example
>>> from obspy import read
>>> tr = read()[0]
>>> print(tr) # doctest: +ELLIPSIS
BW.RJOB..EHZ | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
>>> st = tr % 800
>>> print(st) # doctest: +ELLIPSIS
4 Trace(s) in Stream:
BW.RJOB..EHZ | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 800 samples
BW.RJOB..EHZ | 2009-08-24T00:20:11.000000Z ... | 100.0 Hz, 800 samples
BW.RJOB..EHZ | 2009-08-24T00:20:19.000000Z ... | 100.0 Hz, 800 samples
BW.RJOB..EHZ | 2009-08-24T00:20:27.000000Z ... | 100.0 Hz, 600 samples
"""
if not isinstance(num, int):
raise TypeError("Integer expected")
elif num <= 0:
raise ValueError("Positive Integer expected")
from obspy import Stream
st = Stream()
total_length = np.size(self.data)
if num >= total_length:
st.append(self.copy())
return st
tstart = self.stats.starttime
tend = tstart + (self.stats.delta * (num - 1))
while True:
st.append(self.slice(tstart, tend).copy())
tstart = tend + self.stats.delta
tend = tstart + (self.stats.delta * (num - 1))
if tstart > self.stats.endtime:
break
return st
[docs]
def __add__(self, trace, method=0, interpolation_samples=0,
fill_value=None, sanity_checks=True):
"""
Add another Trace object to current trace.
:type method: int, optional
:param method: Method to handle overlaps of traces. Defaults to ``0``.
See the `Handling Overlaps`_ section below for further details.
:type fill_value: int, float, str or ``None``, optional
:param fill_value: Fill value for gaps. Defaults to ``None``. Traces
will be converted to NumPy masked arrays if no value is given and
gaps are present. If the keyword ``'latest'`` is provided it will
use the latest value before the gap. If keyword ``'interpolate'``
is provided, missing values are linearly interpolated (not
changing the data type e.g. of integer valued traces).
See the `Handling Gaps`_ section below for further details.
:type interpolation_samples: int, optional
:param interpolation_samples: Used only for ``method=1``. It specifies
the number of samples which are used to interpolate between
overlapping traces. Defaults to ``0``. If set to ``-1`` all
overlapping samples are interpolated.
:type sanity_checks: bool, optional
:param sanity_checks: Enables some sanity checks before merging traces.
Defaults to ``True``.
Trace data will be converted into a NumPy masked array data type if
any gaps are present. This behavior may be prevented by setting the
``fill_value`` parameter. The ``method`` argument controls the
handling of overlapping data values.
Sampling rate, data type and trace.id of both traces must match.
.. rubric:: _`Handling Overlaps`
====== ===============================================================
Method Description
====== ===============================================================
0 Discard overlapping data. Overlaps are essentially treated the
same way as gaps::
Trace 1: AAAAAAAA
Trace 2: FFFFFFFF
1 + 2 : AAAA----FFFF
Contained traces with differing data will be marked as gap::
Trace 1: AAAAAAAAAAAA
Trace 2: FF
1 + 2 : AAAA--AAAAAA
Missing data can be merged in from a different trace::
Trace 1: AAAA--AAAAAA (contained trace, missing samples)
Trace 2: FF
1 + 2 : AAAAFFAAAAAA
1 Discard data of the previous trace assuming the following trace
contains data with a more correct time value. The parameter
``interpolation_samples`` specifies the number of samples used
to linearly interpolate between the two traces in order to
prevent steps. Note that if there are gaps inside, the
returned array is still a masked array, only if ``fill_value``
is set, the returned array is a normal array and gaps are
filled with fill value.
No interpolation (``interpolation_samples=0``)::
Trace 1: AAAAAAAA
Trace 2: FFFFFFFF
1 + 2 : AAAAFFFFFFFF
Interpolate first two samples (``interpolation_samples=2``)::
Trace 1: AAAAAAAA
Trace 2: FFFFFFFF
1 + 2 : AAAACDFFFFFF (interpolation_samples=2)
Interpolate all samples (``interpolation_samples=-1``)::
Trace 1: AAAAAAAA
Trace 2: FFFFFFFF
1 + 2 : AAAABCDEFFFF
Any contained traces with different data will be discarded::
Trace 1: AAAAAAAAAAAA (contained trace)
Trace 2: FF
1 + 2 : AAAAAAAAAAAA
Missing data can be merged in from a different trace::
Trace 1: AAAA--AAAAAA (contained trace, missing samples)
Trace 2: FF
1 + 2 : AAAAFFAAAAAA
====== ===============================================================
.. rubric:: _`Handling gaps`
1. Traces with gaps and ``fill_value=None`` (default)::
Trace 1: AAAA
Trace 2: FFFF
1 + 2 : AAAA----FFFF
2. Traces with gaps and given ``fill_value=0``::
Trace 1: AAAA
Trace 2: FFFF
1 + 2 : AAAA0000FFFF
3. Traces with gaps and given ``fill_value='latest'``::
Trace 1: ABCD
Trace 2: FFFF
1 + 2 : ABCDDDDDFFFF
4. Traces with gaps and given ``fill_value='interpolate'``::
Trace 1: AAAA
Trace 2: FFFF
1 + 2 : AAAABCDEFFFF
"""
if sanity_checks:
if not isinstance(trace, Trace):
raise TypeError
# check id
if self.get_id() != trace.get_id():
raise TypeError("Trace ID differs: %s vs %s" %
(self.get_id(), trace.get_id()))
# check sample rate
if self.stats.sampling_rate != trace.stats.sampling_rate:
raise TypeError("Sampling rate differs: %s vs %s" %
(self.stats.sampling_rate,
trace.stats.sampling_rate))
# check calibration factor
if self.stats.calib != trace.stats.calib:
raise TypeError("Calibration factor differs: %s vs %s" %
(self.stats.calib, trace.stats.calib))
# check data type
if self.data.dtype != trace.data.dtype:
raise TypeError("Data type differs: %s vs %s" %
(self.data.dtype, trace.data.dtype))
# check times
if self.stats.starttime <= trace.stats.starttime:
lt = self
rt = trace
else:
rt = self
lt = trace
# check whether to use the latest value to fill a gap
if fill_value == "latest":
fill_value = lt.data[-1]
elif fill_value == "interpolate":
fill_value = (lt.data[-1], rt.data[0])
sr = self.stats.sampling_rate
delta = (rt.stats.starttime - lt.stats.endtime) * sr
delta = int(compatibility.round_away(delta)) - 1
delta_endtime = lt.stats.endtime - rt.stats.endtime
# create the returned trace
out = self.__class__(header=deepcopy(lt.stats))
# check if overlap or gap
if delta < 0 and delta_endtime < 0:
# overlap
delta = abs(delta)
if np.all(np.equal(lt.data[-delta:], rt.data[:delta])):
# check if data are the same
data = [lt.data[:-delta], rt.data]
elif method == 0:
overlap = create_empty_data_chunk(delta, lt.data.dtype,
fill_value)
data = [lt.data[:-delta], overlap, rt.data[delta:]]
elif method == 1 and interpolation_samples >= -1:
try:
ls = lt.data[-delta - 1]
except Exception:
ls = lt.data[0]
if interpolation_samples == -1:
interpolation_samples = delta
elif interpolation_samples > delta:
interpolation_samples = delta
try:
rs = rt.data[interpolation_samples]
except IndexError:
# contained trace
data = [lt.data]
else:
# include left and right sample (delta + 2)
interpolation = np.linspace(ls, rs,
interpolation_samples + 2)
# cut ls and rs and ensure correct data type
interpolation = np.require(interpolation[1:-1],
lt.data.dtype)
data = [lt.data[:-delta], interpolation,
rt.data[interpolation_samples:]]
else:
raise NotImplementedError
elif delta < 0 and delta_endtime >= 0:
# contained trace
delta = abs(delta)
lenrt = len(rt)
t1 = len(lt) - delta
t2 = t1 + lenrt
# check if data are the same
data_equal = (lt.data[t1:t2] == rt.data)
# force a masked array and fill it for check of equality of valid
# data points
if np.all(np.ma.masked_array(data_equal).filled()):
# if all (unmasked) data are equal,
if isinstance(data_equal, np.ma.masked_array):
x = np.ma.masked_array(lt.data[t1:t2])
y = np.ma.masked_array(rt.data)
data_same = np.choose(x.mask, [x, y])
data = np.choose(x.mask & y.mask, [data_same, np.nan])
if np.any(np.isnan(data)):
data = np.ma.masked_invalid(data)
# convert back to maximum dtype of original data
dtype = np.max((x.dtype, y.dtype))
data = data.astype(dtype)
data = [lt.data[:t1], data, lt.data[t2:]]
else:
data = [lt.data]
elif method == 0:
gap = create_empty_data_chunk(lenrt, lt.data.dtype, fill_value)
data = [lt.data[:t1], gap, lt.data[t2:]]
elif method == 1:
data = [lt.data]
else:
raise NotImplementedError
elif delta == 0:
# exact fit - merge both traces
data = [lt.data, rt.data]
else:
# gap
# use fixed value or interpolate in between
gap = create_empty_data_chunk(delta, lt.data.dtype, fill_value)
data = [lt.data, gap, rt.data]
# merge traces depending on NumPy array type
if True in [isinstance(_i, np.ma.masked_array) for _i in data]:
data = np.ma.concatenate(data)
else:
data = np.concatenate(data)
data = np.require(data, dtype=lt.data.dtype)
# Check if we can downgrade to normal ndarray
if isinstance(data, np.ma.masked_array) and \
np.ma.count_masked(data) == 0:
data = data.compressed()
out.data = data
return out
[docs]
def get_id(self):
"""
Return a SEED compatible identifier of the trace.
:rtype: str
:return: SEED identifier
The SEED identifier contains the network, station, location and channel
code for the current Trace object.
.. rubric:: Example
>>> meta = {'station': 'MANZ', 'network': 'BW', 'channel': 'EHZ'}
>>> tr = Trace(header=meta)
>>> print(tr.get_id())
BW.MANZ..EHZ
>>> print(tr.id)
BW.MANZ..EHZ
"""
out = "%(network)s.%(station)s.%(location)s.%(channel)s"
return out % (self.stats)
id = property(get_id)
@id.setter
def id(self, value):
"""
Set network, station, location and channel codes from a SEED ID.
Raises an Exception if the provided ID does not contain exactly three
dots (or is not of type `str`).
>>> from obspy import read
>>> tr = read()[0]
>>> print(tr) # doctest: +ELLIPSIS
BW.RJOB..EHZ | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
>>> tr.id = "GR.FUR..HHZ"
>>> print(tr) # doctest: +ELLIPSIS
GR.FUR..HHZ | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
:type value: str
:param value: SEED ID to use for setting `self.stats.network`,
`self.stats.station`, `self.stats.location` and
`self.stats.channel`.
"""
try:
net, sta, loc, cha = value.split(".")
except AttributeError:
msg = ("Can only set a Trace's SEED ID from a string "
"(and not from {})").format(type(value))
raise TypeError(msg)
except ValueError:
msg = ("Not a valid SEED ID: '{}'").format(value)
raise ValueError(msg)
self.stats.network = net
self.stats.station = sta
self.stats.location = loc
self.stats.channel = cha
[docs]
def plot(self, **kwargs):
"""
Create a simple graph of the current trace.
Various options are available to change the appearance of the waveform
plot. Please see :meth:`~obspy.core.stream.Stream.plot` method for all
possible options.
.. rubric:: Example
>>> from obspy import read
>>> st = read()
>>> tr = st[0]
>>> tr.plot() # doctest: +SKIP
.. plot::
from obspy import read
st = read()
tr = st[0]
tr.plot()
"""
from obspy.imaging.waveform import WaveformPlotting
waveform = WaveformPlotting(stream=self, **kwargs)
return waveform.plot_waveform(**kwargs)
[docs]
def spectrogram(self, **kwargs):
"""
Create a spectrogram plot of the trace.
For details on kwargs that can be used to customize the spectrogram
plot see :func:`~obspy.imaging.spectrogram.spectrogram`.
.. rubric:: Example
>>> from obspy import read
>>> st = read()
>>> tr = st[0]
>>> tr.spectrogram() # doctest: +SKIP
.. plot::
from obspy import read
st = read()
tr = st[0]
tr.spectrogram()
"""
# set some default values
if 'samp_rate' not in kwargs:
kwargs['samp_rate'] = self.stats.sampling_rate
if 'title' not in kwargs:
kwargs['title'] = str(self)
from obspy.imaging.spectrogram import spectrogram
return spectrogram(data=self.data, **kwargs)
[docs]
def write(self, filename, format=None, **kwargs):
"""
Save current trace into a file.
:type filename: str
:param filename: The name of the file to write.
:type format: str, optional
:param format: The format of the file to write. See
:meth:`obspy.core.stream.Stream.write` method for possible
formats. If format is set to ``None`` it will be deduced
from file extension, whenever possible.
:param kwargs: Additional keyword arguments passed to the underlying
waveform writer method.
.. rubric:: Example
>>> tr = Trace()
>>> tr.write("out.mseed", format="MSEED") # doctest: +SKIP
The ``format`` argument can be omitted, and the file format will be
deduced from file extension, whenever possible.
>>> tr.write("out.mseed") # doctest: +SKIP
"""
# we need to import here in order to prevent a circular import of
# Stream and Trace classes
from obspy import Stream
Stream([self]).write(filename, format, **kwargs)
[docs]
def _ltrim(self, starttime, pad=False, nearest_sample=True,
fill_value=None):
"""
Cut current trace to given start time. For more info see
:meth:`~obspy.core.trace.Trace.trim`.
.. rubric:: Example
>>> tr = Trace(data=np.arange(0, 10))
>>> tr.stats.delta = 1.0
>>> tr._ltrim(tr.stats.starttime + 8) # doctest: +ELLIPSIS
<...Trace object at 0x...>
>>> tr.data
array([8, 9])
>>> tr.stats.starttime
UTCDateTime(1970, 1, 1, 0, 0, 8)
"""
org_dtype = self.data.dtype
if isinstance(starttime, float) or isinstance(starttime, int):
starttime = UTCDateTime(self.stats.starttime) + starttime
elif not isinstance(starttime, UTCDateTime):
raise TypeError
# check if in boundary
if nearest_sample:
delta = compatibility.round_away(
(starttime - self.stats.starttime) * self.stats.sampling_rate)
# due to rounding and npts starttime must always be right of
# self.stats.starttime, rtrim relies on it
if delta < 0 and pad:
npts = abs(delta) + 10 # use this as a start
newstarttime = self.stats.starttime - npts / \
float(self.stats.sampling_rate)
newdelta = compatibility.round_away(
(starttime - newstarttime) * self.stats.sampling_rate)
delta = newdelta - npts
delta = int(delta)
else:
delta = int(math.floor(round((self.stats.starttime - starttime) *
self.stats.sampling_rate, 7))) * -1
# Adjust starttime only if delta is greater than zero or if the values
# are padded with masked arrays.
if delta > 0 or pad:
self.stats.starttime += delta * self.stats.delta
if delta == 0 or (delta < 0 and not pad):
return self
elif delta < 0 and pad:
try:
gap = create_empty_data_chunk(abs(delta), self.data.dtype,
fill_value)
except ValueError:
# create_empty_data_chunk returns negative ValueError ?? for
# too large number of points, e.g. 189336539799
raise Exception("Time offset between starttime and "
"trace.starttime too large")
self.data = np.ma.concatenate((gap, self.data))
return self
elif starttime > self.stats.endtime:
self.data = np.empty(0, dtype=org_dtype)
return self
elif delta > 0:
try:
self.data = self.data[delta:]
except IndexError:
# a huge numbers for delta raises an IndexError
# here we just create empty array with same dtype
self.data = np.empty(0, dtype=org_dtype)
return self
[docs]
def _rtrim(self, endtime, pad=False, nearest_sample=True, fill_value=None):
"""
Cut current trace to given end time. For more info see
:meth:`~obspy.core.trace.Trace.trim`.
.. rubric:: Example
>>> tr = Trace(data=np.arange(0, 10))
>>> tr.stats.delta = 1.0
>>> tr._rtrim(tr.stats.starttime + 2) # doctest: +ELLIPSIS
<...Trace object at 0x...>
>>> tr.data
array([0, 1, 2])
>>> tr.stats.endtime
UTCDateTime(1970, 1, 1, 0, 0, 2)
"""
org_dtype = self.data.dtype
if isinstance(endtime, float) or isinstance(endtime, int):
endtime = UTCDateTime(self.stats.endtime) - endtime
elif not isinstance(endtime, UTCDateTime):
raise TypeError
# check if in boundary
if nearest_sample:
delta = compatibility.round_away(
(endtime - self.stats.starttime) *
self.stats.sampling_rate) - self.stats.npts + 1
delta = int(delta)
else:
# solution for #127, however some tests need to be changed
# delta = -1*int(math.floor(compatibility.round_away(
# (self.stats.endtime - endtime) * \
# self.stats.sampling_rate, 7)))
delta = int(math.floor(round((endtime - self.stats.endtime) *
self.stats.sampling_rate, 7)))
if delta == 0 or (delta > 0 and not pad):
return self
if delta > 0 and pad:
try:
gap = create_empty_data_chunk(delta, self.data.dtype,
fill_value)
except ValueError:
# create_empty_data_chunk returns negative ValueError ?? for
# too large number of points, e.g. 189336539799
raise Exception("Time offset between starttime and " +
"trace.starttime too large")
self.data = np.ma.concatenate((self.data, gap))
return self
elif endtime < self.stats.starttime:
self.stats.starttime = self.stats.endtime + \
delta * self.stats.delta
self.data = np.empty(0, dtype=org_dtype)
return self
# cut from right
delta = abs(delta)
total = len(self.data) - delta
if endtime == self.stats.starttime:
total = 1
self.data = self.data[:total]
return self
[docs]
@_add_processing_info
def trim(self, starttime=None, endtime=None, pad=False,
nearest_sample=True, fill_value=None):
"""
Cut current trace to given start and end time.
:type starttime: :class:`~obspy.core.utcdatetime.UTCDateTime`, optional
:param starttime: Specify the start time.
:type endtime: :class:`~obspy.core.utcdatetime.UTCDateTime`, optional
:param endtime: Specify the end time.
:type pad: bool, optional
:param pad: Gives the possibility to trim at time points outside the
time frame of the original trace, filling the trace with the
given ``fill_value``. Defaults to ``False``.
:type nearest_sample: bool, optional
:param nearest_sample: If set to ``True``, the closest sample is
selected, if set to ``False``, the inner (next sample for a
start time border, previous sample for an end time border) sample
containing the time is selected. Defaults to ``True``.
Given the following trace containing 6 samples, "|" are the
sample points, "A" is the requested starttime::
| |A | | B | |
1 2 3 4 5 6
``nearest_sample=True`` will select samples 2-5,
``nearest_sample=False`` will select samples 3-4 only.
:type fill_value: int, float or ``None``, optional
:param fill_value: Fill value for gaps. Defaults to ``None``. Traces
will be converted to NumPy masked arrays if no value is given and
gaps are present.
.. note::
This operation is performed in place on the actual data arrays. The
raw data is not accessible anymore afterwards. To keep your
original data, use :meth:`~obspy.core.trace.Trace.copy` to create
a copy of your trace object.
.. rubric:: Example
>>> tr = Trace(data=np.arange(0, 10))
>>> tr.stats.delta = 1.0
>>> t = tr.stats.starttime
>>> tr.trim(t + 2.000001, t + 7.999999) # doctest: +ELLIPSIS
<...Trace object at 0x...>
>>> tr.data
array([2, 3, 4, 5, 6, 7, 8])
"""
# check time order and swap eventually
if (isinstance(starttime, UTCDateTime) and
isinstance(endtime, UTCDateTime) and starttime > endtime):
raise ValueError("startime is larger than endtime")
# cut it
if starttime:
self._ltrim(starttime, pad, nearest_sample=nearest_sample,
fill_value=fill_value)
if endtime:
self._rtrim(endtime, pad, nearest_sample=nearest_sample,
fill_value=fill_value)
# if pad=True and fill_value is given convert to NumPy ndarray
if pad is True and fill_value is not None:
try:
self.data = self.data.filled()
except AttributeError:
# numpy.ndarray object has no attribute 'filled' - ignoring
pass
return self
[docs]
def slice(self, starttime=None, endtime=None, nearest_sample=True):
"""
Return a new Trace object with data going from start to end time.
:type starttime: :class:`~obspy.core.utcdatetime.UTCDateTime`
:param starttime: Specify the start time of slice.
:type endtime: :class:`~obspy.core.utcdatetime.UTCDateTime`
:param endtime: Specify the end time of slice.
:type nearest_sample: bool, optional
:param nearest_sample: If set to ``True``, the closest sample is
selected, if set to ``False``, the inner (next sample for a
start time border, previous sample for an end time border) sample
containing the time is selected. Defaults to ``True``.
Given the following trace containing 6 samples, "|" are the
sample points, "A" is the requested starttime::
| |A | | B | |
1 2 3 4 5 6
``nearest_sample=True`` will select samples 2-5,
``nearest_sample=False`` will select samples 3-4 only.
:return: New :class:`~obspy.core.trace.Trace` object. Does not copy
data but just passes a reference to it.
.. rubric:: Example
>>> tr = Trace(data=np.arange(0, 10))
>>> tr.stats.delta = 1.0
>>> t = tr.stats.starttime
>>> tr2 = tr.slice(t + 2, t + 8)
>>> tr2.data
array([2, 3, 4, 5, 6, 7, 8])
"""
tr = copy(self)
tr.stats = deepcopy(self.stats)
tr.trim(starttime=starttime, endtime=endtime,
nearest_sample=nearest_sample)
return tr
[docs]
def slide(self, window_length, step, offset=0,
include_partial_windows=False, nearest_sample=True):
"""
Generator yielding equal length sliding windows of the Trace.
Please keep in mind that it only returns a new view of the original
data. Any modifications are applied to the original data as well. If
you don't want this you have to create a copy of the yielded
windows. Also be aware that if you modify the original data and you
have overlapping windows, all following windows are affected as well.
.. rubric:: Example
>>> import obspy
>>> tr = obspy.read()[0]
>>> for windowed_tr in tr.slide(window_length=10.0, step=10.0):
... print("---") # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
... print(windowed_tr)
---
... | 2009-08-24T00:20:03.000000Z - 2009-08-24T00:20:13.000000Z | ...
---
... | 2009-08-24T00:20:13.000000Z - 2009-08-24T00:20:23.000000Z | ...
:param window_length: The length of each window in seconds.
:type window_length: float
:param step: The step between the start times of two successive
windows in seconds. Can be negative if an offset is given.
:type step: float
:param offset: The offset of the first window in seconds relative to
the start time of the whole interval.
:type offset: float
:param include_partial_windows: Determines if windows that are
shorter then 99.9 % of the desired length are returned.
:type include_partial_windows: bool
:param nearest_sample: If set to ``True``, the closest sample is
selected, if set to ``False``, the inner (next sample for a
start time border, previous sample for an end time border) sample
containing the time is selected. Defaults to ``True``.
Given the following trace containing 6 samples, "|" are the
sample points, "A" is the requested starttime::
| |A | | B | |
1 2 3 4 5 6
``nearest_sample=True`` will select samples 2-5,
``nearest_sample=False`` will select samples 3-4 only.
:type nearest_sample: bool, optional
"""
windows = get_window_times(
starttime=self.stats.starttime,
endtime=self.stats.endtime,
window_length=window_length,
step=step,
offset=offset,
include_partial_windows=include_partial_windows)
if len(windows) < 1:
return
for start, stop in windows:
yield self.slice(start, stop,
nearest_sample=nearest_sample)
[docs]
def verify(self):
"""
Verify current trace object against available meta data.
.. rubric:: Example
>>> tr = Trace(data=np.array([1,2,3,4]))
>>> tr.stats.npts = 100
>>> tr.verify() #doctest: +ELLIPSIS
Traceback (most recent call last):
...
Exception: ntps(100) differs from data size(4)
"""
if len(self) != self.stats.npts:
msg = "ntps(%d) differs from data size(%d)"
raise Exception(msg % (self.stats.npts, len(self.data)))
delta = self.stats.endtime - self.stats.starttime
if delta < 0:
msg = "End time(%s) before start time(%s)"
raise Exception(msg % (self.stats.endtime, self.stats.starttime))
sr = self.stats.sampling_rate
if self.stats.starttime != self.stats.endtime:
if int(compatibility.round_away(delta * sr)) + 1 != len(self.data):
msg = "Sample rate(%f) * time delta(%.4lf) + 1 != data len(%d)"
raise Exception(msg % (sr, delta, len(self.data)))
# Check if the endtime fits the starttime, npts and sampling_rate.
if self.stats.endtime != self.stats.starttime + \
(self.stats.npts - 1) / float(self.stats.sampling_rate):
msg = "End time is not the time of the last sample."
raise Exception(msg)
elif self.stats.npts not in [0, 1]:
msg = "Data size should be 0, but is %d"
raise Exception(msg % self.stats.npts)
if not isinstance(self.stats, Stats):
msg = "Attribute stats must be an instance of obspy.core.Stats"
raise Exception(msg)
if isinstance(self.data, np.ndarray) and \
self.data.dtype.byteorder not in ["=", "|"]:
msg = "Trace data should be stored as numpy.ndarray in the " + \
"system specific byte order."
raise Exception(msg)
return self
[docs]
@_add_processing_info
def simulate(self, paz_remove=None, paz_simulate=None,
remove_sensitivity=True, simulate_sensitivity=True, **kwargs):
"""
Correct for instrument response / Simulate new instrument response.
:type paz_remove: dict, None
:param paz_remove: Dictionary containing keys ``'poles'``, ``'zeros'``,
``'gain'`` (A0 normalization factor). Poles and zeros must be a
list of complex floating point numbers, gain must be of type float.
Poles and Zeros are assumed to correct to m/s, SEED convention.
Use ``None`` for no inverse filtering.
:type paz_simulate: dict, None
:param paz_simulate: Dictionary containing keys ``'poles'``,
``'zeros'``, ``'gain'``. Poles and zeros must be a list of complex
floating point numbers, gain must be of type float. Or ``None`` for
no simulation.
:type remove_sensitivity: bool
:param remove_sensitivity: Determines if data is divided by
``paz_remove['sensitivity']`` to correct for overall sensitivity of
recording instrument (seismometer/digitizer) during instrument
correction.
:type simulate_sensitivity: bool
:param simulate_sensitivity: Determines if data is multiplied with
``paz_simulate['sensitivity']`` to simulate overall sensitivity of
new instrument (seismometer/digitizer) during instrument
simulation.
This function corrects for the original instrument response given by
`paz_remove` and/or simulates a new instrument response given by
`paz_simulate`.
For additional information and more options to control the instrument
correction/simulation (e.g. water level, demeaning, tapering, ...) see
:func:`~obspy.signal.invsim.simulate_seismometer`.
`paz_remove` and `paz_simulate` are expected to be dictionaries
containing information on poles, zeros and gain (and usually also
sensitivity).
If both `paz_remove` and `paz_simulate` are specified, both steps are
performed in one go in the frequency domain, otherwise only the
specified step is performed.
.. note::
Instead of the builtin deconvolution based on Poles and Zeros
information, the deconvolution can be performed using evalresp
instead by using the option `seedresp` (see documentation of
:func:`~obspy.signal.invsim.simulate_seismometer` and the `ObsPy
Tutorial <https://docs.obspy.org/master/tutorial/code_snippets/\
seismometer_correction_simulation.html#using-a-resp-file>`_.
.. note::
This operation is performed in place on the actual data arrays. The
raw data is not accessible anymore afterwards. To keep your
original data, use :meth:`~obspy.core.trace.Trace.copy` to create
a copy of your trace object.
This also makes an entry with information on the applied processing
in ``stats.processing`` of this trace.
.. rubric:: Example
>>> from obspy import read
>>> from obspy.signal.invsim import corn_freq_2_paz
>>> st = read()
>>> tr = st[0]
>>> paz_sts2 = {'poles': [-0.037004+0.037016j, -0.037004-0.037016j,
... -251.33+0j,
... -131.04-467.29j, -131.04+467.29j],
... 'zeros': [0j, 0j],
... 'gain': 60077000.0,
... 'sensitivity': 2516778400.0}
>>> paz_1hz = corn_freq_2_paz(1.0, damp=0.707)
>>> paz_1hz['sensitivity'] = 1.0
>>> tr.simulate(paz_remove=paz_sts2, paz_simulate=paz_1hz)
... # doctest: +ELLIPSIS
<...Trace object at 0x...>
>>> tr.plot() # doctest: +SKIP
.. plot::
from obspy import read
from obspy.signal.invsim import corn_freq_2_paz
st = read()
tr = st[0]
paz_sts2 = {'poles': [-0.037004+0.037016j, -0.037004-0.037016j,
-251.33+0j,
-131.04-467.29j, -131.04+467.29j],
'zeros': [0j, 0j],
'gain': 60077000.0,
'sensitivity': 2516778400.0}
paz_1hz = corn_freq_2_paz(1.0, damp=0.707)
paz_1hz['sensitivity'] = 1.0
tr.simulate(paz_remove=paz_sts2, paz_simulate=paz_1hz)
tr.plot()
"""
# XXX accepting string "self" and using attached PAZ then
if paz_remove == 'self':
paz_remove = self.stats.paz
# some convenience handling for evalresp type instrument correction
if "seedresp" in kwargs:
seedresp = kwargs["seedresp"]
# if date is missing use trace's starttime
seedresp.setdefault("date", self.stats.starttime)
# if a Parser object is provided, get corresponding RESP
# information
from obspy.io.xseed import Parser
if isinstance(seedresp['filename'], Parser):
seedresp = deepcopy(seedresp)
kwargs['seedresp'] = seedresp
resp_key = ".".join(("RESP", self.stats.network,
self.stats.station, self.stats.location,
self.stats.channel))
for key, stringio in seedresp['filename'].get_resp():
if key == resp_key:
stringio.seek(0, 0)
seedresp['filename'] = stringio
break
else:
msg = "Response for %s not found in Parser" % self.id
raise ValueError(msg)
# Set the SEED identifiers!
for item in ["network", "station", "location", "channel"]:
seedresp[item] = self.stats[item]
from obspy.signal.invsim import simulate_seismometer
self.data = simulate_seismometer(
self.data, self.stats.sampling_rate, paz_remove=paz_remove,
paz_simulate=paz_simulate, remove_sensitivity=remove_sensitivity,
simulate_sensitivity=simulate_sensitivity, **kwargs)
return self
[docs]
@_add_processing_info
@raise_if_masked
def filter(self, type, **options):
"""
Filter the data of the current trace.
:type type: str
:param type: String that specifies which filter is applied (e.g.
``"bandpass"``). See the `Supported Filter`_ section below for
further details.
:param options: Necessary keyword arguments for the respective filter
that will be passed on. (e.g. ``freqmin=1.0``, ``freqmax=20.0`` for
``"bandpass"``)
.. note::
This operation is performed in place on the actual data arrays. The
raw data is not accessible anymore afterwards. To keep your
original data, use :meth:`~obspy.core.trace.Trace.copy` to create
a copy of your trace object.
This also makes an entry with information on the applied processing
in ``stats.processing`` of this trace.
.. rubric:: _`Supported Filter`
``'bandpass'``
Butterworth-Bandpass (uses :func:`obspy.signal.filter.bandpass`).
``'bandstop'``
Butterworth-Bandstop (uses :func:`obspy.signal.filter.bandstop`).
``'lowpass'``
Butterworth-Lowpass (uses :func:`obspy.signal.filter.lowpass`).
``'highpass'``
Butterworth-Highpass (uses :func:`obspy.signal.filter.highpass`).
``'lowpass_cheby_2'``
Cheby2-Lowpass (uses :func:`obspy.signal.filter.lowpass_cheby_2`).
``'lowpass_fir'`` (experimental)
FIR-Lowpass (uses :func:`obspy.signal.filter.lowpass_fir`).
``'remez_fir'`` (experimental)
Minimax optimal bandpass using Remez algorithm (uses
:func:`obspy.signal.filter.remez_fir`).
.. rubric:: Example
>>> from obspy import read
>>> st = read()
>>> tr = st[0]
>>> tr.filter("highpass", freq=1.0) # doctest: +ELLIPSIS
<...Trace object at 0x...>
>>> tr.plot() # doctest: +SKIP
.. plot::
from obspy import read
st = read()
tr = st[0]
tr.filter("highpass", freq=1.0)
tr.plot()
"""
type = type.lower()
# retrieve function call from entry points
func = _get_function_from_entry_point('filter', type)
# filtering
# the options dictionary is passed as kwargs to the function that is
# mapped according to the filter_functions dictionary
self.data = func(self.data, df=self.stats.sampling_rate, **options)
return self
[docs]
@_add_processing_info
def trigger(self, type, **options):
"""
Run a triggering algorithm on the data of the current trace.
:param type: String that specifies which trigger is applied (e.g.
``'recstalta'``). See the `Supported Trigger`_ section below for
further details.
:param options: Necessary keyword arguments for the respective trigger
that will be passed on.
(e.g. ``sta=3``, ``lta=10``)
Arguments ``sta`` and ``lta`` (seconds) will be mapped to ``nsta``
and ``nlta`` (samples) by multiplying with sampling rate of trace.
(e.g. ``sta=3``, ``lta=10`` would call the trigger with 3 and 10
seconds average, respectively)
.. note::
This operation is performed in place on the actual data arrays. The
raw data is not accessible anymore afterwards. To keep your
original data, use :meth:`~obspy.core.trace.Trace.copy` to create
a copy of your trace object.
This also makes an entry with information on the applied processing
in ``stats.processing`` of this trace.
.. rubric:: _`Supported Trigger`
``'classicstalta'``
Computes the classic STA/LTA characteristic function (uses
:func:`obspy.signal.trigger.classic_sta_lta`).
``'recstalta'``
Recursive STA/LTA
(uses :func:`obspy.signal.trigger.recursive_sta_lta`).
``'recstaltapy'``
Recursive STA/LTA written in Python (uses
:func:`obspy.signal.trigger.recursive_sta_lta_py`).
``'delayedstalta'``
Delayed STA/LTA.
(uses :func:`obspy.signal.trigger.delayed_sta_lta`).
``'carlstatrig'``
Computes the carl_sta_trig characteristic function (uses
:func:`obspy.signal.trigger.carl_sta_trig`).
``'zdetect'``
Z-detector (uses :func:`obspy.signal.trigger.z_detect`).
.. rubric:: Example
>>> from obspy import read
>>> st = read()
>>> tr = st[0]
>>> tr.filter("highpass", freq=1.0) # doctest: +ELLIPSIS
<...Trace object at 0x...>
>>> tr.plot() # doctest: +SKIP
>>> tr.trigger("recstalta", sta=1, lta=4) # doctest: +ELLIPSIS
<...Trace object at 0x...>
>>> tr.plot() # doctest: +SKIP
.. plot::
from obspy import read
st = read()
tr = st[0]
tr.filter("highpass", freq=1.0)
tr.plot()
tr.trigger('recstalta', sta=1, lta=4)
tr.plot()
"""
type = type.lower()
# retrieve function call from entry points
func = _get_function_from_entry_point('trigger', type)
# convert the two arguments sta and lta to nsta and nlta as used by
# actual triggering routines (needs conversion to int, as samples are
# used in length of trigger averages)...
spr = self.stats.sampling_rate
for key in ['sta', 'lta']:
if key in options:
options['n%s' % (key)] = int(options.pop(key) * spr)
# triggering
# the options dictionary is passed as kwargs to the function that is
# mapped according to the trigger_functions dictionary
self.data = func(self.data, **options)
return self
[docs]
@skip_if_no_data
@_add_processing_info
def resample(self, sampling_rate, window='hann', no_filter=True,
strict_length=False):
"""
Resample trace data using Fourier method. Spectra are linearly
interpolated if required.
:type sampling_rate: float
:param sampling_rate: The sampling rate of the resampled signal.
:type window: :class:`numpy.ndarray`, callable, str, float, or tuple,
optional
:param window: Specifies the window applied to the signal in the
Fourier domain. Defaults to ``'hann'`` window. See
:func:`scipy.signal.resample` for details.
:type no_filter: bool, optional
:param no_filter: Deactivates automatic filtering if set to ``True``.
Defaults to ``True``.
:type strict_length: bool, optional
:param strict_length: Leave traces unchanged for which end time of
trace would change. Defaults to ``False``.
.. note::
The :class:`~obspy.core.trace.Trace` object has
three different methods to change
the sampling rate of its data: :meth:`~.resample`,
:meth:`~.decimate`, and :meth:`~.interpolate`
Make sure to choose the most appropriate one for the problem at
hand.
.. note::
This operation is performed in place on the actual data arrays. The
raw data is not accessible anymore afterwards. To keep your
original data, use :meth:`~obspy.core.trace.Trace.copy` to create
a copy of your trace object.
This also makes an entry with information on the applied processing
in ``stats.processing`` of this trace.
Uses :func:`scipy.signal.resample`. Because a Fourier method is used,
the signal is assumed to be periodic.
.. rubric:: Example
>>> tr = Trace(data=np.array([0.5, 0, 0.5, 1, 0.5, 0, 0.5, 1]))
>>> len(tr)
8
>>> tr.stats.sampling_rate
1.0
>>> tr.resample(4.0) # doctest: +ELLIPSIS
<...Trace object at 0x...>
>>> len(tr)
32
>>> tr.stats.sampling_rate
4.0
>>> tr.data # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
array([ 0.5 , 0.40432914, 0.3232233 , 0.26903012, 0.25 ...
"""
from scipy.signal import get_window
from scipy.fftpack import rfft, irfft
factor = self.stats.sampling_rate / float(sampling_rate)
# check if end time changes and this is not explicitly allowed
if strict_length:
if len(self.data) % factor != 0.0:
msg = "End time of trace would change and strict_length=True."
raise ValueError(msg)
# do automatic lowpass filtering
if not no_filter:
# be sure filter still behaves good
if factor > 16:
msg = "Automatic filter design is unstable for resampling " + \
"factors (current sampling rate/new sampling rate) " + \
"above 16. Manual resampling is necessary."
raise ArithmeticError(msg)
freq = self.stats.sampling_rate * 0.5 / float(factor)
self.filter('lowpass_cheby_2', freq=freq, maxorder=12)
# resample in the frequency domain. Make sure the byteorder is native.
x = rfft(self.data.newbyteorder("="))
# Cast the value to be inserted to the same dtype as the array to avoid
# issues with numpy rule 'safe'.
x = np.insert(x, 1, x.dtype.type(0))
if self.stats.npts % 2 == 0:
x = np.append(x, [0])
x_r = x[::2]
x_i = x[1::2]
if window is not None:
if callable(window):
large_w = window(np.fft.fftfreq(self.stats.npts))
elif isinstance(window, np.ndarray):
if window.shape != (self.stats.npts,):
msg = "Window has the wrong shape. Window length must " + \
"equal the number of points."
raise ValueError(msg)
large_w = window
else:
large_w = np.fft.ifftshift(get_window(window,
self.stats.npts))
x_r *= large_w[:self.stats.npts // 2 + 1]
x_i *= large_w[:self.stats.npts // 2 + 1]
# interpolate
num = int(self.stats.npts / factor)
if num == 0:
msg = ("Resampled trace would have less than one sample. "
"Retaining exactly one sample.")
warnings.warn(msg)
num = 1
df = 1.0 / (self.stats.npts * self.stats.delta)
d_large_f = 1.0 / num * sampling_rate
f = df * np.arange(0, self.stats.npts // 2 + 1, dtype=np.int32)
n_large_f = num // 2 + 1
large_f = d_large_f * np.arange(0, n_large_f, dtype=np.int32)
large_y = np.zeros((2 * n_large_f))
large_y[::2] = np.interp(large_f, f, x_r)
large_y[1::2] = np.interp(large_f, f, x_i)
large_y = np.delete(large_y, 1)
if num % 2 == 0:
large_y = np.delete(large_y, -1)
self.data = irfft(large_y) * (float(num) / float(self.stats.npts))
self.stats.sampling_rate = sampling_rate
return self
[docs]
@_add_processing_info
def decimate(self, factor, no_filter=False, strict_length=False):
"""
Downsample trace data by an integer factor.
:type factor: int
:param factor: Factor by which the sampling rate is lowered by
decimation.
:type no_filter: bool, optional
:param no_filter: Deactivates automatic filtering if set to ``True``.
Defaults to ``False``.
:type strict_length: bool, optional
:param strict_length: Leave traces unchanged for which end time of
trace would change. Defaults to ``False``.
Currently a simple integer decimation is implemented.
Only every ``decimation_factor``-th sample remains in the trace, all
other samples are thrown away. Prior to decimation a lowpass filter is
applied to ensure no aliasing artifacts are introduced. The automatic
filtering can be deactivated with ``no_filter=True``.
If the length of the data array modulo ``decimation_factor`` is not
zero then the end time of the trace is changing on sub-sample scale. To
abort downsampling in case of changing end times set
``strict_length=True``.
.. note::
The :class:`~obspy.core.trace.Trace` object has three different
methods to change the sampling rate of its data:
:meth:`~.resample`, :meth:`~.decimate`, and :meth:`~.interpolate`
Make sure to choose the most appropriate one for the problem at
hand.
.. note::
This operation is performed in place on the actual data arrays. The
raw data is not accessible anymore afterwards. To keep your
original data, use :meth:`~obspy.core.trace.Trace.copy` to create
a copy of your trace object.
This also makes an entry with information on the applied processing
in ``stats.processing`` of this trace.
.. rubric:: Example
For the example we switch off the automatic pre-filtering so that
the effect of the downsampling routine becomes clearer:
>>> tr = Trace(data=np.arange(10))
>>> tr.stats.sampling_rate
1.0
>>> tr.data
array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
>>> tr.decimate(4, strict_length=False,
... no_filter=True) # doctest: +ELLIPSIS
<...Trace object at 0x...>
>>> tr.stats.sampling_rate
0.25
>>> tr.data
array([0, 4, 8])
"""
# check if end time changes and this is not explicitly allowed
if strict_length and len(self.data) % factor:
msg = "End time of trace would change and strict_length=True."
raise ValueError(msg)
# do automatic lowpass filtering
if not no_filter:
# be sure filter still behaves good
if factor > 16:
msg = "Automatic filter design is unstable for decimation " + \
"factors above 16. Manual decimation is necessary."
raise ArithmeticError(msg)
freq = self.stats.sampling_rate * 0.5 / float(factor)
self.filter('lowpass_cheby_2', freq=freq, maxorder=12)
# actual downsampling, as long as sampling_rate is a float we would not
# need to convert to float, but let's do it as a safety measure
from obspy.signal.filter import integer_decimation
self.data = integer_decimation(self.data, factor)
self.stats.sampling_rate = self.stats.sampling_rate / float(factor)
return self
[docs]
def max(self):
"""
Returns the value of the absolute maximum amplitude in the trace.
:return: Value of absolute maximum of ``trace.data``.
.. rubric:: Example
>>> tr = Trace(data=np.array([0, -3, 9, 6, 4]))
>>> tr.max()
9
>>> tr = Trace(data=np.array([0, -3, -9, 6, 4]))
>>> tr.max()
-9
>>> tr = Trace(data=np.array([0.3, -3.5, 9.0, 6.4, 4.3]))
>>> tr.max()
9.0
"""
value = self.data.max()
_min = self.data.min()
if abs(_min) > abs(value):
value = _min
return value
[docs]
def std(self):
"""
Method to get the standard deviation of amplitudes in the trace.
:return: Standard deviation of ``trace.data``.
Standard deviation is calculated by NumPy method
:meth:`~numpy.ndarray.std` on ``trace.data``.
.. rubric:: Example
>>> tr = Trace(data=np.array([0, -3, 9, 6, 4]))
>>> tr.std()
4.2614551505325036
>>> tr = Trace(data=np.array([0.3, -3.5, 9.0, 6.4, 4.3]))
>>> tr.std()
4.4348618918744247
"""
return self.data.std()
[docs]
@skip_if_no_data
@_add_processing_info
def differentiate(self, method='gradient', **options):
"""
Differentiate the trace with respect to time.
:type method: str, optional
:param method: Method to use for differentiation. Defaults to
``'gradient'``. See the `Supported Methods`_ section below for
further details.
.. note::
This operation is performed in place on the actual data arrays. The
raw data is not accessible anymore afterwards. To keep your
original data, use :meth:`~obspy.core.trace.Trace.copy` to create
a copy of your trace object.
This also makes an entry with information on the applied processing
in ``stats.processing`` of this trace.
.. rubric:: _`Supported Methods`
``'gradient'``
The gradient is computed using central differences in the interior
and first differences at the boundaries. The returned gradient
hence has the same shape as the input array. (uses
:func:`numpy.gradient`)
"""
method = method.lower()
# retrieve function call from entry points
func = _get_function_from_entry_point('differentiate', method)
# differentiate
self.data = func(self.data, self.stats.delta, **options)
return self
[docs]
@skip_if_no_data
@_add_processing_info
def integrate(self, method="cumtrapz", **options):
"""
Integrate the trace with respect to time.
.. rubric:: _`Supported Methods`
``'cumtrapz'``
First order integration of data using the trapezoidal rule. Uses
:func:`~obspy.signal.differentiate_and_integrate.integrate_cumtrapz`
``'spline'``
Integrates by generating an interpolating spline and integrating
that. Uses
:func:`~obspy.signal.differentiate_and_integrate.integrate_spline`
.. note::
This operation is performed in place on the actual data arrays. The
raw data is not accessible anymore afterwards. To keep your
original data, use :meth:`~obspy.core.trace.Trace.copy` to create
a copy of your trace object.
This also makes an entry with information on the applied processing
in ``stats.processing`` of this trace.
"""
method = method.lower()
# retrieve function call from entry points
func = _get_function_from_entry_point('integrate', method)
self.data = func(data=self.data, dx=self.stats.delta, **options)
return self
[docs]
@skip_if_no_data
@raise_if_masked
@_add_processing_info
def detrend(self, type='simple', **options):
"""
Remove a trend from the trace.
:type type: str, optional
:param type: Method to use for detrending. Defaults to ``'simple'``.
See the `Supported Methods`_ section below for further details.
:param options:
Collects keyword arguments which are passed to the selected
detrend function. Does not need to be specified directly.
.. note::
This operation is performed in place on the actual data arrays. The
raw data is not accessible anymore afterwards. To keep your
original data, use :meth:`~obspy.core.trace.Trace.copy` to create
a copy of your trace object.
This also makes an entry with information on the applied processing
in ``stats.processing`` of this trace.
.. rubric:: _`Supported Methods`
``'simple'``
Subtracts a linear function defined by first/last sample of the
trace (uses :func:`obspy.signal.detrend.simple`).
``'linear'``
Fitting a linear function to the trace with least squares and
subtracting it (uses :func:`scipy.signal.detrend`).
``'constant'`` or ``'demean'``
Mean of data is subtracted (uses :func:`scipy.signal.detrend`).
``'polynomial'``
Subtracts a polynomial of a given order.
(uses :func:`obspy.signal.detrend.polynomial`).
``'spline'``
Subtracts a spline of a given order with a given number of
samples between spline nodes.
(uses :func:`obspy.signal.detrend.spline`).
.. rubric:: Example
Apply a third order spline detrend with 500 samples between nodes.
>>> from obspy import read
>>> tr = read()[0]
>>> tr.detrend("spline", order=3, dspline=500)
... # doctest: +ELLIPSIS
<...Trace object at 0x...>
"""
type = type.lower()
# retrieve function call from entry points
func = _get_function_from_entry_point('detrend', type)
# handle function specific settings
if func.__module__.startswith('scipy'):
# SciPy need to set the type keyword
if type == 'demean':
type = 'constant'
options['type'] = type
original_dtype = self.data.dtype
# detrending
self.data = func(self.data, **options)
# Ugly workaround for old scipy versions that might unnecessarily
# change the dtype of the data.
if func.__module__.startswith('scipy'):
if original_dtype == np.float32 and self.data.dtype != np.float32:
self.data = np.require(self.data, dtype=np.float32)
return self
[docs]
@skip_if_no_data
@_add_processing_info
def taper(self, max_percentage, type='hann', max_length=None,
side='both', **kwargs):
"""
Taper the trace.
Optional (and sometimes necessary) options to the tapering function can
be provided as kwargs. See respective function definitions in
`Supported Methods`_ section below.
:type type: str
:param type: Type of taper to use for detrending. Defaults to
``'hann'``. See the `Supported Methods`_ section below for
further details.
:type max_percentage: None, float
:param max_percentage: Decimal percentage of taper at one end (ranging
from 0. to 0.5).
:type max_length: None, float
:param max_length: Length of taper at one end in seconds.
:type side: str
:param side: Specify if both sides should be tapered (default, "both")
or if only the left half ("left") or right half ("right") should be
tapered.
.. note::
To get the same results as the default taper in SAC, use
`max_percentage=0.05` and leave `type` as `hann`.
.. note::
If both `max_percentage` and `max_length` are set to a float, the
shorter tape length is used. If both `max_percentage` and
`max_length` are set to `None`, the whole trace will be tapered.
.. note::
This operation is performed in place on the actual data arrays. The
raw data is not accessible anymore afterwards. To keep your
original data, use :meth:`~obspy.core.trace.Trace.copy` to create
a copy of your trace object.
This also makes an entry with information on the applied processing
in ``stats.processing`` of this trace.
.. rubric:: _`Supported Methods`
``'cosine'``
Cosine taper, for additional options like taper percentage see:
:func:`obspy.signal.invsim.cosine_taper`.
``'barthann'``
Modified Bartlett-Hann window. (uses:
:func:`scipy.signal.windows.barthann`)
``'bartlett'``
Bartlett window. (uses: :func:`scipy.signal.windows.bartlett`)
``'blackman'``
Blackman window. (uses: :func:`scipy.signal.windows.blackman`)
``'blackmanharris'``
Minimum 4-term Blackman-Harris window. (uses:
:func:`scipy.signal.windows.blackmanharris`)
``'bohman'``
Bohman window. (uses: :func:`scipy.signal.windows.bohman`)
``'boxcar'``
Boxcar window. (uses: :func:`scipy.signal.windows.boxcar`)
``'chebwin'``
Dolph-Chebyshev window.
(uses: :func:`scipy.signal.windows.chebwin`)
``'flattop'``
Flat top window. (uses: :func:`scipy.signal.windows.flattop`)
``'gaussian'``
Gaussian window with standard-deviation std. (uses:
:func:`scipy.signal.windows.gaussian`)
``'general_gaussian'``
Generalized Gaussian window. (uses:
:func:`scipy.signal.windows.general_gaussian`)
``'hamming'``
Hamming window. (uses: :func:`scipy.signal.windows.hamming`)
``'hann'``
Hann window. (uses: :func:`scipy.signal.windows.hann`)
``'kaiser'``
Kaiser window with shape parameter beta. (uses:
:func:`scipy.signal.windows.kaiser`)
``'nuttall'``
Minimum 4-term Blackman-Harris window according to Nuttall.
(uses: :func:`scipy.signal.windows.nuttall`)
``'parzen'``
Parzen window. (uses: :func:`scipy.signal.windows.parzen`)
``'slepian'``
Slepian window. (uses: :func:`scipy.signal.windows.slepian`)
``'triang'``
Triangular window. (uses: :func:`scipy.signal.windows.triang`)
"""
type = type.lower()
side = side.lower()
side_valid = ['both', 'left', 'right']
npts = self.stats.npts
if side not in side_valid:
raise ValueError("'side' has to be one of: %s" % side_valid)
# retrieve function call from entry points
func = _get_function_from_entry_point('taper', type)
# store all constraints for maximum taper length
max_half_lenghts = []
if max_percentage is not None:
max_half_lenghts.append(int(max_percentage * npts))
if max_length is not None:
max_half_lenghts.append(int(max_length * self.stats.sampling_rate))
if np.all([2 * mhl > npts for mhl in max_half_lenghts]):
msg = "The requested taper is longer than the trace. " \
"The taper will be shortened to trace length."
warnings.warn(msg)
# add full trace length to constraints
max_half_lenghts.append(int(npts / 2))
# select shortest acceptable window half-length
wlen = min(max_half_lenghts)
# obspy.signal.cosine_taper has a default value for taper percentage,
# we need to override is as we control percentage completely via npts
# of taper function and insert ones in the middle afterwards
if type == "cosine":
kwargs['p'] = 1.0
# tapering. tapering functions are expected to accept the number of
# samples as first argument and return an array of values between 0 and
# 1 with the same length as the data
if 2 * wlen == npts:
taper_sides = func(2 * wlen, **kwargs)
else:
taper_sides = func(2 * wlen + 1, **kwargs)
if side == 'left':
taper = np.hstack((taper_sides[:wlen], np.ones(npts - wlen)))
elif side == 'right':
taper = np.hstack((np.ones(npts - wlen),
taper_sides[len(taper_sides) - wlen:]))
else:
taper = np.hstack((taper_sides[:wlen], np.ones(npts - 2 * wlen),
taper_sides[len(taper_sides) - wlen:]))
# Convert data if it's not a floating point type.
if not np.issubdtype(self.data.dtype, np.floating):
self.data = np.require(self.data, dtype=np.float64)
self.data *= taper
return self
[docs]
@_add_processing_info
def normalize(self, norm=None):
"""
Normalize the trace to its absolute maximum.
:type norm: ``None`` or float
:param norm: If not ``None``, trace is normalized by dividing by
specified value ``norm`` instead of dividing by its absolute
maximum. If a negative value is specified then its absolute value
is used. If it is zero (either through a zero array or by being
passed), nothing will happen and the original array will not
change.
If ``trace.data.dtype`` was integer it is changing to float.
.. note::
This operation is performed in place on the actual data arrays. The
raw data is not accessible anymore afterwards. To keep your
original data, use :meth:`~obspy.core.trace.Trace.copy` to create
a copy of your trace object.
This also makes an entry with information on the applied processing
in ``stats.processing`` of this trace.
.. rubric:: Example
>>> tr = Trace(data=np.array([0, -3, 9, 6]))
>>> tr.normalize() # doctest: +ELLIPSIS
<...Trace object at 0x...>
>>> tr.data
array([ 0. , -0.33333333, 1. , 0.66666667])
>>> print(tr.stats.processing[0]) # doctest: +ELLIPSIS
ObsPy ...: normalize(norm=None)
>>> tr = Trace(data=np.array([0.3, -3.5, -9.2, 6.4]))
>>> tr.normalize() # doctest: +ELLIPSIS
<...Trace object at 0x...>
>>> tr.data
array([ 0.0326087 , -0.38043478, -1. , 0.69565217])
>>> print(tr.stats.processing[0]) # doctest: +ELLIPSIS
ObsPy ...: normalize(norm=None)
"""
# normalize, use norm-kwarg otherwise normalize to 1
if norm is not None:
norm = norm
if norm < 0:
msg = "Normalizing with negative values is forbidden. " + \
"Using absolute value."
warnings.warn(msg)
else:
norm = self.max()
# Don't do anything for zero norm but raise a warning.
if not norm:
msg = ("Attempting to normalize by dividing through zero. This "
"is not allowed and the data will thus not be changed.")
warnings.warn(msg)
return self
# Convert data if it's not a floating point type.
if not np.issubdtype(self.data.dtype, np.floating):
self.data = np.require(self.data, dtype=np.float64)
self.data /= abs(norm)
return self
[docs]
def copy(self):
"""
Returns a deepcopy of the trace.
:return: Copy of trace.
This actually copies all data in the trace and does not only provide
another pointer to the same data. At any processing step if the
original data has to be available afterwards, this is the method to
use to make a copy of the trace.
.. rubric:: Example
Make a Trace and copy it:
>>> tr = Trace(data=np.random.rand(10))
>>> tr2 = tr.copy()
The two objects are not the same:
>>> tr2 is tr
False
But they have equal data (before applying further processing):
>>> tr2 == tr
True
The following example shows how to make an alias but not copy the
data. Any changes on ``tr3`` would also change the contents of ``tr``.
>>> tr3 = tr
>>> tr3 is tr
True
>>> tr3 == tr
True
"""
return deepcopy(self)
[docs]
def _internal_add_processing_info(self, info):
"""
Add the given informational string to the `processing` field in the
trace's :class:`~obspy.core.trace.Stats` object.
"""
proc = self.stats.setdefault('processing', [])
if len(proc) == self._max_processing_info-1:
msg = ('List of processing information in Trace.stats.processing '
'reached maximal length of {} entries.')
warnings.warn(msg.format(self._max_processing_info))
if len(proc) < self._max_processing_info:
proc.append(info)
[docs]
@_add_processing_info
def split(self):
"""
Split Trace object containing gaps using a NumPy masked array into
several traces.
:rtype: :class:`~obspy.core.stream.Stream`
:returns: Stream containing all split traces. A gapless trace will
still be returned as Stream with only one entry.
"""
from obspy import Stream
# Not a masked array.
if not isinstance(self.data, np.ma.masked_array):
# no gaps
return Stream([self.copy()])
# Masked array but no actually masked values.
elif isinstance(self.data, np.ma.masked_array) and \
not np.ma.is_masked(self.data):
_tr = self.copy()
_tr.data = np.ma.getdata(_tr.data)
return Stream([_tr])
slices = flat_not_masked_contiguous(self.data)
trace_list = []
for slice in slices:
if slice.step:
raise NotImplementedError("step not supported")
stats = self.stats.copy()
tr = Trace(header=stats)
tr.stats.starttime += (stats.delta * slice.start)
# return the underlying data not the masked array
tr.data = self.data.data[slice.start:slice.stop]
trace_list.append(tr)
return Stream(trace_list)
[docs]
@skip_if_no_data
@raise_if_masked
@_add_processing_info
def interpolate(self, sampling_rate, method="weighted_average_slopes",
starttime=None, npts=None, time_shift=0.0,
*args, **kwargs):
"""
Interpolate the data using various interpolation techniques.
Be careful when downsampling data and make sure to apply an appropriate
anti-aliasing lowpass filter before interpolating in case it's
necessary.
.. note::
The :class:`obspy.core.trace.Trace` object has
three different methods to change the sampling rate of its data:
:meth:`~.resample`, :meth:`~.decimate`, and :meth:`~.interpolate`.
Make sure to choose the most appropriate one for the problem at
hand.
.. note::
This operation is performed in place on the actual data arrays. The
raw data will no longer be accessible afterwards. To keep your
original data, use :meth:`~.copy` to create a copy of your Trace
object.
.. rubric:: _`Interpolation Methods:`
The chosen method is crucial and we will elaborate a bit about the
choices here:
* ``"lanczos"``: This offers the highest quality interpolation and
should be chosen whenever possible. It is only due to legacy
reasons that this is not the default method. The one downside it
has is that it can be fairly expensive. See the
:func:`~obspy.signal.interpolation.lanczos_interpolation` function
for more details.
* ``"weighted_average_slopes"``: This is the interpolation method used
by SAC. Refer to
:func:`~obspy.signal.interpolation.weighted_average_slopes` for
more details.
* ``"slinear"``, ``"quadratic"`` and ``"cubic"``: spline interpolation
of first, second or third order.
* ``"linear"``: Linear interpolation.
* ``"nearest"``: Nearest neighbour interpolation.
* ``"zero"``: Last encountered value interpolation.
.. rubric:: _`Parameters:`
:param sampling_rate: The new sampling rate in ``Hz``.
:param method: The kind of interpolation to perform as a string. One of
``"linear"``, ``"nearest"``, ``"zero"``, ``"slinear"``,
``"quadratic"``, ``"cubic"``, ``"lanczos"``, or
``"weighted_average_slopes"``. Alternatively an integer
specifying the order of the spline interpolator to use also works.
Defaults to ``"weighted_average_slopes"``.
:type starttime: :class:`~obspy.core.utcdatetime.UTCDateTime` or int
:param starttime: The start time (or timestamp) for the new
interpolated stream. Will be set to current start time of the
trace if not given.
:type npts: int
:param npts: The new number of samples. Will be set to the best
fitting number to retain the current end time of the trace if
not given.
:type time_shift: float
:param time_shift: Shift the trace by adding time_shift to the
starttime. The time shift is always given in seconds.
A positive shift means the data is shifted towards the future,
e.g. a positive time delta.
Note that this parameter solely affects the metadata.
The actual interpolation of the underlaying data is governed
by the parameters sampling_rate, starttime and npts.
.. note::
Interpolation can also shift the data with
subsample accuracy. Please note that a time shift in
the Fourier domain is always more accurate than this.
When using Lanczos interpolation with large values of ``a``
and away from the boundaries this is nonetheless pretty good.
.. rubric:: _`New in version 0.11:`
* New parameter ``time_shift``.
* New interpolation method ``lanczos``.
.. rubric:: _`Usage Examples`
>>> from obspy import read
>>> tr = read()[0]
>>> print(tr) # doctest: +ELLIPSIS
BW.RJOB..EHZ | 2009-08-24T00:20:03... - ... | 100.0 Hz, 3000 samples
>>> tr.interpolate(sampling_rate=111.1) # doctest: +ELLIPSIS
<obspy.core.trace.Trace object at 0x...>
>>> print(tr) # doctest: +ELLIPSIS
BW.RJOB..EHZ | 2009-08-24T00:20:03... - ... | 111.1 Hz, 3332 samples
Setting ``starttime`` and/or ``npts`` will interpolate to sampling
points with the given start time and/or number of samples.
Extrapolation will not be performed.
>>> tr = read()[0]
>>> print(tr) # doctest: +ELLIPSIS
BW.RJOB..EHZ | 2009-08-24T00:20:03... - ... | 100.0 Hz, 3000 samples
>>> tr.interpolate(sampling_rate=111.1,
... starttime=tr.stats.starttime + 10) \
# doctest: +ELLIPSIS
<obspy.core.trace.Trace object at 0x...>
>>> print(tr) # doctest: +ELLIPSIS
BW.RJOB..EHZ | 2009-08-24T00:20:13... - ... | 111.1 Hz, 2221 samples
"""
try:
method = method.lower()
except Exception:
pass
dt = float(sampling_rate)
if dt <= 0.0:
raise ValueError("The time step must be positive.")
dt = 1.0 / sampling_rate
# We just shift the old start time. The interpolation will take care
# of the rest.
if time_shift:
self.stats.starttime += time_shift
try:
if isinstance(method, int) or \
method in ["linear", "nearest", "zero", "slinear",
"quadratic", "cubic"]:
func = _get_function_from_entry_point('interpolate',
'interpolate_1d')
else:
func = _get_function_from_entry_point('interpolate', method)
old_start = self.stats.starttime.timestamp
old_dt = self.stats.delta
if starttime is not None:
try:
starttime = starttime.timestamp
except AttributeError:
pass
else:
starttime = self.stats.starttime.timestamp
endtime = self.stats.endtime.timestamp
if npts is None:
npts = int(math.floor((endtime - starttime) / dt)) + 1
self.data = np.atleast_1d(func(
np.require(self.data, dtype=np.float64), old_start, old_dt,
starttime, dt, npts, type=method, *args, **kwargs))
self.stats.starttime = UTCDateTime(starttime)
self.stats.delta = dt
except Exception:
# Revert the start time change if something went wrong.
if time_shift:
self.stats.starttime -= time_shift
# re-raise last exception.
raise
return self
[docs]
def times(self, type="relative", reftime=None):
"""
For convenient plotting compute a NumPy array with timing information
of all samples in the Trace.
Time can be either:
* seconds relative to ``trace.stats.starttime``
(``type="relative"``) or to ``reftime``
* absolute time as
:class:`~obspy.core.utcdatetime.UTCDateTime` objects
(``type="utcdatetime"``)
* absolute time as POSIX timestamps (
:class:`UTCDateTime.timestamp <obspy.core.utcdatetime.UTCDateTime>`
``type="timestamp"``)
* absolute time as matplotlib numeric datetime (for matplotlib
plotting with absolute time on axes, see :mod:`matplotlib.dates`
and :func:`matplotlib.dates.date2num`, ``type="matplotlib"``)
.. note::
The option ``type="utcdatetime"`` shouldn't be used for Traces with
a large sample size as it will generate an array of thousands of
:class:`UTCDateTime.timestamp <obspy.core.utcdatetime.UTCDateTime>`
objects.
>>> from obspy import read, UTCDateTime
>>> tr = read()[0]
>>> tr.times() # doctest: +NORMALIZE_WHITESPACE
array([ 0.00000000e+00, 1.00000000e-02, 2.00000000e-02, ...,
2.99700000e+01, 2.99800000e+01, 2.99900000e+01])
>>> tr.times(reftime=UTCDateTime("2009-01-01T00"))
array([ 20305203. , 20305203.01, 20305203.02, ..., 20305232.97,
20305232.98, 20305232.99])
>>> tr.times("utcdatetime") # doctest: +SKIP
array([UTCDateTime(2009, 8, 24, 0, 20, 3),
UTCDateTime(2009, 8, 24, 0, 20, 3, 10000),
UTCDateTime(2009, 8, 24, 0, 20, 3, 20000), ...,
UTCDateTime(2009, 8, 24, 0, 20, 32, 970000),
UTCDateTime(2009, 8, 24, 0, 20, 32, 980000),
UTCDateTime(2009, 8, 24, 0, 20, 32, 990000)], dtype=object)
>>> tr.times("timestamp")
array([ 1.25107320e+09, 1.25107320e+09, 1.25107320e+09, ...,
1.25107323e+09, 1.25107323e+09, 1.25107323e+09])
>>> tr.times("matplotlib") # doctest: +SKIP
array([ 14480.01392361, 14480.01392373, 14480.01392384, ...,
14480.01427049, 14480.0142706 , 14480.01427072])
:type type: str
:param type: Determines type of returned time array, see above for
valid values.
:type reftime: obspy.core.utcdatetime.UTCDateTime
:param reftime: When using a relative timing, the time used as the
reference for the zero point, i.e., the first sample will be at
``trace.stats.starttime - reftime`` (in seconds).
:rtype: :class:`~numpy.ndarray` or :class:`~numpy.ma.MaskedArray`
:returns: An array of time samples in an :class:`~numpy.ndarray` if
the trace doesn't have any gaps or a :class:`~numpy.ma.MaskedArray`
otherwise (``dtype`` of array is either ``float`` or
:class:`~obspy.core.utcdatetime.UTCDateTime`).
"""
type = type.lower()
time_array = np.arange(self.stats.npts)
time_array = time_array / self.stats.sampling_rate
if type == "relative":
if reftime is not None:
time_array += (self.stats.starttime - reftime)
elif type == "timestamp":
time_array = time_array + self.stats.starttime.timestamp
elif type == "utcdatetime":
time_array = np.vectorize(
lambda t: self.stats.starttime + t,
otypes=[UTCDateTime])(time_array)
elif type == "matplotlib":
from matplotlib.dates import date2num
time_array = (
date2num(self.stats.starttime.datetime) + time_array / 86400.0)
else:
msg = "Invalid `type`: {}".format(type)
raise ValueError(msg)
# Check if the data is a ma.maskedarray
if isinstance(self.data, np.ma.masked_array):
time_array = np.ma.array(time_array, mask=self.data.mask)
return time_array
[docs]
def _get_response(self, inventories):
"""
Search for and return channel response for the trace.
:type inventories: :class:`~obspy.core.inventory.inventory.Inventory`
or :class:`~obspy.core.inventory.network.Network` or a list
containing objects of these types or a string with a filename of
a StationXML file.
:param inventories: Station metadata to use in search for response for
each trace in the stream.
:returns: :class:`obspy.core.inventory.response.Response` object
"""
from obspy.core.inventory import Response
if inventories is None and 'response' in self.stats:
if not isinstance(self.stats.response, Response):
msg = ("Response attached to Trace.stats must be of type "
"obspy.core.inventory.response.Response "
"(but is of type %s).") % type(self.stats.response)
raise TypeError(msg)
return self.stats.response
elif inventories is None:
msg = ('No response information found. Use `inventory` '
'parameter to specify an inventory with response '
'information.')
raise ValueError(msg)
from obspy.core.inventory import Inventory, Network, read_inventory
if isinstance(inventories, Inventory) or \
isinstance(inventories, Network):
inventories = [inventories]
elif isinstance(inventories, str):
inventories = [read_inventory(inventories)]
responses = []
for inv in inventories:
try:
responses.append(inv.get_response(self.id,
self.stats.starttime))
except Exception:
pass
if len(responses) > 1:
msg = "Found more than one matching response. Using first."
warnings.warn(msg)
elif len(responses) < 1:
msg = "No matching response information found."
raise ValueError(msg)
return responses[0]
[docs]
def attach_response(self, inventories):
"""
Search for and attach channel response to the trace as
:class:`obspy.core.trace.Trace`.stats.response. Raises an exception
if no matching response can be found.
To subsequently deconvolve the instrument response use
:meth:`obspy.core.trace.Trace.remove_response`.
>>> from obspy import read, read_inventory
>>> st = read()
>>> tr = st[0]
>>> inv = read_inventory()
>>> tr.attach_response(inv)
>>> print(tr.stats.response) \
# doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
Channel Response
From M/S (Velocity in Meters per Second) to COUNTS (Digital Counts)
Overall Sensitivity: 2.5168e+09 defined at 0.020 Hz
4 stages:
Stage 1: PolesZerosResponseStage from M/S to V, gain: 1500
Stage 2: CoefficientsTypeResponseStage from V to COUNTS, ...
Stage 3: FIRResponseStage from COUNTS to COUNTS, gain: 1
Stage 4: FIRResponseStage from COUNTS to COUNTS, gain: 1
:type inventories: :class:`~obspy.core.inventory.inventory.Inventory`
or :class:`~obspy.core.inventory.network.Network` or a list
containing objects of these types or a string with a filename of
a StationXML file.
:param inventories: Station metadata to use in search for response for
each trace in the stream.
"""
self.stats.response = self._get_response(inventories)
[docs]
@_add_processing_info
def remove_response(self, inventory=None, output="VEL", water_level=60,
pre_filt=None, zero_mean=True, taper=True,
taper_fraction=0.05, plot=False, fig=None, **kwargs):
"""
Deconvolve instrument response.
Uses the adequate :class:`obspy.core.inventory.response.Response`
from the provided
:class:`obspy.core.inventory.inventory.Inventory` data. Raises an
exception if the response is not present.
Note that there are two ways to prevent overamplification
while convolving the inverted instrument spectrum: One possibility is
to specify a water level which represents a clipping of the inverse
spectrum and limits amplification to a certain maximum cut-off value
(`water_level` in dB). The other possibility is to taper the waveform
data in the frequency domain prior to multiplying with the inverse
spectrum, i.e. perform a pre-filtering in the frequency domain
(specifying the four corner frequencies of the frequency taper as a
tuple in `pre_filt`).
.. warning::
The water level approach can lead to unexpected results that
strongly suppress valid/wanted parts of the spectrum if the
requested output unit is not the native quantity of the instrument,
i.e. the instrument response is not flat for that quantity (e.g.
requesting output ``"VEL"`` for an accelerometer). For details see
https://github.com/obspy/obspy/issues/3136.
In this case it might be better to set ``water_level=None`` and use
``pre_filt`` option instead.
.. note::
Any additional kwargs will be passed on to
:meth:`Response.get_evalresp_response()
<obspy.core.inventory.response.Response.get_evalresp_response>`,
see documentation of that method for further customization (e.g.
start/stop stage and hiding overall sensitivity mismatch warning).
.. note::
Using :meth:`~obspy.core.trace.Trace.remove_response` is equivalent
to using
:meth:`~obspy.core.trace.Trace.simulate` with the identical
response provided as
a (dataless) SEED or RESP file and when using the same
``water_level`` and ``pre_filt`` (and options ``sacsim=True`` and
``pitsasim=False`` which influence very minor details in detrending
and tapering).
.. rubric:: Example
>>> from obspy import read, read_inventory
>>> st = read()
>>> tr = st[0].copy()
>>> inv = read_inventory()
>>> tr.remove_response(inventory=inv) # doctest: +ELLIPSIS
<...Trace object at 0x...>
>>> tr.plot() # doctest: +SKIP
.. plot::
from obspy import read, read_inventory
st = read()
tr = st[0]
inv = read_inventory()
tr.remove_response(inventory=inv)
tr.plot()
Using the `plot` option it is possible to visualize the individual
steps during response removal in the frequency domain to check the
chosen `pre_filt` and `water_level` options to stabilize the
deconvolution of the inverted instrument response spectrum:
>>> from obspy import read, read_inventory
>>> st = read("/path/to/IU_ULN_00_LH1_2015-07-18T02.mseed")
>>> tr = st[0]
>>> inv = read_inventory("/path/to/IU_ULN_00_LH1.xml")
>>> pre_filt = [0.001, 0.005, 45, 50]
>>> tr.remove_response(inventory=inv, pre_filt=pre_filt, output="DISP",
... water_level=60, plot=True) # doctest: +SKIP
<...Trace object at 0x...>
.. plot::
from obspy import read, read_inventory
st = read("/path/to/IU_ULN_00_LH1_2015-07-18T02.mseed", "MSEED")
tr = st[0]
inv = read_inventory("/path/to/IU_ULN_00_LH1.xml", "STATIONXML")
pre_filt = [0.001, 0.005, 45, 50]
output = "DISP"
tr.remove_response(inventory=inv, pre_filt=pre_filt, output=output,
water_level=60, plot=True)
:type inventory: :class:`~obspy.core.inventory.inventory.Inventory`
or None.
:param inventory: Station metadata to use in search for adequate
response. If inventory parameter is not supplied, the response
has to be attached to the trace with
:meth:`obspy.core.trace.Trace.attach_response` beforehand.
:type output: str
:param output: Output units. One of:
``"DISP"``
displacement, output unit is meters
``"VEL"``
velocity, output unit is meters/second
``"ACC"``
acceleration, output unit is meters/second**2
``"DEF"``
default units, the response is calculated in
output units/input units (last stage/first stage).
Useful if the units for a particular type of sensor (e.g., a
pressure sensor) cannot be converted to displacement, velocity
or acceleration.
:type water_level: float
:param water_level: Water level for deconvolution.
:type pre_filt: list or tuple(float, float, float, float)
:param pre_filt: Apply a bandpass filter in frequency domain to the
data before deconvolution. The list or tuple defines
the four corner frequencies `(f1, f2, f3, f4)` of a cosine taper
which is one between `f2` and `f3` and tapers to zero for
`f1 < f < f2` and `f3 < f < f4`.
:type zero_mean: bool
:param zero_mean: If `True`, the mean of the waveform data is
subtracted in time domain prior to deconvolution.
:type taper: bool
:param taper: If `True`, a cosine taper is applied to the waveform data
in time domain prior to deconvolution.
:type taper_fraction: float
:param taper_fraction: Taper fraction of cosine taper to use.
:type plot: bool or str
:param plot: If `True`, brings up a plot that illustrates how the
data are processed in the frequency domain in three steps. First by
`pre_filt` frequency domain tapering, then by inverting the
instrument response spectrum with or without `water_level` and
finally showing data with inverted instrument response multiplied
on it in frequency domain. It also shows the comparison of
raw/corrected data in time domain. If a `str` is provided then the
plot is saved to file (filename must have a valid image suffix
recognizable by matplotlib e.g. '.png').
"""
if NUMPY_VERSION < [1, 17]:
limit_numpy_fft_cache()
from obspy.core.inventory import PolynomialResponseStage
from obspy.signal.invsim import (cosine_taper, cosine_sac_taper,
invert_spectrum)
if plot:
import matplotlib.pyplot as plt
response = self._get_response(inventory)
# polynomial response using blockette 62 stage 0
if not response.response_stages and response.instrument_polynomial:
coefficients = response.instrument_polynomial.coefficients
self.data = np.poly1d(coefficients[::-1])(self.data)
return self
# polynomial response using blockette 62 stage 1 and no other stages
if len(response.response_stages) == 1 and \
isinstance(response.response_stages[0], PolynomialResponseStage):
# check for gain
if response.response_stages[0].stage_gain is None:
msg = 'Stage gain not defined for %s - setting it to 1.0'
warnings.warn(msg % self.id)
gain = 1
else:
gain = response.response_stages[0].stage_gain
coefficients = response.response_stages[0].coefficients[:]
for i in range(len(coefficients)):
coefficients[i] /= math.pow(gain, i)
self.data = np.poly1d(coefficients[::-1])(self.data)
return self
# use evalresp
data = self.data.astype(np.float64)
npts = len(data)
# time domain pre-processing
if zero_mean:
data -= data.mean()
if taper:
data *= cosine_taper(npts, taper_fraction,
sactaper=True, halfcosine=False)
if plot:
color1 = "blue"
color2 = "red"
bbox = dict(boxstyle="round", fc="w", alpha=1, ec="w")
bbox1 = dict(boxstyle="round", fc="blue", alpha=0.15)
bbox2 = dict(boxstyle="round", fc="red", alpha=0.15)
if fig is None:
fig = plt.figure(figsize=(14, 10))
fig.suptitle(str(self))
ax1 = fig.add_subplot(321)
ax1b = ax1.twinx()
ax2 = fig.add_subplot(323, sharex=ax1)
ax2b = ax2.twinx()
ax3 = fig.add_subplot(325, sharex=ax1)
ax3b = ax3.twinx()
ax4 = fig.add_subplot(322)
ax5 = fig.add_subplot(324, sharex=ax4)
ax6 = fig.add_subplot(326, sharex=ax4)
for ax_ in (ax1, ax2, ax3, ax4, ax5, ax6):
ax_.grid(zorder=-10)
if pre_filt is None:
text = 'pre_filt: None'
else:
text = 'pre_filt: [{:.3g}, {:.3g}, {:.3g}, {:.3g}]'.format(
*pre_filt)
ax1.text(0.05, 0.1, text, ha="left", va="bottom",
transform=ax1.transAxes, fontsize="large", bbox=bbox,
zorder=5)
ax1.set_ylabel("Data spectrum, raw", bbox=bbox1)
ax1b.set_ylabel("'pre_filt' taper fraction", bbox=bbox2)
evalresp_info = "\n".join(
['output: %s' % output] +
['%s: %s' % (key, value) for key, value in kwargs.items()])
ax2.text(0.05, 0.1, evalresp_info, ha="left",
va="bottom", transform=ax2.transAxes,
fontsize="large", zorder=5, bbox=bbox)
ax2.set_ylabel("Data spectrum,\n"
"'pre_filt' applied", bbox=bbox1)
ax2b.set_ylabel("Instrument response", bbox=bbox2)
ax3.text(0.05, 0.1, 'water_level: %s' % water_level,
ha="left", va="bottom", transform=ax3.transAxes,
fontsize="large", zorder=5, bbox=bbox)
ax3.set_ylabel("Data spectrum,\nmultiplied with inverted\n"
"instrument response", bbox=bbox1)
ax3b.set_ylabel("Inverted instrument response,\n"
"water level applied", bbox=bbox2)
ax3.set_xlabel("Frequency [Hz]")
times = self.times()
ax4.plot(times, self.data, color="k")
ax4.set_ylabel("Raw")
ax4.yaxis.set_ticks_position("right")
ax4.yaxis.set_label_position("right")
ax5.plot(times, data, color="k")
ax5.set_ylabel("Raw, after time\ndomain pre-processing")
ax5.yaxis.set_ticks_position("right")
ax5.yaxis.set_label_position("right")
ax6.set_ylabel("Response removed")
ax6.set_xlabel("Time [s]")
ax6.yaxis.set_ticks_position("right")
ax6.yaxis.set_label_position("right")
# smart calculation of nfft dodging large primes
from obspy.signal.util import _npts2nfft
nfft = _npts2nfft(npts)
# Transform data to Frequency domain
data = np.fft.rfft(data, n=nfft)
# calculate and apply frequency response,
# optionally prefilter in frequency domain and/or apply water level
freq_response, freqs = \
response.get_evalresp_response(self.stats.delta, nfft,
output=output, **kwargs)
if plot:
ax1.loglog(freqs, np.abs(data), color=color1, zorder=9)
# frequency domain pre-filtering of data spectrum
# (apply cosine taper in frequency domain)
if pre_filt:
freq_domain_taper = cosine_sac_taper(freqs, flimit=pre_filt)
data *= freq_domain_taper
if plot:
try:
freq_domain_taper
except NameError:
freq_domain_taper = np.ones(len(freqs))
ax1b.semilogx(freqs, freq_domain_taper, color=color2, zorder=10)
ax1b.set_ylim(-0.05, 1.05)
ax2.loglog(freqs, np.abs(data), color=color1, zorder=9)
ax2b.loglog(freqs, np.abs(freq_response), color=color2, zorder=10)
if water_level is None:
# No water level used, so just directly invert the response.
# First entry is at zero frequency and value is zero, too.
# Just do not invert the first value (and set to 0 to make sure).
freq_response[0] = 0.0
freq_response[1:] = 1.0 / freq_response[1:]
else:
# Invert spectrum with specified water level.
invert_spectrum(freq_response, water_level)
data *= freq_response
data[-1] = abs(data[-1]) + 0.0j
if plot:
ax3.loglog(freqs, np.abs(data), color=color1, zorder=9)
ax3b.loglog(freqs, np.abs(freq_response), color=color2, zorder=10)
# transform data back into the time domain
data = np.fft.irfft(data)[0:npts]
if plot:
# Oftentimes raises NumPy warnings which we don't want to see.
with np.errstate(all="ignore"):
ax6.plot(times, data, color="k")
plt.subplots_adjust(wspace=0.4)
if plot is True and fig is None:
plt.show()
elif plot is True and fig is not None:
pass
else:
plt.savefig(plot)
plt.close(fig)
# assign processed data and store processing information
self.data = data
return self
[docs]
@_add_processing_info
def remove_sensitivity(self, inventory=None):
"""
Remove instrument sensitivity.
:type inventory: :class:`~obspy.core.inventory.inventory.Inventory`
or None.
:param inventory: Station metadata to use in search for adequate
response. If inventory parameter is not supplied, the response
has to be attached to the trace with
:meth:`obspy.core.trace.Trace.attach_response` beforehand.
.. rubric:: Example
>>> from obspy import read, read_inventory
>>> tr = read()[0]
>>> inv = read_inventory()
>>> tr.remove_sensitivity(inv) # doctest: +ELLIPSIS
<...Trace object at 0x...>
"""
response = self._get_response(inventory)
self.data = self.data / response.instrument_sensitivity.value
return self
[docs]
def _data_sanity_checks(value):
"""
Check if a given input is suitable to be used for Trace.data. Raises the
corresponding exception if it is not, otherwise silently passes.
"""
if not isinstance(value, np.ndarray):
msg = "Trace.data must be a NumPy array."
raise ValueError(msg)
if value.ndim != 1:
msg = ("NumPy array for Trace.data has bad shape ('%s'). Only 1-d "
"arrays are allowed for initialization.") % str(value.shape)
raise ValueError(msg)
if __name__ == '__main__':
import doctest
doctest.testmod(exclude_empty=True)