Source code for obspy.io.segy.unpack

# -*- coding: utf-8 -*-
# ------------------------------------------------------------------
#  Filename: unpack.py
#  Purpose: Routines for unpacking SEG Y data formats.
#   Author: Lion Krischer
#    Email: krischer@geophysik.uni-muenchen.de
#
# Copyright (C) 2010 Lion Krischer
# --------------------------------------------------------------------
"""
Functions that will all take a file pointer and the sample count and return a
NumPy array with the unpacked values.
"""
import ctypes as C  # NOQA
from pathlib import Path
import sys
import warnings

import numpy as np

from obspy.core.compatibility import from_buffer
from .util import clibsegy


# Get the system byte order.
BYTEORDER = sys.byteorder
if BYTEORDER == 'little':
    BYTEORDER = '<'
else:
    BYTEORDER = '>'


clibsegy.ibm2ieee.argtypes = [
    np.ctypeslib.ndpointer(dtype=np.float32, ndim=1,
                           flags='C_CONTIGUOUS'),
    C.c_int]
clibsegy.ibm2ieee.restype = C.c_void_p


[docs]def unpack_4byte_ibm(file, count, endian='>'): """ Unpacks 4 byte IBM floating points. """ # Read as 4 byte integer so bit shifting works. data = from_buffer(file.read(count * 4), dtype=np.float32) # Swap the byte order if necessary. if BYTEORDER != endian: data = data.byteswap() length = len(data) # Call the C code which transforms the data inplace. clibsegy.ibm2ieee(data, length) return data
# Old pure Python/NumPy code # # def unpack_4byte_ibm(file, count, endian='>'): # """ # Unpacks 4 byte IBM floating points. # """ # # Read as 4 byte integer so bit shifting works. # data = np.fromstring(file.read(count * 4), dtype=np.int32) # # Swap the byte order if necessary. # if BYTEORDER != endian: # data = data.byteswap() # # See https://mail.scipy.org/pipermail/scipy-user/2009-January/019392.html # # XXX: Might need check for values out of range: # # https://bytes.com/topic/c/answers/ # # 221981-c-code-converting-ibm-370-floating-point-ieee-754-a # sign = np.bitwise_and(np.right_shift(data, 31), 0x01) # exponent = np.bitwise_and(np.right_shift(data, 24), 0x7f) # mantissa = np.bitwise_and(data, 0x00ffffff) # # Force single precision. # mantissa = np.require(mantissa, 'float32') # mantissa /= 0x1000000 # # Do the following calculation in a weird way to avoid autocasting to # # float64. # # data = (1.0 - 2.0 * sign) * mantissa * 16.0 ** (exponent - 64.0) # sign *= -2.0 # sign += 1.0 # mantissa *= 16.0 ** (exponent - 64) # mantissa *= sign # return mantissa
[docs]def unpack_4byte_integer(file, count, endian='>'): """ Unpacks 4 byte integers. """ # Read as 4 byte integer so bit shifting works. data = from_buffer(file.read(count * 4), dtype=np.int32) # Swap the byte order if necessary. if BYTEORDER != endian: data = data.byteswap() return data
[docs]def unpack_2byte_integer(file, count, endian='>'): """ Unpacks 2 byte integers. """ # Read as 4 byte integer so bit shifting works. data = from_buffer(file.read(count * 2), dtype=np.int16) # Swap the byte order if necessary. if BYTEORDER != endian: data = data.byteswap() return data
[docs]def unpack_4byte_fixed_point(file, count, endian='>'): raise NotImplementedError
[docs]def unpack_4byte_ieee(file, count, endian='>'): """ Unpacks 4 byte IEEE floating points. """ # Read as 4 byte integer so bit shifting works. data = from_buffer(file.read(count * 4), dtype=np.float32) # Swap the byte order if necessary. if BYTEORDER != endian: data = data.byteswap() return data
[docs]def unpack_1byte_integer(file, count, endian='>'): raise NotImplementedError
[docs]class OnTheFlyDataUnpacker: """ Tie-up a data sample unpack function with its parameters. This class allows for data to be read directly from the disk as needed, preventing the need to store data in memory. """
[docs] def __init__(self, unpack_function, filename, filemode, seek, count, endian='>'): self.unpack_function = unpack_function self.filename = filename self.filemode = filemode self.seek = seek self.count = count self.endian = endian self.mtime = Path(self.filename).stat().st_mtime
[docs] def __call__(self): mtime = Path(self.filename).stat().st_mtime if mtime != self.mtime: msg = "File '%s' changed since reading headers" % self.filename msg += "; data may be read incorrectly " msg += "(modification time = %s)." % mtime warnings.warn(msg) with open(self.filename, self.filemode) as fp: fp.seek(self.seek) raw = self.unpack_function(fp, self.count, endian=self.endian) return raw