DEV Community

Anze
Anze

Posted on

Making a performant audio capture system⚡🚀💨

Ever wanted to build a performant audio capturing system, but don't know where to start? It definitely happened to me.

Due to lack of modern C++ implementations on this, I decided to share what I've learned when making it and give you a starting group off of what I've managed to build!

DISCLAIMER: Knowledge about OOP, basic C++ principles or overall general programming knowledge and GRPC is advised!

The motivation

The motivation is for a way larger project I'm working on. The source code for all of the code is going to be linked below at the end of the post! 😊

The CMake configuration

Every modern C/C++ application requires some type of configuration, this is where I chose CMake since It's perfect for cross platform compilation!


# Define the minimum required version
# and the project version
cmake_minimum_required(VERSION 3.16)
project(audio_capture VERSION 1.0)

# Set C standards
set(CMAKE_CXX_STANDARD 20)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
set(CMAKE_EXPORT_COMPILE_COMMANDS ON)

#Include additional common cmake code for working with GRPC

include(./cmake/common.cmake)

#Include FetchContent for fetching code and files remotely.

include(FetchContent)

# Find all .proto files in the protos directory
file(GLOB_RECURSE PROTO_FILES "${CMAKE_SOURCE_DIR}/protos/*.proto")

# Create lists to store generated files
set(PROTO_SRCS)
set(PROTO_HDRS)
set(GRPC_SRCS)
set(GRPC_HDRS)

# Generate protocol buffer and gRPC code for each .proto file
foreach(proto_file ${PROTO_FILES})
    get_filename_component(proto_path "${proto_file}" PATH)
    get_filename_component(proto_name "${proto_file}" NAME_WE)

    list(APPEND PROTO_SRCS "${CMAKE_CURRENT_BINARY_DIR}/${proto_name}.pb.cc")
    list(APPEND PROTO_HDRS "${CMAKE_CURRENT_BINARY_DIR}/${proto_name}.pb.h")
    list(APPEND GRPC_SRCS "${CMAKE_CURRENT_BINARY_DIR}/${proto_name}.grpc.pb.cc")
    list(APPEND GRPC_HDRS "${CMAKE_CURRENT_BINARY_DIR}/${proto_name}.grpc.pb.h")

    add_custom_command(
        OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/${proto_name}.pb.cc"
            "${CMAKE_CURRENT_BINARY_DIR}/${proto_name}.pb.h"
            "${CMAKE_CURRENT_BINARY_DIR}/${proto_name}.grpc.pb.cc"
            "${CMAKE_CURRENT_BINARY_DIR}/${proto_name}.grpc.pb.h"
        COMMAND ${_PROTOBUF_PROTOC}
        ARGS --grpc_out=generate_mock_code=false:"${CMAKE_CURRENT_BINARY_DIR}"  # Disable server stub
            --cpp_out "${CMAKE_CURRENT_BINARY_DIR}"
            -I "${proto_path}"
            --plugin=protoc-gen-grpc="${_GRPC_CPP_PLUGIN_EXECUTABLE}"
            "${proto_file}"
        DEPENDS "${proto_file}")
endforeach()

# Create protocol buffer library
add_library(audio_proto
    ${GRPC_SRCS}
    ${GRPC_HDRS}
    ${PROTO_SRCS}
    ${PROTO_HDRS})

target_link_libraries(audio_proto
    absl::check
    ${_REFLECTION}
    ${_GRPC_GRPCPP}
    ${_PROTOBUF_LIBPROTOBUF})

# Configure PortAudio
FetchContent_Declare(
    portaudio
    GIT_REPOSITORY https://github.com/PortAudio/portaudio.git
    GIT_TAG v19.7.0
)

# Configure PortAudio build options
set(PA_BUILD_SHARED OFF CACHE BOOL "")
set(PA_BUILD_TESTS OFF CACHE BOOL "")
set(PA_BUILD_EXAMPLES OFF CACHE BOOL "")

# Make PortAudio available
FetchContent_MakeAvailable(portaudio)

