Skip to content

Commit

Permalink
Merge pull request #215 from shikokuchuo/mirai
Browse files Browse the repository at this point in the history
Use mirai for Async Appender
  • Loading branch information
daroczig authored Oct 9, 2024
2 parents 83329ad + b2f018b commit f4c89a2
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 137 deletions.
3 changes: 1 addition & 2 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ Imports:
utils
Suggests:
botor,
callr,
covr,
crayon,
devtools,
glue,
jsonlite,
knitr,
mirai (>= 1.3.0),
pander,
parallel,
R.utils,
Expand All @@ -41,7 +41,6 @@ Suggests:
syslognet,
telegram,
testthat (>= 3.0.0),
txtq,
withr
Enhances:
futile.logger,
Expand Down
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
* computing metadata lazily, so various expensive computations are only performed if you actually add them to the log (#105, @hadley)
* `log_appender()`, `log_layout()` and `log_formatter()` now check that you are calling them with a function, and return the previously set value (#170, @hadley)
* new function to return number of log indices (#194, @WurmPeter)
* `appender_async` is now using `mirai` instead of a custom background process and queue system (#214, @hadley @shikokuchuo)

## Fixes

Expand Down
129 changes: 23 additions & 106 deletions R/appenders.R
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,6 @@ appender_kinesis <- function(stream) {
#' @param appender a [log_appender()] function with a `generator`
#' attribute (TODO note not required, all fn will be passed if
#' not)
#' @param batch number of records to process from the queue at once
#' @param namespace `logger` namespace to use for logging messages on
#' starting up the background process
#' @param init optional function to run in the background process that
Expand All @@ -355,10 +354,7 @@ appender_kinesis <- function(stream) {
#' loaded or some environment variables to be set etc
#' @return function taking `lines` argument
#' @export
#' @note This functionality depends on the \pkg{txtq} and \pkg{callr}
#' packages. The R session's temp folder is used for staging files
#' (message queue and other forms of communication between the
#' parent and child processes).
#' @note This functionality depends on the \pkg{mirai} package.
#' @family `log_appenders`
#' @examples \dontrun{
#' appender_file_slow <- function(file) {
Expand All @@ -376,127 +372,48 @@ appender_kinesis <- function(stream) {
#' ## start async appender
#' t <- tempfile()
#' log_info("Logging in the background to {t}")
#' my_appender <- appender_async(appender_file_slow(file = t))
#'
#' ## use async appender
#' log_appender(my_appender)
#' log_appender(appender_async(appender_file_slow(file = t)))
#' log_info("Was this slow?")
#' system.time(for (i in 1:25) log_info(i))
#'
#' readLines(t)
#' Sys.sleep(10)
#' readLines(t)
#'
#' ## check on the async appender (debugging, you will probably never need this)
#' attr(my_appender, "async_writer_queue")$count()
#' attr(my_appender, "async_writer_queue")$log()
#'
#' attr(my_appender, "async_writer_process")$get_pid()
#' attr(my_appender, "async_writer_process")$get_state()
#' attr(my_appender, "async_writer_process")$poll_process(1)
#' attr(my_appender, "async_writer_process")$read()
#'
#' attr(my_appender, "async_writer_process")$is_alive()
#' attr(my_appender, "async_writer_process")$read_error()
#' }
appender_async <- function(appender,
batch = 1,
namespace = "async_logger",
init = function() log_info("Background process started")) {
fail_on_missing_package("txtq")
fail_on_missing_package("callr")

fail_on_missing_package("mirai")
force(appender)
force(batch)

## create a storage for the message queue
async_writer_storage <- tempfile()
log_trace(paste("Async writer storage:", async_writer_storage), namespace = "async_logger")

## initialize the message queue
async_writer_queue <- txtq::txtq(async_writer_storage)

## start a background process for the async execution of the message queue
## TODO make it easy to create multiple/parallel background processes?
async_writer_process <- callr::r_session$new()
log_trace(paste("Async writer PID:", async_writer_process$get_pid()), namespace = "async_logger")

## load minimum required packages
async_writer_process$run(function() {
source(system.file(
"load-packages-in-background-process.R",
package = "logger"
))
})
async_writer_process$run(init)

## connect to the message queue
async_writer_process$run(assign, args = list(x = "async_writer_storage", value = async_writer_storage))
async_writer_process$run(function() async_writer_queue <<- txtq::txtq(async_writer_storage))

## pass arguments
async_writer_process$run(assign, args = list(x = "batch", value = batch))

## pass appender
async_writer_tempfile <- tempfile()
saveRDS(appender, async_writer_tempfile)
log_trace(paste("Async appender cached at:", async_writer_tempfile), namespace = "async_logger")
async_writer_process$run(assign, args = list(x = "async_writer_tempfile", value = async_writer_tempfile))
async_writer_process$run(assign, args = list(x = "appender", value = readRDS(async_writer_tempfile)))

## start infinite loop processing log records
async_writer_process$call(function() {
while (TRUE) {
items <- async_writer_queue$pop(batch)

if (nrow(items) == 0) {
## avoid burning CPU
Sys.sleep(.1)
} else {
## execute the actual appender for each log item
for (i in seq_len(nrow(items))) {
appender(items$message[i])
}

## remove processed log records
async_writer_queue$clean()
}
}
})
# Start one background process (hence dispatcher not required)
# force = FALSE allows multiple appenders to use same namespace logger
mirai::daemons(1L, dispatcher = "none", force = FALSE, .compute = namespace)
mirai::everywhere(
{
library(logger)
init()
},
appender = appender, # remains in .GlobalEnv on daemon
.args = list(init = init),
.compute = namespace
)

structure(
function(lines) {
## check if background process still works
if (!isTRUE(async_writer_process$is_alive())) {
stop("FATAL: Async writer process not found")
}
remote_error <- async_writer_process$read_error()
if (remote_error != "") {
stop(paste("FATAL: Async writer failed with", shQuote(remote_error)))
}
remote_event <- async_writer_process$read()
if (!is.null(remote_event) && !is.null(remote_event$error)) {
stop(paste(
"FATAL: Async writer error of",
shQuote(remote_event$error$message),
"in",
shQuote(paste(deparse(remote_event$error$call), collapse = " "))
))
}

## write to message queue
for (line in lines) {
async_writer_queue$push(title = as.character(as.numeric(Sys.time())), message = line)
}
mirai::mirai(
for (line in lines) {
appender(line)
},
.args = list(lines = lines),
.compute = namespace
)
},
generator = deparse(match.call()),
## share remote process and queue with parent for debugging purposes
async_writer_storage = async_writer_storage,
async_writer_queue = async_writer_queue,
async_writer_process = async_writer_process
generator = deparse(match.call())
)

## NOTE no need to clean up, all will go away with the current R session's temp folder
}

## TODO other appenders: graylog, datadog, cloudwatch, email via sendmailR, ES etc
4 changes: 0 additions & 4 deletions inst/load-packages-in-background-process.R

This file was deleted.

22 changes: 2 additions & 20 deletions man/appender_async.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion tests/testthat/test-CRANSKIP-appenders.R
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ test_that("async logging", {
)

for (i in 1:5) log_info(i)
Sys.sleep(0.25)
Sys.sleep(1)

expect_equal(readLines(t)[1], "1")
expect_equal(length(readLines(t)), 5)
})
8 changes: 4 additions & 4 deletions vignettes/performance.Rmd
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,12 @@ log_appender(appender_async(appender_file_slow(file = tempfile())))

async <- function() log_info('Was this slow?')
microbenchmark(async(), times = 1e3)
#> Unit: microseconds
#> expr min lq mean median uq max neval
#> async() 511.329 528.247 614.8694 558.2535 616.14 5018.731 1000
# Unit: microseconds
# expr min lq mean median uq max neval
# async() 298.275 315.5565 329.6235 322.219 333.371 894.579 1000
```

Please note that although this ~0.6 ms is significantly higher than the ~0.15 ms we achieved above with the `sprintf` formatter, but this time we are calling an appender that would take 1 full second to deliver the log message (and not just printing to the console), so bringing that down to less than 1 millisecond is not too bad. If you need even higher throughput, then a custom `appender_async` without checking on the background process and potentially a faster message queue can bring this even below to 200 µs.
Please note that although this ~0.3 ms is higher than the ~0.15 ms we achieved above with the `sprintf` formatter, but this time we are calling an appender that would take 1 full second to deliver the log message (and not just printing to the console), so bringing that down to less than 1 millisecond is not too bad.

```{r cleanup, include = FALSE}
logger:::namespaces_reset()
Expand Down

0 comments on commit f4c89a2

Please sign in to comment.