Source code for obspy.clients.seedlink.client.seedlinkconnection

# -*- coding: utf-8 -*-
"""
Module to manage a connection to a SeedLink server using a Socket.

Part of Python implementation of libslink of Chad Trabant and
JSeedLink of Anthony Lomax

:copyright:
    The ObsPy Development Team (devs@obspy.org) & Anthony Lomax
:license:
    GNU Lesser General Public License, Version 3
    (https://www.gnu.org/copyleft/lesser.html)
"""
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)
from future.builtins import *  # NOQA

import io
import logging
import select
import socket
import time

from obspy.core.utcdatetime import UTCDateTime
from .slnetstation import SLNetStation
from .slstate import SLState
from ..seedlinkexception import SeedLinkException
from ..slpacket import SLPacket


# default logger
logger = logging.getLogger('obspy.clients.seedlink')

# set to True for debugging to stdout
if False:
    import sys
    logging.basicConfig(stream=sys.stdout, level=logging.DEBUG)


[docs]class SeedLinkConnection(object): """ Class to manage a connection to a SeedLink server using a Socket. See obspy.realtime.seedlink.SLClient for an example of how to create and use this SeedLinkConnection object. A new SeedLink application can be created by sub-classing SLClient, or by creating a new class and invoking the methods of SeedLinkConnection. :var SEEDLINK_PROTOCOL_PREFIX: URI/URL prefix for seedlink servers ("seedlink://"). :type SEEDLINK_PROTOCOL_PREFIX: str :var UNISTATION: The station code used for uni-station mode. :type UNISTATION: str :var UNINETWORK: The network code used for uni-station mode. :type UNINETWORK: str :var DFT_READBUF_SIZE: Default size for buffer to hold responses from server (default is 1024). :type DFT_READBUF_SIZE: int :var QUOTE_CHAR: Character used for delimiting timestamp strings in the statefile. :type QUOTE_CHAR: str Publicly accessible (get/set) parameters: :var sladdr: The host:port of the SeedLink server. :type sladdr: str :var keepalive: Interval to send keepalive/heartbeat (seconds) (default is 0 sec). :type keepalive: int :var netto: Network timeout (seconds) (default is 120 sec). :type netto: int :var netdly: Network reconnect delay (seconds) (default is 30 sec). :type netdly: int :var info_string: String containing concatenation of contents of last terminated set of INFO packets. :type info_string: str :var statefile: File name for storing state information. :type statefile: str :var lastpkttime: Flag to control last packet time usage, if true, begin_time is appended to DATA command (Default is False). :type lastpkttime: bool :type timeout: float :param timeout: Time in seconds after which a `collect()` call will be interrupted. Protected parameters :var streams: Vector of SLNetStation objects. :type streams: list :var begin_time: Beginning of time window. :type begin_time: str :var end_time: End of time window. :type end_time: str :var resume: Flag to control resuming with sequence numbers. :type resume: bool :var multistation: Flag to indicate multistation mode. :type multistation: bool :var dialup: Flag to indicate dial-up mode. :type dialup: bool :var terminate_flag: Flag to control connection termination. :type terminate_flag: bool :var server_id: ID of the remote SeedLink server. :type server_id: str :var server_version: Version of the remote SeedLink server. :type server_version: float :var info_request_string: INFO level to request. :type info_request_string: str :var socket: The network socket. :type socket: :class:`socket.socket` :var state: Persistent state information. :type state: :class:`~obspy.clients.seedlink.client.SLState` :var infoStrBuf: String to store INFO packet contents. :type infoStrBuf: str """ SEEDLINK_PROTOCOL_PREFIX = "seedlink://" SEEDLINK_DEFAULT_PORT = 18000 UNISTATION = b"UNISTATION" UNINETWORK = b"UNINETWORK" DFT_READBUF_SIZE = 1024 QUOTE_CHAR = b'"'
[docs] def __init__(self, timeout=None): """ Creates a new instance of SeedLinkConnection. """ self.sladdr = None self.keepalive = 0 self.netto = 120 self.netdly = 30 self.info_string = "" self.statefile = None self.lastpkttime = False self.streams = [] self.begin_time = None self.end_time = None self.resume = True self.multistation = False self.dialup = False self.terminate_flag = False self.server_id = None self.server_version = 0.0 self.info_request_string = None self.socket = None self.state = None self.info_response_buffer = io.BytesIO() self.state = SLState() self.timeout = timeout
[docs] def is_connected(self, timeout=1.0): """ Returns connection state of the connection socket. :return: true if connected, false if not connected or socket is not initialized """ return self.socket is not None and \ self.is_connected_impl(self.socket, timeout)
[docs] def get_state(self): """ Returns the SLState state object. :return: the SLState state object """ return self.state
[docs] def set_net_timeout(self, netto): """ Sets the network timeout (seconds). :param netto: the network timeout in seconds. """ self.netto = netto
[docs] def get_net_timeout(self): """ Returns the network timeout (seconds). :return: the network timeout in seconds. """ return self.netto
[docs] def set_keep_alive(self, keepalive): """ Sets interval to send keepalive/heartbeat (seconds). :param keepalive: the interval to send keepalive/heartbeat in seconds. """ self.keepalive = keepalive
[docs] def get_keep_alive(self): """ Returns the interval to send keepalive/heartbeat (seconds). :return: the interval to send keepalive/heartbeat in seconds. """ return self.keepalive
[docs] def set_net_delay(self, netdly): """ Sets the network reconnect delay (seconds). :param netdly: the network reconnect delay in seconds. """ self.netdly = netdly
[docs] def get_net_delay(self): """ Returns the network reconnect delay (seconds). :return: the network reconnect delay in seconds. """ return self.netdly
[docs] def set_sl_address(self, sladdr): """ Sets the host:port of the SeedLink server. :param sladdr: the host:port of the SeedLink server. """ prefix = SeedLinkConnection.SEEDLINK_PROTOCOL_PREFIX if sladdr.startswith(prefix): sladdr = len(sladdr[prefix:]) # use default port 18000 if ':' not in sladdr: sladdr += ':%d' % self.SEEDLINK_DEFAULT_PORT self.sladdr = sladdr # set logger format name = " obspy.clients.seedlink [%s]" % (sladdr) logger.name = name
[docs] def set_last_pkt_time(self, lastpkttime): """ Sets a specified start time for beginning of data transmission . :param lastpkttime: if true, beginning time of last packet received for each station is appended to DATA command on resume. """ self.lastpkttime = lastpkttime
[docs] def set_begin_time(self, start_time_string): """ Sets begin_time for initiation of continuous data transmission. :param start_time_string: start time in in SeedLink string format: "year,month,day,hour,minute,second". """ if start_time_string is not None: self.begin_time = UTCDateTime(start_time_string) else: self.begin_time = None
[docs] def set_end_time(self, end_time_string): """ Sets end_time for termination of data transmission. :param end_time_string: start time in in SeedLink string format: "year,month,day,hour,minute,second". """ if end_time_string is not None: self.end_time = UTCDateTime(end_time_string) else: self.end_time = None
[docs] def terminate(self): """ Sets terminate flag, closes connection and clears state. """ self.terminate_flag = True
[docs] def get_sl_address(self): """ Returns the host:port of the SeedLink server. :return: the host:port of the SeedLink server. """ return self.sladdr
[docs] def get_streams(self): """ Returns a copy of the Vector of SLNetStation objects. :return: a copy of the Vector of SLNetStation objects. """ return list(self.streams)
[docs] def get_info_string(self): """ Returns the results of the last INFO request. :return: concatenation of contents of last terminated set of INFO packets """ return self.info_string
[docs] def check_slcd(self): """ Check this SeedLinkConnection description has valid parameters. :return: true if pass and false if problems were identified. """ retval = True if len(self.streams) < 1 and self.info_request_string is None: logger.error("stream chain AND info type are empty") retval = False ndx = 0 if self.sladdr is None: logger.info("server address %s is empty" % (self.sladdr)) retval = False else: ndx = self.sladdr.find(':') if ndx < 1 or len(self.sladdr) < ndx + 2: msg = "host address %s is not in '[hostname]:port' format" logger.error(msg % (self.sladdr)) retval = False return retval
[docs] def read_stream_list(self, streamfile, defselect): """ Read a list of streams and selectors from a file and add them to the stream chain for configuring a multi-station connection. If 'defselect' is not null it will be used as the default selectors for entries will no specific selectors indicated. The file is expected to be repeating lines of the form:: <NET> <STA> [selectors] For example:: # Comment lines begin with a '#' or '*' GE ISP BH?.D NL HGN MN AQU BH? HH? :param streamfile: name of file containing list of streams and selectors. :param defselect: default selectors. :return: the number of streams configured. :raise SeedLinkException: on error. """ # Open the stream list file streamfile_file = None try: streamfile_file = open(streamfile, 'r') except IOError as ioe: logger.error("cannot open state file %s" % (ioe)) return 0 except Exception as e: msg = "%s: opening state file: %s" % (e, streamfile) logger.critical(msg) raise SeedLinkException(msg) logger.info( "recovering connection state from state file %s" % (streamfile)) linecount = 0 stacount = 0 try: for line in streamfile_file: linecount += 1 if line.startswith('#') or line.startswith('*'): # comment lines continue net = None station = None selectors_str = None tokens = line.split() if (len(tokens) >= 2): net = tokens[0] station = tokens[1] selectors_str = "" for token in tokens[2:]: selectors_str += " " + token if net is None: msg = "invalid or missing network string at line " + \ "%s of stream list file: %s" logger.error(msg % (linecount, streamfile)) continue if station is None: msg = "invalid or missing station string at line " + \ "%s of stream list file: %s" logger.error(msg % (linecount, streamfile)) continue if selectors_str is not None: self.add_stream(net, station, selectors_str, -1, None) stacount += 1 else: self.add_stream(net, station, defselect, -1, None) stacount += 1 if (stacount == 0): logger.error("no streams defined in %s" % (streamfile)) else: logger.debug("Read %s streams from %s" % (stacount, streamfile)) except IOError as e: msg = "%s: reading stream list file: %s" % (e, streamfile) logger.critical(msg) raise SeedLinkException(msg) finally: try: streamfile_file.close() except Exception: pass return stacount
[docs] def parse_stream_list(self, streamlist, defselect): """ Parse a string of streams and selectors and add them to the stream chain for configuring a multi-station connection. The string should be of the following form:: "stream1[:selectors1],stream2[:selectors2],..." For example:: "IU_KONO:BHE BHN,GE_WLF,MN_AQU:HH?.D" :param streamlist: list of streams and selectors. :param defselect: default selectors. :return: the number of streams configured. :raise SeedLinkException: on error. """ # Parse the streams and selectors # print("DEBUG: streamlist:", streamlist) stacount = 0 for stream_token in streamlist.split(","): stream_token = stream_token.strip() net = None station = None staselect = None configure = True req_tkz = stream_token.split(":") req_token = req_tkz[0] net_sta_tkz = req_token.split("_") # Fill in the NET and STA fields if (len(net_sta_tkz) != 2): logger.error("not in NET_STA format: %s" % (req_token)) configure = False else: # First token, should be a network code net = net_sta_tkz[0] if len(net) < 1: logger.error("not in NET_STA format: %s" % (req_token)) configure = False else: # Second token, should be a station code station = net_sta_tkz[1] if len(station) < 1: logger.error("not in NET_STA format: %s" % (req_token)) configure = False if len(req_tkz) > 1: staselect = req_tkz[1] if len(staselect) < 1: logger.error("empty selector: %s" % (req_token)) configure = False else: # If no specific selectors, use the default staselect = defselect # print("DEBUG: staselect:", staselect) # Add this to the stream chain if configure: self.add_stream(net, station, staselect, -1, None) stacount += 1 if stacount == 0: logger.error("no streams defined in stream list") elif stacount > 0: msg = "parsed %d streams from stream list" % (stacount) logger.debug(msg) return stacount
[docs] def add_stream(self, net, station, selectors_str, seqnum, timestamp): """ Add a new stream entry to the stream chain for the given net/station parameters. If the stream entry already exists do nothing and return 1. Also sets the multi-station flag to true. :param net: network code. :param station: station code. :param selectors_str: selectors for this net/station, null if none. :param seqnum: SeedLink sequence number of last packet received, -1 to start at the next data. :param timestamp: SeedLink time stamp in a UTCDateTime format for last packet received, null for none. :return: 0 if successfully added, 1 if an entry for network and station already exists. :raise SeedLinkException: on error. """ # Sanity, check for a uni-station mode entry # print("DEBUG: selectors_str:", selectors_str) if len(self.streams) > 0: stream = self.streams[0] if stream.net == SeedLinkConnection.UNINETWORK and \ stream.station == SeedLinkConnection.UNISTATION: msg = "add_stream called, but uni-station mode configured!" logger.critical(msg) raise SeedLinkException(msg) if not selectors_str: selectors = [] else: selectors = selectors_str.split() # Search the stream chain if net/station/selector already present for stream in self.streams: if stream.net == net and stream.station == station: return stream.append_selectors(selectors_str) # Add new stream newstream = SLNetStation(net, station, selectors, seqnum, timestamp) self.streams.append(newstream) self.multistation = True return 0
[docs] def set_uni_params(self, selectors_str, seqnum, timestamp): """ Set the parameters for a uni-station mode connection for the given SLCD struct. If the stream entry already exists, overwrite the previous settings. Also sets the multi-station flag to 0 (false). :param selectors: selectors for this net/station, null if none. :param seqnum: SeedLink sequence number of last packet received, -1 to start at the next data. :param timestamp: SeedLink time stamp in a UTCDateTime format for last packet received, null for none. :raise SeedLinkException: on error. """ # Sanity, check for a multi-station mode entry if len(self.streams) > 0: stream = self.streams[0] if not stream.net == SeedLinkConnection.UNINETWORK or \ not stream.station == SeedLinkConnection.UNISTATION: msg = "set_uni_params called, " \ "but multi-station mode configured!" logger.critical(msg) raise SeedLinkException(msg) selectors = None if selectors_str is not None and len(selectors_str) > 0: selectors = selectors_str.split() # Add new stream newstream = SLNetStation(SeedLinkConnection.UNINETWORK, SeedLinkConnection.UNISTATION, selectors, seqnum, timestamp) self.streams.append(newstream) self.multistation = False
[docs] def set_state_file(self, statefile): """ Set the state file and recover state. :param statefile: path and name of statefile. :return: the number of stream chains recovered. :raise SeedLinkException: on error. """ self.statefile = statefile return self.recover_state(self.statefile)
[docs] def recover_state(self, statefile): """ Recover the state file and put the sequence numbers and time stamps into the pre-existing stream chain entries. :param statefile: path and name of statefile. :return: the number of stream chains recovered. :raise SeedLinkException: on error. """ # open the state file statefile_file = None try: statefile_file = open(self.statefile, 'r') except IOError as ioe: logger.error("cannot open state file: %s" % (ioe)) return 0 except Exception as e: msg = "%s: opening state file: %s" % (e, statefile) logger.critical(msg) raise SeedLinkException(msg) # recover the state msg = "recovering connection state from state file: %s" logger.info(msg % (self.statefile)) linecount = 0 stacount = 0 try: for line in statefile_file: linecount += 1 if line.startswith('#') or line.startswith('*'): # comment lines continue net = None station = None seqnum = -1 time_str = "" tokens = line.split() net = tokens[0] station = tokens[1] seqnum = int(tokens[2]) time_str = tokens[3] # check for completeness of read if time_str == "": msg = "error parsing line of state file: %s" % (line) logger.error(msg) continue elif time_str == "null": continue # Search for a matching net/station in the stream chain stream = None for i in range(len(self.streams)): stream = self.streams[i] if stream.net == net and stream.station == station: break stream = None # update net/station entry in the stream chain if stream is not None: stream.seqnum = seqnum if time_str is not None: try: # AJL stream.btime = Btime(timeStr) stream.btime = UTCDateTime(time_str) stacount += 1 except SeedLinkException as sle: msg = "parsing timestamp in line %s of state " + \ "file: %s" logger.error(msg % (linecount, sle.value)) except Exception as e: msg = "parsing timestamp in line %s of state " + \ "file: %s" logger.error(msg % (linecount, str(e))) if (stacount == 0): msg = "no matching streams found in %s" logger.error(msg % (self.statefile)) else: msg = "recovered state for %s streams in %s" logger.debug(msg % (stacount, self.statefile)) except IOError as e: msg = "%s: reading state file: %s" % (e, self.statefile) logger.critical(msg) raise SeedLinkException(msg) finally: try: statefile_file.close() except Exception: pass return stacount
[docs] def save_state(self, statefile): """ Save all current sequence numbers and time stamps into the given state file. :param statefile: path and name of statefile. :return: the number of stream chains saved. :raise SeedLinkException: on error. """ # open the state file statefile_file = None try: statefile_file = open(self.statefile, 'w') except IOError as ioe: logger.error("cannot open state file: %s" % (ioe)) return 0 except Exception as e: msg = "%s: opening state file: %s" % (e, statefile) logger.critical(msg) raise SeedLinkException(msg) logger.debug("saving connection state to state file") stacount = 0 try: # Loop through the stream chain for curstream in self.streams: # print("DEBUG: curstream:", curstream.net, curstream.station, # curstream.btime) if curstream.btime is not None: statefile_file.write( curstream.net + " " + curstream.station + " " + str(curstream.seqnum) + " " + curstream.btime.format_seedlink() + "\n") except IOError as e: msg = "%s: writing state file: %s" % (e, self.statefile) logger.critical(msg) raise SeedLinkException(msg) finally: try: statefile_file.close() except Exception: pass return stacount
[docs] def do_terminate(self): """ Terminate the collection loop. """ logger.warn("terminating collect loop") self.disconnect() self.state = SLState() self.info_request_string = None self.info_response_buffer = io.BytesIO() return SLPacket.SLTERMINATE
[docs] def collect(self): """ Manage a connection to a SeedLink server based on the values given in this SeedLinkConnection, and to collect data. Designed to run in a tight loop at the heart of a client program, this function will return every time a packet is received. If the SeedLinkConnection was initialized with a timeout, the collect() call will be terminated if it takes longer than `self.timeout` seconds to finish. :return: an SLPacket when something is received. :return: null when the connection was closed by the server or the termination sequence completed. :raise SeedLinkException: on error. """ start_ = UTCDateTime() self.terminate_flag = False # Check if the infoRequestString was set if self.info_request_string is not None: self.state.query_mode = SLState.INFO_QUERY # If the connection is not up check this SeedLinkConnection and reset # the timing variables if self.socket is None or not self.is_connected(): if not self.check_slcd(): msg = "problems with the connection description" logger.critical(msg) raise SeedLinkException(msg) self.state.previous_time = time.time() # print("DEBUG: self.state.previous_time set:", # self.state.previous_time) self.state.netto_trig = -1 self.state.keepalive_trig = -1 # Start the primary loop npass = 0 while True: # manually check if we are over specified timeout if self.timeout is not None: if UTCDateTime() > start_ + self.timeout: self.terminate_flag = True _msg = "primary loop pass %s, state %d" logger.debug(_msg % (npass, self.state.state)) # we are terminating (abnormally!) if self.terminate_flag: return self.do_terminate() # not terminating if self.socket is None or not self.is_connected(): self.state.state = SLState.SL_DOWN # Check for network timeout if self.state.state == SLState.SL_DATA and self.netto > 0 and \ self.state.netto_trig > 0: msg = "network timeout (%s), reconnecting in %ss" logger.warn(msg % (self.netto, self.netdly)) self.disconnect() self.state.state = SLState.SL_DOWN self.state.netto_trig = -1 self.state.netdly_trig = -1 # Check if a keepalive packet needs to be sent if self.state.state == SLState.SL_DATA and \ not self.state.expect_info and self.keepalive > 0 and \ self.state.keepalive_trig > 0: logger.debug("sending: keepalive request") try: self.send_info_request("ID", 3) self.state.query_mode = SLState.KEEP_ALIVE_QUERY self.state.expect_info = True self.state.keepalive_trig = -1 except IOError: msg = "I/O error, reconnecting in %ss" logger.warn(msg % (self.netdly)) self.disconnect() self.state.state = SLState.SL_DOWN # Check if an in-stream INFO request needs to be sent if self.state.state == SLState.SL_DATA and \ not self.state.expect_info and \ self.info_request_string is not None: try: self.send_info_request(self.info_request_string, 1) self.state.query_mode = SLState.INFO_QUERY self.state.expect_info = True except IOError: self.state.query_mode = SLState.NO_QUERY msg = "I/O error, reconnecting in %ss" logger.warn(msg % (self.netdly)) self.disconnect() self.state.state = SLState.SL_DOWN self.info_request_string = None # Throttle the loop while delaying if self.state.state == SLState.SL_DOWN and \ self.state.netdly_trig > 0: time.sleep(0.5) # Connect to remote SeedLink server if self.state.state == SLState.SL_DOWN and \ self.state.netdly_trig == 0: self.connect() self.state.state = SLState.SL_UP self.state.netto_trig = -1 self.state.netdly_trig = -1 # Negotiate/configure the connection if self.state.state == SLState.SL_UP: # Send query if a query is set, stream configuration will be # done only after query is fully returned if self.info_request_string is not None: try: self.send_info_request(self.info_request_string, 1) self.state.query_mode = SLState.INFO_QUERY self.state.expect_info = True except IOError: msg = "SeedLink version does not support INFO requests" logger.info(msg) self.state.query_mode = SLState.NO_QUERY self.state.expect_info = False msg = "I/O error, reconnecting in %ss" logger.warn(msg % (self.netdly)) self.disconnect() self.state.state = SLState.SL_DOWN self.info_request_string = None else: if not self.state.expect_info: try: self.config_link() self.state.recptr = 0 self.state.sendptr = 0 self.state.state = SLState.SL_DATA except Exception as e: msg = "negotiation with remote SeedLink failed: %s" logger.error(msg % (e)) self.disconnect() self.state.state = SLState.SL_DOWN self.state.netdly_trig = -1 self.state.expect_info = False # Process data in our buffer and then read incoming data if self.state.state == SLState.SL_DATA or \ self.state.expect_info and \ not (self.state.state == SLState.SL_DOWN): # Process data in buffer while self.state.packet_available(): slpacket = None sendpacket = True # Check for an INFO packet if self.state.packet_is_info(): temp = self.state.sendptr + SLPacket.SLHEADSIZE - 1 terminator = chr(self.state.databuf[temp]) != '*' if not self.state.expect_info: msg = "unexpected INFO packet received, skipping" logger.error(msg) else: if terminator: self.state.expect_info = False # Keep alive packets are not returned if self.state.query_mode == \ SLState.KEEP_ALIVE_QUERY: sendpacket = False if not terminator: logger.error( "non-terminated " + "keep-alive packet received!?!") else: logger.debug("keepalive packet received") else: slpacket = self.state.get_packet() # construct info String packet_type = slpacket.get_type() # print("DEBUG: slpacket.get_type():", # slpacket.get_type()) # print("DEBUG: SLPacket.TYPE_SLINF:", # SLPacket.TYPE_SLINF) # print("DEBUG: SLPacket.TYPE_SLINFT:", # SLPacket.TYPE_SLINFT) data = slpacket.get_string_payload() self.info_response_buffer.write(data) if (packet_type == SLPacket.TYPE_SLINFT): # Terminated INFO response packet # -> build complete INFO response string, # strip NULL bytes from the end self.info_string = \ self.info_response_buffer.getvalue().\ decode('ASCII', errors='ignore').\ replace("><", ">\n<").rstrip('\x00') self.info_response_buffer = io.BytesIO() self.state.query_mode = SLState.NO_QUERY else: # Get packet and update the stream chain entry if not # an INFO packet try: slpacket = self.state.get_packet() self.update_stream(slpacket) if self.statefile is not None: self.save_state(self.statefile) except SeedLinkException as sle: logger.error("bad packet: %s" % (sle)) sendpacket = False # Increment the send pointer self.state.increment_send_pointer() # After processing the packet buffer shift the data self.state.pack_data_buffer() # Return packet if sendpacket: return slpacket # A trap door for terminating, all complete data packets from # the buffer have been sent to the caller we are terminating # (abnormally!) if self.terminate_flag: return self.do_terminate() # Catch cases where the data stream stopped try: if self.state.is_error(): logger.error( "SeedLink reported an error with the last command") self.disconnect() return SLPacket.SLERROR except SeedLinkException: pass # not enough bytes to determine packet type try: if self.state.is_end(): logger.info("end of buffer or selected time window") self.disconnect() return SLPacket.SLTERMINATE except SeedLinkException: pass # Check for more available data from the socket bytesread = None try: bytesread = self.receive_data(self.state.bytes_remaining(), self.sladdr) except IOError as ioe: msg = "socket read error: %s, reconnecting in %sss" logger.error(msg % (ioe, self.netdly)) self.disconnect() self.state.state = SLState.SL_DOWN self.state.netto_trig = -1 self.state.netdly_trig = -1 if bytesread is not None and len(bytesread) > 0: # Data is here, process it self.state.append_bytes(bytesread) # Reset the timeout and keepalive timers self.state.netto_trig = -1 self.state.keepalive_trig = -1 else: time.sleep(0.5) # Update timing variables when more than a 1/4 second has passed now = time.time() # print("DEBUG: if now - self.state.previous_time >= 0.25:", now, # self.state.previous_time, now - self.state.previous_time) if now - self.state.previous_time >= 0.25: # print("DEBUG: now - self.state.previous_time >= 0.25:", # self.state.previous_time) self.state.previous_time = time.time() # print("DEBUG: self.state.previous_time set:", # self.state.previous_time) # Network timeout timing logic if self.netto > 0: if self.state.netto_trig == -1: self.state.netto_time = now self.state.netto_trig = 0 elif self.state.netto_trig == 0 and \ now - self.state.netto_time > self.netto: self.state.netto_trig = 1 # print("DEBUG: self.keepalive:", self.keepalive) # Keepalive/heartbeat interval timing logic if self.keepalive > 0: # print("DEBUG: self.state.keepalive_trig:", # self.state.keepalive_trig) # print("DEBUG: now - self.state.keepalive_time", # " >=self.keepalive:", self.state.previous_time, # now - self.state.keepalive_time, self.keepalive) if self.state.keepalive_trig == -1: self.state.keepalive_time = now self.state.keepalive_trig = 0 elif self.state.keepalive_trig == 0 and \ now - self.state.keepalive_time > self.keepalive: self.state.keepalive_trig = 1 # Network delay timing logic if self.netdly > 0: if self.state.netdly_trig == -1: self.state.netdly_time = now self.state.netdly_trig = 1 elif self.state.netdly_trig == 1 and \ now - self.state.netdly_time > self.netdly: self.state.netdly_trig = 0 # End of primary loop
[docs] def connect(self): """ Open a network socket connection to a SeedLink server. Expects sladdr to be in 'host:port' format. :raise SeedLinkException: on error or no response or bad response from server. :raise IOException: if an I/O error occurs. """ timeout = 4.0 try: host_name = self.sladdr[0:self.sladdr.find(':')] nport = int(self.sladdr[self.sladdr.find(':') + 1:]) # create and connect Socket sock = None sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 65536) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # print("DEBUG: sock.connect:", self.sladdr, host_name, nport) sock.connect((host_name, nport)) # print("DEBUG: sock.connect: sock:", sock) if sock is None: raise Exception self.socket = sock # Check if socket is connected if not self.is_connected(timeout): msg = "socket connect time-out %ss" % (timeout) try: self.socket.close() except Exception: pass self.socket = None raise SeedLinkException(msg) # socket connected logger.info("network socket opened") self.socket.settimeout(self.netto) except Exception as e: msg = "cannot connect to SeedLink server: %s" raise SeedLinkException(msg % (e)) # Everything should be connected, say hello try: self.say_hello() except SeedLinkException as sle: try: self.socket.close() self.socket = None except Exception: pass raise sle except IOError as ioe: # traceback.print_exc() try: self.socket.close() self.socket = None except Exception: pass raise ioe
[docs] def disconnect(self): """ Close the network socket associated with this connection. """ if self.socket is not None: try: self.socket.close() except IOError as ioe: logger.error("network socket close failed: %s" % (ioe)) self.socket = None logger.info("network socket closed") # make sure previous state is cleaned up self.state = SLState()
[docs] def close(self): """ Closes this SeedLinkConnection by closing the network socket and saving the state to the statefile, if it exists. """ if self.socket is not None: logger.info("closing SeedLinkConnection()") self.disconnect() if self.statefile is not None: try: self.save_state(self.statefile) except SeedLinkException as sle: logger.error(sle.value)
[docs] def is_connected_impl(self, sock, timeout): """ Check a socket for write ability using select() Time-out values are also passed (seconds) for the select() call. :return: 1 = success, 0 = if time-out expires, -1 = errors """ start_time = time.time() ready_to_write = [] while (sock not in ready_to_write) and \ (time.time() - start_time) < timeout: _ready_to_read, ready_to_write, _in_error = \ select.select([sock], [sock], [], timeout) # print("DEBUG: sock:", sock) # print("DEBUG: ready_to_read:", ready_to_read) # print("DEBUG: ready_to_write:", ready_to_write) # print("DEBUG: in_error:", in_error) if sock in ready_to_write: return True return False
[docs] def send_data(self, sendbytes, code, resplen): """ Send bytes to the server. This is only designed for small pieces of data, specifically for when the server responses to commands. :param sendbytes: bytes to send. :param code: a string to include in error messages for identification. :param resplen: if > 0 then read up to resplen response bytes after sending. :return: the response bytes or null if no response requested. :raise SeedLinkException: on error or no or bad response from server. :raise IOException: if an I/O error occurs. """ # print("DEBUG: sendbytes:", repr(sendbytes)) try: self.socket.send(sendbytes) except IOError as ioe: raise ioe if resplen <= 0: # no response requested return # If requested, wait up to 30 seconds for a response ackcnt = 0 # counter for the read loop ackpoll = 50 # poll at 0.05 seconds for reading ackcntmax = 30000 / ackpoll # 30 second wait bytesread = self.receive_data(resplen, code) while bytesread is not None and len(bytesread) == 0: if ackcnt > ackcntmax: msg = "[%s] no response from SeedLink server to '%s'" raise SeedLinkException(msg % (code, sendbytes)) sleep_time = 0.001 * ackpoll time.sleep(sleep_time) ackcnt += 1 bytesread = self.receive_data(resplen, code) if bytesread is None: msg = "[%s] bad response to '%s'" raise SeedLinkException(msg % (code, sendbytes)) return bytesread
[docs] def receive_data(self, maxbytes, code): """ Read bytes from the server. :param maxbytes: maximum number of bytes to read. :param code: a string to include in error messages for identification. :return: the response bytes (zero length if no available data), or null if EOF. :raise IOException: if an I/O error occurs. """ # read up to maxbytes try: # self.socket.setblocking(0) bytesread = self.socket.recv(maxbytes) # self.socket.setblocking(1) except IOError as ioe: # traceback.print_exc() raise ioe # print("DEBUG: bytesread:", repr(bytesread)) nbytesread = len(bytesread) # check for end or no bytes read if (nbytesread == -1): # XXX This is never true msg = "[%s] socket.read(): %s: TCP FIN or EOF received" logger.error(msg % (code, nbytesread)) return else: if (nbytesread == 0): return b"" return bytesread
[docs] def say_hello(self): """ Send the HELLO command and attempt to parse the server version number from the returned string. The server version is set to 0.0 if it can not be parsed from the returned string. :raise SeedLinkException: on error. :raise IOException: if an I/O error occurs. """ send_str = b"HELLO" logger.debug("sending: %s" % (send_str.decode())) bytes_ = send_str + b"\r" bytesread = self.send_data(bytes_, self.sladdr, SeedLinkConnection.DFT_READBUF_SIZE) # Parse the server ID and version from the returned string servstr = None try: servstr = bytesread.decode() vndx = servstr.find(" v") if vndx < 0: self.server_id = servstr self.server_version = 0.0 else: self.server_id = servstr[0:vndx] tmpstr = servstr[vndx + 2:] endndx = tmpstr.find(" ") # print("DEBUG: tmpstr:", tmpstr) # print("DEBUG: tmpstr[0:endndx]:", tmpstr[0:endndx]) self.server_version = float(tmpstr[0:endndx]) except Exception: msg = "bad server ID/version string: '%s'" raise SeedLinkException(msg % (servstr)) # Check the response to HELLO if self.server_id.lower() == "seedlink": msg = "connected to: '" + servstr[0:servstr.find('\r')] + "'" logger.info(msg) else: msg = "ncorrect response to HELLO: '%s'" % (servstr) raise SeedLinkException(msg)
[docs] def request_info(self, info_level): """ Add an INFO request to the SeedLink Connection Description. :param info_level: the INFO level (one of: ID, STATIONS, STREAMS, GAPS, CONNECTIONS, ALL) :raise SeedLinkException: if an INFO request is already pending. """ if self.info_request_string is not None or self.state.expect_info: msg = "cannot make INFO request, one is already pending" raise SeedLinkException(msg) else: self.info_request_string = info_level
[docs] def send_info_request(self, info_level, verb_level): """ Sends a request for the specified INFO level. The verbosity level can be specified, allowing control of when the request should be logged. :param info_level: the INFO level (one of: ID, STATIONS, STREAMS, GAPS, CONNECTIONS, ALL). :raise SeedLinkException: on error. :raise IOException: if an I/O error occurs. """ if self.check_version(2.92) >= 0: bytes_ = b"INFO " + info_level.encode('ascii', 'strict') + b"\r" msg = "sending: requesting INFO level %s" % (info_level) if verb_level == 1: logger.info(msg) else: logger.debug(msg) self.send_data(bytes_, self.sladdr, 0) else: msg = "detected SeedLink version %s does not support INFO requests" raise SeedLinkException(msg % (self.server_version))
[docs] def check_version(self, version): """ Checks server version number against a given specified value. :param version: specified version value to test. :return: 1 if version is greater than or equal to value specified, 0 if no server version is known, -1 if version is less than value specified. """ if (self.server_version == 0.0): return 0 else: if self.server_version >= version: return 1 else: return -1
[docs] def negotiate_station(self, curstream): """ Negotiate a SeedLink connection for a single station and issue the DATA command. If selectors are defined, then the string is parsed on space and each selector is sent. If 'seqnum' != -1 and the SLCD 'resume' flag is true then data is requested starting at seqnum. :param curstream: the description of the station to negotiate. :raise SeedLinkException: on error. :raise IOException: if an I/O error occurs. """ # Send the selector(s) and check the response(s) selectors = curstream.get_selectors() acceptsel = 0 # Count of accepted selectors for selector_str in selectors: selector = selector_str.encode('ascii', 'strict') if len(selector) > SLNetStation.MAX_SELECTOR_SIZE: logger.warn("invalid selector: %s" % (selector)) else: # Build SELECT command, send it and receive response send_str = b"SELECT " + selector logger.debug("sending: %s" % (send_str)) bytes_ = send_str + b"\r" bytesread = None bytesread = self.send_data(bytes_, self.sladdr, SeedLinkConnection.DFT_READBUF_SIZE) read_str = bytesread.decode() # Check response to SELECT if read_str == "OK\r\n": logger.debug("response: selector %s is OK" % (selector)) acceptsel += 1 elif read_str == "ERROR\r\n": msg = "response: selector %s not accepted" logger.error(msg % (selector)) else: msg = "response: invalid response to SELECT command: %s" raise SeedLinkException(msg % (read_str)) # Fail if none of the given selectors were accepted if acceptsel < 1: msg = "response: no data stream selector(s) accepted" raise SeedLinkException(msg) msg = "response: %s selector(s) accepted" logger.debug(msg % (acceptsel)) # Issue the DATA, FETCH or TIME action commands. A specified start (and # optionally, stop time) takes precedence over the resumption from any # previous sequence number. send_str = None if (curstream.seqnum != -1) and self.resume: if self.dialup: send_str = b"FETCH" else: send_str = b"DATA" # Append the last packet time if the feature is enabled and server # is >= 2.93 if self.lastpkttime and self.check_version(2.93) >= 0 and \ curstream.btime is not None: # Increment sequence number by 1 send_str += b" " + \ hex(curstream.seqnum + 1).encode('ascii', 'strict') + \ b" " + curstream.get_sl_time_stamp() msg = "requesting resume data from 0x%s (decimal: %s) at %s" logger.info(msg % (hex(curstream.seqnum + 1).upper(), curstream.seqnum + 1), curstream.get_sl_time_stamp()) else: # Increment sequence number by 1 send_str += b" " + \ hex(curstream.seqnum + 1).encode('ascii', 'strict') msg = "requesting resume data from 0x%s (decimal: %s)" logger.info(msg % (hex(curstream.seqnum + 1).upper(), curstream.seqnum + 1)) elif self.begin_time is not None: # begin time specified (should only be at initial startup) if self.check_version(2.92) >= 0: send_str = b"TIME " + self.begin_time.\ format_seedlink().encode('ascii', 'strict') if self.end_time is not None: send_str += b" " + self.end_time.format_seedlink().\ encode('ascii', 'strict') logger.info("requesting specified time window") else: msg = "detected SeedLink version %s does not support " + \ "TIME windows" raise SeedLinkException(msg % (self.server_version)) else: # default if self.dialup: send_str = b"FETCH" else: send_str = b"DATA" logger.info("requesting next available data") # Send action command and receive response logger.debug("sending: %s" % (send_str)) bytes_ = send_str + b"\r" bytesread = None bytesread = self.send_data(bytes_, self.sladdr, SeedLinkConnection.DFT_READBUF_SIZE) # Check response to DATA/FETCH/TIME read_str = bytesread.decode() if read_str == "OK\r\n": logger.debug("response: DATA/FETCH/TIME command is OK") acceptsel += 1 elif read_str == "ERROR\r\n": msg = "response: DATA/FETCH/TIME command is not accepted" raise SeedLinkException(msg) else: msg = "response: invalid response to DATA/FETCH/TIME command: %s" raise SeedLinkException(msg % (read_str))
[docs] def negotiate_uni_station(self): """ Negotiate a SeedLink connection in uni-station mode and issue the DATA command. This is compatible with SeedLink Protocol version 2 or greater. If selectors are defined, then the string is parsed on space and each selector is sent. If 'seqnum' != -1 and the SLCD 'resume' flag is true then data is requested starting at seqnum. :raise SeedLinkException: on error. :raise IOException: if an I/O error occurs. """ # get stream (should be only stream present) curstream = None try: curstream = self.streams[0] except Exception: msg = "cannot negotiate uni-station, stream list does not " + \ "have exactly one element" raise SeedLinkException(msg) if not curstream.net == SeedLinkConnection.UNINETWORK and \ curstream.station == SeedLinkConnection.UNISTATION: msg = "cannot negotiate uni-station, mode not configured!" raise SeedLinkException(msg) # negotiate the station connection self.negotiate_station(curstream)
[docs] def negotiate_multi_station(self): """ Negotiate a SeedLink connection using multi-station mode and issue the END action command. This is compatible with SeedLink Protocol version 3, multi-station mode. If selectors are defined, then the string is parsed on space and each selector is sent. If 'seqnum' != -1 and the SLCD 'resume' flag is true then data is requested starting at seqnum. :raise SeedLinkException: on error. :raise IOException: if an I/O error occurs. """ acceptsta = 0 if len(self.streams) < 1: msg = "cannot negotiate multi-station, stream list is empty" raise SeedLinkException(msg) # Loop through the stream chain for curstream in self.streams: # A ring identifier # slring = curstream.net + curstream.station # Build STATION command, send it and receive response send_str = ("STATION " + curstream.station + " " + curstream.net).encode('ascii', 'strict') logger.debug("sending: %s" % send_str.decode()) bytes_ = send_str + b"\r" bytesread = self.send_data(bytes_, self.sladdr, SeedLinkConnection.DFT_READBUF_SIZE) read_str = bytesread # Check response to SELECT if read_str == b"OK\r\n": logger.debug("response: station is OK (selected)") elif read_str == b"ERROR\r\n": logger.error("response: station not accepted, skipping") continue else: msg = "response: invalid response to STATION command: %s" raise SeedLinkException(msg % (read_str)) # negotiate the station connection try: self.negotiate_station(curstream) except SeedLinkException as sle: logger.error(sle.value) continue except Exception as e: logger.error(str(e)) continue acceptsta += 1 # Fail if no stations were accepted if acceptsta < 1: raise SeedLinkException("no stations accepted") logger.info("%s station(s) accepted" % (acceptsta)) # Issue END action command send_str = b"END" logger.debug("sending: %s" % (send_str.decode())) bytes_ = send_str + b"\r" self.send_data(bytes_, self.sladdr, 0)
[docs] def update_stream(self, slpacket): """ Update the appropriate stream chain entry given a Mini-SEED record. :param slpacket: the packet containing a Mini-SEED record. :raise SeedLinkException: on error. """ seqnum = slpacket.get_sequence_number() if (seqnum == -1): raise SeedLinkException("could not determine sequence number") trace = None try: trace = slpacket.get_trace() except Exception as e: msg = "blockette not 1000 (Data Only SEED Blockette) or other " + \ "error reading miniseed data: %s" raise SeedLinkException(msg % (e)) # read some blockette fields net = None station = None btime = None try: station = trace.stats['station'] net = trace.stats['network'] btime = trace.stats['starttime'] # print("DEBUG: station, net, btime:", station, net, btime) except Exception as se: raise SeedLinkException("trace header read error: %s" % (se)) # For uni-station mode if not self.multistation: curstream = None try: curstream = self.streams[0] except Exception: msg = "cannot update uni-station stream, stream list does " + \ "not have exactly one element" raise SeedLinkException(msg) curstream.seqnum = seqnum curstream.btime = btime return # For multi-station mode, search the stream chain # Search for a matching net/station in the stream chain # AJL 20090306 - Add support for IRIS DMC enhancements: # Enhancements to the SeedLink protocol supported by the DMC's server # allow network and station codes to be # wildcarded in addition to the location and channel codes. wildcarded = False stream = None for stream in self.streams: if stream.net == net and stream.station == station: break if "?" in stream.net or "*" in stream.net or \ "?" in stream.station or "*" in stream.station: # wildcard character found wildcarded = True stream = None # print("DEBUG: stream:", stream.net, stream.station, stream.btime) # update net/station entry in the stream chain if stream is not None: stream.seqnum = seqnum stream.btime = btime elif not wildcarded: logger.error("unexpected data received: %s %s" % (net, station))