Source code for obspy.io.nied.knet

# -*- coding: utf-8 -*-
"""
obspy.io.nied.knet - K-NET/KiK-net read support for ObsPy
====================================================================
Reading of the K-NET and KiK-net ASCII format as defined on
http://www.kyoshin.bosai.go.jp.
"""
import re

import numpy as np

from obspy import UTCDateTime, Stream, Trace
from obspy.core.trace import Stats


[docs] class KNETException(Exception): pass
[docs] def _buffer_proxy(filename_or_buf, function, reset_fp=True, file_mode="rb", *args, **kwargs): """ Calls a function with an open file or file-like object as the first argument. If the file originally was a filename, the file will be opened, otherwise it will just be passed to the underlying function. :param filename_or_buf: File to pass. :type filename_or_buf: str, open file, or file-like object :param function: The function to call. :type function: callable :param reset_fp: If True, the file pointer will be set to the initial position after the function has been called. :type reset_fp: bool :param file_mode: Mode to open file in if necessary. """ try: position = filename_or_buf.tell() is_buffer = True except AttributeError: is_buffer = False if is_buffer is True: ret_val = function(filename_or_buf, *args, **kwargs) if reset_fp: filename_or_buf.seek(position, 0) return ret_val else: with open(filename_or_buf, file_mode) as fh: return function(fh, *args, **kwargs)
[docs] def _is_knet_ascii(filename_or_buf): """ Checks if the file is a valid K-NET/KiK-net ASCII file. :param filename_or_buf: File to test. :type filename_or_buf: str or file-like object """ try: return _buffer_proxy(filename_or_buf, _internal_is_knet_ascii, reset_fp=True) # Happens for example when passing the data as a string which would be # interpreted as a filename. except (OSError, UnicodeDecodeError): return False
[docs] def _internal_is_knet_ascii(buf): """ Checks if the file is a valid K-NET/KiK-net ASCII file. :param buf: File to read. :type buf: open file or file-like object """ first_string = buf.read(11).decode() # File has less than 11 characters if len(first_string) != 11: return False if first_string == 'Origin Time': return True return False
[docs] def _prep_hdr_line(name, line): """ Helper function to check the contents of a header line and split it. :param name: String that the line should start with. :type name: str :param line: Line to check and split. :type line: str """ if not line.startswith(name): raise KNETException("Expected line to start with %s but got %s " % (name, line)) else: return line.split()
[docs] def _read_knet_hdr(hdrlines, convert_stnm=False, **kwargs): """ Read the header values into a dictionary. :param hdrlines: List of the header lines of a a K-NET/KiK-net ASCII file :type hdrlines: list :param convert_stnm: For station names with 6 letters write the last two letters of the station code to the 'location' field :type convert_stnm: bool """ hdrdict = {'knet': {}} hdrnames = ['Origin Time', 'Lat.', 'Long.', 'Depth. (km)', 'Mag.', 'Station Code', 'Station Lat.', 'Station Long.', 'Station Height(m)', 'Record Time', 'Sampling Freq(Hz)', 'Duration Time(s)', 'Dir.', 'Scale Factor', 'Max. Acc. (gal)', 'Last Correction', 'Memo.'] _i = 0 # Event information flds = _prep_hdr_line(hdrnames[_i], hdrlines[_i]) dt = flds[2] + ' ' + flds[3] dt = UTCDateTime.strptime(dt, '%Y/%m/%d %H:%M:%S') # All times are in Japanese standard time which is 9 hours ahead of UTC dt -= 9 * 3600. hdrdict['knet']['evot'] = dt _i += 1 flds = _prep_hdr_line(hdrnames[_i], hdrlines[_i]) lat = float(flds[1]) hdrdict['knet']['evla'] = lat _i += 1 flds = _prep_hdr_line(hdrnames[_i], hdrlines[_i]) lon = float(flds[1]) hdrdict['knet']['evlo'] = lon _i += 1 flds = _prep_hdr_line(hdrnames[_i], hdrlines[_i]) dp = float(flds[2]) hdrdict['knet']['evdp'] = dp _i += 1 flds = _prep_hdr_line(hdrnames[_i], hdrlines[_i]) mag = float(flds[1]) hdrdict['knet']['mag'] = mag # Station information _i += 1 flds = _prep_hdr_line(hdrnames[_i], hdrlines[_i]) # K-NET and KiK-Net station names can be more than 5 characters long # which will cause the station name to be truncated when writing the # the trace as miniSEED; if convert_stnm is enabled, the last two # letters of the station code are written to the 'location' field stnm = flds[2] location = '' if convert_stnm and len(stnm) > 5: location = stnm[-2:] stnm = stnm[:-2] if len(stnm) > 7: raise KNETException( "Station name can't be more than 7 characters long!") hdrdict['station'] = stnm hdrdict['location'] = location _i += 1 flds = _prep_hdr_line(hdrnames[_i], hdrlines[_i]) hdrdict['knet']['stla'] = float(flds[2]) _i += 1 flds = _prep_hdr_line(hdrnames[_i], hdrlines[_i]) hdrdict['knet']['stlo'] = float(flds[2]) _i += 1 flds = _prep_hdr_line(hdrnames[_i], hdrlines[_i]) hdrdict['knet']['stel'] = float(flds[2]) # Data information _i += 1 flds = _prep_hdr_line(hdrnames[_i], hdrlines[_i]) dt = flds[2] + ' ' + flds[3] # A 15 s delay is added to the record time by the # the K-NET and KiK-Net data logger dt = UTCDateTime.strptime(dt, '%Y/%m/%d %H:%M:%S') - 15.0 # All times are in Japanese standard time which is 9 hours ahead of UTC dt -= 9 * 3600. hdrdict['starttime'] = dt _i += 1 flds = _prep_hdr_line(hdrnames[_i], hdrlines[_i]) freqstr = flds[2] m = re.search('[0-9]*', freqstr) freq = int(m.group()) hdrdict['sampling_rate'] = freq _i += 1 flds = _prep_hdr_line(hdrnames[_i], hdrlines[_i]) hdrdict['knet']['duration'] = float(flds[2]) _i += 1 flds = _prep_hdr_line(hdrnames[_i], hdrlines[_i]) channel = flds[1].replace('-', '') kiknetcomps = {'1': 'NS1', '2': 'EW1', '3': 'UD1', '4': 'NS2', '5': 'EW2', '6': 'UD2'} if channel.strip() in kiknetcomps.keys(): # kiknet directions are 1-6 channel = kiknetcomps[channel.strip()] hdrdict['channel'] = channel _i += 1 flds = _prep_hdr_line(hdrnames[_i], hdrlines[_i]) eqn = flds[2] num, denom = eqn.split('/') num = float(re.search('[0-9]*', num).group()) denom = float(denom) # convert the calibration from gal to m/s^2 hdrdict['calib'] = 0.01 * num / denom _i += 1 flds = _prep_hdr_line(hdrnames[_i], hdrlines[_i]) acc = float(flds[3]) hdrdict['knet']['accmax'] = acc _i += 1 flds = _prep_hdr_line(hdrnames[_i], hdrlines[_i]) dt = flds[2] + ' ' + flds[3] dt = UTCDateTime.strptime(dt, '%Y/%m/%d %H:%M:%S') # All times are in Japanese standard time which is 9 hours ahead of UTC dt -= 9 * 3600. hdrdict['knet']['last correction'] = dt # The comment ('Memo') field is optional _i += 1 flds = _prep_hdr_line(hdrnames[_i], hdrlines[_i]) if len(flds) > 1: hdrdict['knet']['comment'] = ' '.join(flds[1:]) if len(hdrlines) != _i + 1: raise KNETException("Expected %d header lines but got %d" % (_i + 1, len(hdrlines))) return hdrdict
[docs] def _read_knet_ascii(filename_or_buf, **kwargs): """ Reads a K-NET/KiK-net ASCII file and returns an ObsPy Stream object. .. warning:: This function should NOT be called directly, it registers via the ObsPy :func:`~obspy.core.stream.read` function, call this instead. :param filename: K-NET/KiK-net ASCII file to be read. :type filename: str or file-like object """ return _buffer_proxy(filename_or_buf, _internal_read_knet_ascii, **kwargs)
[docs] def _internal_read_knet_ascii(buf, **kwargs): """ Reads a K-NET/KiK-net ASCII file and returns an ObsPy Stream object. .. warning:: This function should NOT be called directly, it registers via the ObsPy :func:`~obspy.core.stream.read` function, call this instead. :param buf: File to read. :type buf: open file or file-like object """ data = [] hdrdict = {} cur_pos = buf.tell() buf.seek(0, 2) size = buf.tell() buf.seek(cur_pos, 0) # First read the headerlines headerlines = [] while buf.tell() < size: line = buf.readline().decode() headerlines.append(line) if line.startswith('Memo'): hdrdict = _read_knet_hdr(headerlines, **kwargs) break while buf.tell() < size: line = buf.readline() parts = line.strip().split() data += [float(p) for p in parts] hdrdict['npts'] = len(data) # The FDSN network code for the National Research Institute for Earth # Science and Disaster Prevention (NEID JAPAN) is BO (Bosai-Ken Network) hdrdict['network'] = 'BO' data = np.array(data) stats = Stats(hdrdict) trace = Trace(data, header=stats) return Stream([trace])
if __name__ == '__main__': import doctest doctest.testmod(exclude_empty=True)