diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..c2fabe1 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,17 @@ +# To get started with Dependabot version updates, you'll need to specify which +# package ecosystems to update and where the package manifests are located. +# Please see the documentation for all configuration options: +# https://docs.github.com/github/administering-a-repository/configuration-options-for-dependency-updates + +version: 2 +updates: + # Maintain dependencies for Cargo + - package-ecosystem: "cargo" + directory: "/" # Location of package manifests + schedule: + interval: "weekly" + # Maintain dependencies for GitHub Actions + - package-ecosystem: github-actions + directory: "/" + schedule: + interval: weekly diff --git a/.github/workflows/dayly_build.yml b/.github/workflows/dayly_build.yml index 4486596..14f6d5a 100644 --- a/.github/workflows/dayly_build.yml +++ b/.github/workflows/dayly_build.yml @@ -1,9 +1,11 @@ name: Dayly Build on: + pull_request: push: branches: - dev + - main jobs: build-linux: diff --git a/audio_stuff/midi/fuer_elise.py b/audio_stuff/midi/fuer_elise.py index 6ec1ceb..4c80687 100644 --- a/audio_stuff/midi/fuer_elise.py +++ b/audio_stuff/midi/fuer_elise.py @@ -6,12 +6,13 @@ # Define the notes for "Für Elise" fur_elise = [ - ('E5', 0.5), ('D#5', 0.5), ('E5', 0.5), ('D#5', 0.5), ('E5', - 0.5), ('B4', 0.5), ('D5', 0.5), ('C5', 0.5), ('A4', 1.5), - ('C4', 0.5), ('E4', 0.5), ('A4', 0.5), ('B4', 1.5), ('E4', - 0.5), ('G#4', 0.5), ('B4', 0.5), ('C5', 1.5), - ('E4', 0.5), ('C4', 0.5), ('E4', 0.5), ('A4', 0.5), ('B4', - 1.5), ('E4', 0.5), ('G#4', 0.5), ('B4', 0.5), ('C5', 1.5) + ('E5', 0.5), ('D#5', 0.5), ('E5', 0.5), ('D#5', 0.5), + ('E5', 0.5), ('B4', 0.5), ('D5', 0.5), ('C5', 0.5), + ('A4', 1.5), ('C4', 0.5), ('E4', 0.5), ('A4', 0.5), + ('B4', 1.5), ('E4', 0.5), ('G#4', 0.5), ('B4', 0.5), + ('C5', 1.5), ('E4', 0.5), ('C4', 0.5), ('E4', 0.5), + ('A4', 0.5), ('B4', 1.5), ('E4', 0.5), ('G#4', 0.5), + ('B4', 0.5), ('C5', 1.5) ] # Define a function to convert note names to MIDI note numbers diff --git a/environment.yml b/environment.yml index c2a3197..963169d 100644 --- a/environment.yml +++ b/environment.yml @@ -1,22 +1,104 @@ channels: - defaults dependencies: - - python==3.11 - - pip + - bzip2=1.0.8 + - ca-certificates=2024.9.24 + - flake8=7.1.1 + - libffi=3.4.4 + - mccabe=0.7.0 + - openssl=1.1.1w + - pip=24.2 + - pycodestyle=2.12.1 + - pyflakes=3.2.0 + - python=3.11.0 + - setuptools=75.1.0 + - sqlite=3.45.3 + - tk=8.6.14 + - tzdata=2024a + - wheel=0.44.0 + - xz=5.4.6 + - zlib=1.2.13 - pip: - - numpy - - pygame - - scipy - - torch - - psutil - - numba - - pretty_midi - - sounddevice - - pyaudio - - matplotlib - - nuitka - - mkdocs - - mkdocs-material - - mkdocs-git-revision-date-localized-plugin - - mkdocs-git-authors-plugin - - mkdocstrings + - argparse==1.4.0 + - attrs==24.2.0 + - automat==24.8.1 + - babel==2.16.0 + - buildtools==1.0.6 + - certifi==2024.8.30 + - cffi==1.17.1 + - charset-normalizer==3.3.2 + - click==8.1.7 + - colorama==0.4.6 + - constantly==23.10.4 + - contourpy==1.3.0 + - cycler==0.12.1 + - cython==3.0.11 + - docopt==0.6.2 + - filelock==3.16.1 + - fonttools==4.54.0 + - fsspec==2024.9.0 + - furl==2.1.3 + - ghp-import==2.1.0 + - gitdb==4.0.11 + - gitpython==3.1.43 + - greenlet==3.1.1 + - hyperlink==21.0.0 + - idna==3.10 + - incremental==24.7.2 + - jinja2==3.1.4 + - keras==3.5.0 + - kiwisolver==1.4.7 + - llvmlite==0.43.0 + - markupsafe==2.1.5 + - matplotlib==3.9.2 + - mergedeep==1.3.4 + - mido==1.3.2 + - mkdocs==1.6.1 + - mkdocs-autorefs==1.2.0 + - mkdocs-get-deps==0.2.0 + - mkdocs-git-authors-plugin==0.9.0 + - mkdocs-git-revision-date-localized-plugin==1.2.9 + - mkdocs-material==9.5.39 + - mkdocs-material-extensions==1.3.1 + - mkdocstrings==0.26.1 + - mpmath==1.3.0 + - networkx==3.3 + - nuitka==2.4.8 + - numba==0.60.0 + - numpy==2.0.2 + - ordered-set==4.1.0 + - orderedmultidict==1.0.1 + - packaging==23.2 + - paginate==0.5.7 + - pathspec==0.12.1 + - pillow==10.4.0 + - platformdirs==4.3.6 + - psutil==6.0.0 + - pyaudio==0.2.14 + - pycparser==2.22 + - pygame==2.6.0 + - pygments==2.18.0 + - pymdown-extensions==10.11.2 + - pyparsing==3.1.4 + - python-dateutil==2.9.0.post0 + - python-rtmidi==1.5.8 + - pytz==2024.2 + - pyyaml==6.0.2 + - pyyaml-env-tag==0.1 + - redo==3.0.0 + - regex==2024.9.11 + - requests==2.32.3 + - scipy==1.14.1 + - simplejson==3.19.3 + - six==1.16.0 + - smmap==5.0.1 + - sounddevice==0.5.0 + - sqlalchemy==2.0.35 + - sympy==1.13.3 + - torch==2.4.1 + - torchvision==0.19.1 + - twisted==24.7.0 + - typing-extensions==4.12.2 + - urllib3==2.2.3 + - zope-interface==7.0.3 + - zstandard==0.23.0 diff --git a/main.py b/main.py index 8188263..91a5641 100644 --- a/main.py +++ b/main.py @@ -520,7 +520,7 @@ def main(): state = "running" try: # window.after(1000, lambda: window.graph.plot_function( - # predefined_functions_dict['nice'], overwrite=True)) + # predefined_functions_dict['funny2'], overwrite=True)) # window.after(2000, window.start_training) window.mainloop() except KeyboardInterrupt: diff --git a/src/music.py b/src/music.py index 645d1cc..0808efa 100644 --- a/src/music.py +++ b/src/music.py @@ -1,18 +1,10 @@ -import copy -import threading -import time -from typing import Callable import mido -import pygame import scipy import torch from src.fourier_neural_network import FourierNN from src import utils -from pygame import mixer, midi, sndarray import atexit -import multiprocessing from multiprocessing import Process, Queue, current_process -import os import sys from tkinter import filedialog import numpy as np @@ -73,205 +65,6 @@ def rescale_audio(audio): return audio / max_val -class Synth(): - - MIDI_NOTE_OF_FREQ_ONE = midi.frequency_to_midi(1) - - def __init__(self, fourier_nn, stdout: Queue = None, num_channels: int = 20): - self.stdout = stdout - self.pool = None - self.live_synth: Process = None - self.notes_ready = False - self.fourier_nn: FourierNN = fourier_nn - self.fs = 44100 # Sample rate - self.num_channels = num_channels - self.notes: dict = {} - self.effects: list[Callable] = [ - # apply_reverb, - # apply_echo, - # apply_chorus, - # apply_distortion, - ] - # pygame.init() - mixer.init(frequency=44100, size=-16, - channels=2, buffer=1024) - mixer.set_num_channels(self.num_channels) - - self.running_channels: dict[str, tuple[int, mixer.Channel]] = {} - self.free_channel_ids = list(range( - self.num_channels)) - # self.generate_sounds() - - def generate_sound_wrapper(self, x, offset): - midi_note, data = self.fourier_nn.synthesize_tuple(x) - return midi_note+offset, data - - def generate_sounds(self): - self.notes_ready = False - - timestep = 44100 - - t = np.arange(0, 1, step=1 / timestep) - - data = self.fourier_nn.predict((2 * np.pi*t)-np.pi) - - peak_frequency = utils.calculate_peak_frequency(data) - - print("peak_frequency", peak_frequency) - - midi_offset = midi.frequency_to_midi(peak_frequency) \ - - self.MIDI_NOTE_OF_FREQ_ONE - print("Midi Note offset:", midi_offset) - self.pool = multiprocessing.Pool(processes=os.cpu_count()) - atexit.register(self.pool.terminate) - atexit.register(self.pool.join) - result_async = self.pool.starmap_async(self.generate_sound_wrapper, - ((x-midi_offset, midi_offset) for x in range(128))) - utils.run_after_ms(1000, self.monitor_note_generation, result_async) - - def monitor_note_generation(self, result_async): - try: - self.note_list = result_async.get(0.1) - except multiprocessing.TimeoutError: - utils.run_after_ms( - 1000, self.monitor_note_generation, result_async) - else: - print("sounds Generated") - - self.notes_ready = True - self.play_init_sound() - self.pool.close() - self.pool.join() - atexit.unregister(self.pool.terminate) - atexit.unregister(self.pool.join) - - def play_init_sound(self): - f1 = 440 # Frequency of the "duuu" sound (in Hz) - f2 = 880 # Frequency of the "dib" sound (in Hz) - t1 = 0.8 # Duration of the "duuu" sound (in seconds) - t2 = 0.2 # Duration of the "dib" sound (in seconds) - t = np.arange(int(t1 * self.fs)) / self.fs - sound1 = 0.5 * np.sin(2 * np.pi * f1 * t) - - # Generate the "dib" sound - t = np.arange(int(t2 * self.fs)) / self.fs - sound2 = 0.5 * np.sin(2 * np.pi * f2 * t) - - # Concatenate the two sounds - audio = np.concatenate([sound1, sound2]) - output = np.array( - audio * 32767 / np.max(np.abs(audio)) / 2).astype(np.int16) - stereo_sine_wave = np.repeat(output.reshape(-1, 1), 2, axis=1) - sound = sndarray.make_sound(stereo_sine_wave) - - channel = mixer.Channel(0) - channel.play(sound) - while (channel.get_busy()): - pass - print("Ready") - - def apply_effects(self, sound): - for effect in self.effects: - print(effect.__name__) - sound[:] = effect(sound) - sound = sound-np.mean(sound) - sound[:] = rescale_audio(sound) - return sound - - def live_synth_loop(self): - midi.init() - mixer.init(frequency=44100, size=-16, - channels=2, buffer=1024) - mixer.set_num_channels(self.num_channels) - for note, sound in self.note_list: - # print(sound.shape) - stereo = np.repeat(rescale_audio(sound).reshape(-1, 1), 2, axis=1) - stereo = np.array(stereo, dtype=np.int16) - # np.savetxt(f"tmp/numpy/sound_note_{note}.numpy", stereo) - stereo_sound = pygame.sndarray.make_sound(stereo) - self.notes[note] = stereo_sound - - input_id = midi.get_default_input_id() - if input_id == -1: - print("No MIDI input device found.") - return - - midi_input = midi.Input(input_id) - print("Live Synth is running") - while True: - if midi_input.poll(): - midi_events = midi_input.read(10) - for midi_event, timestamp in midi_events: - if midi_event[0] == 144: - print("Note on", midi_event[1]) - try: - id = self.free_channel_ids.pop() - channel = mixer.Channel(id) - channel.set_volume(1) - channel.play( - self.notes[midi_event[1]], fade_ms=10, loops=-1) - self.running_channels[midi_event[1]] = ( - id, channel,) - except IndexError: - print("to many sounds playing") - elif midi_event[0] == 128: - print("Note off", midi_event[1]) - self.free_channel_ids.append( - self.running_channels[midi_event[1]][0]) - self.running_channels[midi_event[1]][1].stop() - - def pending_for_live_synth(self): - if not self.notes_ready: - # print("pending") - utils.run_after_ms(500, self.pending_for_live_synth) - else: - self.run_live_synth() - - def run_live_synth(self): - if not self.notes_ready: - print("\033[31;1;4mNOT READY YET\033[0m") - self.generate_sounds() - utils.run_after_ms(500, self.pending_for_live_synth) - return - if not self.live_synth: - print("spawning live synth") - self.live_synth = Process(target=self.live_synth_loop) - self.live_synth.start() - else: - print("killing live synth") - self.live_synth.terminate() - self.live_synth.join() - self.live_synth = None - print("live synth killed") - atexit.register(utils.DIE, self.live_synth, 0, 0) - - def play_sound(self, midi_note): - id = self.free_channel_ids.pop() - channel = mixer.Channel(id) - channel.play(self.notes[midi_note], 200) - - def __getstate__(self) -> object: - pool = self.pool - live_synth = self.live_synth - del self.pool - del self.live_synth - Synth_dict = self.__dict__.copy() - self.pool = pool - self.live_synth = live_synth - return Synth_dict - - def __setstate__(self, state): - # Load the model from a file after deserialization - self.__dict__.update(state) - if self.stdout is not None: - if current_process().name != 'MainProcess': - sys.stdout = utils.QueueSTD_OUT(queue=self.stdout) - - -def get_raw_audio(sound): - return pygame.sndarray.array(sound) - - def apply_reverb(audio, decay=0.5, delay=0.02, fs=44100): delay_samples = int(delay * fs) impulse_response = np.zeros((delay_samples, 2)) @@ -314,84 +107,156 @@ def apply_distortion(audio, gain=2.0, threshold=0.5): return distorted_audio -class ADSR(): +class ADSR: def __init__(self, attack_time, decay_time, sustain_level, release_time, sample_rate): self.attack_samples = int(attack_time * sample_rate) self.decay_samples = int(decay_time * sample_rate) + self.ad_samples = self.attack_samples + self.decay_samples self.sustain_level = sustain_level self.release_samples = int(release_time * sample_rate) - self.sample_rate = sample_rate - - def get_ads_envelope(self, start_frame, num_frames): - envelope = np.zeros(num_frames) - end_frame = start_frame + num_frames - - # Attack phase - if start_frame < self.attack_samples: - attack_end = min(end_frame, self.attack_samples) - envelope[:attack_end - - start_frame] = np.linspace(0, 1, attack_end - start_frame) - start_frame = attack_end - - # Decay phase - if start_frame < self.attack_samples + self.decay_samples: - decay_end = min(end_frame, self.attack_samples + - self.decay_samples) - envelope[start_frame - self.attack_samples:decay_end - - self.attack_samples] = np.linspace(1, self.sustain_level, decay_end - start_frame)[start_frame - self.attack_samples:decay_end - - self.attack_samples] - start_frame = decay_end - - # Sustain phase - if start_frame >= self.attack_samples + self.decay_samples: - sustain_start = max( - start_frame, self.attack_samples + self.decay_samples) - envelope[sustain_start - start_frame:] = self.sustain_level + + self.ads_envelope = np.concatenate(( + np.linspace(0, 1, self.attack_samples), + np.linspace(1, self.sustain_level, self.decay_samples) + )) + self.r_envelope = np.linspace( + self.sustain_level, 0, self.release_samples) + + self._r_counter = [0 for _ in range(128)] + self._ads_counter = [0 for _ in range(128)] + + def reset_note(self, note_num): + self._ads_counter[note_num] = 0 + self._r_counter[note_num] = 0 + + def has_note_ended(self, note_num) -> bool: + return self._r_counter[note_num] >= self.release_samples + + def get_ads_envelope(self, note_num, frame): + start = self._ads_counter[note_num] + end = start + frame + self._ads_counter[note_num] += frame + + if start > self.ad_samples: + return np.ones(frame) * self.sustain_level + + envelope = np.zeros(frame) + if end > self.ad_samples: + envelope[:self.ad_samples - + start] = self.ads_envelope[start:self.ad_samples] + envelope[self.ad_samples - + start:] = np.ones(end - self.ad_samples)*self.sustain_level + else: + envelope[:] = self.ads_envelope[start:end] return envelope - def get_r_envelope(self, start_frame, num_frames): - envelope = np.zeros(num_frames) - end_frame = start_frame + num_frames + def get_r_envelope(self, note_num, frame): + start = self._r_counter[note_num] + end = start + frame + self._r_counter[note_num] += frame - # Release phase - if start_frame < self.release_samples: - release_end = min(end_frame, self.release_samples) - envelope[:release_end - start_frame] = np.linspace( - self.sustain_level, 0, release_end - start_frame) + if start > self.release_samples: + return np.zeros(frame) + envelope = np.zeros(frame) + if end <= self.release_samples: + envelope[:] = self.r_envelope[start:end] + else: + envelope[:self.release_samples - + start] = self.r_envelope[start:self.release_samples] + # After release, the envelope is 0 + envelope[self.release_samples - start:] = 0 return envelope -class Note: - def __init__(self, amplitude): - self.amplitude = amplitude - self.on = True - self.adsr_state = 'attack' - self.adsr_position = 0 +class Echo_torch: + def __init__(self, sample_size, delay_time_ms, feedback, wet_dry_mix, device='cpu'): + self.sample_size = sample_size + # Convert ms to samples (assuming 44.1 kHz sample rate) + self.delay_time = int(delay_time_ms * 44.1) + self.feedback = feedback + self.wet_dry_mix = wet_dry_mix + self.device = device + + # Initialize the past buffer to store delayed samples + self.past = torch.zeros( + (self.delay_time + 1, sample_size), device=device) + + def apply(self, sound): + # Ensure sound is a tensor of shape (num_samples, sample_size) + if sound.dim() != 2 or sound.size(1) != self.sample_size: + raise ValueError( + "Input sound must be a 2D tensor with shape (num_samples, sample_size)") + + # Create an output tensor + output = torch.zeros_like(sound, device=self.device) + + for i in range(sound.size(0)): + # Get the current sample + current_sample = sound[i] + + # Get the delayed sample from the past buffer + delayed_sample = self.past[-self.delay_time] if i >= self.delay_time else torch.zeros( + self.sample_size, device=self.device) - def __str__(self): - return f"Note({self.amplitude}, {self.on}, {self.adsr_position}, {self.adsr_state})" + # Calculate the echo effect + echo_sample = (current_sample + delayed_sample * + self.feedback) * self.wet_dry_mix + # Store the current sample in the past buffer + self.past = torch.roll(self.past, shifts=-1, dims=0) + self.past[-1] = current_sample -def get_on_notes(notes): - return {note: note_data for note, note_data in notes.items() if note_data.on} + # Combine the original sound and the echo + output[i] = current_sample * (1 - self.wet_dry_mix) + echo_sample + return output -def get_off_notes(notes): - return {note: note_data for note, note_data in notes.items() if not note_data.on} +class Echo: + def __init__(self, sample_size, delay_time_ms, feedback, wet_dry_mix): + self.sample_size = sample_size + # Convert ms to samples (assuming 44.1 kHz sample rate) + # Convert ms to seconds + self.delay_time = int(delay_time_ms * 44.1 / 1000) + self.feedback = feedback + self.wet_dry_mix = wet_dry_mix -def piano_id(): - desired_name = "Digital Piano" - device_id = None + # Initialize the past buffer to store delayed samples + self.past = np.zeros((self.delay_time + 1, sample_size)) - for i in range(pygame.midi.get_count()): - info = pygame.midi.get_device_info(i) - if info[1].decode() == desired_name: - device_id = i - break - return device_id + def apply(self, sound): + # Ensure sound is a 2D array with shape (num_samples, sample_size) + if sound.ndim != 2 or sound.shape[1] != self.sample_size: + raise ValueError( + "Input sound must be a 2D array with shape (num_samples, sample_size)") + + # Create an output array + output = np.zeros_like(sound) + + for i in range(sound.shape[0]): + # Get the current sample + current_sample = sound[i] + + # Get the delayed sample from the past buffer + if i >= self.delay_time: + delayed_sample = self.past[-self.delay_time] + else: + delayed_sample = np.zeros(self.sample_size) + + # Calculate the echo effect + echo_sample = (current_sample + delayed_sample * + self.feedback) * self.wet_dry_mix + + # Store the current sample in the past buffer + self.past = np.roll(self.past, shift=-1, axis=0) + self.past[-1] = current_sample + + # Combine the original sound and the echo + output[i] = current_sample * (1 - self.wet_dry_mix) + echo_sample + + return output class Synth2(): @@ -400,12 +265,12 @@ def __init__(self, fourier_nn, stdout: Queue = None, port_name=None): self.live_synth: Process = None self.notes_ready = False self.fourier_nn: FourierNN = fourier_nn - self.fs = 44100 # Sample rate + self.sample_rate = 44100 # Sample rate self.max_parralel_notes = 3 self.current_frame = 0 t = np.array([ - midi.midi_to_frequency(f) * - np.linspace(0, 2*np.pi, self.fs) + utils.midi_to_freq(f) * + np.linspace(0, 2*np.pi, self.sample_rate) for f in range(128) ]) self.t_buffer = torch.tensor(t, dtype=torch.float32) @@ -426,19 +291,19 @@ def play_init_sound(self): t4 = 0.4 # Duration of the "dii" sound (in seconds) # Generate the "duuu" sound - t = np.arange(int(t1 * self.fs)) / self.fs + t = np.arange(int(t1 * self.sample_rate)) / self.sample_rate sound1 = 0.5 * np.sin(2 * np.pi * f1 * t) # Generate the "dl" sound - t = np.arange(int(t2 * self.fs)) / self.fs + t = np.arange(int(t2 * self.sample_rate)) / self.sample_rate sound2 = 0.5 * np.sin(2 * np.pi * f2 * t) # Generate the "diii" sound - t = np.arange(int(t3 * self.fs)) / self.fs + t = np.arange(int(t3 * self.sample_rate)) / self.sample_rate sound3 = 0.5 * np.sin(2 * np.pi * f3 * t) # Generate the "dub" sound - t = np.arange(int(t4 * self.fs)) / self.fs + t = np.arange(int(t4 * self.sample_rate)) / self.sample_rate sound4 = 0.5 * np.sin(2 * np.pi * f4 * t) # Concatenate the sounds to form "duuudldiiidub" @@ -458,60 +323,93 @@ def live_synth_loop(self): CHUNK = 2048 # Increased chunk size stream = p.open(format=pyaudio.paFloat32, channels=1, - rate=self.fs, + # frames_per_buffer=CHUNK, + rate=self.sample_rate, output=True) current_frame = 0 cycle_frame = 0 + self.adsr_envelope = ADSR( + 0.1, + 0.1, + 0.75, + 0.2, + self.sample_rate) notes = {} model = self.fourier_nn.current_model.to(self.fourier_nn.device) self.play_init_sound() + with mido.open_input(self.port_name) as midi_input: while True: # for _ in utils.timed_loop(True): available_buffer = stream.get_write_available() - # print(available_buffer, end=" |\t") if available_buffer == 0: continue midi_event = midi_input.poll() - # print(midi_event) if midi_event: - midi_event.type - midi_event.note - midi_event.velocity + print(midi_event) + # midi_event.type + # midi_event.note + # midi_event.velocity if midi_event.type == 'note_on': # Note on print("Note on", midi_event.note, - midi.midi_to_frequency(midi_event.note)) + utils.midi_to_freq(midi_event.note)) - notes[midi_event.note] = midi_event.velocity + notes[midi_event.note] = [True, midi_event.velocity] + self.adsr_envelope.reset_note(midi_event.note) elif midi_event.type == 'note_off': # Note off print("Note off", midi_event.note, - midi.midi_to_frequency(midi_event.note)) + utils.midi_to_freq(midi_event.note)) if midi_event.note in notes: - del notes[midi_event.note] + # del notes[midi_event.note] + notes[midi_event.note][0] = False if len(notes) > 0: + print(f"available_buffer: {available_buffer}") + # y = torch.zeros(size=(len(notes), available_buffer), + # device=self.fourier_nn.device) y = np.zeros((len(notes), available_buffer), dtype=np.float32) - for i, (note, amplitude) in enumerate(notes.items()): + to_delete = [] + for i, (note, data) in enumerate(notes.items()): with torch.no_grad(): x = utils.wrap_concat( self.t_buffer[note], cycle_frame, cycle_frame + available_buffer).to(self.fourier_nn.device) - - y[i, :] = model( - x.unsqueeze(1)).cpu().numpy().astype(np.float32) # * (amplitude/127) + if data[0]: + envelope = self.adsr_envelope.get_ads_envelope( + note, available_buffer) + else: + envelope = self.adsr_envelope.get_r_envelope( + note, available_buffer) + if self.adsr_envelope.has_note_ended(note): + to_delete.append(note) + + model_output = model(x.unsqueeze(1)) + y[i, :] = model_output.cpu().numpy().astype( + np.float32) * envelope + # y[i, :] = model( + # x.unsqueeze(1)).cpu().numpy().astype(np.float32) # * envelope # * (amplitude/127) + # y[i, :] = np.sin( + # x.numpy().astype(np.float32)) * envelope + + for note in to_delete: + del notes[note] audio_data = sum_signals(y) # print(np.max(np.abs(audio_data))) + audio_data = normalize(audio_data) + audio_data *= 1 # oscilating_amplitude audio_data = np.clip(audio_data, -1, 1) - audio_data *= 0.7 - stream.write(audio_data, + stream.write(audio_data.astype(np.float32), available_buffer, exception_on_underflow=True) + # if cycle_frame <= 100: + # print(f"available_buffer: {available_buffer}") current_frame = (current_frame + available_buffer) - cycle_frame = (cycle_frame + available_buffer) % self.fs + cycle_frame = (cycle_frame + available_buffer) %\ + self.sample_rate def run_live_synth(self): if not self.live_synth: @@ -542,6 +440,9 @@ def __setstate__(self, state): def normalize(signal): + max_val = np.max(np.abs(signal)) + if np.isnan(max_val) or max_val == 0: + return signal return signal / np.max(np.abs(signal)) diff --git a/src/std_redirect.py b/src/std_redirect.py index 3ba0996..8affcc2 100644 --- a/src/std_redirect.py +++ b/src/std_redirect.py @@ -165,18 +165,21 @@ def on_resize(self, event): def check_queue(self): try: # print("asdfasdf") - while not self.queue.empty(): - try: - msg = self.queue.get_nowait() - self.redirector(msg) - except queue.Empty: - break - try: - tk_after_errorless(self.master, 100, self.check_queue) - except: - print("ERROR") + if not self.queue.empty(): + for i in range(100): + try: + msg = self.queue.get_nowait() + self.redirector(msg) + # self.old_stdout.write(str(self.queue.qsize())+'\n') + except queue.Empty: + break except BrokenPipeError: - pass + return + # delay = 100 + delay = int(500 if self.queue.empty() + else min(5, 100/min(1, self.queue.qsize()/10))) + # self.old_stdout.write(f'Delay: {delay}\n') + tk_after_errorless(self.master, delay, self.check_queue) def __del__(self): sys.stdout = self.old_stdout diff --git a/src/utils.py b/src/utils.py index 42402f9..54dac13 100644 --- a/src/utils.py +++ b/src/utils.py @@ -378,11 +378,32 @@ def timed_generator(iterable): def timed_loop(condition): + """ + A generator that runs a timed loop while the condition is true. + + Args: + condition (bool): A condition that controls the loop. + + Yields: + None: Control is yielded back to the caller. + """ max_val = 0 + total_time = 0 + iteration_count = 0 + while condition: start_time = time.perf_counter_ns() - yield + yield # Yield control back to the caller current_time = time.perf_counter_ns() - start_time + max_val = max(current_time, max_val) - print( - f"Time taken for this iteration: {(current_time/1_000_000_000):10.9f}ms, max_val: {max_val/1_000_000_000:10.3}") + total_time += current_time + iteration_count += 1 + + if total_time >= 100_000_000: # 100 milliseconds + avg_time = (total_time / iteration_count) / \ + 1_000_000 # Convert to milliseconds + max_time = max_val / 1_000_000 # Convert to milliseconds + print( + f"Time taken for this iteration: {avg_time:10.9f} ms, max_val: {max_time:10.3f} ms") + total_time, iteration_count = 0, 0 # Reset for the next period