# -*- coding: utf-8 -*-
"""
Various additional utilities for ObsPy xseed.
:copyright:
The ObsPy Development Team (devs@obspy.org)
:license:
GNU Lesser General Public License, Version 3
(https://www.gnu.org/copyleft/lesser.html)
"""
from __future__ import (absolute_import, division, print_function,
unicode_literals)
from future.builtins import * # NOQA @UnusedWildImport
from future.utils import native_str
import warnings
from obspy import UTCDateTime
# Ignore Attributes of Blockettes
IGNORE_ATTR = ['blockette_id', 'blockette_name', 'compact', 'debug',
'seed_version', 'strict', 'xseed_version',
'length_of_blockette', 'blockette_type']
[docs]class SEEDParserException(Exception):
pass
[docs]class SEEDAbbreviationNotFoundWarning(UserWarning):
pass
[docs]def to_tag(name):
"""
Creates a XML tag from a given string.
"""
temp = name.lower().replace(' ', '_')
temp = temp.replace('fir_', 'FIR_')
temp = temp.replace('a0_', 'A0_')
return temp
[docs]def to_string(tag):
"""
Creates a pretty string from any given XML tag.
"""
temp = tag.replace('_', ' ').title()
temp = temp.replace('Fir', 'FIR')
return temp
[docs]def datetime_2_string(dt, compact=False):
"""
Generates a valid SEED time string from a UTCDateTime object.
"""
if isinstance(dt, UTCDateTime):
return dt.format_seed(compact)
elif isinstance(dt, (str, native_str)):
dt = dt.strip()
if not dt:
return ""
try:
dt = UTCDateTime(dt)
return dt.format_seed(compact)
except Exception:
raise Exception("Invalid datetime %s: %s" % (type(dt), str(dt)))
[docs]def compare_seed(seed1, seed2):
"""
Compares two SEED files.
Only works with a record length of 4096 bytes.
"""
# Each SEED string should be a multiple of the record length.
if (len(seed1) % 4096) != 0:
msg = "Length of first SEED string should be a multiple of 4096 bytes"
raise Exception(msg)
if (len(seed2) % 4096) != 0:
msg = "Length of second SEED string should be a multiple of 4096 bytes"
raise Exception(msg)
# Loop over each record and remove empty ones. obspy.io.xseed doesn't write
# empty records. Redundant code to ease coding...
recnums = len(seed1) // 4096
new_seed1 = b''
for _i in range(recnums):
cur_record = seed1[_i * 4096 + 8:(_i + 1) * 4096].strip()
if cur_record == b'':
continue
new_seed1 += seed1[_i * 4096:(_i + 1) * 4096]
seed1 = new_seed1
recnums = len(seed2) // 4096
new_seed2 = b''
for _i in range(recnums):
cur_record = seed2[_i * 4096 + 8:(_i + 1) * 4096].strip()
if cur_record == b'':
continue
new_seed2 += seed2[_i * 4096:(_i + 1) * 4096]
seed2 = new_seed2
# length should be the same
if len(seed1) != len(seed2):
msg = "Length of SEED strings differ! (%d != %d)" % (len(seed1),
len(seed2))
raise Exception(msg)
# version string is always ' 2.4' for output
if seed1[15:19] == b' 2.3':
seed1 = seed1.replace(b' 2.3', b' 2.4', 1)
if seed1[15:19] == b'02.3':
seed1 = seed1.replace(b'02.3', b' 2.4', 1)
# check for missing '~' in blockette 10 (faulty dataless from BW network)
l = int(seed1[11:15]) # NOQA
temp = seed1[0:(l + 8)]
if temp.count(b'~') == 4:
# added a '~' and remove a space before the next record
# record length for now 4096
b_l = ('%04i' % (l + 1)).encode('ascii', 'strict')
seed1 = seed1[0:11] + b_l + seed1[15:(l + 8)] + b'~' + \
seed1[(l + 8):4095] + seed1[4096:]
# check each byte
for i in range(0, len(seed1)):
if seed1[i] == seed2[i]:
continue
temp = seed1[i] + seed2[i]
if temp == b'0+':
continue
if temp == b'0 ':
continue
if temp == b' +':
continue
if temp == b'- ':
# -056.996398+0031.0
# -56.996398 +31.0
continue
[docs]def lookup_code(blockettes, blkt_number, field_name, lookup_code,
lookup_code_number):
"""
Loops over a list of blockettes until it finds the blockette with the
right number and lookup code.
"""
# List of all possible names for lookup
for blockette in blockettes:
if blockette.id != blkt_number:
continue
if getattr(blockette, lookup_code) != lookup_code_number:
continue
return getattr(blockette, field_name)
return None
[docs]def blockette_34_lookup(abbr, lookup):
"""
Gets certain values from blockette 34. Needed for RESP output.
"""
try:
l1 = lookup_code(abbr, 34, 'unit_name', 'unit_lookup_code', lookup)
l2 = lookup_code(abbr, 34, 'unit_description', 'unit_lookup_code',
lookup)
return l1 + ' - ' + l2
except Exception:
msg = 'Warning: Abbreviation reference for "%i" not found.' % lookup
warnings.warn(msg, SEEDAbbreviationNotFoundWarning)
return 'No Abbreviation Referenced'
[docs]def set_xpath(blockette, identifier):
"""
Returns an X-Path String to a blockette with the correct identifier.
"""
try:
identifier = int(identifier)
except Exception:
msg = 'X-Path identifier needs to be an integer.'
raise TypeError(msg)
abbr_path = '/xseed/abbreviation_dictionary_control_header/'
end_of_path = '[text()="%s"]/parent::*'
if blockette == 30:
return abbr_path + \
'data_format_dictionary/data_format_identifier_code' + \
end_of_path % identifier
elif blockette == 31:
return abbr_path + \
'comment_description/comment_code_key' + \
end_of_path % identifier
elif blockette == 33:
return abbr_path + \
'generic_abbreviation/abbreviation_lookup_code' + \
end_of_path % identifier
elif blockette == 34:
return abbr_path + \
'units_abbreviations/unit_lookup_code' + \
end_of_path % identifier
# All dictionary blockettes.
elif blockette == 'dictionary':
return abbr_path + \
'*/response_lookup_key' + \
end_of_path % identifier
msg = 'XPath for blockette %d not implemented yet.' % blockette
raise NotImplementedError(msg)
[docs]def get_xpath(xpath):
"""
Returns lookup key of XPath expression on abbreviation dictionary.
"""
return int(xpath.split('"')[-2])
[docs]def unique_list(seq):
# Not order preserving
keys = {}
for e in seq:
keys[e] = 1
return list(keys.keys())