Source code for obspy.io.nlloc.core

# -*- coding: utf-8 -*-
"""
NonLinLoc file format support for ObsPy

:copyright:
    The ObsPy Development Team (devs@obspy.org)
:license:
    GNU Lesser General Public License, Version 3
    (https://www.gnu.org/copyleft/lesser.html)
"""
import warnings
from math import sqrt

import numpy as np

from obspy import Catalog, UTCDateTime, __version__
from obspy.core.event import (Arrival, Comment, CreationInfo, Event, Origin,
                              OriginQuality, OriginUncertainty, Pick,
                              WaveformStreamID)
from obspy.core.inventory.util import (
    _add_resolve_seedid_doc, _add_resolve_seedid_ph2comp_doc, _resolve_seedid)
from obspy.geodetics import kilometer2degrees


ONSETS = {"i": "impulsive", "e": "emergent"}
ONSETS_REVERSE = {"impulsive": "i", "emergent": "e"}
POLARITIES = {"c": "positive", "u": "positive", "d": "negative"}
POLARITIES_REVERSE = {"positive": "u", "negative": "d"}


[docs]def is_nlloc_hyp(filename): """ Checks that a file is actually a NonLinLoc Hypocenter-Phase file. """ try: with open(filename, 'rb') as fh: temp = fh.read(6) except Exception: return False if temp != b'NLLOC ': return False return True
[docs]@_add_resolve_seedid_ph2comp_doc @_add_resolve_seedid_doc def read_nlloc_hyp(filename, coordinate_converter=None, picks=None, **kwargs): """ Reads a NonLinLoc Hypocenter-Phase file to a :class:`~obspy.core.event.catalog.Catalog` object. .. note:: Coordinate conversion from coordinate frame of NonLinLoc model files / location run to WGS84 has to be specified explicitly by the user if necessary. .. note:: An example can be found on the :mod:`~obspy.io.nlloc` submodule front page in the documentation pages. :param filename: File or file-like object in text mode. :type coordinate_converter: callable :param coordinate_converter: Function to convert (x, y, z) coordinates of NonLinLoc output to geographical coordinates and depth in meters (longitude, latitude, depth in kilometers). If left ``None``, the geographical coordinates in the "GEOGRAPHIC" line of NonLinLoc output are used. The function should accept three arguments x, y, z (each of type :class:`numpy.ndarray`) and return a tuple of three :class:`numpy.ndarray` (lon, lat, depth in kilometers). :type picks: list of :class:`~obspy.core.event.origin.Pick` :param picks: Original picks used to generate the NonLinLoc location. If provided, the output event will include the original picks and the arrivals in the output origin will link to them correctly (with their ``pick_id`` attribute). If not provided, the output event will include (the rather basic) pick information that can be reconstructed from the NonLinLoc hypocenter-phase file. :rtype: :class:`~obspy.core.event.catalog.Catalog` """ if not hasattr(filename, "read"): # Check if it exists, otherwise assume its a string. try: with open(filename, "rb") as fh: data = fh.read() data = data.decode("UTF-8") except Exception: try: data = filename.decode("UTF-8") except Exception: data = str(filename) data = data.strip() else: data = filename.read() if hasattr(data, "decode"): data = data.decode("UTF-8") # split lines and remove empty ones lines = [line for line in data.splitlines() if line.strip()] # remember picks originally used in location, if provided original_picks = picks if original_picks is None: original_picks = [] cat = Catalog() lines_start = [i for i, line in enumerate(lines) if line.startswith("NLLOC ")] lines_end = [i for i, line in enumerate(lines) if line.startswith("END_NLLOC")] if len(lines_start) != len(lines_end): msg = ("NLLOC HYP file '{}' seems corrupt, number of 'NLLOC' lines " "does not match number of 'END_NLLOC' lines").format(filename) raise Exception(msg) start_end_indices = [] for start, end in zip(lines_start, lines_end): start_end_indices.append(start) start_end_indices.append(end) if any(np.diff(start_end_indices) < 1): msg = ("NLLOC HYP file '{}' seems corrupt, inconsistent " "positioning of 'NLLOC' and 'END_NLLOC' lines " "detected.").format(filename) raise Exception(msg) for start, end in zip(lines_start, lines_end): event = _read_single_hypocenter( lines[start:end + 1], coordinate_converter=coordinate_converter, original_picks=original_picks, **kwargs) cat.append(event) cat.creation_info.creation_time = UTCDateTime() cat.creation_info.version = "ObsPy %s" % __version__ return cat
[docs]def _read_single_hypocenter(lines, coordinate_converter, original_picks, **kwargs): """ Given a list of lines (starting with a 'NLLOC' line and ending with a 'END_NLLOC' line), parse them into an Event. """ nlloc_file_format_version = None try: # some paranoid checks.. assert lines[0].startswith("NLLOC ") assert lines[-1].startswith("END_NLLOC") for line in lines[1:-1]: assert not line.startswith("NLLOC ") assert not line.startswith("END_NLLOC") except Exception: msg = ("This should not have happened, please report this as a bug at " "https://github.com/obspy/obspy/issues.") raise Exception(msg) indices_phases = [None, None] for i, line in enumerate(lines): if line.startswith("PHASE "): indices_phases[0] = i # determine whether it's old format or new one which has one # additional item in middle. use position of magic character to # find out. separator_position = line.split().index('>') if separator_position == 15: nlloc_file_format_version = 1 elif separator_position == 16: nlloc_file_format_version = 2 else: msg = ("This should not happen, please open a ticket on " "github and supply your nonlinloc file for debugging") raise NotImplementedError(msg) elif line.startswith("END_PHASE"): indices_phases[1] = i # extract PHASES lines (if any) if any(indices_phases): if not all(indices_phases): msg = ("NLLOC HYP file seems corrupt, 'PHASE' block is corrupt.") raise RuntimeError(msg) i1, i2 = indices_phases lines, phases_lines = lines[:i1] + lines[i2 + 1:], lines[i1 + 1:i2] else: phases_lines = [] lines = dict([line.split(None, 1) for line in lines[:-1]]) line = lines["SIGNATURE"] line = line.rstrip().split('"')[1] signature, version, date, time = line.rsplit(" ", 3) # new NLLoc > 6.0 seems to add prefix 'run:' before date if date.startswith('run:'): date = date[4:] signature = signature.strip() creation_time = UTCDateTime.strptime(date + time, str("%d%b%Y%Hh%Mm%S")) if coordinate_converter: # maximum likelihood origin location in km info line line = lines["HYPOCENTER"] x, y, z = coordinate_converter(*map(float, line.split()[1:7:2])) else: # maximum likelihood origin location lon lat info line line = lines["GEOGRAPHIC"] y, x, z = map(float, line.split()[8:13:2]) # maximum likelihood origin time info line line = lines["GEOGRAPHIC"] year, mon, day, hour, min = map(int, line.split()[1:6]) seconds = float(line.split()[6]) time = UTCDateTime(year, mon, day, hour, min, seconds, strict=False) # distribution statistics line line = lines["STATISTICS"] covariance_xx = float(line.split()[7]) covariance_yy = float(line.split()[13]) covariance_zz = float(line.split()[17]) stats_info_string = str( "Note: Depth/Latitude/Longitude errors are calculated from covariance " "matrix as 1D marginal (Lon/Lat errors as great circle degrees) " "while OriginUncertainty min/max horizontal errors are calculated " "from 2D error ellipsoid and are therefore seemingly higher compared " "to 1D errors. Error estimates can be reconstructed from the " "following original NonLinLoc error statistics line:\nSTATISTICS " + lines["STATISTICS"]) # goto location quality info line line = lines["QML_OriginQuality"].split() (assoc_phase_count, used_phase_count, assoc_station_count, used_station_count, depth_phase_count) = map(int, line[1:11:2]) stderr, az_gap, sec_az_gap = map(float, line[11:17:2]) gt_level = line[17] min_dist, max_dist, med_dist = map(float, line[19:25:2]) # goto location quality info line line = lines["QML_OriginUncertainty"] if "COMMENT" in lines: comment = lines["COMMENT"].strip() comment = comment.strip('\'"') comment = comment.strip() hor_unc, min_hor_unc, max_hor_unc, hor_unc_azim = \ map(float, line.split()[1:9:2]) # assign origin info event = Event() o = Origin() event.origins = [o] event.preferred_origin_id = o.resource_id o.origin_uncertainty = OriginUncertainty() o.quality = OriginQuality() ou = o.origin_uncertainty oq = o.quality o.comments.append(Comment(text=stats_info_string, force_resource_id=False)) event.comments.append(Comment(text=comment, force_resource_id=False)) # SIGNATURE field's first item is LOCSIG, which is supposed to be # 'Identification of an individual, institiution or other entity' # according to # http://alomax.free.fr/nlloc/soft6.00/control.html#_NLLoc_locsig_ # so use it as author in creation info event.creation_info = CreationInfo(creation_time=creation_time, version=version, author=signature) o.creation_info = CreationInfo(creation_time=creation_time, version=version, author=signature) # negative values can appear on diagonal of covariance matrix due to a # precision problem in NLLoc implementation when location coordinates are # large compared to the covariances. o.longitude = x try: o.longitude_errors.uncertainty = kilometer2degrees(sqrt(covariance_xx)) except ValueError: if covariance_xx < 0: msg = ("Negative value in XX value of covariance matrix, not " "setting longitude error (epicentral uncertainties will " "still be set in origin uncertainty).") warnings.warn(msg) else: raise o.latitude = y try: o.latitude_errors.uncertainty = kilometer2degrees(sqrt(covariance_yy)) except ValueError: if covariance_yy < 0: msg = ("Negative value in YY value of covariance matrix, not " "setting longitude error (epicentral uncertainties will " "still be set in origin uncertainty).") warnings.warn(msg) else: raise o.depth = z * 1e3 # meters! o.depth_errors.uncertainty = sqrt(covariance_zz) * 1e3 # meters! o.depth_errors.confidence_level = 68 o.depth_type = str("from location") o.time = time ou.horizontal_uncertainty = hor_unc ou.min_horizontal_uncertainty = min_hor_unc ou.max_horizontal_uncertainty = max_hor_unc # values of -1 seem to be used for unset values, set to None for field in ("horizontal_uncertainty", "min_horizontal_uncertainty", "max_horizontal_uncertainty"): if ou.get(field, -1) == -1: ou[field] = None else: ou[field] *= 1e3 # meters! ou.azimuth_max_horizontal_uncertainty = hor_unc_azim ou.preferred_description = str("uncertainty ellipse") ou.confidence_level = 68 # NonLinLoc in general uses 1-sigma (68%) level oq.standard_error = stderr oq.azimuthal_gap = az_gap oq.secondary_azimuthal_gap = sec_az_gap oq.used_phase_count = used_phase_count oq.used_station_count = used_station_count oq.associated_phase_count = assoc_phase_count oq.associated_station_count = assoc_station_count oq.depth_phase_count = depth_phase_count oq.ground_truth_level = gt_level oq.minimum_distance = kilometer2degrees(min_dist) oq.maximum_distance = kilometer2degrees(max_dist) oq.median_distance = kilometer2degrees(med_dist) # go through all phase info lines for line in phases_lines: line = line.split() arrival = Arrival() o.arrivals.append(arrival) station = line[0] channel = line[2] phase = line[4] arrival.phase = phase if nlloc_file_format_version == 1: arrival.distance = kilometer2degrees(float(line[21])) arrival.azimuth = float(line[22]) # do not read in take off angle, if the ray takeoff quality is # given as "0" for "unreliable", see #3224 if int(line[25]) != 0: arrival.takeoff_angle = float(line[24]) arrival.time_residual = float(line[16]) arrival.time_weight = float(line[17]) elif nlloc_file_format_version == 2: arrival.distance = kilometer2degrees(float(line[22])) arrival.azimuth = float(line[23]) # do not read in take off angle, if the ray takeoff quality is # given as "0" for "unreliable", see #3224 if int(line[26]) != 0: arrival.takeoff_angle = float(line[25]) arrival.time_residual = float(line[17]) arrival.time_weight = float(line[18]) else: msg = ("This should not happen, please open a ticket on " "github and supply your nonlinloc file for debugging") raise NotImplementedError(msg) pick = Pick() # have to split this into ints for overflow to work correctly date, hourmin, sec = map(str, line[6:9]) ymd = [int(date[:4]), int(date[4:6]), int(date[6:8])] hm = [int(hourmin[:2]), int(hourmin[2:4])] t = UTCDateTime(*(ymd + hm), strict=False) + float(sec) pick.time = t # network codes are not used by NonLinLoc, so they can not be known # when reading the .hyp file.. if an inventory is provided, a lookup # is done widargs = _resolve_seedid( station=station, component=channel, time=t, phase=phase, unused_kwargs=True, **kwargs) wid = WaveformStreamID(*widargs) pick.waveform_id = wid pick.time_errors.uncertainty = float(line[10]) pick.phase_hint = phase pick.onset = ONSETS.get(line[3].lower(), None) pick.polarity = POLARITIES.get(line[5].lower(), None) # try to determine original pick for each arrival for pick_ in original_picks: wid = pick_.waveform_id if station == wid.station_code and phase == pick_.phase_hint: pick = pick_ break else: # warn if original picks were specified and we could not associate # the arrival correctly if original_picks: msg = ("Could not determine corresponding original pick for " "arrival. " "Falling back to pick information in NonLinLoc " "hypocenter-phase file.") warnings.warn(msg) event.picks.append(pick) arrival.pick_id = pick.resource_id event.scope_resource_ids() return event
[docs]def write_nlloc_obs(catalog, filename, **kwargs): """ Write a NonLinLoc Phase file (NLLOC_OBS) from a :class:`~obspy.core.event.catalog.Catalog` object. .. warning:: This function should NOT be called directly, it registers via the the :meth:`~obspy.core.event.Catalog.write` method of an ObsPy :class:`~obspy.core.event.Catalog` object, call this instead. :type catalog: :class:`~obspy.core.event.catalog.Catalog` :param catalog: The ObsPy Catalog object to write. :type filename: str or file-like object :param filename: Filename to write or open file-like object. """ info = [] # Open filehandler or use an existing file like object. if not hasattr(filename, "write"): file_opened = True fh = open(filename, "wb") else: file_opened = False fh = filename if len(catalog) > 1: msg = ("Writing NonLinLoc Phase file is only supported for Catalogs " "with a single Event in it (use a for loop over the catalog " "and provide an output file name for each event).") raise ValueError(msg) fmt = '%s %s %s %s %s %s %s %s %7.4f GAU %9.2e %9.2e %9.2e %9.2e' for pick in catalog[0].picks: wid = pick.waveform_id station = wid.station_code or "?" component = wid.channel_code or "?" onset = ONSETS_REVERSE.get(pick.onset, "?") phase_type = pick.phase_hint or "?" polarity = POLARITIES_REVERSE.get(pick.polarity, "?") date = pick.time.strftime("%Y%m%d") hourminute = pick.time.strftime("%H%M") seconds = pick.time.second + pick.time.microsecond * 1e-6 if pick.time_errors.upper_uncertainty is not None and \ pick.time_errors.lower_uncertainty is not None: time_error = (pick.time_errors.upper_uncertainty + pick.time_errors.lower_uncertainty) / 2.0 elif pick.time_errors.uncertainty is not None: time_error = pick.time_errors.uncertainty else: # see discussion in #2371 msg = ("Writing pick without time uncertainty. Time uncertainty " "will be written as '0.0'") warnings.warn(msg) time_error = 0.0 info_ = fmt % (station.ljust(6), "?".ljust(4), component.ljust(4), onset.ljust(1), phase_type.ljust(6), polarity.ljust(1), date, hourminute, seconds, time_error, -1, -1, -1) info.append(info_) if info: info = "\n".join(sorted(info) + [""]) else: msg = "No pick information, writing empty NLLOC OBS file." warnings.warn(msg) fh.write(info.encode()) # Close if a file has been opened by this function. if file_opened is True: fh.close()