maestro 0.7.0 introduces conditional pipelines | R bloggers

maestro 0.7.0 introduces conditional pipelines | R bloggers

2 minutes, 48 seconds Read

[This article was first published on data-in-flight, and kindly contributed to R-bloggers]. (You can report a problem with the content on this page here)


Want to share your content on R bloggers? click here if you have a blog, or here if you don’t.

The 0.7.0 release of maestro is out, and with it the ability to run pipelines conditionally. In short, this is especially useful if you have DAG pipelines and you want to branch or only run parts of a DAG when the output meets a certain condition.

If you haven’t heard of maestro, it’s a script orchestrator for R. You can learn more about it here.

Get it from CRAN:

install.packages("maestro")

Conditional pipelines with @maestroRunIf

A pipeline can be created conditionally using the maestroRunIf label. The value of the tag must be an R expression that returns one TRUE or FALSE. What makes this tag special is that it can be combined with DAG pipelines.

Let’s say we have a two-step DAG pipeline for receiving weather bulletins and sending a notification in case of an active bulletin. The first step of the DAG is to extract the bulletins from an open data source. The second step is to send the notification. However, we want the second step to only run when there is an active bulletin.

The code below shows this conditional logic. The pipeline send_notification has a maestroRunIf tag that checks the veracity of grepl("Alert", .input$bulletin_title). Please note the use of .input to extract the value from the input pipeline.

# ./pipelines/bulletins.R

#' @maestroFrequency 15 minutes
#' @maestroStartTime 00:00:00
get_bulletins <- function() {

  req <- httr2::request(glue::glue("https://weather.gc.ca/rss/battleboard/ns10_e.xml"))
  
  resp <- req |>
    httr2::req_perform() |>
    httr2::resp_body_xml() |>
    xml2::as_list()

  bulletin_title <- resp$feed$title[[1]]
  
  return(
    list(
      bulletin_title = bulletin_title
    )
  )
}

#' @maestroInputs get_bulletins
#' @maestroRunIf grepl("Alert", .input$bulletin_title)
send_notification <- function(.input) {

  # Pretend this code sends to email or whatever
  message(.input$bulletin_title)
}

Now run the schedule and the send_notification The pipeline will be dependent on a weather alert.

library(maestro)

schedule <- build_schedule(quiet = TRUE)

invocation <- run_schedule(
  schedule, 
  orch_frequency = "15 minutes",
  log_to_console = TRUE
)
── [2025-11-03 08:57:59]
Running pipelines ▶ 
✔ get_bulletins [246ms]
✔ ? send_notification [12ms]
✔ |-send_notification [18ms]
── [2025-11-03 08:58:00]
Pipeline execution completed ■ | 0.294 sec elapsed 
✔ 2 successes | ! 0 warnings | ✖ 0 errors | ◼ 2 total
────────────────────────────────────────────────────────────────────────────────
── Next scheduled pipelines ❯ 
Pipe name | Next scheduled run
• get_bulletins | 2025-11-03 13:15:00

Conditional pipelines can also be used for more complex branching logic. For more examples, check out the vignette on condition.

Calling maestro users

maestro has now been active on CRAN for over 1 year. It was a rewarding experience developing this package; not to mention that it is an integral part of the tech stack where I work. I’m also excited about the future of maestro. This is why I am calling on maestro users out there. I’d love to hear how you use maestro, especially if you use it in production. I would also welcome any feedback and suggestions as we chart the path for Maestro’s future. You can send me a message via LinkedIn or if you have specific feature requests, I would welcome issues on Github here.

As always, happy orchestration!


#maestro #0.7.0 #introduces #conditional #pipelines #bloggers

Similar Posts

Leave a Reply

Your email address will not be published. Required fields are marked *