Source code for obspy.io.y.core

# -*- coding: utf-8 -*-
"""
Y bindings to ObsPy core module.

:copyright:
    The ObsPy Development Team (devs@obspy.org)
:license:
    GNU Lesser General Public License, Version 3
    (https://www.gnu.org/copyleft/lesser.html)
"""
import re
import warnings
from struct import unpack

import numpy as np

from obspy import Stream
from obspy.core.compatibility import from_buffer
from obspy.core.trace import Trace
from obspy.core.utcdatetime import UTCDateTime
from obspy.core.util import AttribDict


INVALID_CHAR_MSG = (
    "Invalid non-ASCII characters in Y file header detected (%s). "
    "These were ignored.")


[docs]def _unpack_with_asciiz_and_decode(fmt, data): """ Unpack binary data and decode ASCII bytestrings, stripping ASCIIZ bytestrings correctly as specified by Y format definition. In addition to format flags defined by :py:func:`struct.unpack`, "z" can be used to denote ASCIIZ fields. :param fmt: see :py:func:`struct.unpack` :param data: see :py:func:`struct.unpack` :returns: see :py:func:`struct.unpack` but with bytestrings being decoded """ fmt_list = re.findall(b'[a-zA-Z]', fmt) z_positions = [pos for pos, fmt_ in enumerate(fmt_list) if fmt_ == b"z"] s_positions = [pos for pos, fmt_ in enumerate(fmt_list) if fmt_ == b"s"] parts = list(unpack(fmt.replace(b"z", b"s"), data)) # special handling for ASCIIZ fields: # strip everything after first (if any) ASCII NULL character *before* # decoding (those need not be valid encoded ASCII bytes and should be # ignored) for i in z_positions: part = parts[i] terminal_index = part.find(b"\x00") if terminal_index != -1: parts[i] = part[:terminal_index] # decode all bytestrings from ASCII for i in z_positions + s_positions: part = parts[i] try: part = part.decode('ascii', errors="strict") except UnicodeError as e: warnings.warn(INVALID_CHAR_MSG % str(e), UserWarning) part = part.decode('ascii', errors="ignore") parts[i] = part # right-strip all BLANKPADDED fields for i in s_positions: parts[i] = parts[i].rstrip() return tuple(parts)
[docs]def _parse_tag(fh): """ Reads and parses a single tag. returns endian, tag_type, next_tag, next_same """ data = fh.read(16) # byte order format for this data. Uses letter “I” for Intel format # data (little endian) or letter “M” for Motorola (big endian) format format = unpack(b'=c', data[0:1])[0] if format == b'I': endian = b'<' elif format == b'M': endian = b'>' else: raise ValueError('Invalid tag: missing byte order information') # magic: check for magic number "31" magic = unpack(endian + b'B', data[1:2])[0] if magic != 31: raise ValueError('Invalid tag: missing magic number') # tag type: the type of data attached to this tag. tag_type = unpack(endian + b'H', data[2:4])[0] # NextTag is the offset in bytes from the end of this tag to the start of # the next tag. That means, the offset is the size of the data attached # to this tag. next_tag = unpack(endian + b'i', data[4:8])[0] # NextSame is the offset in bytes from the end of this tag to the start # of the next tag with the same type. If zero, there is no next tag with # the same type. next_same = unpack(endian + b'i', data[8:12])[0] return endian, tag_type, next_tag, next_same
[docs]def _is_y(filename): """ Checks whether a file is a Nanometrics Y file or not. :type filename: str :param filename: Name of the Nanometrics Y file to be checked. :rtype: bool :return: ``True`` if a Nanometrics Y file. .. rubric:: Example >>> _is_y("/path/to/YAYT_BHZ_20021223.124800") #doctest: +SKIP True """ try: # get first tag (16 bytes) with open(filename, 'rb') as fh: _, tag_type, _, _ = _parse_tag(fh) except Exception: return False # The first tag in a Y-file must be the TAG_Y_FILE tag (tag type 0) if tag_type != 0: return False return True
[docs]def _read_y(filename, headonly=False, **kwargs): # @UnusedVariable """ Reads a Nanometrics Y file and returns an ObsPy Stream object. .. warning:: This function should NOT be called directly, it registers via the ObsPy :func:`~obspy.core.stream.read` function, call this instead. :type filename: str :param filename: Nanometrics Y file to be read. :type headonly: bool, optional :param headonly: If set to True, read only the head. This is most useful for scanning available data in huge (temporary) data sets. :rtype: :class:`~obspy.core.stream.Stream` :return: A ObsPy Stream object. .. rubric:: Example >>> from obspy import read >>> st = read("/path/to/YAYT_BHZ_20021223.124800") >>> st # doctest: +ELLIPSIS <obspy.core.stream.Stream object at 0x...> >>> print(st) # doctest: +ELLIPSIS 1 Trace(s) in Stream: .AYT..BHZ | 2002-12-23T12:48:00.000100Z - ... | 100.0 Hz, 18000 samples """ # The first tag in a Y-file must be the TAG_Y_FILE (0) tag. This must be # followed by the following tags, in any order: # TAG_STATION_INFO (1) # TAG_STATION_LOCATION (2) # TAG_STATION_PARAMETERS (3) # TAG_STATION_DATABASE (4) # TAG_SERIES_INFO (5) # TAG_SERIES_DATABASE (6) # The following tag is optional: # TAG_STATION_RESPONSE (26) # The last tag in the file must be a TAG_DATA_INT32 (7) tag. This tag must # be followed by an array of LONG's. The number of entries in the array # must agree with what was described in the TAG_SERIES_INFO data. with open(filename, 'rb') as fh: trace = Trace() trace.stats.y = AttribDict() count = -1 while True: endian, tag_type, next_tag, _next_same = _parse_tag(fh) if tag_type == 1: # TAG_STATION_INFO # UCHAR Update[8] # This field is only used internally for administrative # purposes. It should always be set to zeroes. # UCHAR Station[5] (BLANKPAD) # Station is the five letter SEED format station # identification. # UCHAR Location[2] (BLANKPAD) # Location Location is the two letter SEED format location # identification. # UCHAR Channel[3] (BLANKPAD) # Channel Channel is the three letter SEED format channel # identification. # UCHAR NetworkID[51] (ASCIIZ) # This is some descriptive text identifying the network. # UCHAR SiteName[61] (ASCIIZ) # SiteName is some text identifying the site. # UCHAR Comment[31] (ASCIIZ) # Comment is any comment for this station. # UCHAR SensorType[51] (ASCIIZ) # SensorType is some text describing the type of sensor used # at the station. # UCHAR DataFormat[7] (ASCIIZ) # DataFormat is some text describing the data format recorded # at the station. data = fh.read(next_tag) parts = _unpack_with_asciiz_and_decode( b'5s2s3s51z61z31z51z7z', data[8:]) trace.stats.station = parts[0] trace.stats.location = parts[1] trace.stats.channel = parts[2] # extra params = AttribDict() params.network_id = parts[3] params.site_name = parts[4] params.comment = parts[5] params.sensor_type = parts[6] params.data_format = parts[7] trace.stats.y.tag_station_info = params elif tag_type == 2: # TAG_STATION_LOCATION # UCHAR Update[8] # This field is only used internally for administrative # purposes. It should always be set to zeroes. # FLOAT Latitude # Latitude in degrees of the location of the station. The # latitude should be between -90 (South) and +90 (North). # FLOAT Longitude # Longitude in degrees of the location of the station. The # longitude should be between -180 (West) and +180 (East). # FLOAT Elevation # Elevation in meters above sea level of the station. # FLOAT Depth # Depth is the depth in meters of the sensor. # FLOAT Azimuth # Azimuth of the sensor in degrees clockwise. # FLOAT Dip # Dip is the dip of the sensor. 90 degrees is defined as # vertical right way up. data = fh.read(next_tag) parts = _unpack_with_asciiz_and_decode( endian + b'ffffff', data[8:]) params = AttribDict() params.latitude = parts[0] params.longitude = parts[1] params.elevation = parts[2] params.depth = parts[3] params.azimuth = parts[4] params.dip = parts[5] trace.stats.y.tag_station_location = params elif tag_type == 3: # TAG_STATION_PARAMETERS # UCHAR Update[16] # This field is only used internally for administrative # purposes. It should always be set to zeroes. # REALTIME StartValidTime # Time that the information in these records became valid. # REALTIME EndValidTime # Time that the information in these records became invalid. # FLOAT Sensitivity # Sensitivity of the sensor in nanometers per bit. # FLOAT SensFreq # Frequency at which the sensitivity was measured. # FLOAT SampleRate # This is the number of samples per second. This value can be # less than 1.0. (i.e. 0.1) # FLOAT MaxClkDrift # Maximum drift rate of the clock in seconds per sample. # UCHAR SensUnits[24] (ASCIIZ) # Some text indicating the units in which the sensitivity was # measured. # UCHAR CalibUnits[24] (ASCIIZ) # Some text indicating the units in which calibration input # was measured. # UCHAR ChanFlags[27] (BLANKPAD) # Text indicating the channel flags according to the SEED # definition. # UCHAR UpdateFlag # This flag must be “N” or “U” according to the SEED # definition. # UCHAR Filler[4] # Filler Pads out the record to satisfy the alignment # restrictions for reading data on a SPARC processor. data = fh.read(next_tag) parts = _unpack_with_asciiz_and_decode( endian + b'ddffff24z24z27sc4s', data[16:]) trace.stats.sampling_rate = parts[4] # extra params = AttribDict() params.start_valid_time = parts[0] params.end_valid_time = parts[1] params.sensitivity = parts[2] params.sens_freq = parts[3] params.sample_rate = parts[4] params.max_clk_drift = parts[5] params.sens_units = parts[6] params.calib_units = parts[7] params.chan_flags = parts[8] params.update_flag = parts[9] trace.stats.y.tag_station_parameters = params elif tag_type == 4: # TAG_STATION_DATABASE # UCHAR Update[8] # This field is only used internally for administrative # purposes. It should always be set to zeroes. # REALTIME LoadDate # Date the information was loaded into the database. # UCHAR Key[16] # Unique key that identifies this record in the database. data = fh.read(next_tag) parts = _unpack_with_asciiz_and_decode( endian + b'd16s', data[8:]) params = AttribDict() params.load_date = parts[0] params.key = parts[1] trace.stats.y.tag_station_database = params elif tag_type == 5: # TAG_SERIES_INFO # UCHAR Update[16] # This field is only used internally for administrative # purposes. It should always be set to zeroes. # REALTIME StartTime # This is start time of the data in this series. # REALTIME EndTime # This is end time of the data in this series. # ULONG NumSamples # This is the number of samples of data in this series. # LONG DCOffset # DCOffset is the DC offset of the data. # LONG MaxAmplitude # MaxAmplitude is the maximum amplitude of the data. # LONG MinAmplitude # MinAmplitude is the minimum amplitude of the data. # UCHAR Format[8] (ASCIIZ) # This is the format of the data. This should always be # “YFILE”. # UCHAR FormatVersion[8] (ASCIIZ) # FormatVersion is the version of the format of the data. # This should always be “5.0” data = fh.read(next_tag) parts = _unpack_with_asciiz_and_decode( endian + b'ddLlll8z8z', data[16:]) trace.stats.starttime = UTCDateTime(parts[0]) count = parts[2] # extra params = AttribDict() params.endtime = UTCDateTime(parts[1]) params.num_samples = parts[2] params.dc_offset = parts[3] params.max_amplitude = parts[4] params.min_amplitude = parts[5] params.format = parts[6] params.format_version = parts[7] trace.stats.y.tag_series_info = params elif tag_type == 6: # TAG_SERIES_DATABASE # UCHAR Update[8] # This field is only used internally for administrative # purposes. It should always be set to zeroes. # REALTIME LoadDate # Date the information was loaded into the database. # UCHAR Key[16] # Unique key that identifies this record in the database. data = fh.read(next_tag) parts = _unpack_with_asciiz_and_decode( endian + b'd16s', data[8:]) params = AttribDict() params.load_date = parts[0] params.key = parts[1] trace.stats.y.tag_series_database = params elif tag_type == 26: # TAG_STATION_RESPONSE # UCHAR Update[8] # This field is only used internally for administrative # purposes. It should always be set to zeroes. # UCHAR PathName[260] # PathName is the full name of the file which contains the # response information for this station. data = fh.read(next_tag) parts = _unpack_with_asciiz_and_decode(b'260s', data[8:]) params = AttribDict() params.path_name = parts[0] trace.stats.y.tag_station_response = params elif tag_type == 7: # TAG_DATA_INT32 trace.data = from_buffer( fh.read(np.dtype(np.int32).itemsize * count), dtype=np.int32) # break loop as TAG_DATA_INT32 should be the last tag in file break else: fh.seek(next_tag, 1) return Stream([trace])
if __name__ == '__main__': import doctest doctest.testmod(exclude_empty=True)