Skip to content

Commit

Permalink
Switch to the latest hyper/tokio/futures
Browse files Browse the repository at this point in the history
  • Loading branch information
operutka committed Jul 8, 2020
1 parent 4974945 commit c809df4
Show file tree
Hide file tree
Showing 8 changed files with 277 additions and 384 deletions.
26 changes: 11 additions & 15 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,22 @@ keywords = ["logging", "slog", "loggly"]

[features]
default = ["runtime"]
runtime = ["tokio"]
runtime = []

[dependencies]
bytes = "0.4"
bytes = "0.5"
chrono = "0.4"
futures = "0.1"
hyper = "0.12"
hyper-tls = "0.3"
futures = "0.3"
hyper = "0.13"
hyper-tls = "0.4"
serde = "1.0"
serde_json = "1.0"
slog = "2.3"
tokio-timer = "0.2.6"
slog = "2.5"

[dependencies.tokio]
version = "0.1"
optional = true
version = "0.2"
features = ["rt-core", "time"]

[dev-dependencies]
tokio = "0.1"

[[example]]
name = "log"
required-features = ["runtime"]
[dev-dependencies.tokio]
version = "0.2"
features = ["full"]
14 changes: 2 additions & 12 deletions examples/log.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,6 @@
extern crate futures;

extern crate slog_loggly;

#[macro_use]
extern crate slog;

use futures::Future;

use slog::{debug, error, info, o, warn, Drain, Logger};
use slog_loggly::LogglyDrain;

use slog::{Drain, Logger};

fn main() {
// Your Loggly token and tag.
let loggly_token = "your-loggly-token";
Expand All @@ -31,5 +21,5 @@ fn main() {
error!(logger, "error"; "key" => "value");

// flush all log messages
fhandle.flush().wait().unwrap();
fhandle.blocking_flush().unwrap();
}
23 changes: 13 additions & 10 deletions src/batch.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use futures::stream::Fuse;
use futures::{Async, Poll, Stream};
use std::{
pin::Pin,
task::{Context, Poll},
};

use futures::stream::{Fuse, Stream, StreamExt};

/// A simple extension to the futures::Stream allowing to take elements in
/// batches of a given maximum size.
Expand Down Expand Up @@ -37,32 +41,31 @@ impl<S> Batch<S> {

impl<S> Stream for Batch<S>
where
S: Stream,
S: Stream + Unpin,
{
type Item = Vec<S::Item>;
type Error = S::Error;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut res = Vec::new();

while res.len() < self.max_size {
if let Async::Ready(item) = self.stream.poll()? {
if let Poll::Ready(item) = self.stream.poll_next_unpin(cx) {
if let Some(item) = item {
res.push(item);
} else if res.is_empty() {
return Ok(Async::Ready(None));
return Poll::Ready(None);
} else {
return Ok(Async::Ready(Some(res)));
return Poll::Ready(Some(res));
}
} else {
break;
}
}

if res.is_empty() {
Ok(Async::NotReady)
Poll::Pending
} else {
Ok(Async::Ready(Some(res)))
Poll::Ready(Some(res))
}
}
}
Loading

0 comments on commit c809df4

Please sign in to comment.