Source code for obspy.io.rg16.core
"""
Receiver Gather (version 1.6-1) bindings to ObsPy core module.
"""
from collections import namedtuple
import numpy as np
from obspy.core import Stream, Trace, Stats, UTCDateTime
from obspy.io.rg16.util import _read, _open_file, _quick_merge
HeaderCount = namedtuple('HeaderCount', 'channel_set extended external')
[docs]
@_open_file
def _read_rg16(filename, headonly=False, starttime=None, endtime=None,
merge=False, contacts_north=False, details=False, **kwargs):
"""
Read Fairfield Nodal's Receiver Gather File Format version 1.6-1.
.. warning::
This function should NOT be called directly, it registers via the
ObsPy :func:`~obspy.core.stream.read` function, call this instead.
:param filename: path to the rg16 file or a file object.
:type filename: str or file-like object
:param headonly: If True don't read data, only main information
contained in the headers of the trace block is read.
:type headonly: optional, bool
:param starttime: If not None dont read traces that start before starttime.
:type starttime: optional, :class:`~obspy.core.utcdatetime.UTCDateTime`
:param endtime: If not None dont read traces that start after endtime.
:type endtime: optional, :class:`~obspy.core.utcdatetime.UTCDateTime`
:param merge: If True merge contiguous data blocks as they are found. For
continuous data files having 100,000+ traces this will create
more manageable streams.
:type merge: bool
:param contacts_north: If this parameter is set to True, it will map the
components to Z (1C, 3C), N (3C), and E (3C) as well as correct
the polarity for the vertical component.
:type contacts_north: bool
:param details: If True, all the information contained in the headers
is read).
:type details: bool
:return: An ObsPy :class:`~obspy.core.stream.Stream` object.
Frequencies are expressed in hertz and time is expressed in second
(except for date).
"""
starttime = starttime or UTCDateTime(1970, 1, 1)
endtime = endtime or UTCDateTime()
# get the number of headers/records, position of trace data
# and record length.
header_count = _cmp_nbr_headers(filename)
record_count = _cmp_nbr_records(filename)
trace_block_start = 32 * (2 + sum(header_count))
record_length = _cmp_record_length(filename)
# create trace data
traces = []
for i in range(0, record_count):
nbr_bytes_trace_block = _cmp_jump(filename, trace_block_start)
trace_starttime = _read(filename, trace_block_start + 20 + 2 * 32, 8,
'binary') / 1e6
trace_endtime = trace_starttime + record_length
con1 = starttime.timestamp > trace_endtime
con2 = endtime.timestamp < trace_starttime
# determine if this block is excluded based in starttime/endtime params
if con1 or con2:
# the block has to be excluded, increment trace_block_start
# and continue
trace_block_start += nbr_bytes_trace_block
continue
trace = _make_trace(filename, trace_block_start, headonly,
contacts_north, details)
traces.append(trace)
trace_block_start += nbr_bytes_trace_block
if merge:
traces = _quick_merge(traces)
return Stream(traces=traces)
[docs]
def _cmp_nbr_headers(fi):
"""
Return a tuple containing the number of channel set headers,
the number of extended headers and the number of external headers
in the file.
"""
header_count = HeaderCount(
channel_set=_read(fi, 28, 1, 'bcd'),
extended=_read(fi, 37, 2, 'binary'),
external=_read(fi, 39, 3, 'binary'),
)
return header_count
[docs]
def _cmp_nbr_records(fi):
"""
Return the number of records in the file (ie number of time slices
multiplied by the number of components).
"""
initial_header = _read_initial_headers(fi)
channel_sets_descriptor = initial_header['channel_sets_descriptor']
channels_number = set()
for _, val in channel_sets_descriptor.items():
chan_num = val['RU_channel_number'] or val['channel_set_number']
channels_number.add(chan_num)
nbr_component = len(channels_number)
extended_header_2 = initial_header['extended_headers']['2']
nbr_time_slices = extended_header_2['nbr_time_slices']
nbr_records = nbr_time_slices * nbr_component
return nbr_records
[docs]
def _cmp_record_length(fi):
"""
Return the record length.
"""
base_scan_interval = _read(fi, 22, 1, 'binary')
sampling_rate = int(1000 / (base_scan_interval / 16))
gen_head_2 = _read_initial_headers(fi)['general_header_2']
record_length = gen_head_2['extended_record_length'] - 1 / sampling_rate
return record_length
[docs]
def _cmp_jump(fi, trace_block_start):
"""
Return the number of bytes in a trace block.
"""
nbr_trace_extension_block = _read(fi, trace_block_start + 9, 1, 'binary')
nbr_bytes_header_trace = 20 + 32 * nbr_trace_extension_block
nbr_sample_trace = _read(fi, trace_block_start + 27, 3, 'binary')
nbr_bytes_trace_data = nbr_sample_trace * 4
nbr_bytes_trace_block = nbr_bytes_trace_data + nbr_bytes_header_trace
return nbr_bytes_trace_block
[docs]
def _make_trace(fi, trace_block_start, headonly, standard_orientation,
details):
"""
Make obspy trace from a trace block (header + trace).
"""
stats = _make_stats(fi, trace_block_start, standard_orientation, details)
if headonly:
data = np.array([])
else: # read trace
nbr_trace_extension_block = _read(fi, trace_block_start + 9,
1, 'binary')
trace_start = trace_block_start + 20 + nbr_trace_extension_block * 32
nbr_sample_trace = _read(fi, trace_block_start + 27, 3, 'binary')
nbr_bytes_trace = 4 * nbr_sample_trace
data = _read(fi, trace_start, nbr_bytes_trace, 'IEEE')
if stats.channel[-1] == 'Z':
data = -data
data = data.astype('>f4')
return Trace(data=data, header=stats)
[docs]
def _make_stats(fi, tr_block_start, standard_orientation, details):
"""
Make Stats object from information contained in the header of the trace.
"""
base_scan_interval = _read(fi, 22, 1, 'binary')
sampling_rate = int(1000 / (base_scan_interval / 16))
# map sampling rate to band code according to seed standard
band_map = {2000: 'G', 1000: 'G', 500: 'D', 250: 'D'}
# geophone instrument code
instrument_code = 'P'
# mapping for "standard_orientation"
standard_component_map = {'2': 'Z', '3': 'N', '4': 'E'}
component = str(_read(fi, tr_block_start + 40, 1, 'binary'))
if standard_orientation:
component = standard_component_map[component]
chan = band_map[sampling_rate] + instrument_code + component
npts = _read(fi, tr_block_start + 27, 3, 'binary')
start_time = _read(fi, tr_block_start + 20 + 2 * 32, 8, 'binary') / 1e6
end_time = start_time + (npts - 1) * (1 / sampling_rate)
network = _read(fi, tr_block_start + 20, 3, 'binary')
station = _read(fi, tr_block_start + 23, 3, 'binary')
location = _read(fi, tr_block_start + 26, 1, 'binary')
statsdict = dict(starttime=UTCDateTime(start_time),
endtime=UTCDateTime(end_time),
sampling_rate=sampling_rate,
npts=npts,
network=str(network),
station=str(station),
location=str(location),
channel=chan)
if details:
statsdict['rg16'] = {}
statsdict['rg16']['initial_headers'] = {}
stats_initial_headers = statsdict['rg16']['initial_headers']
stats_initial_headers.update(_read_initial_headers(fi))
statsdict['rg16']['trace_headers'] = {}
stats_tr_headers = statsdict['rg16']['trace_headers']
stats_tr_headers.update(_read_trace_header(fi, tr_block_start))
nbr_tr_header_block = _read(fi, tr_block_start + 9,
1, 'binary')
if nbr_tr_header_block > 0:
stats_tr_headers.update(
_read_trace_headers(fi, tr_block_start, nbr_tr_header_block))
return Stats(statsdict)
[docs]
def _read_trace_headers(fi, trace_block_start, nbr_trace_header):
"""
Read headers in the trace block.
"""
trace_headers = {}
dict_func = {'1': _read_trace_header_1, '2': _read_trace_header_2,
'3': _read_trace_header_3, '4': _read_trace_header_4,
'5': _read_trace_header_5, '6': _read_trace_header_6,
'7': _read_trace_header_7, '8': _read_trace_header_8,
'9': _read_trace_header_9, '10': _read_trace_header_10}
for i in range(1, nbr_trace_header + 1):
trace_headers.update(dict_func[str(i)](fi, trace_block_start))
return trace_headers
[docs]
def _read_trace_header(fi, trace_block_start):
"""
Read the 20 bytes trace header (first header in the trace block).
"""
trace_number = _read(fi, trace_block_start + 4, 2, 'bcd')
trace_edit_code = _read(fi, trace_block_start + 11, 1, 'binary')
return {'trace_number': trace_number, 'trace_edit_code': trace_edit_code}
[docs]
def _read_trace_header_1(fi, trace_block_start):
"""
Read trace header 1
"""
pos = trace_block_start + 20
dict_header_1 = dict(
extended_receiver_line_nbr=_read(fi, pos + 10, 5, 'binary'),
extended_receiver_point_nbr=_read(fi, pos + 15, 5, 'binary'),
sensor_type=_read(fi, pos + 20, 1, 'binary'),
trace_count_file=_read(fi, pos + 21, 4, 'binary'),
)
return dict_header_1
[docs]
def _read_trace_header_2(fi, trace_block_start):
"""
Read trace header 2
"""
pos = trace_block_start + 20 + 32
leg_source_info = {'0': 'undefined', '1': 'preplan', '2': 'as shot',
'3': 'post processed'}
source_key = str(_read(fi, pos + 29, 1, 'binary'))
leg_energy_source = {'0': 'undefined', '1': 'vibroseis', '2': 'dynamite',
'3': 'air gun'}
energy_source_key = str(_read(fi, pos + 30, 1, 'binary'))
dict_header_2 = dict(
shot_line_nbr=_read(fi, pos, 4, 'binary'),
shot_point=_read(fi, pos + 4, 4, 'binary'),
shot_point_index=_read(fi, pos + 8, 1, 'binary'),
shot_point_pre_plan_x=_read(fi, pos + 9, 4, 'binary') / 10,
shot_point_pre_plan_y=_read(fi, pos + 13, 4, 'binary') / 10,
shot_point_final_x=_read(fi, pos + 17, 4, 'binary') / 10,
shot_point_final_y=_read(fi, pos + 21, 4, 'binary') / 10,
shot_point_final_depth=_read(fi, pos + 25, 4, 'binary') / 10,
source_of_final_shot_info=leg_source_info[source_key],
energy_source_type=leg_energy_source[energy_source_key],
)
return dict_header_2
[docs]
def _read_trace_header_3(fi, trace_block_start):
"""
Read trace header 3
"""
pos = trace_block_start + 20 + 32 * 2
dict_header_3 = dict(
epoch_time=UTCDateTime(_read(fi, pos, 8, 'binary') / 1e6),
# shot skew time in second
shot_skew_time=_read(fi, pos + 8, 8, 'binary') / 1e6,
# time shift clock correction in second
time_shift_clock_correction=_read(fi, pos + 16, 8, 'binary') / 1e9,
# remaining clock correction in second
remaining_clock_correction=_read(fi, pos + 24, 8, 'binary') / 1e9,
)
return dict_header_3
[docs]
def _read_trace_header_4(fi, trace_block_start):
"""
Read trace header 4
"""
pos = trace_block_start + 20 + 32 * 3
leg_trace_clipped = {'0': 'not clipped', '1': 'digital clip detected',
'2': 'analog clip detected'}
clipped_code = str(_read(fi, pos + 9, 1, 'binary'))
leg_record_type = {'2': 'test data record',
'8': 'normal seismic data record'}
record_type_code = str(_read(fi, pos + 10, 1, 'binary'))
leg_shot_flag = {'0': 'normal', '1': 'bad-operator specified',
'2': 'bad-failed to QC test'}
shot_code = str(_read(fi, pos + 11, 1, 'binary'))
dict_header_4 = dict(
# pre shot guard band in second
pre_shot_guard_band=_read(fi, pos, 4, 'binary') / 1e3,
# post shot guard band in second
post_shot_guard_band=_read(fi, pos + 4, 4, 'binary') / 1e3,
# preamp gain in dB
preamp_gain=_read(fi, pos + 8, 1, 'binary'),
trace_clipped_flag=leg_trace_clipped[clipped_code],
record_type_code=leg_record_type[record_type_code],
shot_status_flag=leg_shot_flag[shot_code],
external_shot_id=_read(fi, pos + 12, 4, 'binary'),
post_processed_first_break_pick_time=_read(fi, pos + 24, 4, 'IEEE'),
post_processed_rms_noise=_read(fi, pos + 28, 4, 'IEEE'),
)
return dict_header_4
[docs]
def _read_trace_header_5(fi, trace_block_start):
"""
Read trace header 5
"""
pos = trace_block_start + 20 + 32 * 4
leg_source_receiver_info = {
'1': 'preplan',
'2': 'as laid (no navigation sensor)',
'3': 'as laid (HiPAP only)',
'4': 'as laid (HiPAP and INS)',
'5': 'as laid (HiPAP and DVL)',
'6': 'as laid (HiPAP, DVL and INS)',
'7': 'post processed (HiPAP only)',
'8': 'post processed (HiPAP and INS)',
'9': 'post processed (HiPAP and DVL)',
'10': 'post processed (HiPAP, DVL ans INS)',
'11': 'first break analysis',
}
source_key = str(_read(fi, pos + 29, 1, 'binary'))
dict_header_5 = dict(
receiver_point_pre_plan_x=_read(fi, pos + 9, 4, 'binary') / 10,
receiver_point_pre_plan_y=_read(fi, pos + 13, 4, 'binary') / 10,
receiver_point_final_x=_read(fi, pos + 17, 4, 'binary') / 10,
receiver_point_final_y=_read(fi, pos + 21, 4, 'binary') / 10,
receiver_point_final_depth=_read(fi, pos + 25, 4, 'binary') / 10,
source_of_final_receiver_info=leg_source_receiver_info[source_key],
)
return dict_header_5
[docs]
def _read_trace_header_6(fi, trace_block_start):
"""
Read trace header 6
"""
pos = trace_block_start + 20 + 32 * 5
dict_header_6 = dict(
tilt_matrix_h1x=_read(fi, pos, 4, 'IEEE'),
tilt_matrix_h2x=_read(fi, pos + 4, 4, 'IEEE'),
tilt_matrix_vx=_read(fi, pos + 8, 4, 'IEEE'),
tilt_matrix_h1y=_read(fi, pos + 12, 4, 'IEEE'),
tilt_matrix_h2y=_read(fi, pos + 16, 4, 'IEEE'),
tilt_matrix_vy=_read(fi, pos + 20, 4, 'IEEE'),
tilt_matrix_h1z=_read(fi, pos + 24, 4, 'IEEE'),
tilt_matrix_h2z=_read(fi, pos + 28, 4, 'IEEE'),
)
return dict_header_6
[docs]
def _read_trace_header_7(fi, trace_block_start):
"""
Read trace header 7
"""
pos = trace_block_start + 20 + 32 * 6
dict_header_7 = dict(
tilt_matrix_vz=_read(fi, pos, 4, 'IEEE'),
azimuth_degree=_read(fi, pos + 4, 4, 'IEEE'),
pitch_degree=_read(fi, pos + 8, 4, 'IEEE'),
roll_degree=_read(fi, pos + 12, 4, 'IEEE'),
remote_unit_temp=_read(fi, pos + 16, 4, 'IEEE'),
remote_unit_humidity=_read(fi, pos + 20, 4, 'IEEE'),
orientation_matrix_version_nbr=_read(fi, pos + 24, 4, 'binary'),
gimbal_corrections=_read(fi, pos + 28, 1, 'binary'))
return dict_header_7
[docs]
def _read_trace_header_8(fi, trace_block_start):
"""
Read trace header 8
"""
pos = trace_block_start + 20 + 32 * 7
leg_preamp_path = {
'0': 'external input selected',
'1': 'simulated data selected',
'2': 'pre-amp input shorted to ground',
'3': 'test oscillator with sensors',
'4': 'test oscillator without sensors',
'5': 'common mode test oscillator with sensors',
'6': 'common mode test oscillator without sensors',
'7': 'test oscillator on positive sensors with neg sensor grounded',
'8': 'test oscillator on negative sensors with pos sensor grounded',
'9': 'test oscillator on positive PA input with neg PA input ground',
'10': 'test oscillator on negative PA input with pos PA input ground',
'11': 'test oscillator on positive PA input with neg\
PA input ground, no sensors',
'12': 'test oscillator on negative PA input with pos\
PA input ground, no sensors'}
preamp_path_code = str(_read(fi, pos + 24, 4, 'binary'))
leg_test_oscillator = {'0': 'test oscillator path open',
'1': 'test signal selected',
'2': 'DC reference selected',
'3': 'test oscillator path grounded',
'4': 'DC reference toggle selected'}
oscillator_code = str(_read(fi, pos + 28, 4, 'binary'))
dict_header_8 = dict(
fairfield_test_analysis_code=_read(fi, pos, 4, 'binary'),
first_test_oscillator_attenuation=_read(fi, pos + 4, 4, 'binary'),
second_test_oscillator_attenuation=_read(fi, pos + 8, 4, 'binary'),
# start delay in second
start_delay=_read(fi, pos + 12, 4, 'binary') / 1e6,
dc_filter_flag=_read(fi, pos + 16, 4, 'binary'),
dc_filter_frequency=_read(fi, pos + 20, 4, 'IEEE'),
preamp_path=leg_preamp_path[preamp_path_code],
test_oscillator_signal_type=leg_test_oscillator[oscillator_code],
)
return dict_header_8
[docs]
def _read_trace_header_9(fi, trace_block_start):
"""
Read trace header 9
"""
pos = trace_block_start + 20 + 32 * 8
leg_signal_type = {'0': 'pattern is address ramp',
'1': 'pattern is RU address ramp',
'2': 'pattern is built from provided values',
'3': 'pattern is random numbers',
'4': 'pattern is a walking 1s',
'5': 'pattern is a walking 0s',
'6': 'test signal is a specified DC value',
'7': 'test signal is a pulse train with\
specified duty cycle',
'8': 'test signal is a sine wave',
'9': 'test signal is a dual tone sine',
'10': 'test signal is an impulse',
'11': 'test signal is a step function'}
type_code = str(_read(fi, pos, 4, 'binary'))
# test signal generator frequency 1 in hertz
test_signal_freq_1 = _read(fi, pos + 4, 4, 'binary') / 1e3
# test signal generator frequency 2 in hertz
test_signal_freq_2 = _read(fi, pos + 8, 4, 'binary') / 1e3
# test signal generator amplitude 1 in dB down from full scale -120 to 120
test_signal_amp_1 = _read(fi, pos + 12, 4, 'binary')
# test signal generator amplitude 2 in dB down from full scale -120 to 120
test_signal_amp_2 = _read(fi, pos + 16, 4, 'binary')
# test signal generator duty cycle in percentage
duty_cycle = _read(fi, pos + 20, 4, 'IEEE')
# test signal generator active duration in second
active_duration = _read(fi, pos + 24, 4, 'binary') / 1e6
# test signal generator activation time in second
activation_time = _read(fi, pos + 28, 4, 'binary') / 1e6
dict_header_9 = dict(
test_signal_generator_signal_type=leg_signal_type[type_code],
test_signal_generator_frequency_1=test_signal_freq_1,
test_signal_generator_frequency_2=test_signal_freq_2,
test_signal_generator_amplitude_1=test_signal_amp_1,
test_signal_generator_amplitude_2=test_signal_amp_2,
test_signal_generator_duty_cycle_percentage=duty_cycle,
test_signal_generator_active_duration=active_duration,
test_signal_generator_activation_time=activation_time,
)
return dict_header_9
[docs]
def _read_trace_header_10(fi, trace_block_start):
"""
Read trace header 10
"""
pos = trace_block_start + 20 + 32 * 9
dict_header_10 = dict(
test_signal_generator_idle_level=_read(fi, pos, 4, 'binary'),
test_signal_generator_active_level=_read(fi, pos + 4, 4, 'binary'),
test_signal_generator_pattern_1=_read(fi, pos + 8, 4, 'binary'),
test_signal_generator_pattern_2=_read(fi, pos + 12, 4, 'binary'),
)
return dict_header_10
[docs]
@_open_file
def _is_rg16(filename, **kwargs):
"""
Determine if a file is a rg16 file.
:param filename: a path to a file or a file object
:type filename: str or file-like object
:rtype: bool
:return: True if the file object is a rg16 file.
"""
try:
sample_format = _read(filename, 2, 2, 'bcd')
sample_format = _read(filename, 2, 2, 'bcd')
manufacturer_code = _read(filename, 16, 1, 'bcd')
version = _read(filename, 42, 2, 'binary')
except ValueError: # if file too small
return False
con1 = version == 262 and sample_format == 8058
return con1 and manufacturer_code == 20
[docs]
@_open_file
def _read_initial_headers(filename):
"""
Extract all the information contained in the headers located before data,
at the beginning of the rg16 file object.
:param filename: a path to a rg16 file or a rg16 file object.
:type filename: str or file-like object
:return: a dictionary containing all the information of the initial headers
Frequencies are expressed in hertz and time is expressed in second (except
for the date).
"""
headers_content = dict(
general_header_1=_read_general_header_1(filename),
general_header_2=_read_general_header_2(filename),
channel_sets_descriptor=_read_channel_sets(filename),
extended_headers=_read_extended_headers(filename),
)
return headers_content
[docs]
def _read_general_header_1(fi):
"""
Extract information contained in the general header block 1
"""
gen_head_1 = dict(
file_number=_read(fi, 0, 2, 'bcd'),
sample_format_code=_read(fi, 2, 2, 'bcd'),
general_constant=_read(fi, 4, 6, 'bcd'),
time_slice_year=_read(fi, 10, 1, 'bcd'),
nbr_add_general_header=_read(fi, 11, 0.5, 'bcd'),
julian_day=_read(fi, 11, 1.5, 'bcd', False),
time_slice=_read(fi, 13, 3, 'bcd'),
manufacturer_code=_read(fi, 16, 1, 'bcd'),
manufacturer_serial_number=_read(fi, 17, 2, 'bcd'),
base_scan_interval=_read(fi, 22, 1, 'binary'),
polarity_code=_read(fi, 23, 0.5, 'binary'),
record_type=_read(fi, 25, 0.5, 'binary'),
scan_type_per_record=_read(fi, 27, 1, 'bcd'),
nbr_channel_set=_read(fi, 28, 1, 'bcd'),
nbr_skew_block=_read(fi, 29, 1, 'bcd'),
)
return gen_head_1
[docs]
def _read_general_header_2(fi):
"""
Extract information contained in the general header block 2
"""
gen_head_2 = dict(
extended_file_number=_read(fi, 32, 3, 'binary'),
extended_channel_sets_per_scan_type=_read(fi, 35, 2, 'binary'),
extended_header_blocks=_read(fi, 37, 2, 'binary'),
external_header_blocks=_read(fi, 39, 3, 'binary'),
version_number=_read(fi, 42, 2, 'binary'),
# extended record length in second
extended_record_length=_read(fi, 46, 3, 'binary') / 1e3,
general_header_block_number=_read(fi, 50, 1, 'binary'),
)
return gen_head_2
[docs]
def _read_channel_sets(fi):
"""
Extract information of all channel set descriptor blocks.
"""
channel_sets = {}
nbr_channel_set = _read(fi, 28, 1, 'bcd')
start_byte = 64
for i in range(0, nbr_channel_set):
channel_set_name = str(i + 1)
channel_sets[channel_set_name] = _read_channel_set(fi, start_byte)
start_byte += 32
return channel_sets
[docs]
def _read_channel_set(fi, start_byte):
"""
Extract information contained in the ith channel set descriptor.
"""
nbr_32_ext = _read(fi, start_byte + 28, 0.5, 'binary', False)
# first read alias freq. This can be written as BCD or int32
try:
alias_filter_freq = _read(fi, start_byte + 12, 2, 'bcd')
except ValueError:
alias_filter_freq = _read(fi, start_byte + 12, 2, '>i2')
channel_set = dict(
scan_type_number=_read(fi, start_byte, 1, 'bcd'),
channel_set_number=_read(fi, start_byte + 1, 1, 'bcd'),
channel_set_start_time=_read(fi, start_byte + 2, 2, 'binary') * 2e-3,
channel_set_end_time=_read(fi, start_byte + 4, 2, 'binary') * 2e-3,
optionnal_MP_factor=_read(fi, start_byte + 6, 1, 'binary'),
mp_factor_descaler_multiplier=_read(fi, start_byte + 7, 1, 'binary'),
nbr_channels_in_channel_set=_read(fi, start_byte + 8, 2, 'bcd'),
channel_type_code=_read(fi, start_byte + 10, 0.5, 'binary'),
nbr_sub_scans=_read(fi, start_byte + 11, 0.5, 'bcd'),
gain_control_type=_read(fi, start_byte + 11, 0.5, 'bcd', False),
alias_filter_frequency=alias_filter_freq,
alias_filter_slope=_read(fi, start_byte + 14, 2, 'bcd'),
low_cut_filter_freq=_read(fi, start_byte + 16, 2, 'bcd'),
low_cut_filter_slope=_read(fi, start_byte + 18, 2, 'bcd'),
notch_filter_freq=_read(fi, start_byte + 20, 2, 'bcd') / 10,
notch_2_filter_freq=_read(fi, start_byte + 22, 2, 'bcd') / 10,
notch_3_filter_freq=_read(fi, start_byte + 24, 2, 'bcd') / 10,
extended_channel_set_number=_read(fi, start_byte + 26, 2, 'binary'),
extended_header_flag=_read(fi, start_byte + 28, 0.5, 'binary'),
nbr_32_byte_trace_header_extension=nbr_32_ext,
vertical_stack_size=_read(fi, start_byte + 29, 1, 'binary'),
RU_channel_number=_read(fi, start_byte + 30, 1, 'binary'),
array_forming=_read(fi, start_byte + 31, 1, 'binary'),
)
return channel_set
[docs]
def _read_extended_headers(fi):
"""
Extract information from the extended headers.
"""
extended_headers = {}
nbr_channel_set = _read(fi, 28, 1, 'bcd')
start_byte = 32 + 32 + 32 * nbr_channel_set
extended_headers['1'] = _read_extended_header_1(fi, start_byte)
start_byte += 32
extended_headers['2'] = _read_extended_header_2(fi, start_byte)
start_byte += 32
extended_headers['3'] = _read_extended_header_3(fi, start_byte)
nbr_extended_headers = _read(fi, 37, 2, 'binary', True)
if nbr_extended_headers > 3:
coeffs = extended_headers['2']['number_decimation_filter_coefficient']
nbr_coeff_remain = coeffs % 8
for i in range(3, nbr_extended_headers):
start_byte += 32
extended_header_name = str(i + 1)
if i == nbr_extended_headers - 1:
header = _read_extended_header(fi, start_byte, i + 1,
nbr_coeff_remain)
extended_headers[extended_header_name] = header
else:
header = _read_extended_header(fi, start_byte, i + 1, 8)
extended_headers[extended_header_name] = header
return extended_headers
[docs]
def _read_extended_header_1(fi, start_byte):
"""
Extract information contained in the extended header block number 1.
"""
deployment_time = _read(fi, start_byte + 8, 8, 'binary') / 1e6
pick_up_time = _read(fi, start_byte + 16, 8, 'binary') / 1e6
start_time_ru = _read(fi, start_byte + 24, 8, 'binary') / 1e6
extended_header_1 = dict(
id_ru=_read(fi, start_byte, 8, 'binary'),
deployment_time=UTCDateTime(deployment_time),
pick_up_time=UTCDateTime(pick_up_time),
start_time_ru=UTCDateTime(start_time_ru),
)
return extended_header_1
[docs]
def _read_extended_header_2(fi, start_byte):
"""
Extract information contained in the extended header block number 2.
"""
# code mappings to meaning
leg_clock_stop = {'0': 'normal', '1': 'storage full', '2': 'power loss',
'3': 'reboot'}
stop_code = str(_read(fi, start_byte + 12, 1, 'binary'))
leg_freq_drift = {'0': 'not within specification',
'1': 'within specification'}
drift_code = str(_read(fi, start_byte + 13, 1, 'binary'))
leg_oscillator_type = {'0': 'control board', '1': 'atomic',
'2': 'ovenized', '3': 'double ovenized',
'4': 'disciplined'}
oscillator_code = str(_read(fi, start_byte + 14, 1, 'binary'))
leg_data_collection = {'0': 'normal', '1': 'continuous',
'2': 'shot sliced with guard band'}
data_collection_code = str(_read(fi, start_byte + 15, 1, 'binary'))
leg_data_decimation = {'0': 'not decimated', '1': 'decimated data'}
decimation_code = str(_read(fi, start_byte + 28, 1, 'binary'))
extended_header_2 = dict(
acquisition_drift_window=_read(fi, start_byte, 4, 'IEEE') * 1e-6,
clock_drift=_read(fi, start_byte + 4, 8, 'binary') * 1e-9,
clock_stop_method=leg_clock_stop[stop_code],
frequency_drift=leg_freq_drift[drift_code],
oscillator_type=leg_oscillator_type[oscillator_code],
data_collection_method=leg_data_collection[data_collection_code],
nbr_time_slices=_read(fi, start_byte + 16, 4, 'binary'),
nbr_files=_read(fi, start_byte + 20, 4, 'binary'),
file_number=_read(fi, start_byte + 24, 4, 'binary'),
data_decimation=leg_data_decimation[decimation_code],
original_base_scan_interval=_read(fi, start_byte + 29, 1, 'binary'),
nbr_decimation_filter_coef=_read(fi, start_byte + 30, 2, 'binary'),
)
return extended_header_2
[docs]
def _read_extended_header_3(fi, start_byte):
"""
Extract information contained in the extended header block number 3.
"""
extended_header_3 = dict(
receiver_line_number=_read(fi, start_byte, 4, 'binary'),
receiver_point=_read(fi, start_byte + 4, 4, 'binary'),
receiver_point_index=_read(fi, start_byte + 8, 1, 'binary'),
first_shot_line=_read(fi, start_byte + 9, 4, 'binary'),
first_shot_point=_read(fi, start_byte + 13, 4, 'binary'),
first_shot_point_index=_read(fi, start_byte + 17, 1, 'binary'),
last_shot_line=_read(fi, start_byte + 18, 4, 'binary'),
last_shot_point=_read(fi, start_byte + 22, 4, 'binary'),
last_shot_point_index=_read(fi, start_byte + 26, 1, 'binary'),
)
return extended_header_3
[docs]
def _read_extended_header(fi, start_byte, block_number, nbr_coeff):
"""
Extract information contained in the ith extended header block (i>3).
"""
extended_header = {}
for i in range(0, nbr_coeff):
key = 'coeff_' + str(i + 1)
extended_header[key] = _read(fi, start_byte, 4, 'IEEE')
start_byte += 4
return extended_header
if __name__ == '__main__':
import doctest
doctest.testmod(exclude_empty=True)