# -*- coding: utf-8 -*-
"""
Nordic file format support for ObsPy
:copyright:
The ObsPy Development Team (devs@obspy.org)
:license:
GNU Lesser General Public License, Version 3
(https://www.gnu.org/copyleft/lesser.html)
.. note::
Pick time-residuals are handled in event.origins[0].arrivals, with
the arrival.pick_id linking the arrival (which contain calculated
information) with the pick.resource_id (where the pick contains only
physical measured information).
note::
Station-magnitude residuals (only for New Nordic files) are handled in
obspy.core.event.magnitude.StationMagnitude.mag_errors.uncertainty.
note::
When you read a Nordic file into Obspy and then write to Nordic format, the
following information is not retained:
- amplitude-picks have no distance or azimuth to source
- some event (sub-)types may change if no equivalent event type exists in
Obspy
.. versionchanged:: 1.2.3
* The pick-weight from the Nordic file (0-4, 9) is now read into
pick.extra.nordic_pick_weight (was arrival.time_weight) while the
finalweight (0-100 %) is read into arrival.time_weight (or
backazmiuth_weight, respectively).
* Empty network codes are now read as None instead of "NA"
* Magnitudes are no longer automatically sorted by size.
.. versionchanged:: 1.2.0
The number of stations used to calculate the origin was previously
incorrectly stored in a comment. From version 1.2.0 this is now stored
in `origin.quality.used_station_count`
"""
import warnings
from pathlib import Path
import io
import os
import re
from math import sqrt
import datetime
from obspy import UTCDateTime, read
from obspy.geodetics import kilometers2degrees, degrees2kilometers
from obspy.core.event import (
Event, Origin, Magnitude, StationMagnitude, Catalog, EventDescription,
CreationInfo, OriginQuality, OriginUncertainty, Pick, WaveformStreamID,
Arrival, Amplitude, FocalMechanism, MomentTensor, NodalPlane, NodalPlanes,
QuantityError, Tensor, ResourceIdentifier, Comment)
from obspy.core.event.header import EventType, EventTypeCertainty
from obspy.core.inventory.util import _resolve_seedid, _add_resolve_seedid_doc
from obspy.io.nordic import NordicParsingError
from obspy.io.nordic.utils import (
_int_conv, _str_conv, _float_conv, _evmagtonor, _nortoevmag,
_get_line_tags, _km_to_deg_lat, _km_to_deg_lon, _nordic_iasp_phase_ok,
_get_agency_id, _is_iasp_ampl_phase, EVENT_TYPE_MAPPING_FROM_SEISAN,
EVENT_TYPE_CERTAINTY_MAPPING_FROM_SEISAN, EVENT_TYPE_MAPPING_TO_SEISAN,
EVENT_TYPE_AND_CERTAINTY_MAPPING_TO_SEISAN)
from obspy.io.nordic.ellipse import Ellipse
POLARITY_MAPPING = {"": "undecidable", "C": "positive", "D": "negative"}
INV_POLARITY_MAPPING = {item: key for key, item in POLARITY_MAPPING.items()}
ONSET_MAPPING = {'I': 'impulsive', 'E': 'emergent'}
INV_ONSET_MAPPING = {item: key for key, item in ONSET_MAPPING.items()}
EVALUATION_MAPPING = {'A': 'automatic', ' ': 'manual'}
INV_EVALUTATION_MAPPING = {
item: key for key, item in EVALUATION_MAPPING.items()}
OLD_PHASE_HEADER_LINE = (" STAT SP IPHASW D HRMM SECON CODA AMPLIT PERI AZIMU"
" VELO AIN AR TRES W DIS CAZ7\n")
NEW_PHASE_HEADER_LINE = (" STAT COM NTLO IPHASE W HHMM SS.SSS PAR1 PAR2"
" AGA OPE AIN RES W DIS CAZ7\n")
[docs]def _is_sfile(sfile, encoding='latin-1'):
"""
Basic test of whether the file is nordic format or not.
Not exhaustive, but checks some of the basics.
:type sfile: str
:param sfile: Path to sfile
:type encoding: str
:param encoding: Encoding of the file.
:rtype: bool
"""
if not hasattr(sfile, "readline"):
try:
with open(sfile, 'r', encoding=encoding) as f:
tags = _get_line_tags(f=f, report=False)
except Exception:
return False
else:
try:
tags = _get_line_tags(f=sfile, report=False)
except Exception:
return False
if tags is not None:
try:
head_line = tags['1'][0][0]
except IndexError:
return False
try:
sfile_seconds = int(head_line[16:18])
except ValueError:
return False
if sfile_seconds == 60:
sfile_seconds = 0
try:
UTCDateTime(
int(head_line[1:5]), int(head_line[6:8]), int(head_line[8:10]),
int(head_line[11:13]), int(head_line[13:15]), sfile_seconds,
int(head_line[19:20]) * 100000)
return True
except Exception:
return False
else:
return False
[docs]def _read_origin(line):
"""
Read one origin (type 1) line.
:param str line: Origin format (type 1) line
:return: `~obspy.core.event.event.Event`
"""
new_event = Event()
try:
sfile_seconds = line[16:20].strip()
if len(sfile_seconds) == 0:
sfile_seconds = 0.0
else:
sfile_seconds = float(sfile_seconds)
new_event.origins.append(Origin())
new_event.origins[0].time = UTCDateTime(
int(line[1:5]), int(line[6:8]), int(line[8:10]),
int(line[11:13]), int(line[13:15]), 0, 0) + sfile_seconds
except Exception:
raise NordicParsingError("Couldn't read a date from sfile")
# new_event.loc_mod_ind=line[20]
new_event.event_descriptions.append(EventDescription(text=line[21:23]))
new_event.event_type = EventType(EVENT_TYPE_MAPPING_FROM_SEISAN.get(
line[22]))
new_event.event_type_certainty = EventTypeCertainty(
EVENT_TYPE_CERTAINTY_MAPPING_FROM_SEISAN.get(line[21]))
for key, _slice in [('latitude', slice(23, 30)),
('longitude', slice(30, 38)),
('depth', slice(38, 43))]:
try:
new_event.origins[0].__dict__[key] = float(line[_slice])
except ValueError:
new_event.origins[0].__dict__[key] = None
if new_event.origins[0].depth:
new_event.origins[0].depth *= 1000.
if line[43].strip():
warnings.warn("Depth indicator {0} has not been mapped "
"to the event".format(line[43]))
if line[44].strip():
warnings.warn("Origin location indicator {0} has not been mapped "
"to the event".format(line[44]))
if line[10] == "F":
new_event.origins[0].time_fixed = True
new_event.creation_info = CreationInfo(agency_id=line[45:48].strip())
new_event.origins[0].creation_info = CreationInfo(
agency_id=line[45:48].strip())
used_station_count = line[49:51].strip()
if used_station_count != '':
new_event.origins[0].quality = OriginQuality(
used_station_count=int(used_station_count))
timeres = _float_conv(line[51:55])
if timeres is not None:
if new_event.origins[0].quality is not None:
new_event.origins[0].quality.standard_error = timeres
else:
new_event.origins[0].quality = OriginQuality(
standard_error=timeres)
# Read in magnitudes if they are there.
magnitudes = []
magnitudes.extend(_read_mags(line, new_event))
new_event.magnitudes = magnitudes
return new_event
[docs]def _read_mags(line, event):
"""
Read the magnitude info from a Nordic header line. Convenience function
"""
magnitudes = []
for index in [59, 67, 75]:
if not line[index].isspace():
magnitudes.append(Magnitude(
mag=_float_conv(line[index - 4:index]),
magnitude_type=_nortoevmag(line[index]),
creation_info=CreationInfo(
agency_id=line[index + 1:index + 4].strip()),
origin_id=event.origins[0].resource_id))
return magnitudes
[docs]def read_spectral_info(sfile, encoding='latin-1'):
"""
Read spectral info from an sfile.
:type sfile: str
:param sfile: Sfile to read from.
:type encoding: str
:param encoding: Encoding for file, used to decode from bytes to string
:returns:
list of dictionaries of spectral information, units as in seisan
manual, expect for logs which have been converted to floats.
"""
with open(sfile, 'r', encoding=encoding) as f:
tagged_lines = _get_line_tags(f=f)
spec_inf = _read_spectral_info(tagged_lines=tagged_lines)
return spec_inf
[docs]def _read_spectral_info(tagged_lines, event=None):
"""
Internal spectral reader.
:type tagged_lines: dict
:param tagged_lines: dictionary of tagged lines
:type event: :class:`~obspy.core.event.event.Event`
:param event: Event to associate spectral info with
:returns:
list of dictionaries of spectral information, units as in
seisan manual, expect for logs which have been converted to floats.
"""
if '3' not in tagged_lines.keys():
return {}
if event is None:
event = _readheader(head_lines=tagged_lines['1'])
origin_date = UTCDateTime(event.origins[0].time.date)
relevant_lines = []
for line in tagged_lines['3']:
if line[0][1:5] == 'SPEC':
relevant_lines.append(line)
relevant_lines = [line[0] for line in
sorted(relevant_lines, key=lambda tup: tup[1])]
spec_inf = {}
if not relevant_lines:
return spec_inf
for line in relevant_lines:
spec_str = line.strip()
if spec_str[5:12] in ['AVERAGE', 'STANDARD_DEVIATION']:
info = {}
station = spec_str[5:12]
channel = ''
info['moment'] = _float_conv(spec_str[16:22])
if info['moment'] is not None:
info['moment'] = 10 ** info['moment']
info['stress_drop'] = _float_conv(spec_str[24:30])
info['spectral_level'] = _float_conv(spec_str[32:38])
if info['spectral_level'] is not None:
info['spectral_level'] = 10 ** info['spectral_level']
info['corner_freq'] = _float_conv(spec_str[40:46])
info['source_radius'] = _float_conv(spec_str[47:54])
info['decay'] = _float_conv(spec_str[56:62])
info['window_length'] = _float_conv(spec_str[64:70])
info['moment_mag'] = _float_conv(spec_str[72:78])
spec_inf[(station, channel)] = info
continue
station = spec_str[4:9].strip()
channel = ''.join(spec_str[9:13].split())
try:
info = spec_inf[(station, channel)]
except KeyError:
info = {}
if spec_str[14] == 'T':
info['starttime'] = origin_date + \
(int(spec_str[15:17]) * 3600) +\
(int(spec_str[17:19]) * 60) + int(spec_str[19:21])
if info['starttime'] < event.origins[0].time:
# Wrong day, case of origin at end of day
info['starttime'] += 86400
info['kappa'] = _float_conv(spec_str[23:30])
info['distance'] = _float_conv(spec_str[32:38])
if spec_str[38:40] == 'VS':
info['velocity'] = _float_conv(spec_str[40:46])
info['velocity_type'] = 'S'
elif spec_str[38:40] == 'VP':
info['velocity'] = _float_conv(spec_str[40:46])
info['velocity_type'] = 'P'
else:
warnings.warn('Only VP and VS spectral information'
' implemented')
info['density'] = _float_conv(spec_str[48:54])
info['Q0'] = _float_conv(spec_str[56:62])
info['QA'] = _float_conv(spec_str[64:70])
elif spec_str[14] == 'M':
info['moment'] = _float_conv(spec_str[16:22])
if info['moment']:
info['moment'] = 10 ** info['moment']
info['stress_drop'] = _float_conv(spec_str[24:30])
info['spectral_level'] = _float_conv(spec_str[32:38])
if info['spectral_level']:
info['spectral_level'] = 10 ** info['spectral_level']
info['corner_freq'] = _float_conv(spec_str[40:46])
info['source_radius'] = _float_conv(spec_str[47:54])
info['decay'] = _float_conv(spec_str[56:62])
info['window_length'] = _float_conv(spec_str[64:70])
info['moment_mag'] = _float_conv(spec_str[72:78])
spec_inf[(station, channel)] = info
return spec_inf
[docs]@_add_resolve_seedid_doc
def read_nordic(select_file, return_wavnames=False, encoding='latin-1',
nordic_format='UKN', **kwargs):
"""
Read a catalog of events from a Nordic formatted select file.
Generates a series of temporary files for each event in the select file.
:type select_file: str
:param select_file: Nordic formatted select.out file to open
:type return_wavnames: bool
:param return_wavnames:
If True, will return the names of the waveforms that the events
are associated with.
:type encoding: str
:param encoding: Encoding for file, used to decode from bytes to string
:type nordic_format: str
:param nordic_format:
'UKN', 'OLD', or 'NEW' (unknown, old, new). For 'UKN', the function
will find out on its own
:return: catalog of events
:rtype: :class:`~obspy.core.event.event.Event`
"""
if not hasattr(select_file, "readline"):
try:
f = open(select_file, 'r', encoding=encoding)
except Exception:
try:
f = select_file.decode(encoding)
except Exception:
f = str(select_file)
else:
f = select_file
wav_names = []
event_str = []
catalog = Catalog()
for line in f:
if len(line.rstrip()) > 0:
event_str.append(line)
elif len(event_str) > 0:
catalog, wav_names = _extract_event(
event_str=event_str, catalog=catalog, wav_names=wav_names,
return_wavnames=return_wavnames, nordic_format=nordic_format,
**kwargs)
event_str = []
f.close()
if len(event_str) > 0:
# May occur if the last line of the file is not blank as it should be
catalog, wav_names = _extract_event(
event_str=event_str, catalog=catalog, wav_names=wav_names,
return_wavnames=return_wavnames, nordic_format=nordic_format,
**kwargs)
if return_wavnames:
return catalog, wav_names
for event in catalog:
event.scope_resource_ids()
return catalog
[docs]def _read_uncertainty(tagged_lines, event):
"""
Read hyp uncertainty line.
:param tagged_lines: Lines keyed by line type
:type tagged_lines: dict
:returns: updated event
:rtype: :class:`~obspy.core.event.event.Event`
"""
if 'E' not in tagged_lines.keys():
return event
# In principle there shouldn't be more than one error line, but I think
# there can be - need to associate the correct error
line = tagged_lines['E'][0][0]
# TODO: Convert this to ConfidenceEllipsoid
errors = {'x_err': None}
try:
errors = {'x_err': _float_conv(line[32:38]),
'y_err': _float_conv(line[24:30]),
'z_err': _float_conv(line[38:43]),
'xy_cov': _float_conv(line[43:55]),
'xz_cov': _float_conv(line[55:67]),
'yz_cov': _float_conv(line[67:79])}
except ValueError:
pass
orig = event.origins[0]
# If any value is Zero or None then no Ellipse can be constructed
if not [x for x in (errors['x_err'], errors['y_err'], errors['xy_cov'])
if not x]:
e = Ellipse.from_uncerts(errors['x_err'],
errors['y_err'],
errors['xy_cov'])
if e:
orig.origin_uncertainty = OriginUncertainty(
max_horizontal_uncertainty=e.a * 1000.,
min_horizontal_uncertainty=e.b * 1000.,
azimuth_max_horizontal_uncertainty=e.theta,
preferred_description="uncertainty ellipse")
# But lat / lon / depth-errors may still be filled
if errors['y_err'] is not None:
orig.latitude_errors = QuantityError(_km_to_deg_lat(errors['y_err']))
if errors['x_err'] is not None:
orig.longitude_errors = QuantityError(_km_to_deg_lon(errors['x_err'],
orig.latitude))
if errors['z_err'] is not None:
orig.depth_errors = QuantityError(errors['z_err'] * 1000.)
try:
gap = int(line[5:8])
except ValueError:
gap = None
try:
orig.time_errors = QuantityError(float(line[14:20]))
except ValueError:
pass
if orig.quality:
orig.quality.azimuthal_gap = gap
else:
orig.quality = OriginQuality(azimuthal_gap=gap)
return event
[docs]def _read_highaccuracy(tagged_lines, event):
"""
Read high accuracy origin line.
:param tagged_lines: Lines keyed by line type
:type tagged_lines: dict
:returns: updated event
:rtype: :class:`~obspy.core.event.event.Event`
"""
if 'H' not in tagged_lines.keys():
return event
# In principle there shouldn't be more than one high precision line
line = tagged_lines['H'][0][0]
try:
dt = {'Y': _int_conv(line[1:5]),
'MO': _int_conv(line[6:8]),
'D': _int_conv(line[8:10]),
'H': _int_conv(line[11:13]),
'MI': _int_conv(line[13:15]),
'S': _float_conv(line[16:23])}
except ValueError:
pass
try:
ev_time = UTCDateTime(dt['Y'], dt['MO'], dt['D'],
dt['H'], dt['MI'], 0, 0) + dt['S']
if abs(event.origins[0].time - ev_time) < 0.1:
event.origins[0].time = ev_time
else:
print('High accuracy time differs from normal time by >0.1s')
except ValueError:
pass
try:
values = {'latitude': _float_conv(line[23:32]),
'longitude': _float_conv(line[33:43]),
'depth': _float_conv(line[44:52]),
'rms': _float_conv(line[53:59])}
except ValueError:
pass
if values['latitude'] is not None:
event.origins[0].latitude = values['latitude']
if values['longitude'] is not None:
event.origins[0].longitude = values['longitude']
if values['depth'] is not None:
event.origins[0].depth = values['depth'] * 1000.
if values['rms'] is not None:
if event.origins[0].quality is not None:
event.origins[0].quality.standard_error = values['rms']
else:
event.origins[0].quality = OriginQuality(
standard_error=values['rms'])
return event
[docs]def _read_focal_mechanisms(tagged_lines, event):
"""
Read focal mechanism info from s-file
:param tagged_lines: Lines keyed by line type
:type tagged_lines: dict
:returns: updated event
:rtype: :class:`~obspy.core.event.event.Event`
"""
if 'F' not in tagged_lines.keys():
return event
UserWarning("Found focal-mechanism info: reading amplitude-ratio fit,"
"number of bad polarities and number of bad amplitude ratios"
"is not implemented.")
for line, line_num in tagged_lines['F']:
nodal_p = NodalPlane(strike=float(line[0:10]), dip=float(line[10:20]),
rake=float(line[20:30]))
try:
# Apparently these don't have to be filled.
nodal_p.strike_errors = QuantityError(float(line[30:35]))
nodal_p.dip_errors = QuantityError(float(line[35:40]))
nodal_p.rake_errors = QuantityError(float(line[40:45]))
except ValueError:
pass
fm = FocalMechanism(nodal_planes=NodalPlanes(nodal_plane_1=nodal_p))
try:
fm.method_id = ResourceIdentifier(
"smi:nc.anss.org/focalMehcanism/" + line[70:77].strip())
fm.creation_info = CreationInfo(agency_id=line[66:69].strip)
fm.misfit = float(line[45:50])
fm.station_distribution_ratio = float(line[50:55])
except ValueError:
pass
event.focal_mechanisms.append(fm)
return event
[docs]def _read_moment_tensors(tagged_lines, event):
"""
Read moment tensors from s-file
:param tagged_lines: Lines keyed by line type
:type tagged_lines: dict
:returns: updated event
:rtype: :class:`~obspy.core.event.event.Event`
"""
if 'M' not in tagged_lines.keys():
return event
# Group moment tensor lines together
mt_lines = sorted(tagged_lines['M'], key=lambda tup: tup[1])
for mt_ind in range(len(mt_lines) // 2):
mt_line_1 = mt_lines[mt_ind * 2][0]
mt_line_2 = mt_lines[(mt_ind * 2) + 1][0]
if not str(mt_line_2[1:3]) == str('MT'):
raise NordicParsingError("Matching moment tensor lines not found.")
sfile_seconds = int(mt_line_1[16:18])
if sfile_seconds == 60:
sfile_seconds = 0
add_seconds = 60
else:
add_seconds = 0
ori_time = UTCDateTime(
int(mt_line_1[1:5]), int(mt_line_1[6:8]), int(mt_line_1[8:10]),
int(mt_line_1[11:13]), int(mt_line_1[13:15]), sfile_seconds,
int(mt_line_1[19:20]) * 100000) + add_seconds
event.origins.append(Origin(
time=ori_time, latitude=float(mt_line_1[23:30]),
longitude=float(mt_line_1[30:38]),
depth=float(mt_line_1[38:43]) * 1000,
creation_info=CreationInfo(agency_id=mt_line_1[45:48].strip())))
event.magnitudes.append(Magnitude(
mag=float(mt_line_1[55:59]),
magnitude_type=_nortoevmag(mt_line_1[59]),
creation_info=CreationInfo(agency_id=mt_line_1[60:63].strip()),
origin_id=event.origins[-1].resource_id))
event.focal_mechanisms.append(FocalMechanism(
moment_tensor=MomentTensor(
derived_origin_id=event.origins[-1].resource_id,
moment_magnitude_id=event.magnitudes[-1].resource_id,
scalar_moment=float(mt_line_2[52:62]), tensor=Tensor(
m_rr=float(mt_line_2[3:9]), m_tt=float(mt_line_2[10:16]),
m_pp=float(mt_line_2[17:23]), m_rt=float(mt_line_2[24:30]),
m_rp=float(mt_line_2[31:37]),
m_tp=float(mt_line_2[38:44])),
method_id=ResourceIdentifier(
"smi:nc.anss.org/momentTensor/" + mt_line_1[70:77].strip()
))))
return event
[docs]def _read_picks(tagged_lines, new_event, nordic_format='UKN', **kwargs):
"""
Internal pick reader. Use read_nordic instead.
:type tagged_lines: dict
:param tagged_lines: Lines keyed by line type
:type new_event: :class:`~obspy.core.event.event.Event`
:param new_event: event to associate picks with.
:type nordic_format: str
:param nordic_format:
'UKN', 'OLD', or 'NEW' (unknown, old, new). For 'UKN', the function
will find out on its own
:returns: :class:`~obspy.core.event.event.Event`
"""
evtime = new_event.origins[0].time
pickline = []
# pick-lines can be tagged by either ' ' or '4'
tags = [' ', '4']
for tag in tags:
try:
pickline.extend(
[tup[0] for tup in sorted(
tagged_lines[tag], key=lambda tup: tup[1])])
except KeyError:
pass
header = sorted(tagged_lines['7'], key=lambda tup: tup[1])[0][0]
if nordic_format == 'UKN':
nordic_format, phase_ok = check_nordic_format_version(pickline)
if nordic_format == 'NEW':
new_event = _read_picks_nordic_new(pickline, new_event, header, evtime,
**kwargs)
elif nordic_format == 'OLD':
new_event = _read_picks_nordic_old(pickline, new_event, header, evtime,
**kwargs)
elif nordic_format == 'UKN':
warnings.warn('Cannot check whether Nordic format is Old or New, is '
'this really a Nordic file?')
return new_event
[docs]def _read_picks_nordic_old(pickline, new_event, header, evtime, **kwargs):
"""
Reads the type 4 line of the old Nordic format.
"""
for line in pickline:
ain, snr = (None, None)
if line[18:28].strip() == '': # If line is empty miss it
continue
if len(line) < 80:
line = line.ljust(80) # Pick-lines without a tag may be short.
weight = line[14]
if weight not in ' 012349_': # Long phase name
weight = line[8]
phase = line[10:17].strip()
polarity = ''
elif weight == '_':
phase = line[10:17]
weight = None
polarity = ''
else:
phase = line[10:14].strip()
polarity = line[16]
if weight == ' ':
weight = None
polarity = POLARITY_MAPPING.get(polarity, None) # Empty could be None
# or undecidable.
# It is valid nordic for the origin to be hour 23 and picks to be hour
# 00 or 24: this signifies a pick over a day boundary.
pick_hour = int(line[18:20].strip() or 0)
pick_minute = int(line[20:22].strip() or 0)
pick_seconds = float(line[22:29].strip() or 0.0) # 29 should be blank,
# but sometimes SEISAN appears to overflow here, see #2348
if pick_hour == 0 and evtime.hour == 23:
day_add = 86400
elif pick_hour >= 24: # Nordic supports up to 48 hours advanced.
day_add = 86400
pick_hour -= 24
else:
day_add = 0
time = UTCDateTime(
year=evtime.year, month=evtime.month, day=evtime.day,
hour=pick_hour, minute=pick_minute) + (pick_seconds + day_add)
if header[57:60] == 'AIN':
ain = _float_conv(line[57:60])
elif header[57:60] == 'SNR':
snr = _float_conv(line[57:60])
else:
warnings.warn('%s is not currently supported' % header[57:60])
finalweight = _int_conv(line[68:70])
# Create a new obspy.event.Pick class for this pick
widargs = _resolve_seedid(
station=line[1:6].strip(), component=line[6:8].strip(), time=time,
**kwargs)
_waveform_id = WaveformStreamID(*widargs)
pick = Pick(waveform_id=_waveform_id, phase_hint=phase,
polarity=polarity, time=time)
try:
pick.onset = ONSET_MAPPING[line[9]]
except KeyError:
pass
pick.evaluation_mode = EVALUATION_MAPPING.get(line[15], "manual")
# Pick-weight from Seisan is not covered by Obspy/Quakeml standard
if weight is not None:
pick.extra = {
'nordic_pick_weight': {
'value': weight,
'namespace':
'https://seis.geus.net/software/seisan/node239.html'}}
# Note BAZ and slowness are not always filled.
if _float_conv(line[46:51]) is not None:
pick.backazimuth = _float_conv(line[46:51])
app_velocity = _float_conv(line[51:56])
if app_velocity is not None and app_velocity != 999.0:
pick.horizontal_slowness = 1 / kilometers2degrees(app_velocity)
# Create new obspy.event.Amplitude class which references above Pick
# only if there is an amplitude picked.
if _float_conv(line[33:40]) is not None:
_amplitude = Amplitude(generic_amplitude=_float_conv(line[33:40]),
period=_float_conv(line[41:45]),
pick_id=pick.resource_id,
waveform_id=pick.waveform_id)
if pick.phase_hint == 'IAML':
# Amplitude for local magnitude
_amplitude.type = 'AML'
# Set to be evaluating a point in the trace
_amplitude.category = 'point'
# Default AML unit in seisan is nm (Page 139 of seisan
# documentation, version 10.0)
_amplitude.generic_amplitude /= 1e9
_amplitude.unit = 'm'
_amplitude.magnitude_hint = 'ML'
else:
# Generic amplitude type
_amplitude.type = 'A'
if snr:
_amplitude.snr = snr
new_event.amplitudes.append(_amplitude)
elif _int_conv(line[29:33]) is not None:
# Create an amplitude instance for coda duration also
_amplitude = Amplitude(generic_amplitude=_int_conv(line[29:33]),
pick_id=pick.resource_id,
waveform_id=pick.waveform_id)
# Amplitude for coda magnitude
_amplitude.type = 'END'
# Set to be evaluating a point in the trace
_amplitude.category = 'duration'
_amplitude.unit = 's'
_amplitude.magnitude_hint = 'Mc'
if snr is not None:
_amplitude.snr = snr
new_event.amplitudes.append(_amplitude)
# Create new obspy.event.Arrival class referencing above Pick
if _float_conv(line[33:40]) is None:
arrival = Arrival(phase=pick.phase_hint, pick_id=pick.resource_id)
if _int_conv(line[60:63]) is not None:
arrival.backazimuth_residual = _int_conv(line[60:63])
if _float_conv(line[63:68]) is not None:
arrival.time_residual = _float_conv(line[63:68])
if _float_conv(line[70:75]) is not None:
arrival.distance = kilometers2degrees(_float_conv(line[70:75]))
if _int_conv(line[76:79]) is not None:
arrival.azimuth = _int_conv(line[76:79])
if ain is not None:
arrival.takeoff_angle = ain
if finalweight is not None:
arrival.time_weight = finalweight / 10
new_event.origins[0].arrivals.append(arrival)
new_event.picks.append(pick)
return new_event
[docs]def _read_picks_nordic_new(pickline, new_event, header, evtime, **kwargs):
"""
Reads the type 4 line of the old Nordic format.
"""
for line in pickline:
ain, snr = (None, None)
if line[14:37].strip() == '': # If line is empty miss it
continue
if len(line) < 80:
line = line.ljust(80) # Pick-lines without a tag may be short.
weight = line[24]
if weight == ' ':
weight = None
phase = line[16:24].strip()
if _nordic_iasp_phase_ok(phase) and phase not in ['END', 'BAZ']:
polarity = line[43]
else:
polarity = ''
polarity = POLARITY_MAPPING.get(polarity, None) # Empty could be None
# or undecidable.
# It is valid nordic for the origin to be hour 23 and picks to be hour
# 00 or 24: this signifies a pick over a day boundary. Seisan also
# allows empty hour/min/sec, which is equal to 00.
pick_hour = int(line[26:28].strip() or 0)
pick_minute = int(line[28:30].strip() or 0)
pick_seconds = float(line[31:37].strip() or 0)
if pick_hour == 0 and evtime.hour == 23:
day_add = 86400
elif pick_hour >= 24: # Nordic supports up to 48 hours advanced.
day_add = 86400
pick_hour -= 24
else:
day_add = 0
time = UTCDateTime(
year=evtime.year, month=evtime.month, day=evtime.day,
hour=pick_hour, minute=pick_minute) + (pick_seconds + day_add)
if header[60:63] == 'AIN':
ain = _float_conv(line[58:63])
elif header[60:63] == 'SNR':
snr = _float_conv(line[58:63])
else:
warnings.warn('%s is not currently supported' % header[60:63])
finalweight = _int_conv(line[68:70])
# Create a new obspy.event.Pick class for this pick
sta, cha = line[1:6].strip(), line[6:9].strip()
net, loc = line[10:12].strip(), line[12:14].strip()
if net == '' and loc == '':
widargs = _resolve_seedid(station=sta, component=cha, time=time,
**kwargs)
else:
widargs = net, sta, loc, cha
_waveform_id = WaveformStreamID(*widargs)
pick = Pick(waveform_id=_waveform_id, phase_hint=phase,
polarity=polarity, time=time)
# agency and operator / author / analyst
if line[51:54] != ' ' or line[55:58] != ' ':
pick.creation_info = CreationInfo(agency_id=line[51:54].strip(),
author=line[55:58].strip())
try:
pick.onset = ONSET_MAPPING[line[15]]
except KeyError:
pass
pick.evaluation_mode = EVALUATION_MAPPING.get(line[25], "manual")
# Pick-weight from Seisan is not covered by Obspy/Quakeml standard
if weight is not None:
pick.extra = {
'nordic_pick_weight': {
'value': weight,
'namespace':
'https://seis.geus.net/software/seisan/node239.html'}}
# Note that BAZ and apparent velocity are not always filled
found_baz_associated_pick = False
if 'BAZ' in phase and _float_conv(line[37:44]) is not None:
# Seisan phase name can be e.g. "BAZ-P"
baz_phase_type = phase.strip('BAZ-')
# Check if there is matching pick for the BAZ. THIS MEANS THAT
# THE BAZ-line in Nordic file has to follow the actual time pick!
for existing_pick in new_event.picks:
if (existing_pick.waveform_id == pick.waveform_id and
existing_pick.phase_hint == baz_phase_type and
existing_pick.time == pick.time):
pick = existing_pick
found_baz_associated_pick = True
break
# BAZ-phase name doesn't have to specify the associated phase
# name - as a secondary resort, compare pick times only.
if not found_baz_associated_pick:
for existing_pick in new_event.picks:
if (existing_pick.waveform_id == pick.waveform_id and
existing_pick.time == pick.time):
pick = existing_pick
found_baz_associated_pick = True
break
pick.backazimuth = _float_conv(line[37:44])
app_velocity = _float_conv(line[44:50])
if app_velocity is not None and app_velocity != 999.0:
pick.horizontal_slowness = 1 / kilometers2degrees(app_velocity)
# Create new obspy.event.Amplitude class which references above Pick
# only if there is an amplitude picked.
if (_is_iasp_ampl_phase(phase) and _float_conv(line[37:44]) is not None
and not found_baz_associated_pick):
_amplitude = Amplitude(generic_amplitude=_float_conv(line[37:44]),
period=_float_conv(line[44:50]),
pick_id=pick.resource_id,
waveform_id=pick.waveform_id)
if pick.phase_hint == 'IAML':
# Amplitude for local magnitude
_amplitude.type = 'AML'
# Set to be evaluating a point in the trace
_amplitude.category = 'point'
# Default AML unit in seisan is nm (Page 139 of seisan
# documentation, version 10.0)
_amplitude.generic_amplitude /= 1e9
_amplitude.unit = 'm'
_amplitude.magnitude_hint = 'ML'
else:
# Generic amplitude type
_amplitude.type = 'A'
if snr:
_amplitude.snr = snr
new_event.amplitudes.append(_amplitude)
# Magnitude for single trace / station computed from amplitude
# if line[63:68].strip() != '':
mag_residual = _float_conv(line[63:68])
# assoc_mag = new_event.magnitudes[0]
assoc_mag = None
for mag in new_event.magnitudes:
try:
if (mag.magnitude_type == _amplitude.magnitude_hint
and mag.creation_info.agency_id
== pick.creation_info.agency_id):
assoc_mag = mag
except AttributeError:
pass
if assoc_mag is not None:
_trace_mag = StationMagnitude(
origin_id=assoc_mag.origin_id,
mag=assoc_mag.mag - mag_residual,
mag_errors=mag_residual,
station_magnitude_type=assoc_mag.magnitude_type,
amplitude_id=_amplitude.resource_id,
method_id=assoc_mag.method_id,
waveform_id=pick.waveform_id,
creation_info=pick.creation_info)
new_event.station_magnitudes.append(_trace_mag)
elif phase == 'END' and _int_conv(line[37:44]) is not None:
# Create an amplitude instance for coda duration also
_amplitude = Amplitude(generic_amplitude=_int_conv(line[37:44]),
pick_id=pick.resource_id,
waveform_id=pick.waveform_id)
# Amplitude for coda magnitude
_amplitude.type = 'END'
# Set to be evaluating a point in the trace
_amplitude.category = 'duration'
_amplitude.unit = 's'
_amplitude.magnitude_hint = 'Mc'
if snr is not None:
_amplitude.snr = snr
new_event.amplitudes.append(_amplitude)
# Create new obspy.event.Arrival class referencing above Pick
if not _is_iasp_ampl_phase(phase):
# If there is a pick associated with a BAZ, then there's also an
# arrival for that pick already
if found_baz_associated_pick:
for existing_arrival in new_event.origins[0].arrivals:
if existing_arrival.pick_id == pick.resource_id:
arrival = existing_arrival
break
else:
arrival = Arrival(phase=pick.phase_hint,
pick_id=pick.resource_id)
if _float_conv(line[63:68]) is not None:
if 'BAZ' in phase:
arrival.backazimuth_residual = _float_conv(line[63:68])
else:
arrival.time_residual = _float_conv(line[63:68])
# Add other information and append arrival itself only if it's new.
if not found_baz_associated_pick:
if _float_conv(line[70:75]) is not None:
arrival.distance = kilometers2degrees(_float_conv(
line[70:75]))
if _int_conv(line[76:79]) is not None:
arrival.azimuth = _int_conv(line[76:79])
if ain is not None:
arrival.takeoff_angle = ain
if finalweight is not None:
arrival.time_weight = finalweight / 10
new_event.origins[0].arrivals.append(arrival)
# Add the pick, but do not add it as new if the pick was only updated
# with new information (BAZ, slowness etc.)
if not found_baz_associated_pick:
new_event.picks.append(pick)
return new_event
[docs]def readwavename(sfile, encoding='latin-1'):
"""
Extract the waveform filename from the s-file.
Returns a list of waveform names found in the s-file as multiples can
be present.
:type sfile: str
:param sfile: Path to the sfile
:type encoding: str
:param encoding: Encoding for file, used to decode from bytes to string
:returns: List of strings of wave paths
:rtype: list
"""
with open(sfile, 'r', encoding=encoding) as f:
tagged_lines = _get_line_tags(f=f)
if len(tagged_lines['6']) == 0:
msg = ('No waveform files in sfile %s' % sfile)
warnings.warn(msg)
wavenames = _readwavename(tagged_lines=tagged_lines['6'])
return wavenames
[docs]def _readwavename(tagged_lines):
"""
Internal wave-name reader.
:type tagged_lines: list
:param tagged_lines:
List of tuples of (strings, line-number) of the waveform lines.
:return: list of wave-file names
"""
wavename = []
for line in tagged_lines:
wavename.append(line[0][1:79].strip())
return wavename
[docs]def blanksfile(wavefile, evtype, userid, overwrite=False, evtime=None,
nordic_format='OLD'):
"""
Generate an empty s-file with a populated header for a given waveform.
:type wavefile: str
:param wavefile: Wave-file to associate with this S-file, the timing of \
the S-file will be taken from this file if evtime is not set.
:type evtype: str
:param evtype: Event type letter code, e.g. L, R, D
:type userid: str
:param userid: 4-character SEISAN USER ID
:type overwrite: bool
:param overwrite: Overwrite an existing S-file, default=False
:type evtime: :class:`~obspy.core.utcdatetime.UTCDateTime`
:param evtime: If given this will set the timing of the S-file
:type nordic_format: str
:param nordic_format:
Version of Nordic format to be used for output, either OLD or NEW.
:returns: str, S-file name
"""
if evtime is None:
try:
st = read(wavefile)
evtime = st[0].stats.starttime
except Exception:
raise NordicParsingError('Wavefile: ' + wavefile +
' is invalid, try again with real data.')
# Check that user ID is the correct length
if len(userid) != 4:
raise NordicParsingError('User ID must be 4 characters long')
# Check that evtype is one of L,R,D
if evtype not in ['L', 'R', 'D']:
raise NordicParsingError('Event type must be either L, R or D')
# Generate s-file name in the format dd-hhmm-ss[L,R,D].Syyyymm
sfile = str(evtime.day).zfill(2) + '-' + str(evtime.hour).zfill(2) +\
str(evtime.minute).zfill(2) + '-' + str(evtime.second).zfill(2) +\
evtype + '.S' + str(evtime.year) + str(evtime.month).zfill(2)
# Check is sfile exists
if Path(sfile).is_file() and not overwrite:
warnings.warn('Desired sfile: ' + sfile + ' exists, will not ' +
'overwrite')
for i in range(1, 10):
sfile = str(evtime.day).zfill(2) + '-' +\
str(evtime.hour).zfill(2) +\
str(evtime.minute).zfill(2) + '-' +\
str(evtime.second + i).zfill(2) + evtype + '.S' +\
str(evtime.year) + str(evtime.month).zfill(2)
if not Path(sfile).is_file():
break
else:
msg = ('Tried generated files up to 20s in advance and found ' +
'all exist.')
raise NordicParsingError(msg)
with open(sfile, 'w') as f:
# Write line 1 of s-file
f.write(' ' + str(evtime.year) + ' ' + str(evtime.month).rjust(2) +
str(evtime.day).rjust(2) + ' ' +
str(evtime.hour).rjust(2) +
str(evtime.minute).rjust(2) + ' ' +
str(float(evtime.second)).rjust(4) + ' ' +
evtype + '1'.rjust(58) + '\n')
# Write line 2 of s-file
output_time = datetime.datetime.now()
f.write(' ACTION:ARG ' + str(output_time.year)[2:4] +
'-' + str(output_time.month).zfill(2) + '-' +
str(output_time.day).zfill(2) + ' ' +
str(output_time.hour).zfill(2) + ':' +
str(output_time.minute).zfill(2) + ' OP:' +
userid.ljust(4) + ' STATUS:' + 'ID:'.rjust(18) +
str(evtime.year) + str(evtime.month).zfill(2) +
str(evtime.day).zfill(2) + str(evtime.hour).zfill(2) +
str(evtime.minute).zfill(2) + str(evtime.second).zfill(2) +
'I'.rjust(6) + '\n')
# Write line 3 of s-file
write_wavfile = str(Path(wavefile).parent)
f.write(' ' + write_wavfile + '6'.rjust(79 - len(write_wavfile)) +
'\n')
# Write final line of s-file
if nordic_format == 'OLD':
f.write(OLD_PHASE_HEADER_LINE)
elif nordic_format == 'NEW':
f.write(NEW_PHASE_HEADER_LINE)
return sfile
[docs]def write_select(catalog, filename, userid='OBSP', evtype='L',
wavefiles=None, high_accuracy=True, nordic_format='OLD'):
"""
Function to write a catalog to a select file in nordic format.
:type catalog: :class:`~obspy.core.event.catalog.Catalog`
:param catalog: A catalog of obspy events
:type filename: str
:param filename: Path to write to
:type userid: str
:param userid: Up to 4 character user ID
:type evtype: str
:param evtype:
Single character string to describe the event, either L, R or D.
:type wavefiles: list
:param wavefiles:
Waveforms to associate the events with, must be ordered in the same
way as the events in the catalog.
:type high_accuracy: bool
:param high_accuracy:
Whether to output pick seconds at 6.3f (high_accuracy) or
5.2f (standard)
:type nordic_format: str
:param nordic_format:
Version of Nordic format to be used for output, either OLD or NEW.
"""
if nordic_format not in ['OLD', 'NEW']:
raise ValueError('Nordic format can be ''OLD'' or ''NEW'', not '
+ nordic_format)
if not wavefiles:
wavefiles = ['' for _i in range(len(catalog))]
with open(filename, 'w') as fout:
for event, wavfile in zip(catalog, wavefiles):
select = io.StringIO()
_write_nordic(
event=event, filename=None, userid=userid, evtype=evtype,
wavefiles=wavfile, nordic_format=nordic_format,
string_io=select, high_accuracy=high_accuracy)
select.seek(0)
for line in select:
fout.write(line)
fout.write('\n')
[docs]def _write_nordic(event, filename, userid='OBSP', evtype='L', outdir='.',
wavefiles=None, explosion=False, nordic_format='OLD',
overwrite=True, string_io=None, high_accuracy=True):
"""
Write an :class:`~obspy.core.event.event.Event` to a nordic formatted
s-file.
:type event: :class:`~obspy.core.event.event.Event`
:param event: A single obspy event
:type filename: str
:param filename:
Filename to write to, can be None, and filename will be generated from
the origin time in nordic format.
:type userid: str
:param userid: Up to 4 character user ID
:type evtype: str
:param evtype:
Single character string to describe the event, either L, R or D.
:type outdir: str
:param outdir: Path to directory to write to
:type wavefiles: list
:param wavefiles: Waveforms to associate the nordic file with
:type explosion: bool
:param explosion:
Note if the event is an explosion, will be marked by an E.
:type nordic_format: str
:param nordic_format:
nordic_format of Nordic format to be used for output, either OLD or
NEW.
:type overwrite: bool
:param overwrite: force to overwrite old files, defaults to False
:type string_io: io.StringIO
:param string_io:
If given, will write to the StringIO object in memory rather than to
the filename.
:type high_accuracy: bool
:param high_accuracy:
Whether to output pick seconds at 6.3f (high_accuracy) or
5.2f (standard)
:returns: str: name of nordic file written
.. note::
Seisan can find waveforms either by their relative or absolute path, or
by looking for the file recursively in directories within the WAV
directory in your seisan install. Because all lines need to be less
than 79 characters long (fortran hangover) in the s-files, you will
need to determine whether the full-path is okay or not.
"""
# First we need to work out what to call the s-file and open it
# Check that user ID is the correct length
if len(userid) > 4:
raise NordicParsingError('%s User ID must be at most 4 characters long'
% userid)
userid = userid.ljust(4)
# Check that outdir exists
if not Path(outdir).is_dir():
raise NordicParsingError('Out path does not exist, I will not '
'create this: ' + outdir)
# Check that evtype is one of L,R,D
if evtype not in ['L', 'R', 'D']:
raise NordicParsingError('Event type must be either L, R or D')
if explosion:
evtype += 'E'
elif event.event_type is not None:
try:
evtype += EVENT_TYPE_AND_CERTAINTY_MAPPING_TO_SEISAN.get(
event.event_type_certainty + ' ' + event.event_type)
except TypeError:
try:
evtype += EVENT_TYPE_MAPPING_TO_SEISAN.get(event.event_type)
except TypeError:
pass
# Check that there is one event
if isinstance(event, Catalog) and len(event) == 1:
event = event[0]
elif isinstance(event, Event):
event = event
else:
raise NordicParsingError('Needs a single event')
if not isinstance(wavefiles, list):
wavefiles = [str(wavefiles)]
# Determine name from origin time
try:
origin = event.preferred_origin() or event.origins[0]
except IndexError:
msg = 'Need at least one origin with at least an origin time'
raise NordicParsingError(msg)
evtime = origin.time
if not evtime:
msg = ('event has an origin, but time is not populated. ' +
'This is required!')
raise NordicParsingError(msg)
# Attempt to cope with possible pre-existing files
if not filename:
range_list = []
for i in range(30): # Look +/- 30 seconds around origin time
range_list.append(i)
range_list.append(-1 * i)
range_list = range_list[1:]
for add_secs in range_list:
sfilename = (evtime + add_secs).datetime.strftime('%d-%H%M-%S') +\
evtype[0] + '.S' + (evtime + add_secs).\
datetime.strftime('%Y%m')
if not (Path(outdir) / sfilename).is_file():
sfile_path = Path(outdir) / sfilename
break
elif overwrite:
sfile_path = Path(outdir) / sfilename
break
else:
raise NordicParsingError(str(Path(outdir) / sfilename) +
' already exists, will not overwrite')
else:
sfile_path = Path(outdir) / filename
sfilename = filename
if not string_io:
sfile = open(sfile_path, 'w')
else:
sfile = string_io
# Write header line(s)
sfile.write(_write_header_line(
event, origin, evtype, is_preferred_origin=True))
# Write hyp error line
try:
sfile.write(_write_hyp_error_line(origin) + '\n')
except (NordicParsingError, TypeError):
pass
# Write origin lines for additional origins
for add_origin in event.origins:
if not add_origin == origin:
sfile.write(_write_header_line(event, add_origin, evtype,
is_preferred_origin=False))
# Write fault plane solution
if hasattr(event, 'focal_mechanisms') and len(event.focal_mechanisms) > 0:
for focal_mechanism in event.focal_mechanisms:
try:
sfile.write(
_write_focal_mechanism_line(focal_mechanism) + '\n')
except AttributeError:
pass
# Write moment tensor solution
for focal_mechanism in event.focal_mechanisms:
try:
sfile.write(
_write_moment_tensor_line(focal_mechanism) + '\n')
except AttributeError:
pass
# Write line 2 (type: I) of s-file
sfile.write(
" Action:ARG {0} OP:{1} STATUS: ID:{2} I\n".format(
datetime.datetime.now().strftime("%y-%m-%d %H:%M"),
userid.ljust(4)[0:4], evtime.strftime("%Y%m%d%H%M%S")))
# Write line-type 6 of s-file
for wavefile in wavefiles:
# Do not write names that indicate there's not a waveform file
if wavefile == '' or wavefile == 'None' or wavefile is None:
continue
sfile.write(' ' + os.path.basename(wavefile) +
'6'.rjust(79 - len(os.path.basename(wavefile))) + '\n')
for comment in event.comments:
nordic_comment = _write_comment(comment)
if nordic_comment is None:
continue
sfile.write(nordic_comment + '\n')
# Write final line of s-file
if nordic_format == 'OLD':
sfile.write(OLD_PHASE_HEADER_LINE)
elif nordic_format == 'NEW':
sfile.write(NEW_PHASE_HEADER_LINE)
# Now call the populate sfile function
if len(event.picks) > 0:
newpicks = '\n'.join(nordpick(event, high_accuracy=high_accuracy,
nordic_format=nordic_format))
sfile.write(newpicks + '\n')
sfile.write('\n'.rjust(81))
if not string_io:
sfile.close()
return str(sfilename)
else:
return
[docs]def _write_moment_tensor_line(focal_mechanism):
"""
Generate the two lines required for moment tensor solutions in Nordic.
"""
# First line contains hypocenter info, second contains tensor info
lines = [list(' ' * 79 + 'M'), list(' ' * 79 + 'M')]
# Get the origin associated with the moment tensor
origin = focal_mechanism.moment_tensor.derived_origin_id.\
get_referred_object()
magnitude = focal_mechanism.moment_tensor.moment_magnitude_id.\
get_referred_object()
# Sort out the first line
lines[0][1:5] = str(origin.time.year).rjust(4)
lines[0][6:8] = str(origin.time.month).rjust(2)
lines[0][8:10] = str(origin.time.day).rjust(2)
lines[0][11:13] = str(origin.time.hour).rjust(2)
lines[0][13:15] = str(origin.time.minute).rjust(2)
lines[0][16:20] = str(origin.time.second).rjust(2) + '.' +\
str(origin.time.microsecond).ljust(1)[0:1]
lines[0][23:30] = _str_conv(origin.latitude, 3).rjust(7)
lines[0][30:38] = _str_conv(origin.longitude, 3).rjust(8)
lines[0][38:43] = _str_conv(origin.depth / 1000.0, 1).rjust(5)
lines[0][45:48] = _get_agency_id(origin)
lines[0][55:59] = _str_conv(magnitude.mag, 1).rjust(4)
lines[0][59] = _evmagtonor(magnitude.magnitude_type)
lines[0][60:63] = _get_agency_id(magnitude)
lines[0][70:77] = (str(
focal_mechanism.moment_tensor.method_id).split('/')[-1]).rjust(7)
# Sort out the second line
lines[1][1:3] = 'MT'
lines[1][3:9] = _str_conv(
focal_mechanism.moment_tensor.tensor.m_rr, 3)[0:6].rjust(6)
lines[1][10:16] = _str_conv(
focal_mechanism.moment_tensor.tensor.m_tt, 3)[0:6].rjust(6)
lines[1][17:23] = _str_conv(
focal_mechanism.moment_tensor.tensor.m_pp, 3)[0:6].rjust(6)
lines[1][24:30] = _str_conv(
focal_mechanism.moment_tensor.tensor.m_rt, 3)[0:6].rjust(6)
lines[1][31:37] = _str_conv(
focal_mechanism.moment_tensor.tensor.m_rp, 3)[0:6].rjust(6)
lines[1][38:44] = _str_conv(
focal_mechanism.moment_tensor.tensor.m_tp, 3)[0:6].rjust(6)
lines[1][45:48] = _get_agency_id(magnitude)
lines[1][48] = 'S'
lines[1][52:62] = (
"%.3e" % focal_mechanism.moment_tensor.scalar_moment).rjust(10)
lines[1][70:77] = (
str(focal_mechanism.moment_tensor.method_id).split('/')[-1]).rjust(7)
return '\n'.join([''.join(line) for line in lines])
[docs]def _write_focal_mechanism_line(focal_mechanism):
"""
Get the line for a focal-mechanism
"""
nodal_plane = focal_mechanism.nodal_planes.nodal_plane_1
line = list(' ' * 79 + 'F')
line[0:10] = (_str_conv(nodal_plane.strike, 1)).rjust(10)
line[10:20] = (_str_conv(nodal_plane.dip, 1)).rjust(10)
line[20:30] = (_str_conv(nodal_plane.rake, 1)).rjust(10)
try:
line[30:35] = (_str_conv(nodal_plane.strike_errors.uncertainty,
1)).rjust(5)
line[35:40] = (_str_conv(nodal_plane.dip_errors.uncertainty,
1)).rjust(5)
line[40:45] = (_str_conv(nodal_plane.rake_errors.uncertainty,
1)).rjust(5)
except AttributeError:
pass
if hasattr(focal_mechanism, 'misfit'):
line[45:50] = (_str_conv(focal_mechanism.misfit, 1)).rjust(5)
if hasattr(focal_mechanism, 'station_distribution_ratio'):
line[50:55] = (_str_conv(
focal_mechanism.station_distribution_ratio, 1)).rjust(5)
line[66:69] = _get_agency_id(focal_mechanism)
if hasattr(focal_mechanism, 'method_id'):
line[70:77] = (str(focal_mechanism.method_id).split('/')[-1]
).rjust(7)[0:7]
return ''.join(line)
[docs]def _write_hyp_error_line(origin):
"""
Generate hypocentral error line.
format:
GAP=126 0.64 1.3 1.7 2.3 -0.9900E+00 -0.4052E+00 \
0.2392E+00E
"""
error_line = list(' ' * 79 + 'E')
if not hasattr(origin, 'quality'):
raise NordicParsingError("Origin has no quality associated")
error_line[1:5] = 'GAP='
if origin.quality['azimuthal_gap']:
error_line[5:8] = str(int(origin.quality['azimuthal_gap'])).ljust(3)
error_line[11:14] = _get_agency_id(origin)
if origin.time_errors.uncertainty:
error_line[14:20] = _str_conv(
origin.time_errors.uncertainty, 2).rjust(6)
# try:
errors = dict()
add_simplified_uncertainty = False
add_uncertainty = False
if hasattr(origin, 'origin_uncertainty'):
orig_unc = origin.origin_uncertainty
if (hasattr(orig_unc, 'min_horizontal_uncertainty')
and hasattr(orig_unc, 'max_horizontal_uncertainty')):
# Even though uncertainty should not be Zero, such files exist.
if (orig_unc.min_horizontal_uncertainty == 0.0 or
orig_unc.max_horizontal_uncertainty == 0.0):
add_simplified_uncertainty = True
elif origin.origin_uncertainty is not None:
add_uncertainty = True
# Following will work once Ellipsoid class added
# if hasattr(origin.origin_uncertainty, 'confidence_ellipsoid'):
# cov = Ellipsoid.from_confidence_ellipsoid(
# origin.origin_uncertainty['confidence_ellipsoid']).to_cov()
# errors['x_err'] = sqrt(cov(0, 0)) / 1000.0
# errors['y_err'] = sqrt(cov(1, 1)) / 1000.0
# errors['z_err'] = sqrt(cov(2, 2)) / 1000.0
# # xy_, xz_, then yz_cov fields
# error_line[43:55] = ("%.4e" % (cov(0, 1) / 1.e06)).rjust(12)
# error_line[55:67] = ("%.4e" % (cov(0, 2) / 1.e06)).rjust(12)
# error_line[67:79] = ("%.4e" % (cov(1, 2) / 1.e06)).rjust(12)
# else:
if origin.depth_errors.uncertainty:
errors['z_err'] = origin.depth_errors.uncertainty / 1000.0
else:
errors['z_err'] = None
if add_uncertainty:
cov = Ellipse.from_origin_uncertainty(origin.origin_uncertainty).\
to_cov()
errors['x_err'] = sqrt(cov[0][0][0]) / 1000.0
errors['y_err'] = sqrt(cov[0][1][1]) / 1000.0
# xy covariance field
error_line[43:55] = ("%.4e" % (cov[0][0][1] / 1.e06)).rjust(12)
elif add_simplified_uncertainty: # Deal with Zero uncertainty
errors['x_err'] = 0.0
errors['y_err'] = 0.0
else:
# Only return without writing error-line when no errors available
if not (origin.longitude_errors.uncertainty and
origin.latitude_errors.uncertainty and
origin.depth_errors.uncertainty):
return ''.join(error_line)
try:
errors['x_err'] = (origin.longitude_errors.uncertainty /
_km_to_deg_lon(1.0, origin.latitude))
except AttributeError:
pass
try:
errors['y_err'] = (origin.latitude_errors.uncertainty /
_km_to_deg_lat(1.0))
except AttributeError:
pass
error_line[24:30] = (_str_conv(errors['y_err'], 1)).rjust(6)[0:6]
error_line[32:38] = (_str_conv(errors['x_err'], 1)).rjust(6)[0:6]
error_line[38:43] = (_str_conv(errors['z_err'], 1)).rjust(5)[0:5]
return ''.join(error_line)
[docs]def nordpick(event, high_accuracy=True, nordic_format='OLD'):
"""
Format picks in an :class:`~obspy.core.event.event.Event` to nordic.
:type event: :class:`~obspy.core.event.event.Event`
:param event: A single obspy event.
:type high_accuracy: bool
:param high_accuracy:
Whether to output pick seconds at 6.3f (high_accuracy) or
5.2f (standard).
:type nordic_format: str
:param nordic_format:
Version of Nordic format to be used for output, either OLD or NEW.
:returns: List of String
.. note::
Currently angle of incidence is unsupported. This is because
:class:`~obspy.core.event.event.Event` stores takeoff angle rather than
incident angle, which would require computation from the value stored
in seisan. Multiple weights are also not supported.
"""
# Nordic picks do not have a date associated with them - we need time
# relative to some origin time.
try:
origin = event.preferred_origin() or event.origins[0]
origin_date = origin.time.date
except IndexError:
origin = Origin()
origin_date = min([p.time for p in event.picks]).date
pick_strings = []
if high_accuracy:
pick_rounding = 3
else:
pick_rounding = 2
for pick in event.picks:
if not pick.waveform_id:
msg = ('No waveform id for pick at time %s, skipping' % pick.time)
warnings.warn(msg)
continue
impulsivity = _str_conv(INV_ONSET_MAPPING.get(pick.onset))
polarity = _str_conv(INV_POLARITY_MAPPING.get(pick.polarity))
# Extract weight - should be stored as 0-4, or 9 for seisan.
try:
weight = pick.extra.get('nordic_pick_weight')['value']
except AttributeError:
weight = ' '
# Extract velocity: Note that horizontal slowness in quakeML is stored
# as s/deg and Seisan stores apparent velocity in km/s
if pick.horizontal_slowness is not None:
velocity = _str_conv(degrees2kilometers(
1.0 / pick.horizontal_slowness))
else:
velocity = ' '
azimuth = _str_conv(pick.backazimuth)
# Extract the correct arrival info for this pick - assuming only one
# arrival per pick...
arrival = [arrival for arrival in origin.arrivals
if arrival.pick_id == pick.resource_id]
if len(arrival) > 0:
if len(arrival) > 1:
warnings.warn("Multiple arrivals for pick - only writing one")
arrival = arrival[0]
# Extract azimuth residual
if arrival.backazimuth_residual is not None:
azimuthres = _str_conv(int(arrival.backazimuth_residual))
else:
azimuthres = ' '
if arrival.takeoff_angle is not None:
ain = _str_conv(arrival.takeoff_angle, rounded=1)
else:
ain = ' '
# Extract time residual
timeres = ' '
if hasattr(arrival, 'time_residual'):
if arrival.time_residual is not None:
timeres = _str_conv(arrival.time_residual, rounded=2)
# Extract distance
if arrival.distance is not None:
distance = degrees2kilometers(arrival.distance)
if distance >= 100.0:
distance = str(_int_conv(distance))
elif 10.0 < distance < 100.0:
distance = _str_conv(round(distance, 1), rounded=1)
elif distance < 10.0:
distance = _str_conv(round(distance, 2), rounded=2)
else:
distance = _str_conv(distance, False)
else:
distance = ' '
# Extract CAZ
if arrival.azimuth is not None:
caz = _str_conv(int(arrival.azimuth))
else:
caz = ' '
# Extract finalweight
finalweight = ' '
if arrival.time_weight is not None:
finalweight = _str_conv(int(round(
arrival.time_weight * 10))).rjust(2)[0:2]
if azimuth != ' ':
if arrival.backazimuth_weight is not None:
finalweight = _str_conv(int(round(
arrival.backazimuth_weight * 10))).rjust(2)[0:2]
else:
(caz, distance, timeres, azimuthres, azimuth, finalweight,
ain) = (' ', ' ', ' ', ' ', ' ', ' ', ' ')
phase_hint = pick.phase_hint or ' '
# Extract amplitude: note there can be multiple amplitudes, but they
# should be associated with different picks.
amplitude = [amplitude for amplitude in event.amplitudes
if amplitude.pick_id == pick.resource_id]
if len(amplitude) > 0:
if len(amplitude) > 1:
msg = 'Nordic files need one pick for each amplitude, ' + \
'using the first amplitude only'
warnings.warn(msg)
amplitude = amplitude[0]
# Determine type of amplitude
if amplitude.type != 'END':
# Extract period
if amplitude.period is not None:
peri = amplitude.period
if peri < 10.0:
peri_round = 2
elif peri >= 10.0:
peri_round = 1
else:
peri_round = False
else:
peri = ' '
peri_round = False
# Extract amplitude and convert units
if amplitude.generic_amplitude is not None:
amp = amplitude.generic_amplitude
if amplitude.unit in ['m', 'm/s', 'm/(s*s)', 'm*s']:
amp *= 1e9
# Otherwise we will assume that the amplitude is in counts
else:
amp = None
coda = ' '
mag_hint = (amplitude.magnitude_hint or amplitude.type)
if mag_hint is not None and mag_hint.upper() in ['AML', 'ML']:
phase_hint = 'IAML'
impulsivity = ' '
else:
coda = str(int(amplitude.generic_amplitude))
peri = ' '
peri_round = False
amp = None
else:
peri = ' '
peri_round = False
amp = None
coda = ' '
eval_mode = INV_EVALUTATION_MAPPING.get(pick.evaluation_mode, None)
if eval_mode is None:
warnings.warn("Evaluation mode {0} is not mappable".format(
pick.evaluation_mode))
eval_mode = " "
# Generate a print string and attach it to the list
channel_code = pick.waveform_id.channel_code or ' '
if len(channel_code) == 1:
channel_code = ' ' + channel_code[-1]
if len(channel_code) == 2:
channel_code = channel_code[0] + ' ' + channel_code[-1]
network_code = pick.waveform_id.network_code or ' '
location_code = pick.waveform_id.location_code or ' '
# Seisan doesn't accept questions marks, need to replace with space
network_code = network_code.replace('?', ' ')
location_code = location_code.replace('?', ' ')
channel_code = channel_code.replace('?', ' ')
pick_hour = pick.time.hour
if pick.time.date > origin_date:
# pick hours up to 48 are supported
days_diff = (pick.time.date - origin_date).days
if days_diff > 1:
raise NordicParsingError(
"Pick is {0} days from the origin, must be < 48 "
"hours".format(days_diff))
pick_hour += 24
pick_seconds = pick.time.second + (pick.time.microsecond / 1e6)
# Differentiate based on Nordic format versions
if nordic_format == 'OLD':
if len(phase_hint) > 4:
# Weight goes in 9 and phase_hint runs through 11-18
if polarity != ' ':
UserWarning(
"Polarity not written due to phase hint length")
phase_info = (_str_conv(weight).rjust(1) + impulsivity
+ phase_hint.ljust(8))
else:
phase_info = (
' ' + impulsivity + phase_hint.ljust(4) +
_str_conv(weight).rjust(1) + eval_mode +
polarity.rjust(1) + ' ')
pick_string_formatter = (
" {station:5s}{instrument:1s}{component:1s}{phase_info:10s}"
"{hour:2d}{minute:2d}{seconds:>6s}{coda:5s}{amp:7s}{period:5s}"
"{azimuth:6s}{velocity:5s}{ain:4s}{azimuthres:3s}{timeres:5s}"
"{finalweight:2s}{distance:5s}{caz:4s} ")
# Note that pick seconds rounding only works because SEISAN does
# not enforce that seconds stay 0 <= seconds < 60, so rounding
# something like seconds = 59.997 to 2dp gets to 60.00, which
# SEISAN is happy with. It appears that SEISAN is happy with large
# numbers of seconds see #2348.
pick_strings.append(pick_string_formatter.format(
station=pick.waveform_id.station_code,
instrument=channel_code[0], component=channel_code[-1],
phase_info=phase_info, hour=pick_hour,
minute=pick.time.minute,
seconds=_str_conv(pick_seconds, rounded=pick_rounding),
coda=_str_conv(coda).rjust(5)[0:5],
amp=_str_conv(amp, rounded=1).rjust(7)[0:7],
period=_str_conv(peri, rounded=peri_round).rjust(5)[0:5],
azimuth=_str_conv(azimuth).rjust(6)[0:6],
velocity=_str_conv(velocity).rjust(5)[0:5],
ain=ain[:-2].rjust(4)[0:4],
azimuthres=_str_conv(azimuthres).rjust(3)[0:3],
timeres=_str_conv(timeres, rounded=2).rjust(5)[0:5],
finalweight=finalweight, distance=distance.rjust(5)[0:5],
caz=_str_conv(caz).rjust(4)[0:4]))
# Note that currently angle of incidence is not supported. This is
# because obspy.event stores takeoff angle, which would require
# computation from the value stored in seisan. Multiple weights
# are also not supported in Obspy.event
elif nordic_format == 'NEW':
# Define par1, par2, & residual depending on type of observation:
# Coda, backzimuth (add extra line), amplitude, or other phase pick
add_amp_line = False
is_amp_pick = False
add_baz_line = False
par1 = ' '
par2 = ' '
residual = ' '
# Phase pick
if polarity.strip() != '':
par1 = ' ' + polarity
if hasattr(arrival, 'time_residual'):
if arrival.time_residual is not None:
residual = _str_conv(arrival.time_residual, rounded=2
).rjust(5)[0:5]
# Coda
if coda.strip() != '':
par1 = _str_conv(coda).rjust(7)[0:7] # coda duration
phase_hint = 'END'
impulsivity = ''
# Back Azimuth
elif azimuth.strip() != '': # back-azimuth
add_baz_line = True
if len(phase_hint) <= 4: # max total phase name length is 8
baz_phase_hint = 'BAZ-' + phase_hint
else:
baz_phase_hint = phase_hint
baz_par1 = _str_conv(azimuth, rounded=1).rjust(7)[0:7]
baz_par2 = _str_conv(
velocity, rounded=2).rjust(6)[0:5].rjust(6)
baz_residual = ' '
baz_finalweight = ' '
if arrival.backazimuth_residual is not None:
baz_residual = _str_conv(
arrival.backazimuth_residual, rounded=1).rjust(5)[0:5]
if arrival.backazimuth_weight is not None:
baz_finalweight = _str_conv(
arrival.backazimuth_weight*10, rounded=0).rjust[2][0:2]
# Amplitude
elif amp is not None:
add_amp_line = True
# check if the amplitude and pick reference the same pick-type
# - then don't write the amplitude-pick AND the amplitude
if (pick.phase_hint and
(pick.phase_hint == amplitude.type or
pick.phase_hint[1:] == amplitude.type)
and _is_iasp_ampl_phase(pick.phase_hint)):
is_amp_pick = True
mag_hint = (amplitude.magnitude_hint or amplitude.type)
if mag_hint is not None and mag_hint.upper() in ['AML', 'ML']:
amp_phase_hint = 'IAML'
else:
amp_phase_hint = 'A'
amp_eval_mode = ' ' or INV_EVALUTATION_MAPPING.get(
amplitude.evaluation_mode, None)
amp_finalweight = ' '
amp_par1 = _str_conv(amp, rounded=1).rjust(7)[0:7]
amp_par2 = _str_conv(peri, rounded=peri_round).rjust(6)[0:6]
# Get StationMagnitude that corresponds to the amplitude to
# print magnitude residual
tr_mag = [
sta_mag for sta_mag in event.station_magnitudes
if (sta_mag.amplitude_id == amplitude.resource_id
and sta_mag.creation_info.agency_id
== pick.creation_info.agency_id
and sta_mag.station_magnitude_type
== amplitude.magnitude_hint)]
amp_residual = ' '
if len(tr_mag) > 0:
if len(tr_mag) > 1:
msg = 'Nordic files need one trace-amplitude for ' + \
'each trace / station-magnitude only.'
warnings.warn(msg)
mag_residual = tr_mag[0].mag_errors.uncertainty
amp_residual = _str_conv(mag_residual, rounded=2
).rjust(5)[0:5]
agency = ' '
author = ' '
if pick.creation_info is not None:
agency = _get_agency_id(pick)
if pick.creation_info.author is not None:
author = (pick.creation_info.author.ljust(3)[0:3] or ' ')
pick_string_formatter = (
" {station:5s}{channel:3s} {network:2s}{location:2s} "
"{impulsivity:1s}{phase_hint:8s}{weight:1s}{eval_mode:1s}"
"{hour:2d}{minute:2d} {seconds:6s}"
"{par1:7s}{par2:6s} {agency:3s} {author:3s}"
"{ain:5s}{residual:5s}{finalweight:2s}"
"{distance:5s} {caz:3s} ")
if not is_amp_pick:
pick_strings.append(pick_string_formatter.format(
station=pick.waveform_id.station_code,
channel=channel_code, network=network_code,
location=location_code, impulsivity=impulsivity,
phase_hint=phase_hint, weight=_str_conv(weight).rjust(1),
eval_mode=eval_mode, hour=pick_hour,
minute=pick.time.minute,
seconds=_str_conv(pick_seconds, rounded=3).rjust(6),
par1=par1, par2=par2, agency=agency, author=author,
ain=ain.rjust(5)[0:5], residual=residual,
finalweight=finalweight,
distance=distance.rjust(5)[0:5],
caz=_str_conv(caz).rjust(3)[0:3]))
if add_amp_line:
pick_strings.append(pick_string_formatter.format(
station=pick.waveform_id.station_code,
channel=channel_code, network=network_code,
location=location_code, impulsivity=' ',
phase_hint=amp_phase_hint.ljust(8)[0:8], weight=' ',
eval_mode=amp_eval_mode, hour=pick_hour,
minute=pick.time.minute,
seconds=_str_conv(pick_seconds, rounded=3).rjust(6),
par1=amp_par1, par2=amp_par2, agency=agency, author=author,
ain=' ', residual=amp_residual,
finalweight=amp_finalweight,
distance=distance.rjust(5)[0:5],
caz=_str_conv(caz).rjust(3)[0:3]))
if add_baz_line:
pick_strings.append(pick_string_formatter.format(
station=pick.waveform_id.station_code,
channel=channel_code, network=network_code,
location=location_code, impulsivity=' ',
phase_hint=baz_phase_hint,
weight=_str_conv(weight).rjust(1), eval_mode=eval_mode,
hour=pick_hour, minute=pick.time.minute,
seconds=_str_conv(pick_seconds, rounded=3).rjust(6),
par1=baz_par1, par2=baz_par2, agency=agency, author=author,
ain=' ', residual=baz_residual,
finalweight=baz_finalweight,
distance=distance.rjust(5)[0:5],
caz=_str_conv(caz).rjust(3)[0:3]))
else:
raise ValueError('Nordic format can be ''OLD'' or ''NEW'', not '
+ nordic_format)
return pick_strings
if __name__ == "__main__":
import doctest
doctest.testmod()