from __future__ import (absolute_import, division, print_function,
unicode_literals)
from future.builtins import * # NOQA
from future.utils import native_str as nstr
from codecs import encode
import copy
import decorator
import numpy as np
from obspy import Trace
@decorator.decorator
[docs]def _open_file(func, *args, **kwargs):
"""
Ensure a file buffer is passed as first argument to the
decorated function.
:param func: callable that takes at least one argument;
the first argument must be treated as a buffer.
:return: callable
"""
first_arg = args[0]
try:
with open(first_arg, 'rb') as fi:
args = tuple([fi] + list(args[1:]))
return func(*args, **kwargs)
except TypeError: # assume we have been passed a buffer
if not hasattr(args[0], 'read'):
raise # type error was in function call, not in opening file
out = func(*args, **kwargs)
first_arg.seek(0) # reset position to start of file
return out
[docs]def _read(fi, position, length, dtype, left_part=True):
"""
Read one or more bytes using provided datatype.
:param fi: A buffer containing the bytes to read.
:param position: Byte position to start reading.
:type position: int
:param length: Length, in bytes, of data to read.
:type length: int or float
:param dtype: bcd, binary or IEEE
:type dtype: str
:param left_part: If True, start the reading from the first half part
of the byte position. If False, start the reading from the second
half part of the byte position.
:type left_part: bool
"""
fi.seek(position)
if dtype == 'bcd':
return _read_bcd(fi, length, left_part)
elif dtype == 'binary':
return _read_binary(fi, length, left_part)
elif dtype == 'IEEE':
data = np.frombuffer(fi.read(int(length)), '>f4')
return data[0] if len(data) == 1 else data
[docs]def _read_bcd(fi, length, left_part):
"""
Interprets a byte string as binary coded decimals.
See: https://en.wikipedia.org/wiki/Binary-coded_decimal#Basics
:param fi: A buffer containing the bytes to read.
:param length: number of bytes to read.
:type length: int or float
:param left_part: If True, start the reading from the first half part
of the first byte. If False, start the reading from
the second half part of the first byte.
:type left_part: bool
"""
tens = np.power(10, range(12))[::-1]
nbr_half_bytes = round(2 * length)
if isinstance(length, float):
length = int(length) + 1
byte_values = fi.read(length)
ints = np.frombuffer(byte_values, dtype='<u1', count=length)
if left_part is True:
unpack_bits = np.unpackbits(ints).reshape(-1, 4)[0:nbr_half_bytes]
else:
unpack_bits = np.unpackbits(ints).reshape(-1, 4)[1:nbr_half_bytes + 1]
bits = np.dot(unpack_bits, np.array([1, 2, 4, 8])[::-1].reshape(4, 1))
if np.any(bits > 9):
raise ValueError('invalid bcd values encountered')
return np.dot(tens[-len(bits):], bits)[0]
[docs]def _read_binary(fi, length, left_part):
"""
Read raw bytes and convert them in integer.
:param fi: A buffer containing the bytes to read.
:param length: number of bytes to read.
:type length: int or float
:param left_part: If True, start the reading from the first half part
of the byte.
:type left_part: bool
"""
if isinstance(length, float):
if abs(length - 0.5) <= 1e-7:
ints = np.frombuffer(fi.read(1), dtype='<u1')[0]
if left_part is True:
return np.bitwise_and(ints >> 4, 0x0f)
else:
return np.bitwise_and(ints, 0x0f)
else:
raise ValueError('invalid length of bytes to read.\
It has to be an integer or 0.5')
else:
return int(encode(fi.read(length), 'hex'), 16)
if __name__ == '__main__':
import doctest
doctest.testmod(exclude_empty=True)
[docs]def _quick_merge(traces, small_number=.000001):
"""
Specialized function for merging traces produced by _read_rg16.
Requires that traces are of the same datatype, have the same
sampling_rate, and dont have data overlaps.
:param traces: list of ObsPy :class:`~obspy.core.trace.Trace` objects.
:param small_number: a small number for determining if traces
should be merged. Should be much less than one sample spacing.
:return: list of ObsPy :class:`~obspy.core.trace.Trace` objects.
"""
# make sure sampling rates are all the same
assert len({tr.stats.sampling_rate for tr in traces}) == 1
assert len({tr.data.dtype for tr in traces}) == 1
sampling_rate = traces[0].stats.sampling_rate
diff = 1. / sampling_rate + small_number
# get the array
ar, trace_ar = _trace_list_to_rec_array(traces)
# get groups of traces that can be merged together
group = _get_trace_groups(ar, diff)
group_numbers = np.unique(group)
out = [None] * len(group_numbers) # init output list
for index, gnum in enumerate(group_numbers):
trace_ar_to_merge = trace_ar[group == gnum]
new_data = np.concatenate(list(trace_ar_to_merge['data']))
# get updated stats object
new_stats = copy.deepcopy(trace_ar_to_merge['stats'][0])
new_stats.npts = len(new_data)
out[index] = Trace(data=new_data, header=new_stats)
return out
[docs]def _trace_list_to_rec_array(traces):
"""
Return a recarray from the trace list.
These are separated into two arrays due to a weird issue with
numpy.sort returning and error set.
"""
# get the id, starttime, endtime into a recarray
# rec array column names must be native strings due to numpy issue 2407
dtype1 = [(nstr('id'), np.object), (nstr('starttime'), float),
(nstr('endtime'), float)]
dtype2 = [(nstr('data'), np.object), (nstr('stats'), np.object)]
data1 = [(tr.id, tr.stats.starttime.timestamp, tr.stats.endtime.timestamp)
for tr in traces]
data2 = [(tr.data, tr.stats) for tr in traces]
ar1 = np.array(data1, dtype=dtype1) # array of id, starttime, endtime
ar2 = np.array(data2, dtype=dtype2) # array of data, stats objects
#
sort_index = np.argsort(ar1, order=['id', 'starttime'])
return ar1[sort_index], ar2[sort_index]
[docs]def _get_trace_groups(ar, diff):
"""
Return an array of ints where each element corresponds to a pre-merged
trace row. All trace rows with the same group number can be merged.
"""
# get a bool of if ids are the same as the next row down
ids_different = np.ones(len(ar), dtype=bool)
ids_different[1:] = ar['id'][1:] != ar['id'][:-1]
# get bool of endtimes within one sample of starttime of next row
disjoint = np.zeros(len(ar), dtype=bool)
start_end_diffs = ar['starttime'][1:] - ar['endtime'][:-1]
disjoint[:-1] = np.abs(start_end_diffs) <= diff
# get groups (not disjoint, not different ids)
return np.cumsum(ids_different & disjoint)