ledpi Custom Code
Start up code in /etc/rc.local
export SYNCHRONIZED_LIGHTS_HOME="/home/pi/lightshowpi"
export PYTHONPATH=$PYTHONPATH:/home/pi/rpi_ws281x
sudo python /home/pi/lightshowpi/py/synchronized_LED_lights.py & > /home/pi/lslog.txt 2>&1
Custom light show code /home/pi/lightshowpi/py/synchronized_LED_lights.py
#!/usr/bin/env python
# Jan 12 2014 - latest additions / mutilations by Scott Driscoll
# CuriousInventor.com | https://plus.google.com/+ScottDriscoll
#
# Licensed under the BSD license. See full license in LICENSE file.
# http://www.lightshowpi.com/
#
# Author: Todd Giles (todd@lightshowpi.com)
# Author: Chris Usey (chris.usey@gmail.com)
# Author: Ryan Jennings
"""Play any audio file and synchronize lights to the music
When executed, this script will play an audio file, as well as turn on and off 8 channels
of lights to the music (via the first 8 GPIO channels on the Rasberry Pi), based upon
music it is playing. Many types of audio files are supported (see decoder.py below), but
it has only been tested with wav and mp3 at the time of this writing.
The timing of the lights turning on and off are controlled based upon the frequency response
of the music being played. A short segment of the music is analyzed via FFT to get the
frequency response across 8 channels in the audio range. Each light channel is then turned
on or off based upon whether the amplitude of the frequency response in the corresponding
channel has crossed a dynamic threshold.
The threshold for each channel is "dynamic" in that it is adjusted upwards and downwards
during the song play back based upon the frequency response amplitude of the song. This ensures
that soft songs, or even soft portions of songs will still turn all 8 channels on and off
during the song.
FFT calculation is quite CPU intensive and can adversely affect playback of songs (especially if
attempting to decode the song as well, as is the case for an mp3). For this reason, the timing
values of the lights turning on and off is cached after it is calculated upon the first time a
new song is played. The values are cached in a gzip'd text file in the same location as
the song itself. Subsequent requests to play the same song will use the cached information and not
recompute the FFT, thus reducing CPU utilization dramatically and allowing for clear music
playback of all audio file types.
Sample usage:
sudo python synchronized_lights.py --playlist=/home/pi/music/.playlist
sudo python synchronized_lights.py --file=/home/pi/music/jingle_bells.mp3
Third party dependencies:
alsaaudio: for audio output - http://pyalsaaudio.sourceforge.net/
decoder.py: decoding mp3, ogg, wma, ... - https://pypi.python.org/pypi/decoder.py/1.5XB
numpy: for FFT calcuation - http://www.numpy.org/
"""
import argparse
import csv
import fcntl
import gzip
import logging
import os
import random
from struct import unpack
import sys
import time
import wave
import alsaaudio as aa
import configuration_manager as cm
import decoder
import hardware_controller as hc
import numpy as np
from time import sleep
from struct import unpack
import time
from neopixel import *
c = 0.0
columns = [1.0,1.0,1.0,1.0,1.0]
decay = .9
# this writes out light and color information to a continuous RGB LED
# strip that's been wrapped around into 5 columns.
# numbers comes in at 9-15 ish
# NeoPixel LED strip configuration:
LED_COUNT = 38 # Number of LED pixels.
LED_COUNT1 = 38 # Number of LED pixels.
LED_PIN = 13 # GPIO pin connected to the pixels (18 uses PWM!).
LED_PIN1 = 13 # was 19 GPIO pin connected to the pixels (18 uses PWM!).
LED_FREQ_HZ = 800000 # LED signal frequency in hertz (usually 800khz)
LED_DMA = 10 # DMA channel to use for generating signal (try 10)
LED_DMA1 = 10 #was 5 DMA channel touse for generating signal (try 10)
LED_BRIGHTNESS = 255 # Set to 0 for darkest and 255 for brightest
LED_INVERT = False # True to invert the signal (when using NPN transistor level shift)
LED_CHANNEL = 1 # set to '1' for GPIOs 13, 19, 41, 45 or 53
#strip = Adafruit_NeoPixel(LED_COUNT, LED_PIN, LED_FREQ_HZ, LED_DMA, LED_INVERT, LED_BRIGHTNESS, LED_CHANNEL)
strip1 = Adafruit_NeoPixel(LED_COUNT1, LED_PIN1, LED_FREQ_HZ, LED_DMA1, LED_INVERT, LED_BRIGHTNESS, LED_CHANNEL)
def wheel_color(position):
"""Get color from wheel value (0 - 384)."""
if position < 0:
position = 0
if position > 384:
position = 384
if position < 128:
r = 127 - position % 128
g = position % 128
b = 0
elif position < 256:
g = 127 - position % 128
b = position % 128
r = 0
else:
b = 127 - position % 128
r = position % 128
g = 0
return Color(r, g, b)
def display_column(col=0,height=0.0,color=Color(50,50,0)):
global c
global columns
color = wheel_color(int(c))
#print("col: height: c:", col, int(round(height)))
c = c + .1
if c > 384:
c = 0.0
height = height - 9.0
height = height / 5
if height < .05:
height = .05
elif height > 1.0:
height = 1.0
if height < columns[col]:
columns[col] = columns[col] * decay
height = columns[col]
else:
columns[col] = height
#print("col: height*25:", col, int(round(height*25)))
if col == 0:
for i in range(0, int(round(height*25))):
strip.setPixelColor(i, color)
elif col == 1:
for i in range(56 - int(round(height*25)), 56):
strip.setPixelColor(i, color)
elif col == 2:
for i in range(62, 62+int(round(height*25))):
strip.setPixelColor(i, color)
elif col == 3:
for i in range(118- int(round(height*25)), 118):
strip.setPixelColor(i, color)
elif col == 4:
for i in range(123, 123+int(round(height*25))):
strip.setPixelColor(i, color)
#display_column2 test fixed color and column vary brightness
def display_column2(col=0,height=0.0,color=Color(50,50,0)):
global c
global columns
color = wheel_color(int(c))
#print("col: height: c:", col, int(round(height)))
c = c + .1
if c > 384:
c = 0.0
height = height - 9.0
height = height / 5
if height < .05:
height = .05
elif height > 1.0:
height = 1.0
if height < columns[col]:
columns[col] = columns[col] * decay
height = columns[col]
else:
columns[col] = height
#print("col: height*25:", col, int(round(height*25)))
if col == 0:
color = wheel_color(15 * int(round(height*25)))
for i in range(0, 28):
strip.setPixelColor(i, color)
elif col == 1:
color = wheel_color(6 * int(round(height*25)))
for i in range(28, 57):
strip.setPixelColor(i, color)
elif col == 2:
color = wheel_color(8 * int(round(height*25)))
for i in range(57, 89):
strip.setPixelColor(i, color)
elif col == 3:
color = wheel_color(13 * int(round(height*25)))
for i in range(89, 118):
strip.setPixelColor(i, color)
elif col == 4:
color = wheel_color(16 * int(round(height*25)))
for i in range(118, 144):
strip.setPixelColor(i, color)
#display_column1 for backlit screen 38 LEDs
def display_column1(col=0,height=0.0,color=Color(0,0,0)):
global c
global columns
color = wheel_color(int(c))
#print("col: height:", col, int(round(height)))
c = c + .1
if c > 384:
c = 0.0
height = height - 9.0
height = height / 5
if height < .05:
height = .05
elif height > 1.0:
height = 1.0
if height < columns[col]:
columns[col] = columns[col] * decay
height = columns[col]
else:
columns[col] = height
#print("col: height*25:", col, int(round(height*25)))
if col == 0:
color = wheel_color(3 * int(round(height*25)))
for i in range(0, 38):
strip1.setPixelColor(i, color)
elif col == 1:
color = wheel_color(6 * int(round(height*25)))
for i in range(0, 38):
strip1.setPixelColor(i, color)
elif col == 2:
color = wheel_color(8 * int(round(height*25)))
for i in range(0, 38):
strip1.setPixelColor(i, color)
elif col == 3:
color = wheel_color(13 * int(round(height*25)))
for i in range(0, 38):
strip1.setPixelColor(i, color)
elif col == 4:
color = wheel_color(16 * int(round(height*25)))
for i in range(0, 38):
strip1.setPixelColor(i, color)
#strip1.show()
# Configurations - TODO(todd): Move more of this into configuration manager
_CONFIG = cm.CONFIG
#_LIMIT_LIST = [int(lim) for lim in _CONFIG.get('auto_tuning', 'limit_list').split(',')]
_LIMIT_LIST = []
#_LIMIT_THRESHOLD = _CONFIG.getfloat('auto_tuning', 'limit_threshold')
_LIMIT_THRESHOLD = 0
#_LIMIT_THRESHOLD_INCREASE = _CONFIG.getfloat('auto_tuning', 'limit_threshold_increase')
_LIMIT_THRESHOLD_INCREASE = 0
#_LIMIT_THRESHOLD_DECREASE = _CONFIG.getfloat('auto_tuning', 'limit_threshold_decrease')
_LIMIT_THRESHOLD_DECREASE = 0
#_MAX_OFF_CYCLES = _CONFIG.getfloat('auto_tuning', 'max_off_cycles')
_MAX_OFF_CYCLES = 0
_MIN_FREQUENCY = _CONFIG.getfloat('audio_processing', 'min_frequency')
_MAX_FREQUENCY = _CONFIG.getfloat('audio_processing', 'max_frequency')
_RANDOMIZE_PLAYLIST = _CONFIG.getboolean('lightshow', 'randomize_playlist')
try:
_CUSTOM_CHANNEL_MAPPING = [int(channel) for channel in
_CONFIG.get('audio_processing', 'custom_channel_mapping').split(',')]
except:
_CUSTOM_CHANNEL_MAPPING = 0
try:
_CUSTOM_CHANNEL_FREQUENCIES = [int(channel) for channel in
_CONFIG.get('audio_processing',
'custom_channel_frequencies').split(',')]
except:
_CUSTOM_CHANNEL_FREQUENCIES = 0
try:
_PLAYLIST_PATH = _CONFIG.get('lightshow', 'playlist_path').replace('$SYNCHRONIZED_LIGHTS_HOME',
cm.HOME_DIR)
except:
_PLAYLIST_PATH = "/home/pi/music/.playlist"
#CHUNK_SIZE = 2048 # Use a multiple of 8
CHUNK_SIZE = 1024 # Use a multiple of 8
def calculate_channel_frequency(min_frequency, max_frequency, custom_channel_mapping,
custom_channel_frequencies):
'''Calculate frequency values for each channel, taking into account custom settings.'''
# How many channels do we need to calculate the frequency for
if custom_channel_mapping != 0 and len(custom_channel_mapping) == hc.GPIOLEN:
logging.debug("Custom Channel Mapping is being used: %s", str(custom_channel_mapping))
channel_length = max(custom_channel_mapping)
else:
logging.debug("Normal Channel Mapping is being used.")
channel_length = hc.GPIOLEN
logging.debug("Calculating frequencies for %d channels.", channel_length)
octaves = (np.log(max_frequency / min_frequency)) / np.log(2)
logging.debug("octaves in selected frequency range ... %s", octaves)
octaves_per_channel = octaves / channel_length
frequency_limits = []
frequency_store = []
frequency_limits.append(min_frequency)
if custom_channel_frequencies != 0 and (len(custom_channel_frequencies) >= channel_length + 1):
logging.debug("Custom channel frequencies are being used")
frequency_limits = custom_channel_frequencies
else:
logging.debug("Custom channel frequencies are not being used")
for i in range(1, hc.GPIOLEN + 1):
frequency_limits.append(frequency_limits[-1]*2**octaves_per_channel)
#frequency_limits.append(frequency_limits[-1]
# * 10 ** (3 / (10 * (1 / octaves_per_channel))))
for i in range(0, channel_length):
frequency_store.append((frequency_limits[i], frequency_limits[i + 1]))
logging.debug("channel %d is %6.2f to %6.2f ", i, frequency_limits[i],
frequency_limits[i + 1])
# we have the frequencies now lets map them if custom mapping is defined
if custom_channel_mapping != 0 and len(custom_channel_mapping) == hc.GPIOLEN:
frequency_map = []
for i in range(0, hc.GPIOLEN):
mapped_channel = custom_channel_mapping[i] - 1
mapped_frequency_set = frequency_store[mapped_channel]
mapped_frequency_set_low = mapped_frequency_set[0]
mapped_frequency_set_high = mapped_frequency_set[1]
logging.debug("mapped channel: " + str(mapped_channel) + " will hold LOW: "
+ str(mapped_frequency_set_low) + " HIGH: "
+ str(mapped_frequency_set_high))
frequency_map.append(mapped_frequency_set)
return frequency_map
else:
return frequency_store
def piff(val, sample_rate):
'''Return the power array index corresponding to a particular frequency.'''
return int(CHUNK_SIZE * val / sample_rate)
# TODO(todd): Move FFT related code into separate file as a library
def calculate_levels(data, sample_rate, frequency_limits):
'''Calculate frequency response for each channel
Initial FFT code inspired from the code posted here:
http://www.raspberrypi.org/phpBB3/viewtopic.php?t=35838&p=454041
Optimizations from work by Scott Driscoll:
http://www.instructables.com/id/Raspberry-Pi-Spectrum-Analyzer-with-RGB-LED-Strip-/
'''
# create a numpy array. This won't work with a mono file, stereo only.
data_stereo = np.frombuffer(data, dtype=np.int16)
data = np.empty(len(data) / 4) # data has two channels and 2 bytes per channel
data[:] = data_stereo[::2] # pull out the even values, just using left channel
# if you take an FFT of a chunk of audio, the edges will look like
# super high frequency cutoffs. Applying a window tapers the edges
# of each end of the chunk down to zero.
window = np.hanning(len(data))
data = data * window
# Apply FFT - real data
fourier = np.fft.rfft(data)
# Remove last element in array to make it the same size as CHUNK_SIZE
fourier = np.delete(fourier, len(fourier) - 1)
# Calculate the power spectrum
power = np.abs(fourier) ** 2
matrix = [0 for i in range(hc.GPIOLEN)]
for i in range(hc.GPIOLEN):
# take the log10 of the resulting sum to approximate how human ears perceive sound levels
matrix[i] = np.log10(np.sum(power[piff(frequency_limits[i][0], sample_rate)
:piff(frequency_limits[i][1], sample_rate):1]))
return matrix
# TODO(todd): Refactor this to make it more readable / modular.
def main():
'''main'''
#strip.begin()
strip1.begin()
song_to_play = int(cm.get_state('song_to_play', 0))
play_now = int(cm.get_state('play_now', 0))
# Arguments
parser = argparse.ArgumentParser()
filegroup = parser.add_mutually_exclusive_group()
filegroup.add_argument('--playlist', default=_PLAYLIST_PATH,
help='Playlist to choose song from.')
filegroup.add_argument('--file', help='path to the song to play (required if no'
'playlist is designated)')
parser.add_argument('--readcache', type=int, default=1,
help='read light timing from cache if available. Default: true')
args = parser.parse_args()
# Log everything to our log file
# TODO(todd): Add logging configuration options.
logging.basicConfig(filename=cm.LOG_DIR + '/music_and_lights.play.dbg',
format='[%(asctime)s] %(levelname)s {%(pathname)s:%(lineno)d}'
' - %(message)s',
level=logging.DEBUG)
# Initialize Lights
hc.initialize()
# Initialize FFT stats
matrix = [0 for _ in range(hc.GPIOLEN)]
offct = [0 for _ in range(hc.GPIOLEN)]
# Build the limit list
if len(_LIMIT_LIST) == 1:
limit = [_LIMIT_LIST[0] for _ in range(hc.GPIOLEN)]
else:
limit = _LIMIT_LIST
# Set up audio
sample_rate = 44100
no_channels = 2
chunk = 1024 #was 512 Use a multiple of 8
data_in = aa.PCM(aa.PCM_CAPTURE, aa.PCM_NORMAL)
data_in.setchannels(no_channels)
data_in.setrate(sample_rate)
data_in.setformat(aa.PCM_FORMAT_S16_LE)
data_in.setperiodsize(CHUNK_SIZE)
cache = []
cache_found = False
# Process audio song_filename
row = 0
l,data = data_in.read()
print("l: len(data:", l, len(data))
frequency_limits = calculate_channel_frequency(_MIN_FREQUENCY,
_MAX_FREQUENCY,
_CUSTOM_CHANNEL_MAPPING,
_CUSTOM_CHANNEL_FREQUENCIES)
while data != '' and not play_now:
# Control lights with cached timing values if they exist
matrix = None
if matrix == None:
#print("calculate_levels", sample_rate)
# No cache - Compute FFT in this chunk, and cache results
matrix = calculate_levels(data, sample_rate, frequency_limits)
# blank out the display
#for i in range(0, LED_COUNT):
#strip.setPixelColor(i, 0)
color = Color(0, 0, 0)
for i in range(0, LED_COUNT1):
strip1.setPixelColor(i, color)
for i in range(0, hc.GPIOLEN):
if hc.is_pin_pwm(i):
# Output pwm, where off is at 0.5 std below the mean
# and full on is at 0.75 std above the mean.
#print("column:i matrix[i]:", i, matrix[i])
#display_column2(i,matrix[i])
display_column1(i,matrix[i])
#brightness = matrix[i] - mean[i] + 0.5 * std[i]
#brightness = brightness / (1.25 * std[i])
#if brightness > 1.0:
#brightness = 1.0
#if brightness < 0:
#brightness = 0
#hc.turn_on_light(i, True, int(brightness * 60))
else:
if limit[i] < matrix[i] * _LIMIT_THRESHOLD:
limit[i] = limit[i] * _LIMIT_THRESHOLD_INCREASE
logging.debug("++++ channel: {0}; limit: {1:.3f}".format(i, limit[i]))
# Amplitude has reached threshold
if matrix[i] > limit[i]:
hc.turn_on_light(i, True)
offct[i] = 0
else: # Amplitude did not reach threshold
offct[i] = offct[i] + 1
if offct[i] > _MAX_OFF_CYCLES:
offct[i] = 0
limit[i] = limit[i] * _LIMIT_THRESHOLD_DECREASE # old value 0.8
logging.debug("---- channel: {0}; limit: {1:.3f}".format(i, limit[i]))
hc.turn_off_light(i, True)
# send out data to RGB LED Strip
#strip.show()
strip1.show()
# Read next chunk of data from music song_filename
l,data = data_in.read()
row = row + 1
# Load new application state in case we've been interrupted
cm.load_state()
play_now = int(cm.get_state('play_now', 0))
# We're done, turn it all off ;)
#for i in range(0, LED_COUNT):
#strip.setPixelColor(i, 0)
color = Color(0, 0, 0)
for i in range(0, LED_COUNT1):
strip1.setPixelColor(i, color)
#strip._cleanup()
strip1._cleanup()
hc.clean_up()
if __name__ == "__main__":
main()