Source code for cmd_serial

#!/usr/bin/env python2
# -*- coding: utf-8 -*-
# 
# Copyright 2012-2017 Frédéric Magniette, Miguel Rubio-Roy
# This file is part of Pyrame.
# 
# Pyrame is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# 
# Pyrame is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
# 
# You should have received a copy of the GNU Lesser General Public License
# along with Pyrame.  If not, see <http://www.gnu.org/licenses/>

import serial,select
import subprocess,os
import pools,conf_strings,buses

# SERIAL_BUS #####################################################

baudrates=[50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800, 9600, 19200, 38400, 57600, 115200, 230400, 460800, 500000, 576000, 921600, 1000000, 1152000, 1500000, 2000000, 2500000, 3000000, 3500000, 4000000]

class serial_bus(buses.buses):
    def __init__(self):
        self.serial_pool=pools.pool("serial")
        super(serial_bus,self).__init__() 

    def init(self,serial_id,conf_string):
        try:
            conf=conf_strings.parse(conf_string)
        except Exception as e:
            return 0,str(e)
        if conf.name!="serial":
            return 0,"Invalid module name %s in conf_string. Should be serial"%(conf.name)
        serialnum=vendor=product=descr="undef"
        baudrate="9600"
        bytesize=serial.EIGHTBITS
        stopbits=serial.STOPBITS_ONE
        parity=serial.PARITY_NONE
        flow="undef"
        # serial number
        if conf.has("serialnum"):
            serialnum=conf.params["serialnum"]
        # vendor/product or device
        if conf.has("vendor","product"):
            vendor=conf.params["vendor"]
            product=conf.params["product"]
            # validate
            if (len(vendor)!=4 or len(product)!=4):
                return 0,"Invalid length of vendor or product id's"
        elif conf.has("device"):
            descr=conf.params["device"]
        else:
            return 0,"conf_string does not contain neither vendor and product id's or device name"
        # baudrate
        if conf.has("baudrate") and conf.params["baudrate"]!="undef":
            baudrate=int(conf.params["baudrate"])
            if baudrate not in baudrates:
                return 0,"baudrate %d in conf_string is not valid"%(baudrate)
        # bytesize
        if conf.has("bytesize") and conf.params["bytesize"]!="undef":
            bytesize=conf.params["bytesize"]
            if bytesize=="8":
                bytesize=serial.EIGHTBITS
            elif bytesize=="7":
                bytesize=serial.SEVENBITS
            elif bytesize=="6":
                bytesize=serial.SIXBITS
            elif bytesize=="5":
                bytesize=serial.FIVEBITS
            else:
                return 0,"bytesize %s in conf_string is not valid"%(bytesize)
        # stopbits
        if conf.has("stopbits") and conf.params["stopbits"]!="undef":
            stopbits=conf.params["stopbits"]
            if stopbits=="1":
                stopbits=serial.STOPBITS_ONE
            elif stopbits=="1.5":
                stopbits=serial.STOPBITS_ONE_POINT_FIVE
            elif stopbits=="2":
                stopbits=serial.STOPBITS_TWO
            else:
                return 0,"stopbits %s in conf_string are not valid"%(stopbits)
        # parity
        if conf.has("parity") and conf.params["parity"]!="undef":
            parity=conf.params["parity"]
            if parity=="N":
                parity=serial.PARITY_NONE
            elif parity=="E":
                parity=serial.PARITY_EVEN
            elif parity=="O":
                parity=serial.PARITY_ODD
            elif parity=="M":
                parity=serial.PARITY_MARK
            elif parity=="S":
                partiy=serial.PARITY_SPACE
            else:
                return 0,"parity %s in conf_string is not valid"%(parity)
        # flow control
        if conf.has("flow") and conf.params["flow"]!="undef":
            flow=conf.params["flow"]
            if flow!="xonxoff" and flow!="rtscts" and flow!="dsrdtr":
                return 0,"invalid flow. must be xonxoff, rtscts or dsrftr"
        # timeout
        if conf.has("timeout") and conf.params["timeout"]!="undef":
            timeout=float(conf.params["timeout"])
        else:
            timeout=60
        # Add to the pool
        self.serial_pool.new(serial_id,{"vendor": vendor, "product": product, "descr": descr, "serialnum": serialnum, "baudrate": baudrate, "bytesize": bytesize, "stopbits": stopbits, "parity": parity, "flow": flow, "timeout": timeout, "connected":0})
        return 1,"ok"

    def deinit(self,serial_id):
        try:
            link=self.serial_pool.get(serial_id)
        except Exception as e:
            return 1,str(e)
        # Check status
        if link["connected"]:
            self.inval(serial_id)
        # Remove
        try:
            self.serial_pool.remove(serial_id)
        except Exception as e:
            return 0,str(e)
        return 1,"ok"

    def config(self,serial_id):
        try:
            link=self.serial_pool.get(serial_id)
        except Exception as e:
            return 0,str(e)

      
        # Get description for serial link
        if link["descr"]=="undef":
            result=subprocess.Popen(["/usr/local/bin/get_dev_"+os.uname()[0].lower()+".sh",link["vendor"],link["product"],link["serialnum"]],stdout=subprocess.PIPE)
            res,_=result.communicate()
            link["descr"]=res.strip()
            if result.returncode!=0:
                link["descr"]="undef"
                return 0,"Error getting device name <- %s"%(res.strip())

        #register in buses
        retcode,res=self.ll_new_link(link)
        if retcode==0:
            return 0,res
        return 1,link["descr"]


    def inval(self,serial_id):
        try:
            link=self.serial_pool.get(serial_id)
        except Exception as e:
            return 0,str(e)
        self.ll_del_link(link)
        if link["vendor"]!="undef" and link["product"]!="undef":
            link["descr"]="undef"
        return 1,"ok"

    def write(self,serial_id,mode,data):
        try:
            link=self.serial_pool.get(serial_id)
        except Exception as e:
            return 0,str(e)
        return super(serial_bus,self).write(link,mode,data)

    def read(self,serial_id,mode,bytes_to_read,timeout="undef"):
        try:
            link=self.serial_pool.get(serial_id)
        except Exception as e:
            return 0,str(e)
        return super(serial_bus,self).read(link,mode,bytes_to_read,timeout)

    def read_until(self,serial_id,mode,eot,timeout="undef"):
        try:
            link=self.serial_pool.get(serial_id)
        except Exception as e:
            return 0,str(e)
        return super(serial_bus,self).read_until(link,mode,eot,timeout)

    def expect(self,serial_id,pattern,timeout="undef"):
        try:
            link=self.serial_pool.get(serial_id)
        except Exception as e:
            return 0,str(e)
        return super(serial_bus,self).expect(link,pattern,timeout)

    def ll_open(self,link):
        # open serial link
        try:
            s=serial.Serial(link["descr"],link["baudrate"],link["bytesize"],link["parity"],link["stopbits"],link["timeout"],link["flow"]=="xonxoff",link["flow"]=="rtscts",dsrdtr=(link["flow"]=="dsrdtr"))
            if link["flow"]=="rtscts":
                s.setRTS()
            if link["flow"]=="dsrdtr":
                s.setDTR()
            s.flushInput()
        except:
            return None
        return s

    def ll_close(self,sock):
        sock.close()

    def ll_send(self,sock,link,data):
        try:
            res=sock.write(data)
            if res!=len(data):
                raise Exception("Not all data sent")
        except Exception as e:
            return 0,"Error sending data to %s <- %s"%(link["descr"],str(e))
        return 1,"ok"

    def ll_receive(self,sock,link,bytes_to_read,timeout="undef"):
        if timeout=="" or timeout=="undef":
            timeout=link["timeout"]
        res=select.select([sock.fileno()],[],[],float(timeout))
        if len(res[0])==0:
            return 0,"Timeout while waiting for data"
        # Read
        try:
            response=sock.read(int(bytes_to_read))
        except Exception as e:
            return 0,"Error reading data from %s <- %s"%(link["descr"],str(e))
        return 1,response

