Source code for obspy.io.css.core

# -*- coding: utf-8 -*-
"""
CSS bindings to ObsPy core module.
"""
from pathlib import Path
import numpy as np

from obspy import Stream, Trace, UTCDateTime
from obspy.core.compatibility import from_buffer

import gzip

DTYPE = {
    # Big-endian integers
    b's4': b'>i',
    b's2': b'>h',
    # Little-endian integers
    b'i4': b'<i',
    b'i2': b'<h',
    # ASCII integers
    b'c0': (b'S12', int),
    b'c#': (b'S12', int),
    # Big-endian floating point
    b't4': b'>f',
    b't8': b'>d',
    # Little-endian floating point
    b'f4': b'<f',
    b'f8': b'<d',
    # ASCII floating point
    b'a0': (b'S15', np.float32),
    b'a#': (b'S15', np.float32),
    b'b0': (b'S24', np.float64),
    b'b#': (b'S24', np.float64),
}


[docs]def _is_css(filename): """ Checks whether a file is CSS waveform data (header) or not. :type filename: str :param filename: CSS file to be checked. :rtype: bool :return: ``True`` if a CSS waveform header file. """ # Fixed file format. # Tests: # - the length of each line (283 chars) # - two epochal time fields # (for position of dot and if they convert to UTCDateTime) # - supported data type descriptor try: with open(filename, "rb") as fh: lines = fh.readlines() # check for empty file if not lines: return False # check every line for line in lines: assert(len(line.rstrip(b"\n\r")) == 283) assert(b"." in line[26:28]) UTCDateTime(float(line[16:33])) assert(b"." in line[71:73]) UTCDateTime(float(line[61:78])) assert(line[143:145] in DTYPE) except Exception: return False return True
[docs]def _is_nnsa_kb_core(filename): """ Checks whether a file is NNSA KB Core waveform data (header) or not. :type filename: str :param filename: NNSA KB Core file to be checked. :rtype: bool :return: ``True`` if a NNSA KB Core waveform header file. """ # Fixed file format. # Tests: # - the length of each line (287 chars) # - two epochal time fields # (for position of dot and if they convert to UTCDateTime) # - supported data type descriptor try: with open(filename, "rb") as fh: lines = fh.readlines() # check for empty file if not lines: return False # check every line for line in lines: assert(len(line.rstrip(b"\n\r")) == 287) assert(line[27:28] == b".") UTCDateTime(float(line[16:33])) assert(line[73:74] == b".") UTCDateTime(float(line[62:79])) assert(line[144:146] in DTYPE) except Exception: return False return True
[docs]def _read_css(filename, **kwargs): """ Reads a CSS waveform file and returns a Stream object. .. warning:: This function should NOT be called directly, it registers via the ObsPy :func:`~obspy.core.stream.read` function, call this instead. :type filename: str :param filename: CSS file to be read. :rtype: :class:`~obspy.core.stream.Stream` :returns: Stream with Traces specified by given file. """ # read metafile with info on single traces with open(filename, "rb") as fh: lines = fh.readlines() basedir = Path(filename).parent traces = [] # read single traces for line in lines: npts = int(line[79:87]) dirname = line[148:212].strip().decode() dfilename = Path(basedir) / dirname / line[213:245].strip().decode() offset = int(line[246:256]) dtype = DTYPE[line[143:145]] if isinstance(dtype, tuple): read_fmt = np.dtype(dtype[0]) fmt = dtype[1] else: read_fmt = np.dtype(dtype) fmt = read_fmt try: # assumed that the waveform file is not compressed fh = open(dfilename, "rb") except FileNotFoundError as e: # If does not find the waveform file referenced in the wfdisc, # it will try to open a compressed .gz suffix file instead. try: fh = gzip.open(str(dfilename) + '.gz', "rb") except FileNotFoundError: raise e # Read one segment of binary data fh.seek(offset) data = fh.read(read_fmt.itemsize * npts) fh.close() data = from_buffer(data, dtype=read_fmt) data = np.require(data, dtype=fmt) header = {} header['station'] = line[0:6].strip().decode() header['channel'] = line[7:15].strip().decode() header['starttime'] = UTCDateTime(float(line[16:33])) header['sampling_rate'] = float(line[88:99]) header['calib'] = float(line[100:116]) header['calper'] = float(line[117:133]) tr = Trace(data, header=header) traces.append(tr) return Stream(traces=traces)
[docs]def _read_nnsa_kb_core(filename, **kwargs): """ Reads a NNSA KB Core waveform file and returns a Stream object. .. warning:: This function should NOT be called directly, it registers via the ObsPy :func:`~obspy.core.stream.read` function, call this instead. :type filename: str :param filename: NNSA KB Core file to be read. :rtype: :class:`~obspy.core.stream.Stream` :returns: Stream with Traces specified by given file. """ # read metafile with info on single traces with open(filename, "rb") as fh: lines = fh.readlines() basedir = Path(filename).parent traces = [] # read single traces for line in lines: npts = int(line[80:88]) dirname = line[149:213].strip().decode() filename = Path(basedir) / dirname / \ line[214:246].strip().decode() offset = int(line[247:257]) dtype = DTYPE[line[144:146]] if isinstance(dtype, tuple): read_fmt = np.dtype(dtype[0]) fmt = dtype[1] else: read_fmt = np.dtype(dtype) fmt = read_fmt with open(filename, "rb") as fh: fh.seek(offset) data = fh.read(read_fmt.itemsize * npts) data = from_buffer(data, dtype=read_fmt) data = np.require(data, dtype=fmt) header = {} header['station'] = line[0:6].strip().decode() header['channel'] = line[7:15].strip().decode() header['starttime'] = UTCDateTime(float(line[16:33])) header['sampling_rate'] = float(line[89:100]) header['calib'] = float(line[101:117]) header['calper'] = float(line[118:134]) tr = Trace(data, header=header) traces.append(tr) return Stream(traces=traces)