DEV Community

amaendeepm
amaendeepm

Posted on

Building a Robust Peek-Dequeue Engine in Rust for a General Message Portal

In modern distributed systems, reliable message handling is crucial. Rust, with its emphasis on safety and concurrency, is an excellent choice for building robust systems. Here is an example of Peek-Dequeue engine implemented in Rust that interacts with a general message portal. Further there is explaining each component to help understand.

Introduction
Message portals are essential components in distributed architectures, acting as intermediaries for message exchange between different services. Building a reliable engine to interact with such portals requires careful handling of network communication, error management, and asynchronous operations.

This articulation of code below presents a Rust implementation of a Peek-Dequeue engine that utilizes some of best possible practices in asynchronous programming, error handling, and most importantly system resilience.

Overview of the Peek-Dequeue Engine
The engine's primary responsibilities are:

  1. Peeking Messages: Regularly checking the message portal for new messages.
  2. Handling Responses: Parsing and validating the responses, including error handling.
  3. Processing Messages: Transmuting and persisting valid messages.
  4. Dequeueing Messages: Removing processed messages from the portal.

Here's the high-level structure of the MessagePortalConnection:

pub struct MessagePortalConnection {
    interval: u64,
}

impl Default for MessagePortalConnection {
    fn default() -> Self {
        Self { interval: 1 }
    }
}

impl MessagePortalConnection {
    pub async fn run(&self) {
        // Implementation details...
    }
}
Enter fullscreen mode Exit fullscreen mode

Setting Up the Connection

The MessagePortalConnection struct holds the configuration for the connection, such as the peeking interval.

pub struct MessagePortalConnection {
    interval: u64,
}

impl Default for MessagePortalConnection {
    fn default() -> Self {
        Self { interval: 60 } // Default peeking every minute
    }
}
Enter fullscreen mode Exit fullscreen mode

Using the Default trait allows for easy instantiation with default settings while providing flexibility for customization.

The Main Run Loop
The run method contains the main loop that continuously peeks for new messages.

impl MessagePortalConnection {
    pub async fn run(&self) {
        let portal_endpoint: Arc<str> = env::var("PORTAL_URL")
            .unwrap_or_else(|_| "https://portal.example.com/".to_owned())
            .into();

        let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(self.interval));