# CREATE BUS POOL ################################################

me=serial_bus()

# COMMANDS #######################################################

[docs]def init_serial(serial_id,conf_string): """Initialize a serial link. *conf_string* must include either: - vendor and product: vendor and product id's of a USB device or - device: UNIX device name. e.g.: /dev/ttyUSB0 it can optionally include: - serialnum: serial number of the USB device. - baudrate: rate in bits per second - bytesize: byte size in bits - stopbits: number of stop bits (1, 1.5 or 2) - parity: parity (N, E, O, M or S for None, Even, Odd, Mark and Space) - timeout: default timeout in seconds for this link. Can be float.""" return me.init(serial_id,conf_string)
[docs]def deinit_serial(serial_id): "Deinitialize serial link *serial_id*." return me.deinit(serial_id)
[docs]def config_serial(serial_id): "Configure serial link *serial_id*." return me.config(serial_id)
[docs]def inval_serial(serial_id): "Invalidate configuration of serial link *serial_id*." return me.inval(serial_id)
# WRITE ##########################################################
[docs]def write_serial(serial_id,data): "Write *data* to serial link *serial_id*." return me.write(serial_id,"escaped",data)
[docs]def write_bin_serial(serial_id,data): "Write binary *data* to serial link *serial_id*. *data* is a string where each character represents 4 bits in hexadecimal base. Little endian for each group of 8 bits." return me.write(serial_id,"bin",data)
# READ ###########################################################
[docs]def read_serial(serial_id,bytes_to_read,timeout="undef"): "Read up to *bytes_to_read* bytes from serial link *serial_id*" return me.read(serial_id,"ignore_chars",bytes_to_read,timeout)
[docs]def read_bin_serial(serial_id,bytes_to_read,timeout="undef"): "Read up to *bytes_to_read* bytes from serial link *serial_id* in binary format. The data read is encoded by blocks of 8 bits with two hexadecimal characters little endian." return me.read(serial_id,"bin",bytes_to_read,timeout)
# READ UNTIL #####################################################
[docs]def read_until_serial(serial_id,eot,timeout="undef"): "Read from serial link *serial_id* until a character from *eot* comma-separated list is found" return me.read_until(serial_id,"ignore_chars",eot,timeout)
[docs]def read_bin_until_serial(serial_id,eot,timeout="undef"): "Read from serial link *serial_id* until a character from *eot* comma-separated list is found. The data read is encoded by blocks of 8 bits with two hexadecimal characters little endian." return me.read_until(serial_id,"bin",eot,timeout)
# WRNRD ##########################################################
[docs]def wrnrd_serial(serial_id,data,bytes_to_read,timeout="undef"): "Write and read *data* to and from serial link *serial_id*" retcode,res=me.write(serial_id,"escaped",data) if retcode==0: return 0,res return me.read(serial_id,"ignore_chars",bytes_to_read,timeout)
[docs]def wrnrd_bin_serial(serial_id,data,bytes_to_read,timeout="undef"): "Write and read binary *data* to and from serial link *serial_id*. *data* is a string where each character represents 4 bits in hexadecimal base. Little endian for each group of 8 bits." retcode,res=me.write(serial_id,"bin",data) if retcode==0: return 0,res return me.read(serial_id,"bin",bytes_to_read,timeout)
# WRNRD_UNTIL ####################################################
[docs]def wrnrd_until_serial(serial_id,data,eot,timeout="undef"): "Write and read *data* to and from serial link *serial_id* until a character from comma-separated list *eot* is found" retcode,res=me.write(serial_id,"escaped",data) if retcode==0: return 0,res return me.read_until(serial_id,"ignore_chars",eot,timeout)
[docs]def wrnrd_bin_until_serial(serial_id,data,eot,timeout="undef"): "Write and read *data* to and from serial link *serial_id* until a character from comma-separated list *eot* is found. *data* is a string where each character represents 4 bits in hexadecimal base. Little endian for each group of 8 bits. Read data is return in the same format." retcode,res=me.write(serial_id,"bin",data) if retcode==0: return 0,res return me.read_until(serial_id,"bin",eot,timeout)
# EXPECT #########################################################
[docs]def expect_serial(serial_id,pattern,timeout="undef"): "Read data from serial link *serial_id* until *pattern* is found or timeout." return me.expect(serial_id,pattern,timeout)