# Configure spdlog
FetchContent_Declare(
    spdlog
    GIT_REPOSITORY https://github.com/gabime/spdlog.git
    GIT_TAG v1.15.1
)
FetchContent_MakeAvailable(spdlog)

# Find FFTW3
find_package(PkgConfig REQUIRED)
pkg_check_modules(FFTW3 REQUIRED fftw3)

# Include directories
include_directories(
    ${CMAKE_CURRENT_BINARY_DIR}
    ${CMAKE_SOURCE_DIR}/includes
    ${FFTW3_INCLUDE_DIRS}
    ${portaudio_SOURCE_DIR}/source
)

# Create main executable
file(GLOB_RECURSE SRC_FILES src/*.cc)
add_executable(audio_capture ${SRC_FILES})

# Link dependencies
target_link_libraries(audio_capture PRIVATE
    audio_proto
    portaudio_static
    spdlog::spdlog
    ${FFTW3_LIBRARIES}
    absl::check
    absl::flags
    absl::flags_parse
    absl::log
    ${_REFLECTION}
    ${_GRPC_GRPCPP}
    ${_PROTOBUF_LIBPROTOBUF}
)

# Platform-specific configuration
if(WIN32)
    target_link_libraries(audio_capture PRIVATE winmm)
elseif(UNIX AND NOT APPLE)
    find_package(Threads REQUIRED)
    target_link_libraries(audio_capture PRIVATE
        pthread
        fftw3
    )
endif()

# Add FFTW3 library directories
link_directories(${FFTW3_LIBRARY_DIRS})
Enter fullscreen mode Exit fullscreen mode

The project architecture

The whole project architecture is quite simple, maintainable and made to scale.

|-- CMakeLists.txt
|-- Makefile
|-- cmake
|   `-- common.cmake
|-- includes
|   |-- audio_capture.h
|   `-- logger.h
|-- protos
|   `-- device_stream.proto
`-- src
    |-- audio_capture.cc
    |-- logger.cc
    `-- main.cc
Enter fullscreen mode Exit fullscreen mode

The logger

I've used the spdlog for beautiful logging messages that come with it!

Here is the .h definition (all of the source code is on github)


#pragma once

#include <cstddef>
#include <memory>
#include <spdlog/sinks/basic_file_sink.h>
#include <spdlog/spdlog.h>
#include <string>

class Logger {
public:
  static void init();
  static std::shared_ptr<Logger> get();

  static constexpr size_t queue_items_max = 8192;
  static constexpr size_t backing_thread_count = 1;
  static constexpr size_t max_file_size = 10 * 1024 * 1024;
  static constexpr size_t max_files = 5;
  static constexpr std::string logs_path = "logs/app.log";

  static void info(const std::string &message) {
    logger_->info(message);
  }

  static void warn(const std::string &message) {
    logger_->warn(message);
  }

  static void error(const std::string &message) {
    logger_->error(message);
  }

private:
  static std::shared_ptr<spdlog::logger> logger_;
};
Enter fullscreen mode Exit fullscreen mode

The logger implementation:

#include "logger.h"
#include "spdlog/async.h"
#include "spdlog/common.h"
#include "spdlog/sinks/rotating_file_sink.h"
#include "spdlog/sinks/stdout_color_sinks.h"
#include "spdlog/spdlog.h"
#include <iostream>
#include <memory>

// Initialize logger as nullptr
std::shared_ptr<spdlog::logger> Logger::logger_ = nullptr;

void Logger::init() {
  try {
    spdlog::init_thread_pool(queue_items_max, backing_thread_count);

    auto console_sink = std::make_shared<spdlog::sinks::stdout_color_sink_mt>();
    auto file_rotating_sink =
        std::make_shared<spdlog::sinks::rotating_file_sink_mt>(
            logs_path, max_file_size, max_files);

    console_sink->set_level(spdlog::level::info);
    file_rotating_sink->set_level(spdlog::level::debug);

    logger_ = std::make_shared<spdlog::async_logger>(
        "main_logger",
        spdlog::sinks_init_list{console_sink, file_rotating_sink},
        spdlog::thread_pool(), spdlog::async_overflow_policy::block);

    spdlog::set_default_logger(logger_);
    spdlog::set_pattern("[%Y-%m-%d %H:%M:%S] [%^%l%$] [thread %t] %v");

#ifdef NDEBUG
    spdlog::set_level(spdlog::level::info);
#else
    spdlog::set_level(spdlog::level::debug);
#endif

    spdlog::info("Logger initialized successfully!");
  } catch (const spdlog::spdlog_ex &ex) {
    std::cerr << "Log initialization failed: " << ex.what() << std::endl;
    exit(EXIT_FAILURE);
  }
}

std::shared_ptr<Logger> Logger::get() {
  static std::shared_ptr<Logger> instance = std::make_shared<Logger>();
  return instance;
}
Enter fullscreen mode Exit fullscreen mode

The protobuf GRPC messages & RPCs

The current implementation of this is very simple and straight forward. Both the server and the client share the same GRPC file and structure to communicate with streams.

syntax = "proto3";

package audio_stream;

service AudioStream {
    rpc StreamAudio(stream AudioChunk) returns (StreamResponse);
}

message AudioChunk {
    bytes audio_data = 1;
    repeated double spectral_data = 2;  
    double energy = 3;
    double zero_crossings = 4;
    double speech_band_energy = 5;
    bool voice_detected = 6; 
    int64 timestamp = 7;
}

message StreamResponse {
    bool success = 1;
    string response = 2;
}
Enter fullscreen mode Exit fullscreen mode

The audio capture system

The audio capture system is implemented using portaudio.h which is a library used for cross platform audio development!

Here is my implementation of the .h file (header):

#pragma once

#include "portaudio.h"
#include <atomic>
#include <condition_variable>
#include <cstddef>
#include <memory>
#include <mutex>
#include <queue>
#include <unordered_map>
#include <vector>
#include "device_stream.grpc.pb.h"  // gRPC generated header

// Forward declaration of Logger
class Logger;

// AudioCapture class for handling audio streaming
class AudioCapture {
public:
  AudioCapture();
  ~AudioCapture();

  static constexpr size_t sample_rate = 44100;           // Audio sample rate
  static constexpr size_t frames_per_buffer = 512;        // Frames per buffer
  static constexpr int numInputChannels = 1;              // Number of input channels (mono)
  static constexpr int numOutputChannels = 0;             // Number of output channels (no output)

  // Starts the audio stream
  bool start();
  // Stops the audio stream
  void stop();
  // Checks if the stream is running
  bool isRunning() const;

  // Retrieves available audio devices
  std::unordered_map<size_t, std::shared_ptr<const PaDeviceInfo>> getDevices();

  // Opens the audio stream with the specified device index
  void openAudioStream(int deviceIndex);

  // Gets the total number of audio devices
  int getDeviceCount() const;

  // Creates stream parameters for a given device and number of channels
  std::unique_ptr<PaStreamParameters> createStreamParameters(int device, int numChannels);

  // Sends the audio stream to the gRPC server
  void sendAudioStream(std::shared_ptr<audio_stream::AudioStream::Stub> stub);

  // Closes the audio stream
  void closeAudioStream();

  // Retrieves the next audio chunk from the queue
  std::vector<float> getNextAudioChunk();

private:
  // Audio callback function for PortAudio
  static int audioCallback(const void *input, void *, size_t, 
                           const PaStreamCallbackTimeInfo *, 
                           PaStreamCallbackFlags, void *userData);

  // Queue to hold audio chunks
  std::queue<std::vector<float>> audioQueue;
  // Mutex and condition variable for thread synchronization
  std::mutex queueMutex;
  std::condition_variable queueCondition;
  // Atomic flag to manage the running state of the capture
  std::atomic<bool> running_;

  // Stream parameters for input and output
  std::unique_ptr<PaStreamParameters> inputParameters;
  std::unique_ptr<PaStreamParameters> outputParameters;

  // Logger for logging events
  std::shared_ptr<Logger> logger_;
  // Pointer to the PortAudio stream
  PaStream *stream_;
};
Enter fullscreen mode Exit fullscreen mode

The implementation of audio capture:

// AudioCapture.cpp
#include "audio_capture.h"
#include "device_stream.pb.h"
#include "logger.h"
#include "portaudio.h"
#include <algorithm>
#include <cstddef>
#include <grpcpp/client_context.h>
#include <grpcpp/support/sync_stream.h>
#include <mutex>
#include <optional>
#include <string>
#include <utility>
#include <vector>

static void handleAudioError(PaError err, std::shared_ptr<Logger> logger) {
  if (err != paNoError) {
    logger->error("PortAudio error: " + std::string(Pa_GetErrorText(err)));
  }
}

AudioCapture::AudioCapture()
    : stream_(nullptr), running_(false), logger_(std::make_shared<Logger>()) {
  PaError err = Pa_Initialize();
  handleAudioError(err, logger_);
}

AudioCapture::~AudioCapture() {
  PaError err = Pa_Terminate();
  handleAudioError(err, logger_);
}

bool AudioCapture::start() {
  if (running_)
    return false;

  handleAudioError(Pa_OpenDefaultStream(
                       &stream_, numInputChannels, numOutputChannels, paFloat32,
                       sample_rate, frames_per_buffer, audioCallback, this),
                   logger_);

  PaError err = Pa_StartStream(stream_);
  if (err != paNoError) {
    logger_->error("Failed to start audio stream: " +
                   std::string(Pa_GetErrorText(err)));
    return false;
  }

  running_ = true;

  return true;
}

void AudioCapture::stop() {
  if (!running_) {
    return;
  }

  handleAudioError(Pa_StopStream(stream_), logger_);
  Pa_CloseStream(stream_);
  running_ = false;
}

bool AudioCapture::isRunning() const { return running_; }

int AudioCapture::getDeviceCount() const {
  PaDeviceIndex deviceCount = Pa_GetDeviceCount();
  return static_cast<int>(deviceCount);
}

std::vector<float> AudioCapture::getNextAudioChunk() {
  std::unique_lock<std::mutex> lock(queueMutex);
  queueCondition.wait(lock,
                      [this] { return !audioQueue.empty() || !running_; });

  if (!audioQueue.empty()) {
    std::vector<float> chunk = std::move(audioQueue.front());
    audioQueue.pop();
    return chunk;
  }

  return std::vector<float>();
}

void AudioCapture::closeAudioStream() {
  if (Pa_IsStreamActive(stream_) == 1) {
    Pa_StopStream(stream_);
  }

  Pa_CloseStream(stream_);

  stream_ = nullptr;
}

void AudioCapture::openAudioStream(int deviceIndex) {
  if (stream_) {
    if (Pa_IsStreamActive(stream_) == 1) {
      handleAudioError(Pa_StopStream(stream_), logger_);
    }

    Pa_CloseStream(stream_);
    stream_ = nullptr;
  }

  // Reset parameters

  inputParameters.reset();
  outputParameters.reset();

  if (deviceIndex < 0 || deviceIndex >= getDeviceCount()) {
    logger_->error("Invalid device index");
  }

  const PaDeviceInfo *deviceInfo = Pa_GetDeviceInfo(deviceIndex);

  if (!deviceIndex) {
    logger_->error("Failed to get device info");
    return;
  }

  logger_->info("Device Info: ");
  logger_->info("Name: " + std::string(deviceInfo->name));
  logger_->info("Max Input Channels: " +
                std::to_string(deviceInfo->maxInputChannels));
  logger_->info("Max Output Channels: " +
                std::to_string(deviceInfo->maxOutputChannels));
  logger_->info("Default Sample Rate: " +
                std::to_string(deviceInfo->defaultSampleRate));

  std::optional<PaStreamParameters> inputParams;
  std::optional<PaStreamParameters> outputParams;

  if (deviceInfo->maxInputChannels > 0) {
    inputParameters =
        createStreamParameters(deviceIndex, deviceInfo->maxInputChannels);
    inputParams = *inputParameters;
  }

  if (deviceInfo->maxOutputChannels > 0) {
    outputParameters =
        createStreamParameters(deviceIndex, deviceInfo->maxOutputChannels);
    outputParams = *outputParameters;
  }

  double deviceSampleRate = deviceInfo->defaultSampleRate;

  handleAudioError(
      Pa_OpenStream(&stream_, inputParams ? &*inputParams : nullptr,
                    outputParams ? &*outputParams : nullptr, deviceSampleRate,
                    frames_per_buffer, paClipOff, audioCallback, this),
      logger_);

  std::string deviceName = std::string(deviceInfo->name);
  logger_->info("Stream successfully opened for device" + deviceName);

  Pa_StartStream(stream_);
}

std::unique_ptr<PaStreamParameters>
AudioCapture::createStreamParameters(int device, int numChannels) {
  auto parameters = std::make_unique<PaStreamParameters>();
  const PaDeviceInfo *deviceInfo = Pa_GetDeviceInfo(device);

  if (!deviceInfo) {
    logger_->error("Failed to get device information for device: " +
                   std::to_string(device));
    return nullptr;
  }

  int channels = std::min(numChannels, deviceInfo->maxInputChannels);

  parameters->device = device;
  parameters->channelCount = channels;
  parameters->sampleFormat = paFloat32;
  parameters->suggestedLatency = deviceInfo->defaultHighInputLatency;
  parameters->hostApiSpecificStreamInfo = nullptr;

  return parameters;
}

std::unordered_map<size_t, std::shared_ptr<const PaDeviceInfo>>
AudioCapture::getDevices() {
  std::unordered_map<size_t, std::shared_ptr<const PaDeviceInfo>> devices;
  PaDeviceIndex deviceCount = Pa_GetDeviceCount();

  if (deviceCount < 0) {
    logger_->error("Failed to retrieve PortAudio device count: " +
                   std::to_string(deviceCount));
    return devices;
  }

  for (size_t i = 0; i < static_cast<size_t>(deviceCount); i++) {
    const PaDeviceInfo *deviceInfo = Pa_GetDeviceInfo(i);
    if (deviceInfo) {
      devices.emplace(i, std::make_shared<const PaDeviceInfo>(*deviceInfo));
    } else {
      logger_->warn("Warning: Failed to get device info for device " +
                    std::to_string(i));
    }
  }

  return devices;
}

int AudioCapture::audioCallback(const void *input, void *, size_t,
                                const PaStreamCallbackTimeInfo *,
                                PaStreamCallbackFlags, void *userData) {
  if (!input) {
    return paContinue;
  }

  auto *self = static_cast<AudioCapture *>(userData);
  std::vector<float> buffer(static_cast<const float *>(input),
                            static_cast<const float *>(input) +
                                frames_per_buffer);

  {
    std::lock_guard<std::mutex> lock(self->queueMutex);
    self->audioQueue.push(buffer);
  }

  self->queueCondition.notify_one();
  return paContinue;
}

void AudioCapture::sendAudioStream(
    std::shared_ptr<audio_stream::AudioStream::Stub> stub) {
  grpc::ClientContext context;
  audio_stream::StreamResponse response; // Define the response here

  // Pass the response pointer as the second argument to StreamAudio
  std::unique_ptr<grpc::ClientWriter<audio_stream::AudioChunk>> writer(
      stub->StreamAudio(&context, &response));

  if (!writer) {
    logger_->error("Failed to create gRPC writer.");
    return;
  }

  // Sending audio chunks in a stream
  while (isRunning()) {
    std::vector<float> chunk = getNextAudioChunk();
    if (!chunk.empty()) {
      audio_stream::AudioChunk audioChunk;

      // Convert float vector to byte array (audio data)
      std::string audioData(reinterpret_cast<const char *>(chunk.data()),
                            chunk.size() * sizeof(float));
      audioChunk.set_audio_data(audioData);

      // Fill in other fields like spectral data, energy, etc.
      audioChunk.add_spectral_data(0.0);  // Example spectral data
      audioChunk.set_energy(0.0);         // Example energy
      audioChunk.set_zero_crossings(0.0); // Example zero crossings
      audioChunk.set_speech_band_energy(0.0); // Example speech band energy
      audioChunk.set_voice_detected(false); // Example voice detection
      audioChunk.set_timestamp(0);          // Example timestamp

      if (!writer->Write(audioChunk)) {
        logger_->error("Failed to send audio chunk.");
        break;
      }
    }

    // Optional: sleep to avoid overloading the server
    std::this_thread::sleep_for(std::chrono::milliseconds(10));  // Adjust as necessary
  }

  writer->WritesDone();

  // Now calling Finish with response object passed as the first argument
  grpc::Status status = writer->Finish();

  if (!status.ok()) {
    logger_->error("gRPC failed: " + status.error_message());
  } else {
    // Optionally handle the StreamResponse from the server
    logger_->info("Audio stream successfully sent and received response.");
    // If you want to log response details, you can use `response` here.
  }
}
Enter fullscreen mode Exit fullscreen mode

The main

The main executable then connects to a GRPC server (which iss written in Golang) which is out of the scope of this blog post, so I will include the source code for it in the github repository.

#include "audio_capture.h"
#include "device_stream.grpc.pb.h"
#include "logger.h"
#include <cstdlib>
#include <grpcpp/grpcpp.h>

int main() {
  Logger::init();

  auto logger = Logger::get();
  AudioCapture audioCapture;  // Only one instance is needed

  auto devices = audioCapture.getDevices();

  if (devices.empty()) {
    logger->error("No audio devices found.");
    return EXIT_FAILURE;
  }

  logger->info("Available Audio Devices");

  for (const auto &[index, deviceInfo] : devices) {
    std::string deviceName =
        "Device " + std::to_string(index) + ": " + deviceInfo->name + "\n";

    logger->info(deviceName);
    logger->warn("Opening audio stream on device " + std::to_string(index));
    audioCapture.openAudioStream(index);
  }

  // Create a gRPC channel to the server
  std::shared_ptr<grpc::Channel> channel = grpc::CreateChannel(
      "localhost:50051", grpc::InsecureChannelCredentials());

  // Create the gRPC client stub
  std::shared_ptr<audio_stream::AudioStream::Stub> stub =
      audio_stream::AudioStream::NewStub(channel);

  // Start capturing audio
  audioCapture.start();

  // Send audio stream to the external gRPC server
  audioCapture.sendAudioStream(stub);

  return 0;
}
Enter fullscreen mode Exit fullscreen mode

Running and building the code

For running and building the code I use a simple Makefile with a few commands:

# Compiler optimizations
CMAKE_FLAGS=-G Ninja -DCMAKE_BUILD_TYPE=Release
BUILD_DIR=build
NUM_CORES=$(shell nproc)

.PHONY: build run run_detailed rerun remove rebuild

build:
    @mkdir -p $(BUILD_DIR)
    @cd $(BUILD_DIR) && cmake $(CMAKE_FLAGS) .. && cmake --build . -j$(NUM_CORES)

run:
    @cd $(BUILD_DIR) && ./audio_capture 2>/dev/null

run_detailed:
    @cd $(BUILD_DIR) && ./audio_capture

rerun: rebuild run

remove:
    @rm -rf $(BUILD_DIR)

rebuild:
    @cd $(BUILD_DIR) && cmake --build . --clean-first -j$(NUM_CORES) || { rm -rf $(BUILD_DIR) && make build; }
Enter fullscreen mode Exit fullscreen mode

Running the code should be as easy as make $COMMAND

Thank you for reading this far ⭐

I want to thank you, the reader, for reading this far! If you've made it this far, here is the link to the repository of all of the source code 😊!

https://github.com/LegationPro/audio_capture_system

Top comments (1)

Collapse
 
_itzretro_707570a12ab2ff profile image
- itzRetro

Sexy man