        loop {
            tokio::select! {
                _ = interval.tick() => {
                    // Peeking logic...
                }
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Interval Setup: Uses tokio::time::interval for periodic execution without blocking the runtime.

Asynchronous Loop: The tokio::select! macro allows the loop to wait on multiple asynchronous operations.

Peeking Messages:
Within the loop, the engine attempts to peek messages from the portal.

let peek_trace_id = Ulid::from_datetime(SystemTime::now()).to_string(); // Unique ID for tracing

tracing::debug!(event="peek-portal", trace=peek_trace_id);

let peek_result = retry_with_backoff(
    || async {
        peek_from_portal(portal_endpoint.clone().into()).await.map(|arc_str| arc_str.to_string())
    },
    5,  // Max retries
    2,  // Initial delay (seconds)
    1.5 // Backoff factor
).await;
Enter fullscreen mode Exit fullscreen mode

Unique Trace ID: Generates a ULID based on the current time for traceability.

Retry Logic: Utilizes a custom retry_with_backoff function to handle transient failures.

Handling Responses
After attempting to peek, the engine processes the response.

if peek_result.is_err() {
    tracing::error!(event="portal_peek_result_error", trace=peek_trace_id, err=?peek_result.err());
    continue;
}


let peek_response = peek_result.unwrap();
let doc_result = Document::parse(&peek_response);

if doc_result.is_err() {
    tracing::error!(event="peeked-invalid-response", trace=peek_trace_id, err=?doc_result.err(), msg="peek-response-error");
    continue;
}
Enter fullscreen mode Exit fullscreen mode

Error Handling: Checks for errors at each step and logs them appropriately.
XML Parsing: Uses roxmltree::Document::parse to parse the XML response.
Processing Valid Messages
Once a valid XML response is confirmed, the engine processes the message.

let root_node = doc_result.unwrap();
let fault_string = root_node
    .descendants()
    .find(|node| node.has_tag_name("faultstring"))
    .map(|node| node.text())
    .unwrap_or_default();

if fault_string.is_some() {
    tracing::error!(event="peeked-fault-string", msg="peek-SOAP-fault", trace=peek_trace_id, response=?peek_response);
    continue;
}

// Determine the document type
let doc_type_opt = find_document_type_from_peek_response((&peek_response.clone()).to_string());

if doc_type_opt.is_none() {
    tracing::warn!(event="peek", msg="nothing-to-peek", trace=peek_trace_id);
    continue;
}

let doc_type = doc_type_opt.unwrap();
tracing::info!(event="peek", msg="returned document", doc_type=?doc_type, trace=peek_trace_id);
Enter fullscreen mode Exit fullscreen mode

SOAP Fault Handling: Checks for fault messages and handles them gracefully.
Document Type Identification: Determines the type of the received document for appropriate processing.

Dequeueing Messages
After processing, the engine attempts to dequeue the message from the portal.

let msg_ref = extract_message_reference(peek_response.clone().into());

if msg_ref.is_none() {
    tracing::error!(event="peek", msg="xml has no message reference included", trace=peek_trace_id);
    continue;
}

let msg_ref_id = msg_ref.unwrap();
tracing::info!(event="peek", msg="extracted message reference", reference_id=?msg_ref_id, trace=peek_trace_id);

let incoming_raw_xml = peek_response.to_string().clone();

let incoming_payload = extract_payload_json(incoming_raw_xml.clone().into());

let handling_status = handle_incoming(
    peek_trace_id.clone().into(),
    incoming_payload.clone().unwrap(),
    incoming_raw_xml.clone().into(),
).await;

if !handling_status {
    tracing::error!(event="peek", msg="response not handled internally; not dequeue or proceed", trace=peek_trace_id);
    continue;
}

if incoming_payload.is_some() {
    let payload = incoming_payload.unwrap();
    let transmuted_message = transmute_response(incoming_raw_xml.clone().into()).unwrap();
    let persist_status = persist_identifiable_message(peek_trace_id.clone().into(), payload, transmuted_message.clone()).await;
    if persist_status.is_err() {
        tracing::error!(event="Persist_Transmuted_Message", msg="Could not persist Transmuted JSON Message", error=?persist_status.err(), trace=peek_trace_id);
    } else {
        tracing::debug!(event="Persist_Transmuted_Message", msg="Successfully Persisted Transmuted JSON Message", trace=peek_trace_id);
    }
}

let msg_ref_str = msg_ref_id.to_string();
tracing::info!(event="dequeue", msg="trying", msg_id=msg_ref_str, trace=peek_trace_id);

loop {
    let dequeue_result = retry_with_backoff(
        || async {
            request_message_dequeue_portal(msg_ref_str.clone().into())
                .await
                .map(|arc_str| arc_str.to_string())
        },
        10, // Max retries
        2,  // Initial delay (seconds)
        1.5 // Backoff factor
    ).await;

    if dequeue_result.is_err() {
        tracing::error!(event="portal_dequeue", msg="failed; retrying", trace=peek_trace_id);
        notify_error(format!("Dequeue Attempts exceeded retries for Message {msg_ref_id} :: Peek is Stuck, PeekTraceID {peek_trace_id}").into());
        break;
    } else {
        tracing::info!(event="dequeue", msg="ok", msg_id=msg_ref_str, trace=peek_trace_id);
        break;
    }
}
Enter fullscreen mode Exit fullscreen mode

Message Reference Extraction:
Retrieves a unique identifier for the message.

Handling Status Check: Ensures the message was handled correctly before proceeding.

Persistence: Attempts to persist the processed message, logging any errors.

Dequeue Loop: Uses a loop with retry logic to ensure the message is dequeued, preventing duplicate processing.

Retry Logic with Exponential Backoff

The retry_with_backoff function implements retry logic with exponential backoff, which is crucial for handling transient errors without overwhelming the system or the portal.

async fn retry_with_backoff<F, Fut, T, E>(operation: F, max_retries: u32, delay_secs: u64, backoff_factor: f64) -> Result<T, E>
where
    F: Fn() -> Fut,
    Fut: std::future::Future<Output = Result<T, E>>,
{
    let mut retries = 0;
    let mut delay = Duration::from_secs(delay_secs);

    loop {
        let result = operation().await;
        if result.is_ok() || retries >= max_retries {
            return result;
        }
        tokio::time::sleep(delay).await;
        retries += 1;
        delay = Duration::from_secs_f64(delay.as_secs_f64() * backoff_factor);
    }
}
Enter fullscreen mode Exit fullscreen mode

Generic Function: Works with any asynchronous operation that returns a Result.

Exponential Backoff: Increases the delay between retries exponentially, reducing load during failures.

Summarizing

This Peek-Dequeue engine uses Rust's capabilities in building reliable, high-performance system.
The code follows basically:

Asynchronous Efficiency: Leveraging Tokio for non-blocking operations.

Robust Error Handling: Comprehensive checks and logging at each stage.

Resilience: Retry mechanisms with exponential backoff to handle transient failures.

Maintainability: Modular design with clear separation of concerns.

Top comments (0)