Source code for obspy.io.sac.util
# -*- coding: utf-8 -*-
"""
SAC module helper functions and data.
"""
import sys
import warnings
import numpy as np
from obspy import UTCDateTime
from obspy.core import Stats
from . import header as HD # noqa
# ------------- DATA ----------------------------------------------------------
TWO_DIGIT_YEAR_MSG = ("SAC file with 2-digit year header field encountered. "
"This is not supported by the SAC file format standard. "
"Prepending '19'.")
# ------------- SAC-SPECIFIC EXCEPTIONS ---------------------------------------
[docs]
class SacError(Exception):
"""
Raised if the SAC file is corrupt or if necessary information
in the SAC file is missing.
"""
pass
[docs]
class SacIOError(SacError, IOError):
"""
Raised if the given SAC file can't be read.
"""
pass
[docs]
class SacInvalidContentError(SacError):
"""
Raised if headers and/or data are not valid.
"""
pass
[docs]
class SacHeaderTimeError(SacHeaderError, ValueError):
"""
Raised if header has invalid "nz" times.
"""
pass
# ------------- VALIDITY CHECKS -----------------------------------------------
[docs]
def is_valid_enum_str(hdr, name):
# is this a valid string name for this hdr
# assume that, if a value isn't in HD.ACCEPTED_VALS, it's not valid
if hdr in HD.ACCEPTED_VALS:
tf = name in HD.ACCEPTED_VALS[hdr]
else:
tf = False
return tf
[docs]
def is_valid_enum_int(hdr, val, allow_null=True):
# is this a valid integer for this hdr.
if hdr in HD.ACCEPTED_VALS:
accep = [HD.ENUM_VALS[nm] for nm in HD.ACCEPTED_VALS[hdr]]
if allow_null:
accep += [HD.INULL]
tf = val in accep
else:
tf = False
return tf
# ------------- GENERAL -------------------------------------------------------
[docs]
def _convert_enum(header, converter, accep):
# header : dict, SAC header
# converter : dict, {source value: target value}
# accep : dict, {header name: acceptable value list}
# TODO: use functools.partial/wraps?
for hdr, val in header.items():
if hdr in HD.ACCEPTED_VALS:
if val in accep[hdr]:
header[hdr] = converter[val]
else:
msg = 'Unrecognized enumerated value "{}" for header "{}"'
raise ValueError(msg.format(val, hdr))
return header
[docs]
def enum_string_to_int(header):
"""
Convert enumerated string values in header dictionary to int values.
"""
out = _convert_enum(header, converter=HD.ENUM_VALS, accep=HD.ACCEPTED_VALS)
return out
[docs]
def enum_int_to_string(header):
"""
Convert enumerated int values in header dictionary to string values.
"""
out = _convert_enum(header, converter=HD.ENUM_NAMES, accep=HD.ACCEPTED_INT)
return out
[docs]
def byteswap(*arrays):
"""
Swapping of bytes for provided arrays.
**Notes**
arr.newbyteorder('S') swaps dtype interpretation, but not bytes in memory
arr.byteswap() swaps bytes in memory, but not dtype interpretation
arr.byteswap(True).newbyteorder('S') completely swaps both
.. seealso::
https://docs.scipy.org/doc/numpy/user/basics.byteswapping.html
"""
return [arr.view(arr.dtype.newbyteorder('S')) for arr in arrays]
[docs]
def is_same_byteorder(bo1, bo2):
"""
Deal with all the ways to compare byte order string representations.
:param bo1: Byte order string. Can be one of {'l', 'little', 'L', '<',
'b', 'big', 'B', '>', 'n', 'native','N', '='}
:type bo1: str
:param bo2: Byte order string. Can be one of {'l', 'little', 'L', '<',
'b', 'big', 'B', '>', 'n', 'native','N', '='}
:type bo1: str
:rtype: bool
:return: True of same byte order.
"""
# TODO: extend this as is_same_byteorder(*byteorders) using itertools
be = ('b', 'big', '>')
le = ('l', 'little', '<')
ne = ('n', 'native', '=')
ok = be + le + ne
if (bo1.lower() not in ok) or (bo2.lower() not in ok):
raise ValueError("Unrecognized byte order string.")
# make native decide what it is
bo1 = sys.byteorder if bo1.lower() in ne else bo1
bo2 = sys.byteorder if bo2.lower() in ne else bo2
return (bo1.lower() in le) == (bo2.lower() in le)
[docs]
def _clean_str(value, strip_whitespace=True):
"""
Remove null values and whitespace, return a str
This fn is used in two places: in SACTrace.read, to sanitize strings for
SACTrace, and in sac_to_obspy_header, to sanitize strings for making a
Trace that the user may have manually added.
"""
null_term = value.find('\x00')
if null_term >= 0:
value = value[:null_term] + " " * len(value[null_term:])
if strip_whitespace:
value = value.strip()
return value
# TODO: do this in SACTrace?
[docs]
def sac_to_obspy_header(sacheader, round_sampling_interval=True):
"""
Make an ObsPy Stats header dictionary from a SAC header dictionary.
:param sacheader: SAC header dictionary.
:type sacheader: dict
:param round_sampling_interval: Whether to round sampling interval to
microseconds before calculating sampling rate to avoid floating point
accuracy issues with some SAC files (see #3408)
:type round_sampling_interval: bool
:rtype: :class:`~obspy.core.trace.Stats`
:return: Filled ObsPy Stats header.
"""
# 1. get required sac header values
try:
npts = sacheader['npts']
delta = sacheader['delta']
except KeyError:
msg = "Incomplete SAC header information to build an ObsPy header."
raise KeyError(msg)
assert npts != HD.INULL
assert delta != HD.FNULL
#
# 2. get time
try:
reftime = get_sac_reftime(sacheader)
except (SacError, ValueError, TypeError):
# ObsPy doesn't require a valid reftime
reftime = UTCDateTime(0.0)
b = sacheader.get('b', HD.FNULL)
#
# 3. get optional sac header values
calib = sacheader.get('scale', HD.FNULL)
kcmpnm = sacheader.get('kcmpnm', HD.SNULL)
kstnm = sacheader.get('kstnm', HD.SNULL)
knetwk = sacheader.get('knetwk', HD.SNULL)
khole = sacheader.get('khole', HD.SNULL)
#
# 4. deal with null values
b = b if (b != HD.FNULL) else 0.0
calib = calib if (calib != HD.FNULL) else 1.0
kcmpnm = kcmpnm if (kcmpnm != HD.SNULL) else ''
kstnm = kstnm if (kstnm != HD.SNULL) else ''
knetwk = knetwk if (knetwk != HD.SNULL) else ''
khole = khole if (khole != HD.SNULL) else ''
#
# 5. transform to obspy values
# nothing is null
stats = {}
stats['npts'] = npts
# sampling rate can have floating point issues due to it being stored as a
# single precision floating point sampling interval in seconds, see #3408
# usually it should be ok potentially, but it seems that some SAC data
# sources additionally don't have the sampling interval encoded as the
# closest possible binary representation but rather the closest lesser
# binary representation (e.g. b'\x0b\xd7#=' for 0.04) when the next
# possible higher number would be closer to the "exact" value (here
# b'\n\xd7#=') which seems to be the tipping point of the floating point
# inaccuracy making it through our internal conversion to double precision
# sampling rate. here, we check if the conventional conversion and the
# "safer" way of rounding off (to microseconds in sample spacing) lead to a
# difference and warn about the fact, since the rounding approach actually
# could introduce buggy results for real weird unaligned sampling rates
sample_spacing_conventional = np.float32(delta)
sample_spacing_rounded_off = round(np.float64(delta), 6)
sampling_rate_conventional = np.float32(1.) / np.float32(delta)
sampling_rate_rounded_off = 1.0 / round(np.float64(delta), 6)
if round_sampling_interval:
if sampling_rate_conventional != sampling_rate_rounded_off:
msg = (f"Sample spacing read from SAC file "
f"({sample_spacing_conventional:.9f} when rounded to "
f"nanoseconds) was rounded of to microsecond precision "
f"({sample_spacing_rounded_off:.9f}) to avoid floating "
f"point issues when converting to sampling rate (see "
f"#3408)")
warnings.warn(msg)
stats['sampling_rate'] = sampling_rate_rounded_off
else:
stats['sampling_rate'] = sampling_rate_conventional
stats['network'] = _clean_str(knetwk)
stats['station'] = _clean_str(kstnm)
stats['channel'] = _clean_str(kcmpnm)
stats['location'] = _clean_str(khole)
stats['calib'] = calib
# store _all_ provided SAC header values
stats['sac'] = sacheader.copy()
# get first sample absolute time as UTCDateTime
# always add the begin time (if it's defined) to get the given
# SAC reference time, no matter which iztype is given
# b may be non-zero, even for iztype 'ib', especially if it was used to
# store microseconds from obspy_to_sac_header
stats['starttime'] = UTCDateTime(reftime) + b
return Stats(stats)
[docs]
def split_microseconds(microseconds):
# Returns milliseconds and remainder microseconds
milliseconds = microseconds // 1000
microseconds = (microseconds - milliseconds * 1000)
return milliseconds, microseconds
[docs]
def utcdatetime_to_sac_nztimes(utcdt):
# Returns a dict of integer nz-times and remainder microseconds
nztimes = {}
nztimes['nzyear'] = utcdt.year
nztimes['nzjday'] = utcdt.julday
nztimes['nzhour'] = utcdt.hour
nztimes['nzmin'] = utcdt.minute
nztimes['nzsec'] = utcdt.second
# nz times don't have enough precision, so push microseconds into b,
# using integer arithmetic
millisecond, microsecond = split_microseconds(utcdt.microsecond)
nztimes['nzmsec'] = millisecond
return nztimes, microsecond
[docs]
def obspy_to_sac_header(stats, keep_sac_header=True):
"""
Merge a primary with a secondary header, reconciling some differences.
:param stats: Filled ObsPy Stats header
:type stats: dict or :class:`~obspy.core.trace.Stats`
:param keep_sac_header: If keep_sac_header is True, old stats.sac
header values are kept, and a minimal set of values are updated from
the stats dictionary according to these guidelines:
* npts, delta always come from stats
* If a valid old reftime is found, the new b and e will be made
and properly referenced to it. All other old SAC headers are simply
carried along.
* If the old SAC reftime is invalid and relative time headers are set,
a SacHeaderError exception will be raised.
* If the old SAC reftime is invalid, no relative time headers are set,
and "b" is set, "e" is updated from stats and other old SAC headers
are carried along.
* If the old SAC reftime is invalid, no relative time headers are set,
and "b" is not set, the reftime will be set from stats.starttime
(with micro/milliseconds precision adjustments) and "b" and "e" are
set accordingly.
* If 'kstnm', 'knetwk', 'kcmpnm', or 'khole' are not set or differ
from Stats values 'station', 'network', 'channel', or 'location',
they are taken from the Stats values.
If keep_sac_header is False, a new SAC header is constructed from only
information found in the Stats dictionary, with some other default
values introduced. It will be an iztype 9 ("ib") file, with small
reference time adjustments for micro/milliseconds precision issues.
SAC headers nvhdr, level, lovrok, and iftype are always produced.
:type keep_sac_header: bool
:rtype merged: dict
:return: SAC header
"""
header = {}
oldsac = stats.get('sac', {})
if keep_sac_header and oldsac:
header.update(oldsac)
try:
reftime = get_sac_reftime(header)
except SacHeaderTimeError:
reftime = None
relhdrs = [hdr for hdr in HD.RELHDRS
if header.get(hdr) not in (None, HD.FNULL)]
if reftime:
# Set current 'b' relative to the old reftime.
b = stats['starttime'] - reftime
else:
# Invalid reference time. Relative times like 'b' cannot be
# unambiguously referenced to stats.starttime.
if 'b' in relhdrs:
# Assume no trimming/expanding of the Trace occurred relative
# to the old 'b', and just use the old 'b' value.
b = header['b']
else:
# Assume it's an iztype=ib (9) type file. Also set iztype?
b = 0
# Set the stats.starttime as the reftime and set 'b' and 'e'.
# ObsPy issue 1204
reftime = stats['starttime'] - b
nztimes, microsecond = utcdatetime_to_sac_nztimes(reftime)
header.update(nztimes)
b += (microsecond * 1e-6)
header['b'] = b
header['e'] = b + (stats['endtime'] - stats['starttime'])
# Merge some values from stats if they're missing in the SAC header
# ObsPy issues 1204, 1457
# XXX: If Stats values are empty/"" and SAC header values are real,
# this will replace the real SAC values with SAC null values.
for sachdr, statshdr in [('kstnm', 'station'), ('knetwk', 'network'),
('kcmpnm', 'channel'), ('khole', 'location')]:
if (header.get(sachdr) in (None, HD.SNULL)) or \
(header.get(sachdr).strip() != stats[statshdr]):
header[sachdr] = stats[statshdr] or HD.SNULL
else:
# SAC header from Stats only.
# Here, set headers from Stats that would otherwise depend on the old
# SAC header
header['iztype'] = 9
starttime = stats['starttime']
# nz times don't have enough precision, so push microseconds into b,
# using integer arithmetic
nztimes, microsecond = utcdatetime_to_sac_nztimes(starttime)
header.update(nztimes)
header['b'] = microsecond * 1e-6
# we now have correct b, npts, delta, and nz times
header['e'] = header['b'] + (stats['npts'] - 1) * stats['delta']
header['scale'] = stats.get('calib', HD.FNULL)
# NOTE: overwrites existing SAC headers
# nulls for these are '', which stats.get(hdr, HD.SNULL) won't catch
header['kcmpnm'] = stats['channel'] if stats['channel'] else HD.SNULL
header['kstnm'] = stats['station'] if stats['station'] else HD.SNULL
header['knetwk'] = stats['network'] if stats['network'] else HD.SNULL
header['khole'] = stats['location'] if stats['location'] else HD.SNULL
header['lpspol'] = True
header['lcalda'] = False
# ObsPy issue 1204
header['nvhdr'] = 6
header['leven'] = 1
header['lovrok'] = 1
header['iftype'] = 1
# ObsPy issue #1317
header['npts'] = stats['npts']
header['delta'] = stats['delta']
return header
[docs]
def get_sac_reftime(header):
"""
Get SAC header reference time as a UTCDateTime instance from a SAC header
dictionary.
Builds the reference time from SAC "nz" time fields. Raises
:class:`SacHeaderTimeError` if any time fields are null.
:param header: SAC header
:type header: dict
:rtype: :class:`~obspy.core.utcdatetime.UTCDateTime`
:returns: SAC reference time.
"""
# NOTE: epoch seconds can be got by:
# (reftime - datetime.datetime(1970,1,1)).total_seconds()
try:
yr = header['nzyear']
nzjday = header['nzjday']
nzhour = header['nzhour']
nzmin = header['nzmin']
nzsec = header['nzsec']
nzmsec = header['nzmsec']
except KeyError as e:
# header doesn't have all the keys
msg = "Not enough time information: {}".format(e)
raise SacHeaderTimeError(msg)
if 0 <= yr <= 99:
warnings.warn(TWO_DIGIT_YEAR_MSG)
yr += 1900
try:
reftime = UTCDateTime(year=yr, julday=nzjday, hour=nzhour,
minute=nzmin, second=nzsec,
microsecond=nzmsec * 1000)
except (ValueError, TypeError):
msg = "Invalid time headers. May contain null values."
raise SacHeaderTimeError(msg)
return reftime