# -*- coding: utf-8 -*-
"""
Module for handling ObsPy :class:`~obspy.core.stream.Stream` objects.
:copyright:
The ObsPy Development Team (devs@obspy.org)
:license:
GNU Lesser General Public License, Version 3
(https://www.gnu.org/copyleft/lesser.html)
"""
import collections
import copy
import fnmatch
import math
import pickle
import re
import warnings
from pathlib import Path
from glob import glob, has_magic
import numpy as np
from obspy.core import compatibility
from obspy.core.trace import Trace
from obspy.core.utcdatetime import UTCDateTime
from obspy.core.util.attribdict import AttribDict
from obspy.core.util.base import (ENTRY_POINTS, _get_function_from_entry_point,
_read_from_plugin, _generic_reader)
from obspy.core.util.decorator import (map_example_filename,
raise_if_masked, uncompress_file)
from obspy.core.util.misc import (
get_window_times, buffered_load_entry_point, ptp)
from obspy.core.util.obspy_types import ObsPyException
_headonly_warning_msg = (
"Keyword headonly cannot be combined with starttime, endtime or dtype.")
[docs]
@map_example_filename("pathname_or_url")
def read(pathname_or_url=None, format=None, headonly=False, starttime=None,
endtime=None, nearest_sample=True, dtype=None, apply_calib=False,
check_compression=True, **kwargs):
"""
Read waveform files into an ObsPy :class:`~obspy.core.stream.Stream`
object.
The :func:`~obspy.core.stream.read` function opens either one or multiple
waveform files given via file name or URL using the ``pathname_or_url``
attribute.
The format of the waveform file will be automatically detected if not
given. See the `Supported Formats`_ section below for available formats.
This function returns an ObsPy :class:`~obspy.core.stream.Stream` object, a
``list``-like object of multiple ObsPy :class:`~obspy.core.trace.Trace`
objects.
:type pathname_or_url: str, io.BytesIO, or pathlib.Path, optional
:param pathname_or_url: String containing a file name or a URL, a Path
object, or a open file-like object. Wildcards are allowed for a file
name. If this attribute is omitted, an example
:class:`~obspy.core.stream.Stream` object will be returned.
:type format: str, optional
:param format: Format of the file to read (e.g. ``"MSEED"``). See
the `Supported Formats`_ section below for a list of supported formats.
If format is set to ``None`` it will be automatically detected which
results in a slightly slower reading. If a format is specified, no
further format checking is done.
:type headonly: bool, optional
:param headonly: If set to ``True``, read only the data header. This is
most useful for scanning available meta information of huge data sets.
:type starttime: :class:`~obspy.core.utcdatetime.UTCDateTime`, optional
:param starttime: Specify the start time to read.
:type endtime: :class:`~obspy.core.utcdatetime.UTCDateTime`, optional
:param endtime: Specify the end time to read.
:type nearest_sample: bool, optional
:param nearest_sample: Only applied if `starttime` or `endtime` is given.
Select nearest sample or the one containing the specified time. For
more info, see :meth:`~obspy.core.trace.Trace.trim`.
:type dtype: :class:`numpy.dtype`, optional
:param dtype: Convert data of all traces into given numpy.dtype.
:type apply_calib: bool, optional
:param apply_calib: Automatically applies the calibration factor
``trace.stats.calib`` for each trace, if set. Defaults to ``False``.
:param check_compression: Check for compression on file and decompress
if needed. This may be disabled for a moderate speed up.
:type check_compression: bool, optional
:param kwargs: Additional keyword arguments passed to the underlying
waveform reader method.
:return: An ObsPy :class:`~obspy.core.stream.Stream` object.
.. rubric:: Basic Usage
In most cases a filename is specified as the only argument to
:func:`~obspy.core.stream.read`. For a quick start you may omit all
arguments and ObsPy will create and return a basic example seismogram.
Further usages of the :func:`~obspy.core.stream.read` function can
be seen in the `Further Examples`_ section underneath.
>>> from obspy import read
>>> st = read()
>>> print(st) # doctest: +ELLIPSIS
3 Trace(s) in Stream:
BW.RJOB..EHZ | 2009-08-24T00:20:03.000000Z - ... | 100.0 Hz, 3000 samples
BW.RJOB..EHN | 2009-08-24T00:20:03.000000Z - ... | 100.0 Hz, 3000 samples
BW.RJOB..EHE | 2009-08-24T00:20:03.000000Z - ... | 100.0 Hz, 3000 samples
.. rubric:: _`Supported Formats`
Additional ObsPy modules extend the functionality of the
:func:`~obspy.core.stream.read` function. The following table summarizes
all known file formats currently supported by ObsPy. The table order also
reflects the order of the autodetection routine if no format option is
specified.
Please refer to the `Linked Function Call`_ of each module for any extra
options available at the import stage.
%s
Next to the :func:`~obspy.core.stream.read` function the
:meth:`~obspy.core.stream.Stream.write` method of the returned
:class:`~obspy.core.stream.Stream` object can be used to export the data
to the file system.
.. rubric:: _`Further Examples`
Example waveform files may be retrieved via https://examples.obspy.org.
(1) Reading multiple local files using wildcards.
The following code uses wildcards, in this case it matches two files.
Both files are then read into a single
:class:`~obspy.core.stream.Stream` object.
>>> from obspy import read # doctest: +SKIP
>>> st = read("/path/to/loc_R*.z") # doctest: +SKIP
>>> print(st) # doctest: +SKIP
2 Trace(s) in Stream:
.RJOB..Z | 2005-08-31T02:33:49.850000Z - ... | 200.0 Hz, 12000 samples
.RNON..Z | 2004-06-09T20:05:59.850000Z - ... | 200.0 Hz, 12000 samples
(2) Reading a local file without format detection.
Using the ``format`` parameter disables the automatic detection and
enforces reading a file in a given format.
>>> from obspy import read
>>> st = read("/path/to/loc_RJOB20050831023349.z", format="GSE2")
>>> print(st) # doctest: +ELLIPSIS
1 Trace(s) in Stream:
.RJOB..Z | 2005-08-31T02:33:49.850000Z - ... | 200.0 Hz, 12000 samples
(3) Reading a remote file via HTTP protocol.
>>> from obspy import read
>>> st = read("https://examples.obspy.org/loc_RJOB20050831023349.z")
>>> print(st) # doctest: +ELLIPSIS
1 Trace(s) in Stream:
.RJOB..Z | 2005-08-31T02:33:49.850000Z - ... | 200.0 Hz, 12000 samples
(4) Reading a compressed files.
>>> from obspy import read
>>> st = read("/path/to/tspair.ascii.gz")
>>> print(st) # doctest: +ELLIPSIS
1 Trace(s) in Stream:
XX.TEST..BHZ | 2008-01-15T00:00:00.025000Z - ... | 40.0 Hz, 635 samples
>>> st = read("https://examples.obspy.org/slist.ascii.bz2")
>>> print(st) # doctest: +ELLIPSIS
1 Trace(s) in Stream:
XX.TEST..BHZ | 2008-01-15T00:00:00.025000Z - ... | 40.0 Hz, 635 samples
(5) Reading a file-like object.
>>> import requests
>>> import io
>>> example_url = "https://examples.obspy.org/loc_RJOB20050831023349.z"
>>> stringio_obj = io.BytesIO(requests.get(example_url).content)
>>> st = read(stringio_obj)
>>> print(st) # doctest: +ELLIPSIS
1 Trace(s) in Stream:
.RJOB..Z | 2005-08-31T02:33:49.850000Z - ... | 200.0 Hz, 12000 samples
(6) Using 'starttime' and 'endtime' parameters
>>> from obspy import read
>>> dt = UTCDateTime("2005-08-31T02:34:00")
>>> st = read("https://examples.obspy.org/loc_RJOB20050831023349.z",
... starttime=dt, endtime=dt+10)
>>> print(st) # doctest: +ELLIPSIS
1 Trace(s) in Stream:
.RJOB..Z | 2005-08-31T02:34:00.000000Z - ... | 200.0 Hz, 2001 samples
"""
# add default parameters to kwargs so sub-modules may handle them
kwargs['starttime'] = starttime
kwargs['endtime'] = endtime
kwargs['nearest_sample'] = nearest_sample
kwargs['check_compression'] = check_compression
kwargs['headonly'] = headonly
kwargs['format'] = format
if pathname_or_url is None:
# if no pathname or URL specified, return example stream
st = _create_example_stream(headonly=headonly)
else:
st = _generic_reader(pathname_or_url, _read, **kwargs)
if len(st) == 0:
if isinstance(pathname_or_url, Path):
pathname_or_url = str(pathname_or_url)
# try to give more specific information why the stream is empty
if isinstance(pathname_or_url, str) and \
has_magic(pathname_or_url) and not glob(pathname_or_url):
raise Exception("No file matching file pattern: %s" %
pathname_or_url)
elif isinstance(pathname_or_url, str) and \
not has_magic(pathname_or_url) and \
not Path(pathname_or_url).is_file():
raise IOError(2, "No such file or directory", pathname_or_url)
# Only raise error if no start/end time has been set. This
# will return an empty stream if the user chose a time window with
# no data in it.
# XXX: Might cause problems if the data is faulty and the user
# set start/end time. Not sure what to do in this case.
elif not starttime and not endtime:
raise Exception("Cannot open file/files: %s" % pathname_or_url)
# Trim if times are given.
if headonly and (starttime or endtime or dtype):
warnings.warn(_headonly_warning_msg, UserWarning)
return st
if starttime:
st._ltrim(starttime, nearest_sample=nearest_sample)
if endtime:
st._rtrim(endtime, nearest_sample=nearest_sample)
# convert to dtype if given
if dtype:
for tr in st:
tr.data = np.require(tr.data, dtype)
# applies calibration factor
if apply_calib:
for tr in st:
tr.data = tr.data * tr.stats.calib
return st
[docs]
@uncompress_file
def _read(filename, format=None, headonly=False, **kwargs):
"""
Read a single file into a ObsPy Stream object.
"""
stream, format = _read_from_plugin('waveform', filename, format=format,
headonly=headonly, **kwargs)
# set _format identifier for each element
for trace in stream:
trace.stats._format = format
return stream
[docs]
def _create_example_stream(headonly=False):
"""
Create an example stream.
Data arrays are stored in NumPy's NPZ format. The header information are
fixed values.
PAZ of the used instrument, needed to demonstrate simulate_seismometer()
etc.::
paz = {'gain': 60077000.0,
'poles': [-0.037004+0.037016j, -0.037004-0.037016j, -251.33+0j,
-131.04-467.29j, -131.04+467.29j],
'sensitivity': 2516778400.0,
'zeros': [0j, 0j]}}
"""
data_dir = Path(__file__).parent / "data"
if not headonly:
path = data_dir / "example.npz"
data = np.load(path)
st = Stream()
for channel in ["EHZ", "EHN", "EHE"]:
header = {'network': "BW",
'station': "RJOB",
'location': "",
'npts': 3000,
'starttime': UTCDateTime(2009, 8, 24, 0, 20, 3),
'sampling_rate': 100.0,
'calib': 1.0,
'back_azimuth': 100.0,
'inclination': 30.0}
header['channel'] = channel
if not headonly:
st.append(Trace(data=data[channel], header=header))
else:
st.append(Trace(header=header))
from obspy import read_inventory
inv = read_inventory(data_dir / "BW_RJOB.xml")
st.attach_response(inv)
return st
[docs]
class Stream(object):
"""
List like object of multiple ObsPy :class:`~obspy.core.trace.Trace`
objects.
:type traces: list of :class:`~obspy.core.trace.Trace`, optional
:param traces: Initial list of ObsPy :class:`~obspy.core.trace.Trace`
objects.
.. rubric:: Basic Usage
>>> trace1 = Trace()
>>> trace2 = Trace()
>>> stream = Stream(traces=[trace1, trace2])
>>> print(stream) # doctest: +ELLIPSIS
2 Trace(s) in Stream:
...
.. rubric:: Supported Operations
``stream = streamA + streamB``
Merges all traces within the two Stream objects ``streamA`` and
``streamB`` into the new Stream object ``stream``.
See also: :meth:`Stream.__add__`.
``stream += streamA``
Extends the Stream object ``stream`` with all traces from ``streamA``.
See also: :meth:`Stream.__iadd__`.
``len(stream)``
Returns the number of Traces in the Stream object ``stream``.
See also: :meth:`Stream.__len__`.
``str(stream)``
Contains the number of traces in the Stream object and returns the
value of each Trace's __str__ method.
See also: :meth:`Stream.__str__`.
"""
[docs]
def __init__(self, traces=None):
self.traces = []
if isinstance(traces, Trace):
traces = [traces]
if traces:
self.traces.extend(traces)
[docs]
def __add__(self, other):
"""
Add two streams or a stream with a single trace.
:type other: :class:`~obspy.core.stream.Stream` or
:class:`~obspy.core.trace.Trace`
:param other: Stream or Trace object to add.
:rtype: :class:`~obspy.core.stream.Stream`
:returns: New Stream object containing references to the traces of the
original streams
.. rubric:: Examples
1. Adding two Streams
>>> st1 = Stream([Trace(), Trace(), Trace()])
>>> len(st1)
3
>>> st2 = Stream([Trace(), Trace()])
>>> len(st2)
2
>>> stream = st1 + st2
>>> len(stream)
5
2. Adding Stream and Trace
>>> stream2 = st1 + Trace()
>>> len(stream2)
4
"""
if isinstance(other, Trace):
other = Stream([other])
if not isinstance(other, Stream):
raise TypeError
traces = self.traces + other.traces
return self.__class__(traces=traces)
[docs]
def __iadd__(self, other):
"""
Add two streams with self += other.
It will extend the current Stream object with the traces of the given
Stream. Traces will not be copied but references to the original traces
will be appended.
:type other: :class:`~obspy.core.stream.Stream` or
:class:`~obspy.core.trace.Trace`
:param other: Stream or Trace object to add.
.. rubric:: Example
>>> stream = Stream([Trace(), Trace(), Trace()])
>>> len(stream)
3
>>> stream += Stream([Trace(), Trace()])
>>> len(stream)
5
>>> stream += Trace()
>>> len(stream)
6
"""
if isinstance(other, Trace):
other = Stream([other])
if not isinstance(other, Stream):
raise TypeError
self.extend(other.traces)
return self
[docs]
def __mul__(self, num):
"""
Create a new Stream containing num copies of this stream.
:rtype num: int
:param num: Number of copies.
:returns: New ObsPy Stream object.
.. rubric:: Example
>>> from obspy import read
>>> st = read()
>>> len(st)
3
>>> st2 = st * 5
>>> len(st2)
15
"""
if not isinstance(num, int):
raise TypeError("Integer expected")
from obspy import Stream
st = Stream()
for _i in range(num):
st += self.copy()
return st
[docs]
def __iter__(self):
"""
Return a robust iterator for stream.traces.
Doing this it is safe to remove traces from streams inside of
for-loops using stream's :meth:`~obspy.core.stream.Stream.remove`
method. Actually this creates a new iterator every time a trace is
removed inside the for-loop.
.. rubric:: Example
>>> from obspy import Stream
>>> st = Stream()
>>> for component in ["1", "Z", "2", "3", "Z", "N", "E", "4", "5"]:
... channel = "EH" + component
... tr = Trace(header={'station': 'TEST', 'channel': channel})
... st.append(tr) # doctest: +ELLIPSIS
<...Stream object at 0x...>
>>> print(st) # doctest: +ELLIPSIS
9 Trace(s) in Stream:
.TEST..EH1 | 1970-01-01T00:00:00.000000Z - ... | 1.0 Hz, 0 samples
.TEST..EHZ | 1970-01-01T00:00:00.000000Z - ... | 1.0 Hz, 0 samples
.TEST..EH2 | 1970-01-01T00:00:00.000000Z - ... | 1.0 Hz, 0 samples
.TEST..EH3 | 1970-01-01T00:00:00.000000Z - ... | 1.0 Hz, 0 samples
.TEST..EHZ | 1970-01-01T00:00:00.000000Z - ... | 1.0 Hz, 0 samples
.TEST..EHN | 1970-01-01T00:00:00.000000Z - ... | 1.0 Hz, 0 samples
.TEST..EHE | 1970-01-01T00:00:00.000000Z - ... | 1.0 Hz, 0 samples
.TEST..EH4 | 1970-01-01T00:00:00.000000Z - ... | 1.0 Hz, 0 samples
.TEST..EH5 | 1970-01-01T00:00:00.000000Z - ... | 1.0 Hz, 0 samples
>>> for tr in st:
... if tr.stats.channel[-1] not in ["Z", "N", "E"]:
... st.remove(tr) # doctest: +ELLIPSIS
<...Stream object at 0x...>
>>> print(st) # doctest: +ELLIPSIS
4 Trace(s) in Stream:
.TEST..EHZ | 1970-01-01T00:00:00.000000Z - ... | 1.0 Hz, 0 samples
.TEST..EHZ | 1970-01-01T00:00:00.000000Z - ... | 1.0 Hz, 0 samples
.TEST..EHN | 1970-01-01T00:00:00.000000Z - ... | 1.0 Hz, 0 samples
.TEST..EHE | 1970-01-01T00:00:00.000000Z - ... | 1.0 Hz, 0 samples
"""
return list(self.traces).__iter__()
[docs]
def __nonzero__(self):
"""
A Stream is considered zero if has no Traces.
"""
return bool(len(self.traces))
[docs]
def __len__(self):
"""
Return the number of Traces in the Stream object.
.. rubric:: Example
>>> stream = Stream([Trace(), Trace(), Trace()])
>>> len(stream)
3
"""
return len(self.traces)
count = __len__
[docs]
def __str__(self, extended=False):
"""
Return short summary string of the current stream.
It will contain the number of Traces in the Stream and the return value
of each Trace's :meth:`~obspy.core.trace.Trace.__str__` method.
:type extended: bool, optional
:param extended: This method will show only 20 traces by default.
Enable this option to show all entries.
.. rubric:: Example
>>> stream = Stream([Trace(), Trace()])
>>> print(stream) # doctest: +ELLIPSIS
2 Trace(s) in Stream:
...
"""
# get longest id
if self.traces:
id_length = self and max(len(tr.id) for tr in self) or 0
else:
id_length = 0
out = str(len(self.traces)) + ' Trace(s) in Stream:\n'
if len(self.traces) <= 20 or extended is True:
out = out + "\n".join([_i.__str__(id_length) for _i in self])
else:
out = out + "\n" + self.traces[0].__str__() + "\n" + \
'...\n(%i other traces)\n...\n' % (len(self.traces) - 2) + \
self.traces[-1].__str__() + '\n\n[Use "print(' + \
'Stream.__str__(extended=True))" to print all Traces]'
return out
[docs]
def _repr_pretty_(self, p, cycle):
p.text(self.__str__(extended=p.verbose))
[docs]
def __eq__(self, other):
"""
Implements rich comparison of Stream objects for "==" operator.
Trace order does not effect the comparison because the traces are
sorted beforehand.
This function strictly compares the data and stats objects of each
trace contained by the streams. If less strict behavior is desired,
which may be the case for testing, consider using the
:func:`~obspy.core.util.testing.streams_almost_equal` function.
:type other: :class:`~obspy.core.stream.Stream`
:param other: Stream object for comparison.
:rtype: bool
:return: ``True`` if both Streams contain the same traces, i.e. after a
sort operation going through both streams every trace should be
equal according to Trace's
:meth:`~obspy.core.trace.Trace.__eq__` operator.
.. rubric:: Example
>>> from obspy import read
>>> st = read()
>>> st2 = st.copy()
>>> st is st2
False
>>> st == st2
True
"""
if not isinstance(other, Stream):
return False
# this is maybe still not 100% satisfactory, the question here is if
# two streams should be the same in comparison if one of the streams
# has a duplicate trace. Using sets at the moment, two equal traces
# in one of the Streams would lead to two non-equal Streams.
# This is a bit more conservative and most likely the expected behavior
# in most cases.
self_sorted = self.select()
self_sorted.sort()
other_sorted = other.select()
other_sorted.sort()
if self_sorted.traces != other_sorted.traces:
return False
return True
[docs]
def __ne__(self, other):
"""
Implements rich comparison of Stream objects for "!=" operator.
.. rubric:: Example
>>> from obspy import read
>>> st = read()
>>> st2 = st.copy()
>>> st is st2
False
>>> st != st2
False
"""
# Calls __eq__() and returns the opposite.
return not self.__eq__(other)
[docs]
def __lt__(self, other):
"""
Too ambiguous, throw an Error.
"""
raise NotImplementedError("Too ambiguous, therefore not implemented.")
[docs]
def __le__(self, other):
"""
Too ambiguous, throw an Error.
"""
raise NotImplementedError("Too ambiguous, therefore not implemented.")
[docs]
def __gt__(self, other):
"""
Too ambiguous, throw an Error.
"""
raise NotImplementedError("Too ambiguous, therefore not implemented.")
[docs]
def __ge__(self, other):
"""
Too ambiguous, throw an Error.
"""
raise NotImplementedError("Too ambiguous, therefore not implemented.")
[docs]
def __setitem__(self, index, trace):
"""
__setitem__ method of obspy.Stream objects.
"""
self.traces.__setitem__(index, trace)
[docs]
def __getitem__(self, index):
"""
__getitem__ method of obspy.Stream objects.
:return: Trace objects
"""
if isinstance(index, slice):
return self.__class__(traces=self.traces.__getitem__(index))
else:
return self.traces.__getitem__(index)
[docs]
def __delitem__(self, index):
"""
Passes on the __delitem__ method to the underlying list of traces.
"""
return self.traces.__delitem__(index)
[docs]
def __getslice__(self, i, j, k=1):
"""
__getslice__ method of obspy.Stream objects.
:return: Stream object
"""
# see also https://docs.python.org/3/reference/datamodel.html
return self.__class__(traces=self.traces[max(0, i):max(0, j):k])
[docs]
def append(self, trace):
"""
Append a single Trace object to the current Stream object.
:param trace: :class:`~obspy.core.trace.Trace` object.
.. rubric:: Example
>>> from obspy import read, Trace
>>> st = read()
>>> tr = Trace()
>>> tr.stats.station = 'TEST'
>>> st.append(tr) # doctest: +ELLIPSIS
<...Stream object at 0x...>
>>> print(st) # doctest: +ELLIPSIS
4 Trace(s) in Stream:
BW.RJOB..EHZ | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
BW.RJOB..EHN | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
BW.RJOB..EHE | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
.TEST.. | 1970-01-01T00:00:00.000000Z ... | 1.0 Hz, 0 samples
"""
if isinstance(trace, Trace):
self.traces.append(trace)
else:
msg = 'Append only supports a single Trace object as an argument.'
raise TypeError(msg)
return self
[docs]
def extend(self, trace_list):
"""
Extend the current Stream object with a list of Trace objects.
:param trace_list: list of :class:`~obspy.core.trace.Trace` objects or
:class:`~obspy.core.stream.Stream`.
.. rubric:: Example
>>> from obspy import read, Trace
>>> st = read()
>>> tr1 = Trace()
>>> tr1.stats.station = 'TEST1'
>>> tr2 = Trace()
>>> tr2.stats.station = 'TEST2'
>>> st.extend([tr1, tr2]) # doctest: +ELLIPSIS
<...Stream object at 0x...>
>>> print(st) # doctest: +ELLIPSIS
5 Trace(s) in Stream:
BW.RJOB..EHZ | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
BW.RJOB..EHN | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
BW.RJOB..EHE | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
.TEST1.. | 1970-01-01T00:00:00.000000Z ... | 1.0 Hz, 0 samples
.TEST2.. | 1970-01-01T00:00:00.000000Z ... | 1.0 Hz, 0 samples
"""
if isinstance(trace_list, list):
for _i in trace_list:
# Make sure each item in the list is a trace.
if not isinstance(_i, Trace):
msg = 'Extend only accepts a list of Trace objects.'
raise TypeError(msg)
self.traces.extend(trace_list)
elif isinstance(trace_list, Stream):
self.traces.extend(trace_list.traces)
else:
msg = 'Extend only supports a list of Trace objects as argument.'
raise TypeError(msg)
return self
[docs]
def get_gaps(self, min_gap=None, max_gap=None):
"""
Determine all trace gaps/overlaps of the Stream object.
:param min_gap: All gaps smaller than this value will be omitted. The
value is assumed to be in seconds. Defaults to None.
:param max_gap: All gaps larger than this value will be omitted. The
value is assumed to be in seconds. Defaults to None.
The returned list contains one item in the following form for each gap/
overlap: [network, station, location, channel, starttime of the gap,
end time of the gap, duration of the gap, number of missing samples]
Please be aware that no sorting and checking of stations, channels, ...
is done. This method only compares the start and end times of the
Traces and the start and end times of segments within Traces that
contain masked arrays (i.e., Traces that were merged without a fill
value).
.. rubric:: Example
Our example stream has no gaps:
>>> from obspy import read, UTCDateTime
>>> st = read()
>>> st.get_gaps()
[]
>>> st.print_gaps() # doctest: +ELLIPSIS
Source Last Sample ...
Total: 0 gap(s) and 0 overlap(s)
So let's make a copy of the first trace and cut both so that we end up
with a gappy stream:
>>> tr = st[0].copy()
>>> t = UTCDateTime("2009-08-24T00:20:13.0")
>>> st[0].trim(endtime=t) # doctest: +ELLIPSIS
<...Trace object at 0x...>
>>> tr.trim(starttime=t + 1) # doctest: +ELLIPSIS
<...Trace object at 0x...>
>>> st.append(tr) # doctest: +ELLIPSIS
<...Stream object at 0x...>
>>> st.get_gaps()[0] # doctest: +SKIP
[['BW', 'RJOB', '', 'EHZ', UTCDateTime(2009, 8, 24, 0, 20, 13),
UTCDateTime(2009, 8, 24, 0, 20, 14), 1.0, 99]]
>>> st.print_gaps() # doctest: +ELLIPSIS
Source Last Sample ...
BW.RJOB..EHZ 2009-08-24T00:20:13.000000Z ...
Total: 1 gap(s) and 0 overlap(s)
"""
# Create shallow copy of the traces to be able to sort them later on.
copied_traces = copy.copy(self.traces)
self.sort()
gap_list = []
for _i in range(len(self.traces)):
# if the trace is masked, break it up and run get_gaps on the
# resulting stream
if isinstance(self.traces[_i].data, np.ma.masked_array):
gap_list.extend(self.traces[_i].split().get_gaps())
if _i + 1 == len(self.traces):
# reached the last trace
break
# skip traces with different network, station, location or channel
if self.traces[_i].id != self.traces[_i + 1].id:
continue
# different sampling rates should always result in a gap or overlap
if self.traces[_i].stats.delta == self.traces[_i + 1].stats.delta:
same_sampling_rate = True
else:
same_sampling_rate = False
stats = self.traces[_i].stats
stime = min(stats['endtime'], self.traces[_i + 1].stats['endtime'])
etime = self.traces[_i + 1].stats['starttime']
# last sample of earlier trace represents data up to time of last
# sample (stats.endtime) plus one delta
delta = etime.timestamp - (stime.timestamp + stats.delta)
# Check that any overlap is not larger than the trace coverage
if delta < 0:
temp = self.traces[_i + 1].stats['endtime'].timestamp - \
etime.timestamp
if (delta * -1) > temp:
delta = -1 * temp
# Check gap/overlap criteria
if min_gap and delta < min_gap:
continue
if max_gap and delta > max_gap:
continue
# Number of missing samples
nsamples = int(compatibility.round_away(math.fabs(delta) *
stats['sampling_rate']))
if delta < 0:
nsamples = -nsamples
# skip if is equal to delta (1 / sampling rate)
if same_sampling_rate and nsamples == 0:
continue
# check if gap is already covered in trace before:
covered = False
# only need to check previous traces because the traces are sorted
for prev_trace in self.traces[:_i]:
prev_stats = prev_trace.stats
# look if trace is contained in other trace
prev_start = prev_stats['starttime']
prev_end = prev_stats['endtime']
if not (prev_start < stime < etime < prev_end):
continue
# don't look in traces of other measurements
elif self.traces[_i].id != prev_trace.id:
continue
else:
covered = True
break
if covered:
continue
gap_list.append([stats['network'], stats['station'],
stats['location'], stats['channel'],
stime, etime, delta, nsamples])
# Set the original traces to not alter the stream object.
self.traces = copied_traces
return gap_list
[docs]
def insert(self, position, object):
"""
Insert either a single Trace or a list of Traces before index.
:param position: The Trace will be inserted at position.
:param object: Single Trace object or list of Trace objects.
"""
if isinstance(object, Trace):
self.traces.insert(position, object)
elif isinstance(object, list):
# Make sure each item in the list is a trace.
for _i in object:
if not isinstance(_i, Trace):
msg = 'Trace object or a list of Trace objects expected!'
raise TypeError(msg)
# Insert each item of the list.
for _i in range(len(object)):
self.traces.insert(position + _i, object[_i])
elif isinstance(object, Stream):
self.insert(position, object.traces)
else:
msg = 'Only accepts a Trace object or a list of Trace objects.'
raise TypeError(msg)
return self
[docs]
def plot(self, *args, **kwargs):
"""
Create a waveform plot of the current ObsPy Stream object.
:param outfile: Output file string. Also used to automatically
determine the output format. Supported file formats depend on your
matplotlib backend. Most backends support png, pdf, ps, eps and
svg. Defaults to ``None``.
:param format: Format of the graph picture. If no format is given the
outfile parameter will be used to try to automatically determine
the output format. If no format is found it defaults to png output.
If no outfile is specified but a format is, than a binary
imagestring will be returned.
Defaults to ``None``.
:param starttime: Start time of the graph as a
:class:`~obspy.core.utcdatetime.UTCDateTime` object. If not set
the graph will be plotted from the beginning.
Defaults to ``None``.
:param endtime: End time of the graph as a
:class:`~obspy.core.utcdatetime.UTCDateTime` object. If not set
the graph will be plotted until the end.
Defaults to ``None``.
:param fig: Use an existing matplotlib figure instance.
Defaults to ``None``.
:param automerge: If automerge is True, Traces with the same id will be
merged.
Defaults to ``True``.
:param size: Size tuple in pixel for the output file. This corresponds
to the resolution of the graph for vector formats.
Defaults to ``(800, 250)`` pixel per channel for ``type='normal'``
or ``type='relative'``, ``(800, 600)`` for ``type='dayplot'``, and
``(1000, 600)`` for ``type='section'``.
:param dpi: Dots per inch of the output file. This also affects the
size of most elements in the graph (text, linewidth, ...).
Defaults to ``100``.
:param color: Color of the graph as a matplotlib color string as
described below. If ``type='dayplot'`` a list/tuple of color
strings is expected that will be periodically repeated for each
line plotted. If ``type='section'`` then the values ``'network'``,
``'station'`` or ``'channel'`` are also accepted, and traces will
be uniquely colored by the given information.
Defaults to ``'black'`` or to ``('#B2000F', '#004C12', '#847200',
'#0E01FF')`` for ``type='dayplot'``.
:param bgcolor: Background color of the graph.
Defaults to ``'white'``.
:param face_color: Face color of the matplotlib canvas.
Defaults to ``'white'``.
:param transparent: Make all backgrounds transparent (True/False). This
will override the ``bgcolor`` and ``face_color`` arguments.
Defaults to ``False``.
:param tick_format: The way the time axis is formatted.
Defaults to ``'%H:%M:%S'`` or ``'%.2f'`` if ``type='relative'``.
:param tick_rotation: Tick rotation in degrees.
Defaults to ``0``.
:param handle: Whether or not to return the matplotlib figure instance
after the plot has been created.
Defaults to ``False``.
:param method: By default, all traces with more than 400,000 samples
will be plotted with a fast method that cannot be zoomed.
Setting this argument to ``'full'`` will straight up plot the data.
This results in a potentially worse performance but the interactive
matplotlib view can be used properly.
Defaults to 'fast'.
:param type: Type may be set to either: ``'normal'`` to produce the
standard plot; ``'dayplot'`` to create a one-day plot for a single
Trace; ``'relative'`` to convert all date/time information to a
relative scale starting the seismogram at 0 seconds; ``'section'``
to plot all seismograms in a single coordinate system shifted
according to their distance from a reference point. Defaults to
``'normal'``.
:param equal_scale: If enabled all plots are equally scaled.
Defaults to ``True``.
:param show: If True, show the plot interactively after plotting. This
is ignored if any of ``outfile``, ``format``, ``handle``, or
``fig`` are specified.
Defaults to ``True``.
:param draw: If True, the figure canvas is explicitly re-drawn, which
ensures that *existing* figures are fresh. It makes no difference
for figures that are not yet visible.
Defaults to ``True``.
:param block: If True block call to showing plot. Only works if the
active matplotlib backend supports it.
Defaults to ``True``.
:param linewidth: Float value in points of the line width.
Defaults to ``1.0``.
:param linestyle: Line style.
Defaults to ``'-'``
:param grid_color: Color of the grid.
Defaults to ``'black'``.
:param grid_linewidth: Float value in points of the grid line width.
Defaults to ``0.5``.
:param grid_linestyle: Grid line style.
Defaults to ``':'``
**Dayplot Parameters**
The following parameters are only available if ``type='dayplot'`` is
set.
:param vertical_scaling_range: Determines how each line is scaled in
its given space. Every line will be centered around its mean value
and then clamped to fit its given space. This argument is the range
in data units that will be used to clamp the data. If the range is
smaller than the actual range, the lines' data may overshoot to
other lines which is usually a desired effect. Larger ranges will
result in a vertical padding.
If ``0``, the actual range of the data will be used and no
overshooting or additional padding will occur.
If ``None`` the range will be chosen to be the 99.5-percentile of
the actual range - so some values will overshoot.
Defaults to ``None``.
:param interval: This defines the interval length in minutes for one
line.
Defaults to ``15``.
:param time_offset: Only used if ``type='dayplot'``. The difference
between the timezone of the data (specified with the kwarg
``timezone``) and UTC time in hours. Will be displayed in a string.
Defaults to the current offset of the system time to UTC time.
:param timezone: Defines the name of the user defined time scale. Will
be displayed in a string together with the actual offset defined in
the kwarg ``time_offset``.
Defaults to ``'local time'``.
:param localization_dict: Enables limited localization of the dayplot
through the usage of a dictionary. To change the labels to, e.g.
German, use the following::
localization_dict={'time in': 'Zeit in', 'seconds': 'Sekunden',
'minutes': 'Minuten', 'hours': 'Stunden'}
:param data_unit: If given, the scale of the data will be drawn on the
right hand side in the form ``"%f {data_unit}"``. The unit is
supposed to be a string containing the actual unit of the data. Can
be a LaTeX expression if matplotlib has been built with LaTeX
support, e.g., ``"$\\\\frac{m}{s}$"``. Be careful to escape the
backslashes, or use r-prefixed strings, e.g.,
``r"$\\\\frac{m}{s}$"``.
Defaults to ``None``, meaning no scale is drawn.
:param number_of_ticks: The number of ticks on the x-axis.
Defaults to ``4``.
:param events: An optional list of events can be drawn on the plot if
given. They will be displayed as yellow stars with optional
annotations. They are given as a list of dictionaries. Each
dictionary at least needs to have a "time" key, containing a
UTCDateTime object with the origin time of the event. Furthermore
every event can have an optional "text" key which will then be
displayed as an annotation.
Example::
events=[{"time": UTCDateTime(...), "text": "Event A"}, {...}]
It can also be a :class:`~obspy.core.event.Catalog` object. In this
case each event will be annotated with its corresponding
Flinn-Engdahl region and the magnitude.
Events can also be automatically downloaded with the help of
obspy.clients.fdsn. Just pass a dictionary with a "min_magnitude"
key, e.g. ::
events={"min_magnitude": 5.5}
Defaults to ``[]``.
:param x_labels_size: Size of x labels in points or fontsize.
Defaults to ``8``.
:param y_labels_size: Size of y labels in points or fontsize.
Defaults to ``8``.
:param title_size: Size of the title in points or fontsize.
Defaults to ``10``.
:param subplots_adjust_left: The left side of the subplots of the
figure in fraction of the figure width.
Defaults to ``0.12``.
:param subplots_adjust_right: The right side of the subplots of the
figure in fraction of the figure width.
Defaults to ``0.88``.
:param subplots_adjust_top: The top side of the subplots of the figure
in fraction of the figure width.
Defaults to ``0.95``.
:param subplots_adjust_bottom: The bottom side of the subplots of the
figure in fraction of the figure width.
Defaults to ``0.1``.
:param right_vertical_labels: Whether or not to display labels on the
right side of the dayplot.
Defaults to ``False``.
:param one_tick_per_line: Whether or not to display one tick per line.
Defaults to ``False``.
:param show_y_UTC_label: Whether or not to display the Y UTC vertical
label.
Defaults to ``True``.
:param title: The title to display on top of the plot.
Defaults to ``self.stream[0].id``.
**Section Parameters**
These parameters are only available if ``type='section'`` is set. To
plot a record section the ObsPy header ``trace.stats.distance`` must be
defined in meters (Default). Or ``trace.stats.coordinates.latitude`` &
``trace.stats.coordinates.longitude`` must be set if plotted in
azimuthal distances (``dist_degree=True``) along with ``ev_coord``.
:type scale: float, optional
:param scale: Scale the traces width with this factor.
Defaults to ``1.0``.
:type vred: float, optional
:param vred: Perform velocity reduction, in m/s.
:type norm_method: str, optional
:param norm_method: Defines how the traces are normalized, either
against each ``trace`` or against the global maximum ``stream``.
Defaults to ``trace``.
:type offset_min: float or None, optional
:param offset_min: Minimum offset in meters to plot.
Defaults to minimum offset of all traces.
:type offset_max: float or None, optional
:param offset_max: Maximum offset in meters to plot.
Defaults to maximum offset of all traces.
:type dist_degree: bool, optional
:param dist_degree: Plot trace distance in degree from epicenter. If
``True``, parameter ``ev_coord`` has to be defined.
Defaults to ``False``.
:type ev_coord: tuple or None, optional
:param ev_coord: Event's coordinates as tuple
``(latitude, longitude)``.
:type plot_dx: int, optional
:param plot_dx: Spacing of ticks on the spatial x-axis.
Either m or degree, depending on ``dist_degree``.
:type recordstart: int or float, optional
:param recordstart: Seconds to crop from the beginning.
:type recordlength: int or float, optional
:param recordlength: Length of the record section in seconds.
:type alpha: float, optional
:param alpha: Transparency of the traces between 0.0 - 1.0.
Defaults to ``0.5``.
:type time_down: bool, optional
:param time_down: Flip the plot horizontally, time goes down.
Defaults to ``False``, i.e., time goes up.
:type reftime: :class:`~obspy.core.utcdatetime.UTCDateTime`, optional
:param reftime: The reference time to which the time scale will refer.
Defaults to the minimum start time of the visible traces.
:type orientation: str, optional
:param orientation: The orientation of the time axis, either
``'vertical'`` or ``'horizontal'``. Defaults to ``'vertical'``.
:type fillcolors: tuple, optional
:param fillcolors: Fill the inside of the lines (wiggle plot),
for both the positive and negative sides; use ``None`` to omit
one of the sides. Defaults to ``(None,None)``.
**Relative Parameters**
The following parameters are only available if ``type='relative'`` is
set.
:type reftime: :class:`~obspy.core.utcdatetime.UTCDateTime`, optional
:param reftime: The reference time to which the relative scale will
refer.
Defaults to ``starttime``.
.. rubric:: Color Options
Colors can be specified as defined in the :mod:`matplotlib.colors`
documentation.
Short Version: For all color values, you can either use:
* legal `HTML color names <https://www.w3.org/TR/css3-color/#html4>`_,
e.g. ``'blue'``,
* HTML hex strings, e.g. ``'#EE00FF'``,
* pass an string of a R, G, B tuple, where each of the components is a
float value in the range of 0 to 1, e.g. ``'(1, 0.25, 0.5)'``, or
* use single letters for the basic built-in colors, such as ``'b'``
(blue), ``'g'`` (green), ``'r'`` (red), ``'c'`` (cyan), ``'m'``
(magenta), ``'y'`` (yellow), ``'k'`` (black), ``'w'`` (white).
.. rubric:: Example
>>> from obspy import read
>>> st = read()
>>> st.plot() # doctest: +SKIP
.. plot::
from obspy import read
st = read()
st.plot()
"""
from obspy.imaging.waveform import WaveformPlotting
waveform = WaveformPlotting(stream=self, *args, **kwargs)
return waveform.plot_waveform(*args, **kwargs)
[docs]
def spectrogram(self, **kwargs):
"""
Create a spectrogram plot for each trace in the stream.
For details on kwargs that can be used to customize the spectrogram
plot see :func:`obspy.imaging.spectrogram.spectrogram`.
.. rubric:: Example
>>> from obspy import read
>>> st = read()
>>> st[0].spectrogram() # doctest: +SKIP
.. plot::
from obspy import read
st = read()
st[0].spectrogram()
"""
spec_list = []
for tr in self:
spec = tr.spectrogram(**kwargs)
spec_list.append(spec)
return spec_list
[docs]
def pop(self, index=(-1)):
"""
Remove and return the Trace object specified by index from the Stream.
If no index is given, remove the last Trace. Passes on the pop() to
self.traces.
:param index: Index of the Trace object to be returned and removed.
:returns: Removed Trace.
.. rubric:: Example
>>> from obspy import read
>>> st = read()
>>> print(st) # doctest: +ELLIPSIS
3 Trace(s) in Stream:
BW.RJOB..EHZ | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
BW.RJOB..EHN | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
BW.RJOB..EHE | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
>>> tr = st.pop()
>>> print(st) # doctest: +ELLIPSIS
2 Trace(s) in Stream:
BW.RJOB..EHZ | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
BW.RJOB..EHN | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
>>> print(tr) # doctest: +ELLIPSIS
BW.RJOB..EHE | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
"""
return self.traces.pop(index)
[docs]
def print_gaps(self, min_gap=None, max_gap=None):
"""
Print gap/overlap list summary information of the Stream object.
:param min_gap: All gaps smaller than this value will be omitted. The
value is assumed to be in seconds. Defaults to None.
:param max_gap: All gaps larger than this value will be omitted. The
value is assumed to be in seconds. Defaults to None.
.. rubric:: Example
Our example stream has no gaps:
>>> from obspy import read, UTCDateTime
>>> st = read()
>>> st.get_gaps()
[]
>>> st.print_gaps() # doctest: +ELLIPSIS
Source Last Sample Next Sample ...
Total: 0 gap(s) and 0 overlap(s)
So let's make a copy of the first trace and cut both so that we end up
with a gappy stream:
>>> tr = st[0].copy()
>>> t = UTCDateTime("2009-08-24T00:20:13.0")
>>> st[0].trim(endtime=t) # doctest: +ELLIPSIS
<...Trace object at 0x...>
>>> tr.trim(starttime=t+1) # doctest: +ELLIPSIS
<...Trace object at 0x...>
>>> st.append(tr) # doctest: +ELLIPSIS
<...Stream object at 0x...>
>>> st.get_gaps() # doctest: +ELLIPSIS
[[..., UTCDateTime(2009, 8, 24, 0, 20, 13), ...
>>> st.print_gaps() # doctest: +ELLIPSIS
Source Last Sample ...
BW.RJOB..EHZ 2009-08-24T00:20:13.000000Z ...
Total: 1 gap(s) and 0 overlap(s)
And finally let us create some overlapping traces:
>>> st = read()
>>> tr = st[0].copy()
>>> t = UTCDateTime("2009-08-24T00:20:13.0")
>>> st[0].trim(endtime=t) # doctest: +ELLIPSIS
<...Trace object at 0x...>
>>> tr.trim(starttime=t-1) # doctest: +ELLIPSIS
<...Trace object at 0x...>
>>> st.append(tr) # doctest: +ELLIPSIS
<...Stream object at 0x...>
>>> st.get_gaps() # doctest: +ELLIPSIS
[[...'EHZ', UTCDateTime(2009, 8, 24, 0, 20, 13), ...
>>> st.print_gaps() # doctest: +ELLIPSIS
Source Last Sample ...
BW.RJOB..EHZ 2009-08-24T00:20:13.000000Z ...
Total: 0 gap(s) and 1 overlap(s)
"""
result = self.get_gaps(min_gap, max_gap)
print("%-17s %-27s %-27s %-15s %-8s" % ('Source', 'Last Sample',
'Next Sample', 'Delta',
'Samples'))
gaps = 0
overlaps = 0
for r in result:
if r[6] > 0:
gaps += 1
else:
overlaps += 1
print("%-17s %-27s %-27s %-15.6f %-8d" % ('.'.join(r[0:4]),
r[4], r[5], r[6], r[7]))
print("Total: %d gap(s) and %d overlap(s)" % (gaps, overlaps))
[docs]
def remove(self, trace):
"""
Remove the first occurrence of the specified Trace object in the
Stream object. Passes on the remove() call to self.traces.
:param trace: Trace object to be removed from Stream.
.. rubric:: Example
This example shows how to delete all "E" component traces in a stream:
>>> from obspy import read
>>> st = read()
>>> print(st) # doctest: +ELLIPSIS
3 Trace(s) in Stream:
BW.RJOB..EHZ | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
BW.RJOB..EHN | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
BW.RJOB..EHE | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
>>> for tr in st.select(component="E"):
... st.remove(tr) # doctest: +ELLIPSIS
<...Stream object at 0x...>
>>> print(st) # doctest: +ELLIPSIS
2 Trace(s) in Stream:
BW.RJOB..EHZ | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
BW.RJOB..EHN | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
"""
self.traces.remove(trace)
return self
[docs]
def reverse(self):
"""
Reverse the Traces of the Stream object in place.
.. rubric:: Example
>>> from obspy import read
>>> st = read()
>>> print(st) # doctest: +ELLIPSIS
3 Trace(s) in Stream:
BW.RJOB..EHZ | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
BW.RJOB..EHN | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
BW.RJOB..EHE | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
>>> st.reverse() # doctest: +ELLIPSIS
<...Stream object at 0x...>
>>> print(st) # doctest: +ELLIPSIS
3 Trace(s) in Stream:
BW.RJOB..EHE | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
BW.RJOB..EHN | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
BW.RJOB..EHZ | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
"""
self.traces.reverse()
return self
[docs]
def sort(self, keys=['network', 'station', 'location', 'channel',
'starttime', 'endtime'], reverse=False):
"""
Sort the traces in the Stream object.
The traces will be sorted according to the keys list. It will be sorted
by the first item first, then by the second and so on. It will always
be sorted from low to high and from A to Z.
:type keys: list, optional
:param keys: List containing the values according to which the traces
will be sorted. They will be sorted by the first item first and
then by the second item and so on.
Always available items: 'network', 'station', 'channel',
'location', 'starttime', 'endtime', 'sampling_rate', 'npts',
'dataquality'
Defaults to ['network', 'station', 'location', 'channel',
'starttime', 'endtime'].
:type reverse: bool
:param reverse: Reverts sorting order to descending.
.. rubric:: Example
>>> from obspy import read
>>> st = read()
>>> print(st) # doctest: +ELLIPSIS
3 Trace(s) in Stream:
BW.RJOB..EHZ | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
BW.RJOB..EHN | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
BW.RJOB..EHE | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
>>> st.sort() # doctest: +ELLIPSIS
<...Stream object at 0x...>
>>> print(st) # doctest: +ELLIPSIS
3 Trace(s) in Stream:
BW.RJOB..EHE | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
BW.RJOB..EHN | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
BW.RJOB..EHZ | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
"""
# check if list
msg = "keys must be a list of strings. Always available items to " + \
"sort after: \n'network', 'station', 'channel', 'location', " + \
"'starttime', 'endtime', 'sampling_rate', 'npts', 'dataquality'"
if not isinstance(keys, list):
raise TypeError(msg)
# Loop over all keys in reversed order.
for _i in keys[::-1]:
self.traces.sort(key=lambda x: x.stats[_i], reverse=reverse)
return self
[docs]
def write(self, filename, format=None, **kwargs):
"""
Save stream into a file.
:type filename: str
:param filename: The name of the file to write.
:type format: str, optional
:param format: The file format to use (e.g. ``"MSEED"``). See
the `Supported Formats`_ section below for a list of supported
formats. If format is set to ``None`` it will be deduced from
file extension, whenever possible.
:param kwargs: Additional keyword arguments passed to the underlying
waveform writer method.
.. rubric:: Example
>>> from obspy import read
>>> st = read() # doctest: +SKIP
>>> st.write("example.mseed", format="MSEED") # doctest: +SKIP
The ``format`` argument can be omitted, and the file format will be
deduced from file extension, whenever possible.
>>> st.write("example.mseed") # doctest: +SKIP
Writing single traces into files with meaningful filenames can be done
e.g. using trace.id
>>> for tr in st: #doctest: +SKIP
... tr.write(tr.id + ".MSEED", format="MSEED") #doctest: +SKIP
.. rubric:: _`Supported Formats`
Additional ObsPy modules extend the parameters of the
:meth:`~obspy.core.stream.Stream.write` method. The following
table summarizes all known formats currently available for ObsPy.
Please refer to the `Linked Function Call`_ of each module for any
extra options available.
%s
"""
if not self.traces:
msg = 'Can not write empty stream to file.'
raise ObsPyException(msg)
# Check all traces for masked arrays and raise exception.
for trace in self.traces:
if isinstance(trace.data, np.ma.masked_array):
msg = 'Masked array writing is not supported. You can use ' + \
'np.array.filled() to convert the masked array to a ' + \
'normal array.'
raise NotImplementedError(msg)
if format is None:
# try to guess format from file extension
format = Path(filename).suffix
format = format[1:]
format = format.upper()
try:
# get format specific entry point
format_ep = ENTRY_POINTS['waveform_write'][format]
# search writeFormat method for given entry point
write_format = buffered_load_entry_point(
format_ep.dist.key,
'obspy.plugin.waveform.%s' % (format_ep.name), 'writeFormat')
except (IndexError, ImportError, KeyError):
msg = "Writing format \"%s\" is not supported. Supported types: %s"
raise ValueError(msg % (format,
', '.join(ENTRY_POINTS['waveform_write'])))
write_format(self, filename, **kwargs)
[docs]
def trim(self, starttime=None, endtime=None, pad=False,
keep_empty_traces=False, nearest_sample=True, fill_value=None):
"""
Cut all traces of this Stream object to given start and end time.
:type starttime: :class:`~obspy.core.utcdatetime.UTCDateTime`, optional
:param starttime: Specify the start time.
:type endtime: :class:`~obspy.core.utcdatetime.UTCDateTime`, optional
:param endtime: Specify the end time.
:type pad: bool, optional
:param pad: Gives the possibility to trim at time points outside the
time frame of the original trace, filling the trace with the
given ``fill_value``. Defaults to ``False``.
:type keep_empty_traces: bool, optional
:param keep_empty_traces: Empty traces will be kept if set to ``True``.
Defaults to ``False``.
:type nearest_sample: bool, optional
:param nearest_sample: If set to ``True``, the closest sample is
selected, if set to ``False``, the inner (next sample for a
start time border, previous sample for an end time border) sample
containing the time is selected. Defaults to ``True``.
Given the following trace containing 6 samples, "|" are the
sample points, "A" is the requested starttime::
| |A | | B | |
1 2 3 4 5 6
``nearest_sample=True`` will select samples 2-5,
``nearest_sample=False`` will select samples 3-4 only.
:type fill_value: int, float or ``None``, optional
:param fill_value: Fill value for gaps. Defaults to ``None``. Traces
will be converted to NumPy masked arrays if no value is given and
gaps are present.
.. note::
This operation is performed in place on the actual data arrays. The
raw data will no longer be accessible afterwards. To keep your
original data, use :meth:`~obspy.core.stream.Stream.copy` to create
a copy of your stream object.
.. rubric:: Example
>>> st = read()
>>> print(st) # doctest: +ELLIPSIS
3 Trace(s) in Stream:
BW.RJOB..EHZ | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
BW.RJOB..EHN | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
BW.RJOB..EHE | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
>>> dt = UTCDateTime("2009-08-24T00:20:20")
>>> st.trim(dt, dt + 5) # doctest: +ELLIPSIS
<...Stream object at 0x...>
>>> print(st) # doctest: +ELLIPSIS
3 Trace(s) in Stream:
BW.RJOB..EHZ | 2009-08-24T00:20:20.000000Z ... | 100.0 Hz, 501 samples
BW.RJOB..EHN | 2009-08-24T00:20:20.000000Z ... | 100.0 Hz, 501 samples
BW.RJOB..EHE | 2009-08-24T00:20:20.000000Z ... | 100.0 Hz, 501 samples
"""
if not self:
return self
# select start/end time fitting to a sample point of the first trace
if nearest_sample:
tr = self.traces[0]
try:
if starttime is not None:
delta = compatibility.round_away(
(starttime - tr.stats.starttime) *
tr.stats.sampling_rate)
starttime = tr.stats.starttime + delta * tr.stats.delta
if endtime is not None:
delta = compatibility.round_away(
(endtime - tr.stats.endtime) * tr.stats.sampling_rate)
# delta is negative!
endtime = tr.stats.endtime + delta * tr.stats.delta
except TypeError:
msg = ('starttime and endtime must be UTCDateTime objects '
'or None for this call to Stream.trim()')
raise TypeError(msg)
for trace in self.traces:
trace.trim(starttime, endtime, pad=pad,
nearest_sample=nearest_sample, fill_value=fill_value)
if not keep_empty_traces:
# remove empty traces after trimming
self.traces = [_i for _i in self.traces if _i.stats.npts]
return self
[docs]
def _ltrim(self, starttime, pad=False, nearest_sample=True):
"""
Cut all traces of this Stream object to given start time.
For more info see :meth:`~obspy.core.trace.Trace._ltrim`.
"""
for trace in self.traces:
trace.trim(starttime=starttime, pad=pad,
nearest_sample=nearest_sample)
# remove empty traces after trimming
self.traces = [tr for tr in self.traces if tr.stats.npts]
return self
[docs]
def _rtrim(self, endtime, pad=False, nearest_sample=True):
"""
Cut all traces of this Stream object to given end time.
For more info see :meth:`~obspy.core.trace.Trace._rtrim`.
"""
for trace in self.traces:
trace.trim(endtime=endtime, pad=pad, nearest_sample=nearest_sample)
# remove empty traces after trimming
self.traces = [tr for tr in self.traces if tr.stats.npts]
return self
[docs]
def cutout(self, starttime, endtime):
"""
Cut the given time range out of all traces of this Stream object.
:type starttime: :class:`~obspy.core.utcdatetime.UTCDateTime`
:param starttime: Start of time span to remove from stream.
:type endtime: :class:`~obspy.core.utcdatetime.UTCDateTime`
:param endtime: End of time span to remove from stream.
.. rubric:: Example
>>> st = read()
>>> print(st) # doctest: +ELLIPSIS
3 Trace(s) in Stream:
BW.RJOB..EHZ | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
BW.RJOB..EHN | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
BW.RJOB..EHE | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
>>> t1 = UTCDateTime("2009-08-24T00:20:06")
>>> t2 = UTCDateTime("2009-08-24T00:20:11")
>>> st.cutout(t1, t2) # doctest: +ELLIPSIS
<...Stream object at 0x...>
>>> print(st) # doctest: +ELLIPSIS
6 Trace(s) in Stream:
BW.RJOB..EHZ | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 301 samples
BW.RJOB..EHN | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 301 samples
BW.RJOB..EHE | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 301 samples
BW.RJOB..EHZ | 2009-08-24T00:20:11.000000Z ... | 100.0 Hz, 2200 samples
BW.RJOB..EHN | 2009-08-24T00:20:11.000000Z ... | 100.0 Hz, 2200 samples
BW.RJOB..EHE | 2009-08-24T00:20:11.000000Z ... | 100.0 Hz, 2200 samples
"""
tmp = self.slice(endtime=starttime, keep_empty_traces=False)
tmp += self.slice(starttime=endtime, keep_empty_traces=False)
self.traces = tmp.traces
return self
[docs]
def slice(self, starttime=None, endtime=None, keep_empty_traces=False,
nearest_sample=True):
"""
Return new Stream object cut to the given start and end time.
:type starttime: :class:`~obspy.core.utcdatetime.UTCDateTime`
:param starttime: Specify the start time of all traces.
:type endtime: :class:`~obspy.core.utcdatetime.UTCDateTime`
:param endtime: Specify the end time of all traces.
:type keep_empty_traces: bool, optional
:param keep_empty_traces: Empty traces will be kept if set to ``True``.
Defaults to ``False``.
:type nearest_sample: bool, optional
:param nearest_sample: If set to ``True``, the closest sample is
selected, if set to ``False``, the inner (next sample for a
start time border, previous sample for an end time border) sample
containing the time is selected. Defaults to ``True``.
Given the following trace containing 6 samples, "|" are the
sample points, "A" is the requested starttime::
| |A | | B | |
1 2 3 4 5 6
``nearest_sample=True`` will select samples 2-5,
``nearest_sample=False`` will select samples 3-4 only.
:return: :class:`~obspy.core.stream.Stream`
.. note::
The basic idea of :meth:`~obspy.core.stream.Stream.slice`
is to avoid copying the sample data in memory. So sample data in
the resulting :class:`~obspy.core.stream.Stream` object contains
only a reference to the original traces.
.. rubric:: Example
>>> st = read()
>>> print(st) # doctest: +ELLIPSIS
3 Trace(s) in Stream:
BW.RJOB..EHZ | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
BW.RJOB..EHN | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
BW.RJOB..EHE | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
>>> dt = UTCDateTime("2009-08-24T00:20:20")
>>> st = st.slice(dt, dt + 5)
>>> print(st) # doctest: +ELLIPSIS
3 Trace(s) in Stream:
BW.RJOB..EHZ | 2009-08-24T00:20:20.000000Z ... | 100.0 Hz, 501 samples
BW.RJOB..EHN | 2009-08-24T00:20:20.000000Z ... | 100.0 Hz, 501 samples
BW.RJOB..EHE | 2009-08-24T00:20:20.000000Z ... | 100.0 Hz, 501 samples
"""
if not self:
return copy.copy(self)
# select start/end time fitting to a sample point of the first trace
if nearest_sample:
tr = self.traces[0]
try:
if starttime is not None:
delta = compatibility.round_away(
(starttime - tr.stats.starttime) *
tr.stats.sampling_rate)
starttime = tr.stats.starttime + delta * tr.stats.delta
if endtime is not None:
delta = compatibility.round_away(
(endtime - tr.stats.endtime) * tr.stats.sampling_rate)
# delta is negative!
endtime = tr.stats.endtime + delta * tr.stats.delta
except TypeError:
msg = ('starttime and endtime must be UTCDateTime objects '
'or None for this call to Stream.slice()')
raise TypeError(msg)
tmp = copy.copy(self)
tmp.traces = []
new = tmp.copy()
for trace in self:
sliced_trace = trace.slice(starttime=starttime, endtime=endtime,
nearest_sample=nearest_sample)
if keep_empty_traces is False and not sliced_trace.stats.npts:
continue
new.append(sliced_trace)
return new
[docs]
def slide(self, window_length, step, offset=0,
include_partial_windows=False, nearest_sample=True):
"""
Generator yielding equal length sliding windows of the Stream.
Please keep in mind that it only returns a new view of the original
data. Any modifications are applied to the original data as well. If
you don't want this you have to create a copy of the yielded
windows. Also be aware that if you modify the original data and you
have overlapping windows, all following windows are affected as well.
Not all yielded windows must have the same number of traces. The
algorithm will determine the maximal temporal extents by analysing
all Traces and then creates windows based on these times.
.. rubric:: Example
>>> import obspy
>>> st = obspy.read()
>>> for windowed_st in st.slide(window_length=10.0, step=10.0):
... print(windowed_st)
... print("---") # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
3 Trace(s) in Stream:
... | 2009-08-24T00:20:03.000000Z - 2009-08-24T00:20:13.000000Z | ...
... | 2009-08-24T00:20:03.000000Z - 2009-08-24T00:20:13.000000Z | ...
... | 2009-08-24T00:20:03.000000Z - 2009-08-24T00:20:13.000000Z | ...
---
3 Trace(s) in Stream:
... | 2009-08-24T00:20:13.000000Z - 2009-08-24T00:20:23.000000Z | ...
... | 2009-08-24T00:20:13.000000Z - 2009-08-24T00:20:23.000000Z | ...
... | 2009-08-24T00:20:13.000000Z - 2009-08-24T00:20:23.000000Z | ...
:param window_length: The length of each window in seconds.
:type window_length: float
:param step: The step between the start times of two successive
windows in seconds. Can be negative if an offset is given.
:type step: float
:param offset: The offset of the first window in seconds relative to
the start time of the whole interval.
:type offset: float
:param include_partial_windows: Determines if windows that are
shorter then 99.9 % of the desired length are returned.
:type include_partial_windows: bool
:param nearest_sample: If set to ``True``, the closest sample is
selected, if set to ``False``, the inner (next sample for a
start time border, previous sample for an end time border) sample
containing the time is selected. Defaults to ``True``.
Given the following trace containing 6 samples, "|" are the
sample points, "A" is the requested starttime::
| |A | | B | |
1 2 3 4 5 6
``nearest_sample=True`` will select samples 2-5,
``nearest_sample=False`` will select samples 3-4 only.
:type nearest_sample: bool, optional
"""
starttime = min(tr.stats.starttime for tr in self)
endtime = max(tr.stats.endtime for tr in self)
windows = get_window_times(
starttime=starttime,
endtime=endtime,
window_length=window_length,
step=step,
offset=offset,
include_partial_windows=include_partial_windows)
if len(windows) < 1:
return
for start, stop in windows:
temp = self.slice(start, stop,
nearest_sample=nearest_sample)
# It might happen that there is a time frame where there are no
# windows, e.g. two traces separated by a large gap.
if not temp:
continue
yield temp
[docs]
def select(self, network=None, station=None, location=None, channel=None,
sampling_rate=None, npts=None, component=None, id=None,
inventory=None):
"""
Return new Stream object only with these traces that match the given
stats criteria (e.g. all traces with ``channel="EHZ"``).
Alternatively, traces can be selected based on the content of an
:class:`~obspy.core.inventory.inventory.Inventory` object: trace will
be selected if the inventory contains a matching channel active at the
trace start time.
.. rubric:: Examples
>>> from obspy import read
>>> st = read()
>>> st2 = st.select(station="R*")
>>> print(st2) # doctest: +ELLIPSIS
3 Trace(s) in Stream:
BW.RJOB..EHZ | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
BW.RJOB..EHN | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
BW.RJOB..EHE | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
>>> st2 = st.select(id="BW.RJOB..EHZ")
>>> print(st2) # doctest: +ELLIPSIS
1 Trace(s) in Stream:
BW.RJOB..EHZ | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
>>> st2 = st.select(component="Z")
>>> print(st2) # doctest: +ELLIPSIS
1 Trace(s) in Stream:
BW.RJOB..EHZ | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
>>> st2 = st.select(network="CZ")
>>> print(st2) # doctest: +NORMALIZE_WHITESPACE
0 Trace(s) in Stream:
>>> from obspy import read_inventory
>>> inv = read_inventory('/path/to/BW_RJOB__EHZ.xml')
>>> print(inv) # doctest: +NORMALIZE_WHITESPACE
Inventory created at 2013-12-07T18:00:42.878000Z
Created by: fdsn-stationxml-converter/1.0.0
http://www.iris.edu/fdsnstationconverter
Sending institution: Erdbebendienst Bayern
Contains:
Networks (1):
BW
Stations (1):
BW.RJOB (Jochberg, Bavaria, BW-Net)
Channels (1):
BW.RJOB..EHZ
>>> st2 = st.select(inventory=inv)
>>> print(st2) # doctest: +ELLIPSIS
1 Trace(s) in Stream:
BW.RJOB..EHZ | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
.. warning::
A new Stream object is returned but the traces it contains are
just aliases to the traces of the original stream. Does not copy
the data but only passes a reference.
All keyword arguments except for ``component`` are tested directly
against the respective entry in the :class:`~obspy.core.trace.Stats`
dictionary.
If a string for ``component`` is given (should be a single letter) it
is tested against the last letter of the ``Trace.stats.channel`` entry.
Alternatively, ``channel`` may have the last one or two letters
wildcarded (e.g. ``channel="EH*"``) to select all components with a
common band/instrument code.
All other selection criteria that accept strings (network, station,
location) may also contain Unix style wildcards (``*``, ``?``, ...).
"""
if inventory is None:
traces = self.traces
else:
trace_ids = []
start_dates = []
end_dates = []
for net in inventory.networks:
for sta in net.stations:
for chan in sta.channels:
_id = '.'.join((net.code, sta.code,
chan.location_code, chan.code))
trace_ids.append(_id)
start_dates.append(chan.start_date)
end_dates.append(chan.end_date)
traces = []
for trace in self:
idx = 0
while True:
try:
idx = trace_ids.index(trace.id, idx)
start_date = start_dates[idx]
end_date = end_dates[idx]
idx += 1
if start_date is not None and\
trace.stats.starttime < start_date:
continue
if end_date is not None and\
trace.stats.endtime > end_date:
continue
traces.append(trace)
except ValueError:
break
traces_after_inventory_filter = traces
# make given component letter uppercase (if e.g. "z" is given)
if component is not None and channel is not None:
component = component.upper()
channel = channel.upper()
if (channel[-1:] not in "?*" and component not in "?*" and
component != channel[-1:]):
msg = "Selection criteria for channel and component are " + \
"mutually exclusive!"
raise ValueError(msg)
# For st.select(id=) without wildcards, use a quicker comparison mode:
quick_check = False
quick_check_possible = (id is not None
and sampling_rate is None and npts is None
and network is None and station is None
and location is None and channel is None
and component is None)
if quick_check_possible:
no_wildcards = not any(['?' in id or '*' in id or '[' in id])
if no_wildcards:
quick_check = True
[net, sta, loc, chan] = id.upper().split('.')
traces = []
for trace in traces_after_inventory_filter:
if quick_check:
if (trace.stats.network.upper() == net
and trace.stats.station.upper() == sta
and trace.stats.location.upper() == loc
and trace.stats.channel.upper() == chan):
traces.append(trace)
continue
# skip trace if any given criterion is not matched
if id and not fnmatch.fnmatch(trace.id.upper(), id.upper()):
continue
if network is not None:
if not fnmatch.fnmatch(trace.stats.network.upper(),
network.upper()):
continue
if station is not None:
if not fnmatch.fnmatch(trace.stats.station.upper(),
station.upper()):
continue
if location is not None:
if not fnmatch.fnmatch(trace.stats.location.upper(),
location.upper()):
continue
if channel is not None:
if not fnmatch.fnmatch(trace.stats.channel.upper(),
channel.upper()):
continue
if sampling_rate is not None:
if float(sampling_rate) != trace.stats.sampling_rate:
continue
if npts is not None and int(npts) != trace.stats.npts:
continue
if component is not None:
if not fnmatch.fnmatch(trace.stats.component.upper(),
component.upper()):
continue
traces.append(trace)
return self.__class__(traces=traces)
[docs]
def verify(self):
"""
Verify all traces of current Stream against available meta data.
.. rubric:: Example
>>> from obspy import Trace, Stream
>>> tr = Trace(data=np.array([1, 2, 3, 4]))
>>> tr.stats.npts = 100
>>> st = Stream([tr])
>>> st.verify() #doctest: +ELLIPSIS
Traceback (most recent call last):
...
Exception: ntps(100) differs from data size(4)
"""
for trace in self:
trace.verify()
return self
[docs]
def _merge_checks(self):
"""
Sanity checks for merging.
"""
sr = {}
dtype = {}
calib = {}
for trace in self.traces:
# skip empty traces
if len(trace) == 0:
continue
# Check sampling rate.
sr.setdefault(trace.id, trace.stats.sampling_rate)
if trace.stats.sampling_rate != sr[trace.id]:
msg = (f"Can not merge traces with same ids ({trace.id}) but "
f"differing sampling rates ({sr[trace.id]}, "
f"{trace.stats.sampling_rate})!")
raise Exception(msg)
# Check dtype.
dtype.setdefault(trace.id, trace.data.dtype)
if trace.data.dtype != dtype[trace.id]:
msg = (f"Can not merge traces with same ids ({trace.id}) but "
f"differing data types ({dtype[trace.id]}, "
f"{trace.data.dtype})!")
raise Exception(msg)
# Check calibration factor.
calib.setdefault(trace.id, trace.stats.calib)
if trace.stats.calib != calib[trace.id]:
msg = (f"Can not merge traces with same ids ({trace.id}) but "
f"differing calibration factors ({calib[trace.id]}, "
f"{trace.stats.calib})!")
raise Exception(msg)
[docs]
def merge(self, method=0, fill_value=None, interpolation_samples=0,
**kwargs):
"""
Merge ObsPy Trace objects with same IDs.
:type method: int, optional
:param method: Methodology to handle overlaps/gaps of traces. Defaults
to ``0``.
See :meth:`obspy.core.trace.Trace.__add__` for details on
methods ``0`` and ``1``,
see :meth:`obspy.core.stream.Stream._cleanup` for details on
method ``-1``. Any merge operation performs a cleanup merge as
a first step (method ``-1``).
:type fill_value: int, float, str or ``None``, optional
:param fill_value: Fill value for gaps. Defaults to ``None``. Traces
will be converted to NumPy masked arrays if no value is given and
gaps are present. The value ``'latest'`` will use the latest value
before the gap. If value ``'interpolate'`` is provided, missing
values are linearly interpolated (not changing the data
type e.g. of integer valued traces). Not used for ``method=-1``.
:type interpolation_samples: int, optional
:param interpolation_samples: Used only for ``method=1``. It specifies
the number of samples which are used to interpolate between
overlapping traces. Default to ``0``. If set to ``-1`` all
overlapping samples are interpolated.
Importing waveform data containing gaps or overlaps results into
a :class:`~obspy.core.stream.Stream` object with multiple traces having
the same identifier. This method tries to merge such traces inplace,
thus returning nothing. Merged trace data will be converted into a
NumPy :class:`~numpy.ma.MaskedArray` type if any gaps are present. This
behavior may be prevented by setting the ``fill_value`` parameter.
The ``method`` argument controls the handling of overlapping data
values.
"""
def listsort(order, current):
"""
Helper method for keeping trace's ordering
"""
try:
return order.index(current)
except ValueError:
return -1
self._cleanup(**kwargs)
if method == -1:
return self
# check sampling rates and dtypes
self._merge_checks()
# remember order of traces
order = [id(i) for i in self.traces]
# order matters!
self.sort(keys=['network', 'station', 'location', 'channel',
'starttime', 'endtime'])
# build up dictionary with with lists of traces with same ids
traces_dict = {}
# using pop() and try-except saves memory
try:
while True:
trace = self.traces.pop(0)
# skip empty traces
if len(trace) == 0:
continue
_id = trace.get_id()
if _id not in traces_dict:
traces_dict[_id] = [trace]
else:
traces_dict[_id].append(trace)
except IndexError:
pass
# clear traces of current stream
self.traces = []
# loop through ids
for _id in traces_dict.keys():
cur_trace = traces_dict[_id].pop(0)
# loop through traces of same id
for _i in range(len(traces_dict[_id])):
trace = traces_dict[_id].pop(0)
# disable sanity checks because there are already done
cur_trace = cur_trace.__add__(
trace, method, fill_value=fill_value, sanity_checks=False,
interpolation_samples=interpolation_samples)
self.traces.append(cur_trace)
# trying to restore order, newly created traces are placed at
# start
self.traces.sort(key=lambda x: listsort(order, id(x)))
return self
[docs]
def simulate(self, paz_remove=None, paz_simulate=None,
remove_sensitivity=True, simulate_sensitivity=True, **kwargs):
"""
Correct for instrument response / Simulate new instrument response.
:type paz_remove: dict, None
:param paz_remove: Dictionary containing keys ``'poles'``, ``'zeros'``,
``'gain'`` (A0 normalization factor). Poles and zeros must be a
list of complex floating point numbers, gain must be of type float.
Poles and Zeros are assumed to correct to m/s, SEED convention.
Use ``None`` for no inverse filtering.
Use ``'self'`` to use paz AttribDict in ``trace.stats`` for every
trace in stream.
:type paz_simulate: dict, None
:param paz_simulate: Dictionary containing keys ``'poles'``,
``'zeros'``, ``'gain'``. Poles and zeros must be a list of complex
floating point numbers, gain must be of type float. Or ``None`` for
no simulation.
:type remove_sensitivity: bool
:param remove_sensitivity: Determines if data is divided by
``paz_remove['sensitivity']`` to correct for overall sensitivity of
recording instrument (seismometer/digitizer) during instrument
correction.
:type simulate_sensitivity: bool
:param simulate_sensitivity: Determines if data is multiplied with
``paz_simulate['sensitivity']`` to simulate overall sensitivity of
new instrument (seismometer/digitizer) during instrument
simulation.
This function corrects for the original instrument response given by
``paz_remove`` and/or simulates a new instrument response given by
``paz_simulate``.
For additional information and more options to control the instrument
correction/simulation (e.g. water level, demeaning, tapering, ...) see
:func:`~obspy.signal.invsim.simulate_seismometer`.
The keywords `paz_remove` and `paz_simulate` are expected to be
dictionaries containing information on poles, zeros and gain (and
usually also sensitivity).
If both ``paz_remove`` and ``paz_simulate`` are specified, both steps
are performed in one go in the frequency domain, otherwise only the
specified step is performed.
.. note::
Instead of the builtin deconvolution based on Poles and Zeros
information, the deconvolution can be performed using evalresp
instead by using the option `seedresp` (see documentation of
:func:`~obspy.signal.invsim.simulate_seismometer` and the
`ObsPy Tutorial
<https://docs.obspy.org/master/tutorial/code_snippets/\
seismometer_correction_simulation.html#using-a-resp-file>`_.
.. note::
This operation is performed in place on the actual data arrays. The
raw data is not accessible anymore afterwards. To keep your
original data, use :meth:`~obspy.core.stream.Stream.copy` to create
a copy of your stream object.
This also makes an entry with information on the applied processing
in ``stats.processing`` of every trace.
.. rubric:: Example
>>> from obspy import read
>>> from obspy.signal.invsim import corn_freq_2_paz
>>> st = read()
>>> paz_sts2 = {'poles': [-0.037004+0.037016j, -0.037004-0.037016j,
... -251.33+0j,
... -131.04-467.29j, -131.04+467.29j],
... 'zeros': [0j, 0j],
... 'gain': 60077000.0,
... 'sensitivity': 2516778400.0}
>>> paz_1hz = corn_freq_2_paz(1.0, damp=0.707)
>>> st.simulate(paz_remove=paz_sts2, paz_simulate=paz_1hz)
... # doctest: +ELLIPSIS
<...Stream object at 0x...>
>>> st.plot() # doctest: +SKIP
.. plot::
from obspy import read
from obspy.signal.invsim import corn_freq_2_paz
st = read()
paz_sts2 = {'poles': [-0.037004+0.037016j, -0.037004-0.037016j,
-251.33+0j,
-131.04-467.29j, -131.04+467.29j],
'zeros': [0j, 0j],
'gain': 60077000.0,
'sensitivity': 2516778400.0}
paz_1hz = corn_freq_2_paz(1.0, damp=0.707)
paz_1hz['sensitivity'] = 1.0
st.simulate(paz_remove=paz_sts2, paz_simulate=paz_1hz)
st.plot()
"""
for tr in self:
tr.simulate(paz_remove=paz_remove, paz_simulate=paz_simulate,
remove_sensitivity=remove_sensitivity,
simulate_sensitivity=simulate_sensitivity, **kwargs)
return self
[docs]
@raise_if_masked
def filter(self, type, *args, **options):
"""
Filter the data of all traces in the Stream.
:type type: str
:param type: String that specifies which filter is applied (e.g.
``"bandpass"``). See the `Supported Filter`_ section below for
further details.
:param args: Only filter frequency/frequencies can be specified
as argument(s). Alternatively filter frequencies can be specified
as keyword arguments.
:param options: Keyword arguments for the respective filter
that will be passed on. (e.g. ``freqmin=1.0``, ``freqmax=20.0`` for
``"bandpass"``)
.. note::
This operation is performed in place on the actual data arrays. The
raw data is not accessible anymore afterwards. To keep your
original data, use :meth:`~obspy.core.stream.Stream.copy` to create
a copy of your stream object.
This also makes an entry with information on the applied processing
in ``stats.processing`` of every trace.
.. rubric:: _`Supported Filter`
``'bandpass'``
Butterworth-Bandpass (uses :func:`obspy.signal.filter.bandpass`).
``'bandstop'``
Butterworth-Bandstop (uses :func:`obspy.signal.filter.bandstop`).
``'lowpass'``
Butterworth-Lowpass (uses :func:`obspy.signal.filter.lowpass`).
``'highpass'``
Butterworth-Highpass (uses :func:`obspy.signal.filter.highpass`).
``'lowpass_cheby_2'``
Cheby2-Lowpass (uses :func:`obspy.signal.filter.lowpass_cheby_2`).
``'lowpass_fir'`` (experimental)
FIR-Lowpass (uses :func:`obspy.signal.filter.lowpass_fir`).
``'remez_fir'`` (experimental)
Minimax optimal bandpass using Remez algorithm (uses
:func:`obspy.signal.filter.remez_fir`).
.. rubric:: Example
>>> from obspy import read
>>> st = read()
>>> st.filter("highpass", freq=1.0) # doctest: +ELLIPSIS
<...Stream object at 0x...>
>>> st.plot() # doctest: +SKIP
.. plot::
from obspy import read
st = read()
st.filter("highpass", freq=1.0)
st.plot()
"""
for tr in self:
tr.filter(type, *args, **options)
return self
[docs]
def trigger(self, type, **options):
"""
Run a triggering algorithm on all traces in the stream.
:param type: String that specifies which trigger is applied (e.g.
``'recstalta'``). See the `Supported Trigger`_ section below for
further details.
:param options: Necessary keyword arguments for the respective
trigger that will be passed on. (e.g. ``sta=3``, ``lta=10``)
Arguments ``sta`` and ``lta`` (seconds) will be mapped to ``nsta``
and ``nlta`` (samples) by multiplying with sampling rate of trace.
(e.g. ``sta=3``, ``lta=10`` would call the trigger with 3 and 10
seconds average, respectively)
.. note::
This operation is performed in place on the actual data arrays. The
raw data is not accessible anymore afterwards. To keep your
original data, use :meth:`~obspy.core.stream.Stream.copy` to create
a copy of your stream object.
This also makes an entry with information on the applied processing
in ``stats.processing`` of every trace.
.. rubric:: _`Supported Trigger`
``'classicstalta'``
Computes the classic STA/LTA characteristic function (uses
:func:`obspy.signal.trigger.classic_sta_lta`).
``'recstalta'``
Recursive STA/LTA
(uses :func:`obspy.signal.trigger.recursive_sta_lta`).
``'recstaltapy'``
Recursive STA/LTA written in Python (uses
:func:`obspy.signal.trigger.recursive_sta_lta_py`).
``'delayedstalta'``
Delayed STA/LTA.
(uses :func:`obspy.signal.trigger.delayed_sta_lta`).
``'carlstatrig'``
Computes the carl_sta_trig characteristic function (uses
:func:`obspy.signal.trigger.carl_sta_trig`).
``'energyratio'``
Computes the energy ratio characteristic function (uses
:func:`obspy.signal.trigger.energy_ratio`).
``'modifiedenergyratio'``
Computes the modified energy ratio characteristic function (uses
:func:`obspy.signal.trigger.modified_energy_ratio`).
``'zdetect'``
Z-detector (uses :func:`obspy.signal.trigger.z_detect`).
.. rubric:: Example
>>> from obspy import read
>>> st = read()
>>> st.filter("highpass", freq=1.0) # doctest: +ELLIPSIS
<...Stream object at 0x...>
>>> st.plot() # doctest: +SKIP
>>> st.trigger('recstalta', sta=1, lta=4) # doctest: +ELLIPSIS
<...Stream object at 0x...>
>>> st.plot() # doctest: +SKIP
.. plot::
from obspy import read
st = read()
st.filter("highpass", freq=1.0)
st.plot()
st.trigger('recstalta', sta=1, lta=4)
st.plot()
"""
for tr in self:
tr.trigger(type, **options)
return self
[docs]
def resample(self, sampling_rate, window='hann', no_filter=True,
strict_length=False):
"""
Resample data in all traces of stream using Fourier method.
:type sampling_rate: float
:param sampling_rate: The sampling rate of the resampled signal.
:type window: :class:`numpy.ndarray`, callable, str, float, or tuple,
optional
:param window: Specifies the window applied to the signal in the
Fourier domain. Defaults ``'hann'`` window. See
:func:`scipy.signal.resample` for details.
:type no_filter: bool, optional
:param no_filter: Deactivates automatic filtering if set to ``True``.
Defaults to ``True``.
:type strict_length: bool, optional
:param strict_length: Leave traces unchanged for which end time of
trace would change. Defaults to ``False``.
.. note::
The :class:`~Stream` object has three different methods to change
the sampling rate of its data: :meth:`~.resample`,
:meth:`~.decimate`, and :meth:`~.interpolate`
Make sure to choose the most appropriate one for the problem at
hand.
.. note::
This operation is performed in place on the actual data arrays. The
raw data is not accessible anymore afterwards. To keep your
original data, use :meth:`~obspy.core.stream.Stream.copy` to create
a copy of your stream object.
This also makes an entry with information on the applied processing
in ``stats.processing`` of every trace.
Uses :func:`scipy.signal.resample`. Because a Fourier method is used,
the signal is assumed to be periodic.
.. rubric:: Example
>>> st = read()
>>> print(st) # doctest: +ELLIPSIS
3 Trace(s) in Stream:
BW.RJOB..EHZ | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
BW.RJOB..EHN | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
BW.RJOB..EHE | 2009-08-24T00:20:03.000000Z ... | 100.0 Hz, 3000 samples
>>> st.resample(10.0) # doctest: +ELLIPSIS
<...Stream object at 0x...>
>>> print(st) # doctest: +ELLIPSIS
3 Trace(s) in Stream:
BW.RJOB..EHZ | 2009-08-24T00:20:03.000000Z ... | 10.0 Hz, 300 samples
BW.RJOB..EHN | 2009-08-24T00:20:03.000000Z ... | 10.0 Hz, 300 samples
BW.RJOB..EHE | 2009-08-24T00:20:03.000000Z ... | 10.0 Hz, 300 samples
"""
for tr in self:
tr.resample(sampling_rate, window=window,
no_filter=no_filter, strict_length=strict_length)
return self
[docs]
def decimate(self, factor, no_filter=False, strict_length=False):
"""
Downsample data in all traces of stream by an integer factor.
:type factor: int
:param factor: Factor by which the sampling rate is lowered by
decimation.
:type no_filter: bool, optional
:param no_filter: Deactivates automatic filtering if set to ``True``.
Defaults to ``False``.
:type strict_length: bool, optional
:param strict_length: Leave traces unchanged for which end time of
trace would change. Defaults to ``False``.
Currently a simple integer decimation is implemented.
Only every decimation_factor-th sample remains in the trace, all other
samples are thrown away. Prior to decimation a lowpass filter is
applied to ensure no aliasing artifacts are introduced. The automatic
filtering can be deactivated with ``no_filter=True``.
If the length of the data array modulo ``decimation_factor`` is not
zero then the end time of the trace is changing on sub-sample scale. To
abort downsampling in case of changing end times set
``strict_length=True``.
.. note::
The :class:`~Stream` object has three different methods to change
the sampling rate of its data: :meth:`~.resample`,
:meth:`~.decimate`, and :meth:`~.interpolate`
Make sure to choose the most appropriate one for the problem at
hand.
.. note::
This operation is performed in place on the actual data arrays. The
raw data is not accessible anymore afterwards. To keep your
original data, use :meth:`~obspy.core.stream.Stream.copy` to create
a copy of your stream object.
This also makes an entry with information on the applied processing
in ``stats.processing`` of every trace.
.. rubric:: Example
For the example we switch off the automatic pre-filtering so that
the effect of the downsampling routine becomes clearer.
>>> from obspy import Trace, Stream
>>> tr = Trace(data=np.arange(10))
>>> st = Stream(traces=[tr])
>>> tr.stats.sampling_rate
1.0
>>> tr.data
array([0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
>>> st.decimate(4, strict_length=False, no_filter=True)
... # doctest: +ELLIPSIS
<...Stream object at 0x...>
>>> tr.stats.sampling_rate
0.25
>>> tr.data
array([0, 4, 8])
"""
for tr in self:
tr.decimate(factor, no_filter=no_filter,
strict_length=strict_length)
return self
[docs]
def max(self):
"""
Get the values of the absolute maximum amplitudes of all traces in the
stream. See :meth:`~obspy.core.trace.Trace.max`.
:return: List of values of absolute maxima of all traces
.. rubric:: Example
>>> from obspy import Trace, Stream
>>> tr1 = Trace(data=np.array([0, -3, 9, 6, 4]))
>>> tr2 = Trace(data=np.array([0, -3, -9, 6, 4]))
>>> tr3 = Trace(data=np.array([0.3, -3.5, 9.0, 6.4, 4.3]))
>>> st = Stream(traces=[tr1, tr2, tr3])
>>> st.max()
[9, -9, 9.0]
"""
return [tr.max() for tr in self]
[docs]
def differentiate(self, method='gradient'):
"""
Differentiate all traces with respect to time.
:type method: str, optional
:param method: Method to use for differentiation. Defaults to
``'gradient'``. See the `Supported Methods`_ section below for
further details.
.. note::
This operation is performed in place on the actual data arrays. The
raw data is not accessible anymore afterwards. To keep your
original data, use :meth:`~obspy.core.stream.Stream.copy` to create
a copy of your stream object.
This also makes an entry with information on the applied processing
in ``stats.processing`` of every trace.
.. rubric:: _`Supported Methods`
``'gradient'``
The gradient is computed using central differences in the interior
and first differences at the boundaries. The returned gradient
hence has the same shape as the input array. (uses
:func:`numpy.gradient`)
"""
for tr in self:
tr.differentiate(method=method)
return self
[docs]
def integrate(self, method='cumtrapz', **options):
"""
Integrate all traces with respect to time.
For details see the corresponding
:meth:`~obspy.core.trace.Trace.integrate` method of
:class:`~obspy.core.trace.Trace`.
:type method: str, optional
:param type: Method to use for integration. Defaults to
``'cumtrapz'``. See :meth:`~obspy.core.trace.Trace.integrate` for
further details.
.. note::
This operation is performed in place on the actual data arrays. The
raw data is not accessible anymore afterwards. To keep your
original data, use :meth:`~obspy.core.stream.Stream.copy` to create
a copy of your stream object.
This also makes an entry with information on the applied processing
in ``stats.processing`` of every trace.
"""
for tr in self:
tr.integrate(method=method, **options)
return self
[docs]
@raise_if_masked
def detrend(self, type='simple', **options):
"""
Remove a trend from all traces.
For details on supported methods and parameters see the corresponding
:meth:`~obspy.core.trace.Trace.detrend` method of
:class:`~obspy.core.trace.Trace`.
.. note::
This operation is performed in place on the actual data arrays. The
raw data will no longer be accessible afterwards. To keep your
original data, use :meth:`~obspy.core.stream.Stream.copy` to create
a copy of your stream object.
"""
for tr in self:
tr.detrend(type=type, **options)
return self
[docs]
def taper(self, *args, **kwargs):
"""
Taper all Traces in Stream.
For details see the corresponding :meth:`~obspy.core.trace.Trace.taper`
method of :class:`~obspy.core.trace.Trace`.
.. note::
This operation is performed in place on the actual data arrays. The
raw data is not accessible anymore afterwards. To keep your
original data, use :meth:`~obspy.core.stream.Stream.copy` to create
a copy of your stream object.
"""
for tr in self:
tr.taper(*args, **kwargs)
return self
[docs]
def interpolate(self, *args, **kwargs):
"""
Interpolate all Traces in a Stream.
For details see the corresponding
:meth:`~obspy.core.trace.Trace.interpolate` method of
:class:`~obspy.core.trace.Trace`.
.. note::
The :class:`~Stream` object has three different methods to change
the sampling rate of its data: :meth:`~.resample`,
:meth:`~.decimate`, and :meth:`~.interpolate`
Make sure to choose the most appropriate one for the problem at
hand.
.. note::
This operation is performed in place on the actual data arrays. The
raw data will no longer be accessible afterwards. To keep your
original data, use :meth:`~obspy.core.stream.Stream.copy` to create
a copy of your stream object.
>>> from obspy import read
>>> st = read()
>>> print(st) # doctest: +ELLIPSIS
3 Trace(s) in Stream:
BW.RJOB..EHZ | 2009-08-24T00:20:03... - ... | 100.0 Hz, 3000 samples
BW.RJOB..EHN | 2009-08-24T00:20:03... - ... | 100.0 Hz, 3000 samples
BW.RJOB..EHE | 2009-08-24T00:20:03... - ... | 100.0 Hz, 3000 samples
>>> st.interpolate(sampling_rate=111.1) # doctest: +ELLIPSIS
<obspy.core.stream.Stream object at 0x...>
>>> print(st) # doctest: +ELLIPSIS
3 Trace(s) in Stream:
BW.RJOB..EHZ | 2009-08-24T00:20:03... - ... | 111.1 Hz, 3332 samples
BW.RJOB..EHN | 2009-08-24T00:20:03... - ... | 111.1 Hz, 3332 samples
BW.RJOB..EHE | 2009-08-24T00:20:03... - ... | 111.1 Hz, 3332 samples
"""
for tr in self:
tr.interpolate(*args, **kwargs)
return self
[docs]
def std(self):
"""
Calculate standard deviations of all Traces in the Stream.
Standard deviations are calculated by NumPy method
:meth:`~numpy.ndarray.std` on ``trace.data`` for every trace in the
stream.
:return: List of standard deviations of all traces.
.. rubric:: Example
>>> from obspy import Trace, Stream
>>> tr1 = Trace(data=np.array([0, -3, 9, 6, 4]))
>>> tr2 = Trace(data=np.array([0.3, -3.5, 9.0, 6.4, 4.3]))
>>> st = Stream(traces=[tr1, tr2])
>>> st.std()
[4.2614551505325036, 4.4348618918744247]
"""
return [tr.std() for tr in self]
[docs]
def normalize(self, global_max=False):
"""
Normalize all Traces in the Stream.
By default all traces are normalized separately to their respective
absolute maximum. By setting ``global_max=True`` all traces get
normalized to the global maximum of all traces.
:param global_max: If set to ``True``, all traces are normalized with
respect to the global maximum of all traces in the stream
instead of normalizing every trace separately.
.. note::
If ``data.dtype`` of a trace was integer it is changing to float.
.. note::
This operation is performed in place on the actual data arrays. The
raw data is not accessible anymore afterwards. To keep your
original data, use :meth:`~obspy.core.stream.Stream.copy` to create
a copy of your stream object.
This also makes an entry with information on the applied processing
in ``stats.processing`` of every trace.
.. rubric:: Example
Make a Stream with two Traces:
>>> from obspy import Trace, Stream
>>> tr1 = Trace(data=np.array([0, -3, 9, 6, 4]))
>>> tr2 = Trace(data=np.array([0.3, -0.5, -0.8, 0.4, 0.3]))
>>> st = Stream(traces=[tr1, tr2])
All traces are normalized to their absolute maximum and processing
information is added:
>>> st.normalize() # doctest: +ELLIPSIS
<...Stream object at 0x...>
>>> st[0].data # doctest: +ELLIPSIS
array([ 0. , -0.33333333, 1. , 0.66666667, ...])
>>> print(st[0].stats.processing[0]) # doctest: +ELLIPSIS
ObsPy ... normalize(norm=None)
>>> st[1].data
array([ 0.375, -0.625, -1. , 0.5 , 0.375])
>>> print(st[1].stats.processing[0]) # doctest: +ELLIPSIS
ObsPy ...: normalize(norm=None)
Now let's do it again normalize all traces to the stream's global
maximum:
>>> tr1 = Trace(data=np.array([0, -3, 9, 6, 4]))
>>> tr2 = Trace(data=np.array([0.3, -0.5, -0.8, 0.4, 0.3]))
>>> st = Stream(traces=[tr1, tr2])
>>> st.normalize(global_max=True) # doctest: +ELLIPSIS
<...Stream object at 0x...>
>>> st[0].data # doctest: +ELLIPSIS
array([ 0. , -0.33333333, 1. , 0.66666667, ...])
>>> print(st[0].stats.processing[0]) # doctest: +ELLIPSIS
ObsPy ...: normalize(norm=9)
>>> st[1].data # doctest: +ELLIPSIS
array([ 0.03333333, -0.05555556, -0.08888889, 0.04444444, ...])
>>> print(st[1].stats.processing[0]) # doctest: +ELLIPSIS
ObsPy ...: normalize(norm=9)
"""
# use the same value for normalization on all traces?
if global_max:
norm = max([abs(value) for value in self.max()])
else:
norm = None
# normalize all traces
for tr in self:
tr.normalize(norm=norm)
return self
[docs]
def rotate(self, method, back_azimuth=None, inclination=None,
inventory=None, **kwargs):
"""
Rotate stream objects.
:type method: str
:param method: Determines the rotation method.
``'->ZNE'``: Rotates data from three components into Z, North- and
East-components based on the station metadata (e.g. borehole
stations). Uses mandatory ``inventory`` parameter (provide
either an :class:`~obspy.core.inventory.inventory.Inventory` or
:class:`~obspy.io.xseed.parser.Parser` object) and ignores
``back_azimuth`` and ``inclination`` parameters. Additional
kwargs will be passed on to :meth:`_rotate_to_zne()` (use if
other components than ``["Z", "1", "2"]`` and
``["1", "2", "3"]`` need to be rotated).
Trims common channels used in rotation to time spans that are
available for all three channels (i.e. cuts away parts for
which one or two channels used in rotation do not have data).
``'NE->RT'``: Rotates the North- and East-components of a
seismogram to radial and transverse components.
``'RT->NE'``: Rotates the radial and transverse components of a
seismogram to North- and East-components.
``'ZNE->LQT'``: Rotates from left-handed Z, North, and East system
to LQT, e.g. right-handed ray coordinate system.
``'LQT->ZNE'``: Rotates from LQT, e.g. right-handed ray coordinate
system to left handed Z, North, and East system.
:type back_azimuth: float, optional
:param back_azimuth: Depends on the chosen method.
A single float, the back azimuth from station to source in degrees.
If not given, ``stats.back_azimuth`` will be used. It will also be
written after the rotation is done.
:type inclination: float, optional
:param inclination: Inclination of the ray at the station in degrees.
Only necessary for three component rotations. If not given,
``stats.inclination`` will be used. It will also be written after
the rotation is done.
:type inventory: :class:`~obspy.core.inventory.inventory.Inventory` or
:class:`~obspy.io.xseed.parser.Parser`
:param inventory: Inventory or SEED Parser with metadata of channels.
Example to rotate unaligned borehole instrument data based on station
inventory (a dataless SEED :class:`~obspy.io.xseed.parser.Parser` can
also be provided, see details for option ``inventory``):
>>> from obspy import read, read_inventory
>>> st = read("/path/to/ffbx_unrotated_gaps.mseed")
>>> inv = read_inventory("/path/to/ffbx.stationxml")
>>> st.rotate(method="->ZNE", inventory=inv) # doctest: +ELLIPSIS
<obspy.core.stream.Stream object at 0x...>
"""
# check if we have a mix of multiple SEED ID groups that need to be
# handled separately
seed_id_groups = {}
for tr in self:
net = tr.stats.network
sta = tr.stats.station
loc = tr.stats.location
cha = tr.stats.channel
if not len(cha):
msg = "Channel code must be at least one character long."
raise ValueError(msg)
comp = cha[-1]
cha_prefix = cha[:-1]
seed_id_groups[(net, sta, loc, cha_prefix)] = None
# recursively rotate each set of matching SEED IDs if needed
if len(seed_id_groups) > 1:
new_traces = []
for net, sta, loc, cha_prefix in seed_id_groups:
st = Stream()
for tr in self:
if (tr.stats.network == net and tr.stats.station == sta and
tr.stats.location == loc and
tr.stats.channel[:-1] == cha_prefix):
st += tr
st.rotate(method, back_azimuth=back_azimuth,
inclination=inclination, inventory=inventory,
**kwargs)
new_traces.extend(st.traces)
self.traces = new_traces
return self
# if working on a single SEED ID group just do the requested rotation
if method == "->ZNE":
if inventory is None:
msg = ("With method '->ZNE' station metadata has to be "
"provided as 'inventory' parameter.")
raise ValueError(msg)
return self._rotate_to_zne(inventory, **kwargs)
elif method == "NE->RT":
func = "rotate_ne_rt"
elif method == "RT->NE":
func = "rotate_rt_ne"
elif method == "ZNE->LQT":
func = "rotate_zne_lqt"
elif method == "LQT->ZNE":
func = "rotate_lqt_zne"
else:
msg = ("Method has to be one of ('->ZNE', 'NE->RT', 'RT->NE', "
"'ZNE->LQT', or 'LQT->ZNE').")
raise ValueError(msg)
# Retrieve function call from entry points
func = _get_function_from_entry_point("rotate", func)
# Split to get the components. No need for further checks for the
# method as invalid methods will be caught by previous conditional.
input_components, output_components = method.split("->")
# Do one of the two-component rotations.
if len(input_components) == 2:
input_1 = self.select(component=input_components[0])
input_2 = self.select(component=input_components[1])
for i_1, i_2 in zip(input_1, input_2):
dt = 0.5 * i_1.stats.delta
if (len(i_1) != len(i_2)) or \
(abs(i_1.stats.starttime - i_2.stats.starttime) > dt) \
or (i_1.stats.sampling_rate !=
i_2.stats.sampling_rate):
msg = "All components need to have the same time span."
raise ValueError(msg)
for i_1, i_2 in zip(input_1, input_2):
# Figure out back-azimuth.
baz = back_azimuth
if baz is None:
try:
baz = i_1.stats.back_azimuth
except Exception:
msg = "No back-azimuth specified."
raise TypeError(msg)
output_1, output_2 = func(i_1.data, i_2.data, baz)
i_1.data = output_1
i_2.data = output_2
# Rename the components.
i_1.stats.channel = i_1.stats.channel[:-1] + \
output_components[0]
i_2.stats.channel = i_2.stats.channel[:-1] + \
output_components[1]
# Add the azimuth and inclination to the stats object.
for comp in (i_1, i_2):
comp.stats.back_azimuth = baz
# Do one of the three-component rotations.
else:
input_1 = self.select(component=input_components[0])
input_2 = self.select(component=input_components[1])
input_3 = self.select(component=input_components[2])
for i_1, i_2, i_3 in zip(input_1, input_2, input_3):
dt = 0.5 * i_1.stats.delta
if (len(i_1) != len(i_2)) or (len(i_1) != len(i_3)) or \
(abs(i_1.stats.starttime -
i_2.stats.starttime) > dt) or \
(abs(i_1.stats.starttime -
i_3.stats.starttime) > dt) or \
(i_1.stats.sampling_rate !=
i_2.stats.sampling_rate) or \
(i_1.stats.sampling_rate != i_3.stats.sampling_rate):
msg = "All components need to have the same time span."
raise ValueError(msg)
for i_1, i_2, i_3 in zip(input_1, input_2, input_3):
# Figure out inclination and back-azimuth.
baz = back_azimuth
inc = inclination
if baz is None:
try:
baz = i_1.stats.back_azimuth
except Exception:
msg = "No back-azimuth specified."
raise TypeError(msg)
if inc is None:
try:
inc = i_1.stats.inclination
except Exception:
msg = "No inclination specified."
raise TypeError(msg)
output_1, output_2, output_3 = func(
i_1.data, i_2.data, i_3.data, baz, inc)
i_1.data = output_1
i_2.data = output_2
i_3.data = output_3
# Rename the components.
i_1.stats.channel = i_1.stats.channel[:-1] + \
output_components[0]
i_2.stats.channel = i_2.stats.channel[:-1] + \
output_components[1]
i_3.stats.channel = i_3.stats.channel[:-1] + \
output_components[2]
# Add the azimuth and inclination to the stats object.
for comp in (i_1, i_2, i_3):
comp.stats.back_azimuth = baz
comp.stats.inclination = inc
return self
[docs]
def copy(self):
"""
Return a deepcopy of the Stream object.
:rtype: :class:`~obspy.core.stream.Stream`
:return: Copy of current stream.
.. rubric:: Examples
1. Create a Stream and copy it
>>> from obspy import read
>>> st = read()
>>> st2 = st.copy()
The two objects are not the same:
>>> st is st2
False
But they have equal data (before applying further processing):
>>> st == st2
True
2. The following example shows how to make an alias but not copy the
data. Any changes on ``st3`` would also change the contents of
``st``.
>>> st3 = st
>>> st is st3
True
>>> st == st3
True
"""
return copy.deepcopy(self)
[docs]
def clear(self):
"""
Clear trace list (convenience method).
Replaces Stream's trace list by an empty one creating an empty
Stream object. Useful if there are references to the current
Stream object that should not break. Otherwise simply use a new
Stream() instance.
.. rubric:: Example
>>> from obspy import read
>>> st = read()
>>> len(st)
3
>>> st.clear() # doctest: +ELLIPSIS
<...Stream object at 0x...>
>>> st.traces
[]
"""
self.traces = []
return self
[docs]
def _cleanup(self, misalignment_threshold=1e-2):
"""
Merge consistent trace objects but leave everything else alone.
This can mean traces with matching header that are directly adjacent or
are contained/equal/overlapping traces with exactly the same waveform
data in the overlapping part.
If option `misalignment_threshold` is non-zero then
contained/overlapping/directly adjacent traces with the sampling points
misaligned by less than `misalignment_threshold` times the sampling
interval are aligned on the same sampling points (see example below).
.. rubric:: Notes
Traces with overlapping data parts that do not match are not merged::
before:
Trace 1: AAAAAAAA
Trace 2: BBBBBBBB
after:
Trace 1: AAAAAAAA
Trace 2: BBBBBBBB
Traces with overlapping data parts that do match are merged::
before:
Trace 1: AAAAAAAA
Trace 2: AAAABBBB
after:
Trace 1: AAAAAAAABBBB
Contained traces are handled the same way.
If common data does not match, nothing is done::
before:
Trace 1: AAAAAAAAAAAA
Trace 2: BBBB
after:
Trace 1: AAAAAAAAAAAA
Trace 2: BBBB
If the common data part matches they are merged::
before:
Trace 1: AAAAAAAAAAAA
Trace 2: AAAA
after:
Trace 1: AAAAAAAAAAAA
Directly adjacent traces are merged::
before:
Trace 1: AAAAAAA
Trace 2: BBBBB
after:
Trace 1: AAAAAAABBBBB
Misaligned traces are aligned, depending on set parameters, e.g. for a
directly adjacent trace with slight misalignment (with two common
samples at start of Trace 2 for better visualization)::
before:
Trace 1: A---------A---------A
Trace 2: A---------A---------B---------B
after:
Trace 1: A---------A---------A---------B---------B
:type misalignment_threshold: float
:param misalignment_threshold: Threshold value for sub-sample
misalignments of sampling points of two traces that should be
merged together (fraction of sampling interval, from 0 to 0.5).
``0`` means traces with even just the slightest misalignment will
not be merged together, ``0.5`` means traces will be merged
together disregarding of any sub-sample shifts of sampling points.
"""
# first of all throw away all empty traces
self.traces = [_i for _i in self.traces if _i.stats.npts]
# check sampling rates and dtypes
try:
self._merge_checks()
except Exception as e:
if "Can't merge traces with same ids but" in str(e):
msg = "Incompatible traces (sampling_rate, dtype, ...) " + \
"with same id detected. Doing nothing."
warnings.warn(msg)
return
# order matters!
self.sort(keys=['network', 'station', 'location', 'channel',
'starttime', 'endtime'])
# build up dictionary with lists of traces with same ids
traces_dict = {}
# using pop() and try-except saves memory
try:
while True:
trace = self.traces.pop(0)
# add trace to respective list or create that list
traces_dict.setdefault(trace.id, []).append(trace)
except IndexError:
pass
# clear traces of current stream
self.traces = []
# loop through ids
for id_ in traces_dict.keys():
trace_list = traces_dict[id_]
cur_trace = trace_list.pop(0)
delta = cur_trace.stats.delta
allowed_micro_shift = misalignment_threshold * delta
# work through all traces of same id
while trace_list:
trace = trace_list.pop(0)
# `gap` is the deviation (in seconds) of the actual start
# time of the second trace from the expected start time
# (for the ideal case of directly adjacent and perfectly
# aligned traces).
gap = trace.stats.starttime - (cur_trace.stats.endtime + delta)
# if `gap` is larger than the designated allowed shift,
# we treat it as a real gap and leave as is.
if misalignment_threshold > 0 and gap <= allowed_micro_shift:
# `gap` is smaller than allowed shift (or equal),
# the traces could be
# - overlapping without being misaligned or..
# - overlapping with misalignment or..
# - misaligned with a micro gap
# check if the sampling points are misaligned:
misalignment = gap % delta
if misalignment != 0:
# determine the position of the second trace's
# sampling points in the interval between two
# sampling points of first trace.
# a `misalign_percentage` of close to 0.0 means a
# sampling point of the first trace is just a bit
# to the left of our sampling point:
#
# Trace 1: --|---------|---------|---------|--
# Trace 2: ---|---------|---------|---------|-
# misalign_percentage: 0.........1
#
# a `misalign_percentage` of close to 1.0 means a
# sampling point of the first trace is just a bit
# to the right of our sampling point:
#
# Trace 1: --|---------|---------|---------|--
# Trace 2: -|---------|---------|---------|---
# misalign_percentage: 0.........1
misalign_percentage = misalignment / delta
if (misalign_percentage <= misalignment_threshold or
misalign_percentage >=
1 - misalignment_threshold):
# now we align the sampling points of both traces
trace.stats.starttime = (
cur_trace.stats.starttime +
round((trace.stats.starttime -
cur_trace.stats.starttime) / delta) *
delta)
# we have some common parts: check if consistent
# (but only if sampling points are matching to specified
# accuracy, which is checked and conditionally corrected in
# previous code block)
subsample_shift_percentage = (
trace.stats.starttime.timestamp -
cur_trace.stats.starttime.timestamp) % delta / delta
subsample_shift_percentage = min(
subsample_shift_percentage, 1 - subsample_shift_percentage)
if (trace.stats.starttime <= cur_trace.stats.endtime and
subsample_shift_percentage < misalignment_threshold):
# check if common time slice [t1 --> t2] is equal:
t1 = trace.stats.starttime
t2 = min(cur_trace.stats.endtime, trace.stats.endtime)
# if consistent: add them together
if np.array_equal(cur_trace.slice(t1, t2).data,
trace.slice(t1, t2).data):
cur_trace += trace
# if not consistent: leave them alone
else:
self.traces.append(cur_trace)
cur_trace = trace
# traces are perfectly adjacent: add them together
elif trace.stats.starttime == cur_trace.stats.endtime + \
cur_trace.stats.delta:
cur_trace += trace
# no common parts (gap):
# leave traces alone and add current to list
else:
self.traces.append(cur_trace)
cur_trace = trace
self.traces.append(cur_trace)
self.traces = [tr for tr in self.traces if tr.stats.npts]
return self
[docs]
def split(self):
"""
Split any trace containing gaps into contiguous unmasked traces.
:rtype: :class:`obspy.core.stream.Stream`
:returns: Returns a new stream object containing only contiguous
unmasked.
"""
new_stream = Stream()
for trace in self.traces:
new_stream.extend(trace.split())
return new_stream
[docs]
@map_example_filename("inventories")
def attach_response(self, inventories):
"""
Search for and attach channel response to each trace as
trace.stats.response. Does not raise an exception but shows a warning
if response information can not be found for all traces. Returns a
list of traces for which no response could be found.
To subsequently deconvolve the instrument response use
:meth:`Stream.remove_response`.
.. note::
It is recommended to rather just provide the metadata information
directly whenever needed so that the lookup of appropriate response
is done right at the time when it is used, e.g.
``stream.remove_response(inventory=inv, ...)``
>>> from obspy import read, read_inventory
>>> st = read()
>>> inv = read_inventory()
>>> st.attach_response(inv)
[]
>>> tr = st[0]
>>> print(tr.stats.response) \
# doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
Channel Response
From M/S (Velocity in Meters per Second) to COUNTS (Digital Counts)
Overall Sensitivity: 2.5168e+09 defined at 0.020 Hz
4 stages:
Stage 1: PolesZerosResponseStage from M/S to V, gain: 1500
Stage 2: CoefficientsTypeResponseStage from V to COUNTS, ...
Stage 3: FIRResponseStage from COUNTS to COUNTS, gain: 1
Stage 4: FIRResponseStage from COUNTS to COUNTS, gain: 1
:type inventories: :class:`~obspy.core.inventory.inventory.Inventory`
or :class:`~obspy.core.inventory.network.Network` or a list
containing objects of these types.
:param inventories: Station metadata to use in search for response for
each trace in the stream.
:rtype: list of :class:`~obspy.core.trace.Trace`
:returns: list of traces for which no response information could be
found.
"""
skipped_traces = []
for tr in self.traces:
try:
tr.attach_response(inventories)
except Exception as e:
if str(e) == "No matching response information found.":
warnings.warn(str(e))
skipped_traces.append(tr)
else:
raise
return skipped_traces
[docs]
def remove_response(self, *args, **kwargs):
"""
Deconvolve instrument response for all Traces in Stream.
For details see the corresponding
:meth:`~obspy.core.trace.Trace.remove_response` method of
:class:`~obspy.core.trace.Trace`.
>>> from obspy import read, read_inventory
>>> st = read()
>>> inv = read_inventory()
>>> st.remove_response(inventory=inv) # doctest: +ELLIPSIS
<...Stream object at 0x...>
>>> st.plot() # doctest: +SKIP
.. plot::
from obspy import read, read_inventory
st = read()
inv = read_inventory()
st.remove_response(inventory=inv)
st.plot()
.. note::
This operation is performed in place on the actual data arrays. The
raw data is not accessible anymore afterwards. To keep your
original data, use :meth:`~obspy.core.stream.Stream.copy` to create
a copy of your stream object.
"""
for tr in self:
tr.remove_response(*args, **kwargs)
return self
[docs]
def remove_sensitivity(self, *args, **kwargs):
"""
Remove instrument sensitivity for all Traces in Stream.
For details see the corresponding
:meth:`~obspy.core.trace.Trace.remove_sensitivity` method of
:class:`~obspy.core.trace.Trace`.
>>> from obspy import read, read_inventory
>>> st = read()
>>> inv = read_inventory()
>>> st.remove_sensitivity(inv) # doctest: +ELLIPSIS
<...Stream object at 0x...>
.. note::
This operation is performed in place on the actual data arrays. The
raw data is not accessible anymore afterwards. To keep your
original data, use :meth:`~obspy.core.stream.Stream.copy` to create
a copy of your stream object.
"""
for tr in self:
tr.remove_sensitivity(*args, **kwargs)
return self
[docs]
def stack(self, group_by='all', stack_type='linear', npts_tol=0,
time_tol=0):
"""
Stack traces by the same selected metadata.
The metadata of each trace (including starttime) corresponds to the
metadata of the original traces if those are the same.
Additionaly, the entry ``stack`` is written to the stats object(s).
It contains the fields ``group``
(result of the format operation on the ``group_by`` parameter),
``count`` (number of stacked traces) and ``type``
(``stack_type`` argument).
:type group_by: str
:param group_by: Stack waveforms together which have the same metadata
given by this parameter. The parameter should name the
corresponding keys of the stats object,
e.g. ``'{network}.{station}'`` for stacking all
locations and channels of the stations and returning a stream
consisting of one stacked trace for each station.
This parameter can take two special values,
``'id'`` which stacks the waveforms by SEED id and
``'all'`` (default) which stacks together all traces in the stream.
:type stack_type: str or tuple
:param stack_type: Type of stack, one of the following:
``'linear'``: average stack (default),
``('pw', order)``: phase weighted stack of given order
(see [Schimmel1997]_, order 0 corresponds to linear stack),
``('root', order)``: root stack of given order
(order 1 corresponds to linear stack).
:type npts_tol: int
:param npts_tol: Tolerate traces with different number of points
with a difference up to this value. Surplus samples are discarded.
:type time_tol: float
:param time_tol: Tolerate difference, in seconds, in startime when
setting the new starttime of the stack. If starttimes differs more
than this value it will be set to timestamp 0.
>>> from obspy import read
>>> st = read()
>>> stack = st.stack()
>>> print(stack) # doctest: +ELLIPSIS
1 Trace(s) in Stream:
BW.RJOB.. | 2009-08-24T00:20:03.000000Z - ... | 100.0 Hz, 3000 samples
.. note::
This operation is performed in place on the actual data arrays. The
raw data will no longer be accessible afterwards. To keep your
original data, use :meth:`~obspy.core.stream.Stream.copy` to create
a copy of your stream object.
"""
from obspy.signal.util import stack as stack_func
from obspy.core.util.attribdict import _attribdict_equal
groups = self._groupby(group_by)
stacks = []
for groupid, traces in groups.items():
header = {k: v for k, v in traces[0].stats.items()
if all(_attribdict_equal(tr.stats.get(k), v)
for tr in traces)}
header.pop('endtime', None)
if 'sampling_rate' not in header:
msg = 'Sampling rate of traces to stack is different'
raise ValueError(msg)
if 'starttime' not in header and time_tol > 0:
times = [tr.stats.starttime for tr in traces]
if ptp(times) <= time_tol:
# use high median as starttime
header['starttime'] = sorted(times)[len(times) // 2]
header['stack'] = AttribDict(group=groupid, count=len(traces),
type=stack_type)
npts_all = [len(tr) for tr in traces]
npts_dif = ptp(npts_all)
npts = min(npts_all)
if npts_dif > npts_tol:
msg = ('Difference of number of points of the traces is higher'
' than requested tolerance ({} > {})')
raise ValueError(msg.format(npts_dif, npts_tol))
data = np.array([tr.data[:npts] for tr in traces])
stack = stack_func(data, stack_type=stack_type)
stacks.append(traces[0].__class__(data=stack, header=header))
self.traces = stacks
return self
[docs]
@staticmethod
def _dummy_stream_from_string(s):
"""
Helper method to create a dummy Stream object (with data always equal
to one) from a string representation of the Stream, mostly for
debugging purposes.
>>> import os
>>> s = ['', '', '3 Trace(s) in Stream:',
... 'IU.GRFO..HH2 | 2016-01-07T00:00:00.008300Z - '
... '2016-01-07T00:00:30.098300Z | 10.0 Hz, 301 samples',
... 'XX.GRFO..HH1 | 2016-01-07T00:00:02.668393Z - '
... '2016-01-07T00:00:09.518393Z | 100.0 Hz, 686 samples',
... 'IU.ABCD..EH2 | 2016-01-07T00:00:09.528393Z - '
... '2016-01-07T00:00:50.378393Z | 100.0 Hz, 4086 samples',
... '', '']
>>> s = os.linesep.join(s)
>>> st = Stream._dummy_stream_from_string(s)
>>> print(st) # doctest: +ELLIPSIS
3 Trace(s) in Stream:
IU.GRFO..HH2 | 2016-01-07T00:00:00.008300Z ... | 10.0 Hz, 301 samples
XX.GRFO..HH1 | 2016-01-07T00:00:02.668393Z ... | 100.0 Hz, 686 samples
IU.ABCD..EH2 | 2016-01-07T00:00:09.528393Z ... | 100.0 Hz, 4086 samples
"""
st = Stream()
for line in s.splitlines():
line = line.strip()
if not line:
continue
if re.match(r'[0-9]+ Trace\(s\) in Stream:', line):
continue
items = line.split()
net, sta, loc, cha = items[0].split(".")
starttime = UTCDateTime(items[2])
sampling_rate = float(items[6])
npts = int(items[8])
tr = Trace()
tr.data = np.ones(npts, dtype=np.float64)
tr.stats.station = sta
tr.stats.network = net
tr.stats.location = loc
tr.stats.channel = cha
tr.stats.starttime = starttime
tr.stats.sampling_rate = sampling_rate
st += tr
return st
[docs]
def _get_common_channels_info(self):
"""
Returns a dictionary with information on common channels.
"""
# get all ids down to location code
ids_ = set([tr.id.rsplit(".", 1)[0] for tr in self])
all_channels = {}
# work can be separated by net.sta.loc, so iterate over each
for id_ in ids_:
net, sta, loc = id_.split(".")
channels = {}
st_ = self.select(network=net, station=sta, location=loc)
# for each individual channel collect earliest start time and
# latest endtime
for tr in st_:
cha = tr.stats.channel
cha_common = cha and cha[:-1] or None
cha_common_dict = channels.setdefault(cha_common, {})
cha_dict = cha_common_dict.setdefault(cha, {})
cha_dict["start"] = min(tr.stats.starttime.timestamp,
cha_dict.get("start", np.inf))
cha_dict["end"] = max(tr.stats.endtime.timestamp,
cha_dict.get("end", -np.inf))
# convert all timestamp objects back to UTCDateTime
for cha_common_dict in channels.values():
for cha_dict in cha_common_dict.values():
cha_dict["start"] = UTCDateTime(cha_dict["start"])
cha_dict["end"] = UTCDateTime(cha_dict["end"])
# now for every combination of common channels determine earliest
# common start time and latest common end time, as well as gap
# information in between
for cha_common, channels_ in channels.items():
if cha_common is None:
cha_pattern = ""
else:
cha_pattern = cha_common + "?"
st__ = self.select(network=net, station=sta, location=loc,
channel=cha_pattern)
start = max(
[cha_dict_["start"] for cha_dict_ in channels_.values()])
end = min(
[cha_dict_["end"] for cha_dict_ in channels_.values()])
gaps = st__.get_gaps()
all_channels[(net, sta, loc, cha_pattern)] = {
"start": start, "end": end, "gaps": gaps,
"channels": channels_}
return all_channels
[docs]
def _groupby(self, group_by):
"""
Group traces by same metadata.
:param group_by: Group traces together which have the same metadata
given by this parameter. The parameter should name the
corresponding keys of the stats object,
e.g. ``'{network}.{station}'``.
This parameter can take the value
``'id'`` which groups the traces by SEED id.
:return: dictionary {group: stream}
"""
if group_by == 'id':
group_by = '{network}.{station}.{location}.{channel}'
groups = collections.defaultdict(self.__class__)
for tr in self:
groups[group_by.format(**tr.stats)].append(tr)
return dict(groups)
[docs]
def _trim_common_channels(self):
"""
Trim all channels that have the same ID down to the component character
to the earliest common start time and latest common end time. Works in
place.
"""
self._cleanup()
channel_infos = self._get_common_channels_info()
new_traces = []
for (net, sta, loc, cha_pattern), infos in channel_infos.items():
st = self.select(network=net, station=sta, location=loc,
channel=cha_pattern)
st.trim(infos["start"], infos["end"])
for _, _, _, _, start_, end_, _, _ in infos["gaps"]:
st = st.cutout(start_, end_)
new_traces += st.traces
self.traces = new_traces
[docs]
def _rotate_to_zne(
self, inventory, components=("Z12", "123")):
"""
Rotate all matching traces to ZNE, specifying sets of component codes.
>>> from obspy import read, read_inventory
>>> st = read("/path/to/ffbx_unrotated_gaps.mseed")
>>> inv = read_inventory("/path/to/ffbx.stationxml")
>>> st._rotate_to_zne(inv) # doctest: +ELLIPSIS
<obspy.core.stream.Stream object at 0x...>
:type inventory: :class:`~obspy.core.inventory.inventory.Inventory` or
:class:`~obspy.io.xseed.parser.Parser`
:param inventory: Inventory or Parser with metadata of channels.
:type components: list or tuple or str
:param components: List of combinations of three (case sensitive)
component characters. Rotations are executed in this order, so
order might matter in very strange cases (e.g. if traces with more
than three component codes are present for the same SEED ID down to
the component code). For example, specifying components ``"Z12"``
would rotate sets of "BHZ", "BH1", "BH2" (and "HHZ", "HH1", "HH2",
etc.) channels at the same station. If only a single set of
component codes is used, this option can also be specified as a
string (e.g. ``components='Z12'``).
"""
# be nice to users that specify e.g. ``components='ZNE'``..
# compare http://lists.swapbytes.de/archives/obspy-users/
# 2018-March/002692.html
if isinstance(components, str):
components = [components]
for component_pair in components:
st = self.select(component="[{}]".format(component_pair))
netstaloc = sorted(set(
[(tr.stats.network, tr.stats.station, tr.stats.location)
for tr in st]))
for net, sta, loc in netstaloc:
channels = set(
[tr.stats.channel
for tr in st.select(network=net, station=sta,
location=loc)])
common_channels = {}
for channel in channels:
if channel == "":
continue
cha_without_comp = channel[:-1]
component = channel[-1]
common_channels.setdefault(
cha_without_comp, set()).add(component)
for cha_without_comp, components in sorted(
common_channels.items()):
if components == set(component_pair):
channels_ = [cha_without_comp + comp
for comp in component_pair]
self._rotate_specific_channels_to_zne(
net, sta, loc, channels_, inventory)
return self
[docs]
def _rotate_specific_channels_to_zne(
self, network, station, location, channels, inventory):
"""
Rotate three explicitly specified channels to ZNE.
>>> from obspy import read, read_inventory
>>> st = read("/path/to/ffbx_unrotated_gaps.mseed")
>>> inv = read_inventory("/path/to/ffbx.stationxml")
>>> st._rotate_specific_channels_to_zne(
... "BW", "FFB1", "", ["HHZ", "HH1", "HH2"],
... inv) # doctest: +ELLIPSIS
<obspy.core.stream.Stream object at 0x...>
:type network: str
:param network: Network code of channels that should be rotated.
:type station: str
:param station: Station code of channels that should be rotated.
:type location: str
:param location: Location code of channels that should be rotated.
:type channels: list
:param channels: The three channel codes of channels that should be
rotated.
:type inventory: :class:`~obspy.core.inventory.inventory.Inventory` or
:class:`~obspy.io.xseed.parser.Parser`
:param inventory: Inventory or Parser with metadata of channels.
"""
from obspy.signal.rotate import rotate2zne
from obspy.core.inventory import Inventory, Network
from obspy.io.xseed import Parser
if isinstance(inventory, (Inventory, Network)):
metadata_getter = inventory.get_channel_metadata
elif isinstance(inventory, Parser):
# xseed Parser has everything in get_coordinates method due to
# historic reasons..
metadata_getter = inventory.get_coordinates
else:
msg = 'Wrong type for "inventory": {}'.format(str(type(inventory)))
raise TypeError(msg)
# build temporary stream that has only those traces that are supposed
# to be used in rotation
st = self.select(network=network, station=station, location=location)
st = (st.select(channel=channels[0]) + st.select(channel=channels[1]) +
st.select(channel=channels[2]))
# remove the original unrotated traces from the stream
for tr in st.traces:
self.remove(tr)
# cut data so that we end up with a set of matching pieces for the tree
# components (i.e. cut away any parts where one of the three components
# has no data)
st._trim_common_channels()
# sort by start time, so each three consecutive traces can then be used
# in one rotation run
st.sort(keys=["starttime"])
# woooops, that's unexpected. must be a bug in the trimming helper
# routine
if len(st) % 3 != 0:
msg = ("Unexpected behavior in rotation. Please file a bug "
"report on github.")
raise NotImplementedError(msg)
num_pieces = int(len(st) / 3)
for i in range(num_pieces):
# three consecutive traces are always the ones that combine for one
# rotation run
traces = [st.pop() for i in range(3)]
# paranoid.. do a quick check of the channels again.
if set([tr.stats.channel for tr in traces]) != set(channels):
msg = ("Unexpected behavior in rotation. Please file a bug "
"report on github.")
raise NotImplementedError(msg)
# `.get_orientation()` works the same for Inventory and Parser
orientation = [metadata_getter(tr.id, tr.stats.starttime)
for tr in traces]
zne = rotate2zne(
traces[0], orientation[0]["azimuth"], orientation[0]["dip"],
traces[1], orientation[1]["azimuth"], orientation[1]["dip"],
traces[2], orientation[2]["azimuth"], orientation[2]["dip"])
for tr, new_data, component in zip(traces, zne, "ZNE"):
tr.data = new_data
tr.stats.channel = tr.stats.channel[:-1] + component
self.traces += traces
return self
[docs]
def newbyteorder(self, byteorder='native'):
"""
Change byteorder of the data
For details see
:meth:`Trace.newbyteorder <obspy.core.trace.Trace.newbyteorder>`.
"""
for tr in self:
tr.newbyteorder(byteorder)
return self
[docs]
def _is_pickle(filename): # @UnusedVariable
"""
Check whether a file is a pickled ObsPy Stream file.
:type filename: str
:param filename: Name of the pickled ObsPy Stream file to be checked.
:rtype: bool
:return: ``True`` if pickled file.
.. rubric:: Example
>>> _is_pickle('/path/to/pickle.file') # doctest: +SKIP
True
"""
if isinstance(filename, str):
try:
with open(filename, 'rb') as fp:
if b"obspy.core.stream" in fp.read(100):
fp.seek(0)
st = pickle.load(fp)
return True
else:
return False
except Exception:
return False
else:
try:
st = pickle.load(filename)
except Exception:
return False
return isinstance(st, Stream)
[docs]
def _read_pickle(filename, **kwargs): # @UnusedVariable
"""
Read and return Stream from pickled ObsPy Stream file.
.. warning::
This function should NOT be called directly, it registers via the
ObsPy :func:`~obspy.core.stream.read` function, call this instead.
:type filename: str
:param filename: Name of the pickled ObsPy Stream file to be read.
:rtype: :class:`~obspy.core.stream.Stream`
:return: A ObsPy Stream object.
"""
kwargs = {}
# see seishub/client.py
# https://api.mongodb.org/python/current/\
# python3.html#why-can-t-i-share-pickled-objectids-\
# between-some-versions-of-python-2-and-3
kwargs['encoding'] = "latin-1"
if isinstance(filename, str):
with open(filename, 'rb') as fp:
return pickle.load(fp, **kwargs)
else:
return pickle.load(filename, **kwargs)
[docs]
def _write_pickle(stream, filename, protocol=2, **kwargs): # @UnusedVariable
"""
Write a Python pickle of current stream.
.. note::
Writing into PICKLE format allows to store additional attributes
appended to the current Stream object or any contained Trace.
.. warning::
This function should NOT be called directly, it registers via the
the :meth:`~obspy.core.stream.Stream.write` method of an
ObsPy :class:`~obspy.core.stream.Stream` object, call this instead.
:type stream: :class:`~obspy.core.stream.Stream`
:param stream: The ObsPy Stream object to write.
:type filename: str
:param filename: Name of file to write.
:type protocol: int, optional
:param protocol: Pickle protocol, defaults to ``2``.
"""
if isinstance(filename, str):
with open(filename, 'wb') as fp:
pickle.dump(stream, fp, protocol=protocol)
else:
pickle.dump(stream, filename, protocol=protocol)
if __name__ == '__main__':
import doctest
doctest.testmod(exclude_empty=True)