Monday, February 22, 2021

Raspberry Pi and Arduino revisted

In my earlier post, locating a valid Arduino connection, I used exception handling by catching the error message created by attempting to open a non-existing device.
Of course there is an easier way of doing this in a POSIX compliant system, by looking at the existence of the respective device files. If you are confused about the ttyUSB0 device, this is how an inexpensive Arduino clone appears in my system.

Here is my new approach, which is bit more elegant and less brutish. This approach uses the "glob" module and the "set" data-type.


import glob

def listFiles(prefix):
    _=set()
    for file in glob.glob(prefix):
        _.add(file)
    return _

def findArduino():
    ser_dev={'/dev/ttyACM0','/dev/ttyACM1','/dev/ttyUSB0'}
    dev_list=listFiles('/dev/tty*')
    for _ in ser_dev:
        if _ in dev_list:
            return _

if __name__=='__main__':
    arduino=findArduino()
    if arduino:
        print('Arduino found at',arduino)
    else:
        print('Arduino not found')


Python Script Re-Import

Here is another study, pretty useless what it does actually, however, this might come in handy for some self-modifying projects.

This script writes a script called test01.py. It then imports the test01.py module, overwrites it, including data to a list and finally re-imports the modified test01.py module. At the very end, a function of the re-imported module is called. As a bonus the test01.py module is a fully self sufficient python program itself.

The study demonstrates how a python program can not only modify data but also modify its own functionality during run-time. Having a dictionary with python instruction could be an interesting way to write self-learning scripts.


from importlib import reload
import sys

a=[]

def modify(a,i):
    a.append(i)
    return a


# create a module to import
filename='test01.py'
f=open(filename,'w')
f.write(f'b=[]\n')
f.write(f'def test01(b):\n')
f.write(f'    return b\n')
f.write(f'if __name__=="__main__":\n')
f.write(f'    test01(b)\n')
f.write(f'    print(b)\n')
f.close()

# import the module
from test01 import *

for i in range(3):

    # play with data
    a=modify(a,i)

    # write modified module
    filename='test01.py'
    f=open(filename,'w')
    f.write(f'b='+str(a)+'\n')
    f.write(f'def test01(b):\n')
    f.write(f'    b.pop(1)\n')
    f.write(f'    return b\n')
    f.write(f'if __name__=="__main__":\n')
    f.write(f'    test01(b)\n')
    f.write(f'    print(b)\n')
    f.close()

    # re-import module
    reload(sys.modules['test01'])
    from test01 import b, test01    

    # show data from module
    print(b)

print('===')
print(a)

# run function from module
print(test01(a))


Thursday, February 18, 2021

Python Moving Average

In data analysis, moving averages are often used to "smooth" noisy data. In terms of signal processing, applying a moving average is a very crude low-pass filter. In fact, a moving average filter is a special case of an FIR (finite impulse response) filter.

Here is a study I wrote in Python using numpy and matplotlib.

import numpy as np
from matplotlib import pyplot as plt

# calculate moving average and add zero-padding
# the function takes two parameter, the data array and the
# number of bins for averaging, default 5 can be overwritten
def moving_average(a, n=5):

    ret = np.cumsum(a, dtype=float)
    ret[n:] = ret[n:] - ret[:-n]
    return (np.insert((ret[n-1:]/n),0,[0]*(n-1)))

# create some sort of noisy data
data = np.arange(100)
data = np.sin(data/5)+np.cos(data/20)+np.random.randn(100)/5

data_avg = moving_average(data,7)

print(data)
print(data_avg)

plt.plot(data,'.')
plt.plot(data_avg,'.')
plt.show()


Tuesday, February 16, 2021

Python Circular Buffer

Just to continue the series about watching me learning Python, here is one that might be useful for any  application that samples data in real time into a circular buffer, aka ring buffer. This examples runs two threads, one thread is "reading" (creating) data and depositing this data into the ring buffer, while the other thread reads a certain amount of said data independently.
Obviously, the ring buffer is in shared memory (global) and accessible by both threads.
Of course, putting random numbers into a buffer in a particular frequency and reading said buffer out with a different frequency is not solving any real problems, however, this is supposed to demonstrate how two threads could be used for sampling and analyzing data.

from time import sleep
from threading import Thread
import random

def read_val(var):
    length=len(var)
    global pos
    while True:
        for i in range(length):
            var[i]=random.random()
            pos=i
            print("generated ",i, var[i])
            sleep(0.5)

def pick_val(var):
    length=len(var)
    while True:
        sleep(0.1)
        print("most recent ",pos,var[pos])
        for i in range(length):
            lpos=(pos+i)%length
            print("=> ",i,var[i],lpos,var[lpos])

# initialize memory
data = []
for i in range(5):
    data.append(0)

t1 = Thread(target=read_val, args=(data,))
t2 = Thread(target=pick_val, args=(data,))

t1.start()
t2.start()

Saturday, February 13, 2021

Raspberry Pi with Arduino

Over the last few months I was developing hard- and software for a project. While I won't disclose any details of the project here, I think it would be fun for the readers to see me learning Raspberry Pi, Arduino and Python.
And yes, the previous Raspberry Pi blog posts were all somewhat triggered by the project mentioned above. In fact, the main idea of the project was not only using Raspberry Pi hardware for the project itself, bu also as main hardware for the company running said project, including all staff. (I am diverting...)

On particular problem with Arduino boards hooked up to Raspberry Pi boards is that the device might change from /dev/ttyACM0 to /dev/ttyACM1 from one use to the other. When using cheap clones, the device might be /dev/ttyUSB0.
So, in a project using Arduino (clone) boards with a Raspberry Pi board, one needs to test which device is in use.

Here comes the learn Python with me. This might not be the most elegant way to do this, however, here is a solution that works for me:

def initialize():
    global SERIAL_PORT
    global ser
    # find a valid serial port
    try:
        TSER='/dev/ttyACM0'
        print('trying '+TSER)
        tser = serial.Serial(
            port=TSER,
            baudrate = 115200,
            parity=serial.PARITY_NONE,
            stopbits=serial.STOPBITS_ONE,
            bytesize=serial.EIGHTBITS,
            timeout=1
        )
        print(TSER + ' found')
        SERIAL_PORT=TSER
        tser.close()
    except serial.serialutil.SerialException:
        try:
            TSER='/dev/ttyACM1'
            print('trying '+TSER)
            tser = serial.Serial(
                port=TSER,
                baudrate = 115200,
                parity=serial.PARITY_NONE,
                stopbits=serial.STOPBITS_ONE,
                bytesize=serial.EIGHTBITS,
                timeout=1
            )
            print(TSER+' found')
            SERIAL_PORT=TSER
            tser.close()
        except:
            print('no Arduino found, using clone')
            SERIAL_PORT='/dev/ttyUSB0'
            
    finally:
        print('using '+SERIAL_PORT)

    ser = serial.Serial(
        port=SERIAL_PORT,
        baudrate = 115200,
        parity=serial.PARITY_NONE,
        stopbits=serial.STOPBITS_ONE,
        bytesize=serial.EIGHTBITS,
        timeout=1
    )