Rust Concurrent Execution with Futures and Tokio












0















I've got some Rust code that currently looks like this



fn read_stdin(mut tx: mpsc::Sender<String>) {
loop {
// read from stdin and send value over tx.
}
}

fn sleep_for(n: u64) -> impl Future<Item = (), Error = ()> {
thread::sleep(time::Duration::from_millis(n));
println!("[{}] slept for {} ms", Local::now().format("%T%.3f"), n);
future::ok(())
}

fn main() {
let (stdin_tx, stdin_rx) = mpsc::channel(0);
thread::spawn(move || read_stdin(stdin_tx));

let server = stdin_rx
.map(|data| data.trim().parse::<u64>().unwrap_or(0))
.for_each(|n| tokio::spawn(sleep_for(n * 100)));
tokio::run(server);
}


It uses tokio and futures, with the aim of running some "cpu heavy" work (emulated by the sleep_for function) and then outputting some stuff to stdout.



When I run it, things seems to work fine and I get this output



2
[00:00:00.800] slept for 200 ms
10
1
[00:00:01.800] slept for 1000 ms
[00:00:01.900] slept for 100 ms


The first output with the value 2 is exactly as expected, and I see the timestamp printed after 200ms. But for the next inputs, it becomes clear that the sleep_for function is being executed sequentially, and not concurrently.



The output that I want to see is



2
[00:00:00.800] slept for 200 ms
10
1
[00:00:00.900] slept for 100 ms
[00:00:01.900] slept for 1000 ms


It seems that to get the output I'm looking for I want to execute sleep_for(10) and sleep_for(1) concurrently. How would I go about doing this in Rust with futures and tokio?



(Note: the actual values of the timestamps aren't important I'm using them more to show the ordering of execution within the program)










share|improve this question





























    0















    I've got some Rust code that currently looks like this



    fn read_stdin(mut tx: mpsc::Sender<String>) {
    loop {
    // read from stdin and send value over tx.
    }
    }

    fn sleep_for(n: u64) -> impl Future<Item = (), Error = ()> {
    thread::sleep(time::Duration::from_millis(n));
    println!("[{}] slept for {} ms", Local::now().format("%T%.3f"), n);
    future::ok(())
    }

    fn main() {
    let (stdin_tx, stdin_rx) = mpsc::channel(0);
    thread::spawn(move || read_stdin(stdin_tx));

    let server = stdin_rx
    .map(|data| data.trim().parse::<u64>().unwrap_or(0))
    .for_each(|n| tokio::spawn(sleep_for(n * 100)));
    tokio::run(server);
    }


    It uses tokio and futures, with the aim of running some "cpu heavy" work (emulated by the sleep_for function) and then outputting some stuff to stdout.



    When I run it, things seems to work fine and I get this output



    2
    [00:00:00.800] slept for 200 ms
    10
    1
    [00:00:01.800] slept for 1000 ms
    [00:00:01.900] slept for 100 ms


    The first output with the value 2 is exactly as expected, and I see the timestamp printed after 200ms. But for the next inputs, it becomes clear that the sleep_for function is being executed sequentially, and not concurrently.



    The output that I want to see is



    2
    [00:00:00.800] slept for 200 ms
    10
    1
    [00:00:00.900] slept for 100 ms
    [00:00:01.900] slept for 1000 ms


    It seems that to get the output I'm looking for I want to execute sleep_for(10) and sleep_for(1) concurrently. How would I go about doing this in Rust with futures and tokio?



    (Note: the actual values of the timestamps aren't important I'm using them more to show the ordering of execution within the program)










    share|improve this question



























      0












      0








      0








      I've got some Rust code that currently looks like this



      fn read_stdin(mut tx: mpsc::Sender<String>) {
      loop {
      // read from stdin and send value over tx.
      }
      }

      fn sleep_for(n: u64) -> impl Future<Item = (), Error = ()> {
      thread::sleep(time::Duration::from_millis(n));
      println!("[{}] slept for {} ms", Local::now().format("%T%.3f"), n);
      future::ok(())
      }

      fn main() {
      let (stdin_tx, stdin_rx) = mpsc::channel(0);
      thread::spawn(move || read_stdin(stdin_tx));

      let server = stdin_rx
      .map(|data| data.trim().parse::<u64>().unwrap_or(0))
      .for_each(|n| tokio::spawn(sleep_for(n * 100)));
      tokio::run(server);
      }


      It uses tokio and futures, with the aim of running some "cpu heavy" work (emulated by the sleep_for function) and then outputting some stuff to stdout.



      When I run it, things seems to work fine and I get this output



      2
      [00:00:00.800] slept for 200 ms
      10
      1
      [00:00:01.800] slept for 1000 ms
      [00:00:01.900] slept for 100 ms


      The first output with the value 2 is exactly as expected, and I see the timestamp printed after 200ms. But for the next inputs, it becomes clear that the sleep_for function is being executed sequentially, and not concurrently.



      The output that I want to see is



      2
      [00:00:00.800] slept for 200 ms
      10
      1
      [00:00:00.900] slept for 100 ms
      [00:00:01.900] slept for 1000 ms


      It seems that to get the output I'm looking for I want to execute sleep_for(10) and sleep_for(1) concurrently. How would I go about doing this in Rust with futures and tokio?



      (Note: the actual values of the timestamps aren't important I'm using them more to show the ordering of execution within the program)










      share|improve this question
















      I've got some Rust code that currently looks like this



      fn read_stdin(mut tx: mpsc::Sender<String>) {
      loop {
      // read from stdin and send value over tx.
      }
      }

      fn sleep_for(n: u64) -> impl Future<Item = (), Error = ()> {
      thread::sleep(time::Duration::from_millis(n));
      println!("[{}] slept for {} ms", Local::now().format("%T%.3f"), n);
      future::ok(())
      }

      fn main() {
      let (stdin_tx, stdin_rx) = mpsc::channel(0);
      thread::spawn(move || read_stdin(stdin_tx));

      let server = stdin_rx
      .map(|data| data.trim().parse::<u64>().unwrap_or(0))
      .for_each(|n| tokio::spawn(sleep_for(n * 100)));
      tokio::run(server);
      }


      It uses tokio and futures, with the aim of running some "cpu heavy" work (emulated by the sleep_for function) and then outputting some stuff to stdout.



      When I run it, things seems to work fine and I get this output



      2
      [00:00:00.800] slept for 200 ms
      10
      1
      [00:00:01.800] slept for 1000 ms
      [00:00:01.900] slept for 100 ms


      The first output with the value 2 is exactly as expected, and I see the timestamp printed after 200ms. But for the next inputs, it becomes clear that the sleep_for function is being executed sequentially, and not concurrently.



      The output that I want to see is



      2
      [00:00:00.800] slept for 200 ms
      10
      1
      [00:00:00.900] slept for 100 ms
      [00:00:01.900] slept for 1000 ms


      It seems that to get the output I'm looking for I want to execute sleep_for(10) and sleep_for(1) concurrently. How would I go about doing this in Rust with futures and tokio?



      (Note: the actual values of the timestamps aren't important I'm using them more to show the ordering of execution within the program)







      rust future rust-tokio






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 18 '18 at 5:52







      Josh Leeb-du Toit

















      asked Nov 18 '18 at 5:47









      Josh Leeb-du ToitJosh Leeb-du Toit

      16018




      16018
























          1 Answer
          1






          active

          oldest

          votes


















          0














          Found a solution with the use of the futures-timer crate.



          use chrono::Local;
          use futures::{future, sync::mpsc, Future, Sink, Stream};
          use futures_timer::Delay;
          use std::{io::stdin, thread, time::Duration};

          fn read_stdin(mut tx: mpsc::Sender<String>) {
          let stdin = stdin();
          loop {
          let mut buf = String::new();
          stdin.read_line(&mut buf).unwrap();
          tx = tx.send(buf).wait().unwrap()
          }
          }

          fn main() {
          let (stdin_tx, stdin_rx) = mpsc::channel(0);
          thread::spawn(move || read_stdin(stdin_tx));

          let server = stdin_rx
          .map(|data| data.trim().parse::<u64>().unwrap_or(0) * 100)
          .for_each(|delay| {
          println!("[{}] {} ms -> start", Local::now().format("%T%.3f"), delay);
          tokio::spawn({
          Delay::new(Duration::from_millis(delay))
          .and_then(move |_| {
          println!("[{}] {} ms -> done", Local::now().format("%T%.3f"), delay);
          future::ok(())
          })
          .map_err(|e| panic!(e))
          })
          });

          tokio::run(server);
          }


          The issue is that the rather letting the future to become parked and then notifying the current task, the code presented in the question was just sleeping the thread and so no progress could be made.



          Update: Now I've just come across tokio-timer which seems like the standard way of doing this.






          share|improve this answer

























            Your Answer






            StackExchange.ifUsing("editor", function () {
            StackExchange.using("externalEditor", function () {
            StackExchange.using("snippets", function () {
            StackExchange.snippets.init();
            });
            });
            }, "code-snippets");

            StackExchange.ready(function() {
            var channelOptions = {
            tags: "".split(" "),
            id: "1"
            };
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function() {
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled) {
            StackExchange.using("snippets", function() {
            createEditor();
            });
            }
            else {
            createEditor();
            }
            });

            function createEditor() {
            StackExchange.prepareEditor({
            heartbeatType: 'answer',
            autoActivateHeartbeat: false,
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader: {
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            },
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            });


            }
            });














            draft saved

            draft discarded


















            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53358256%2frust-concurrent-execution-with-futures-and-tokio%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown

























            1 Answer
            1






            active

            oldest

            votes








            1 Answer
            1






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes









            0














            Found a solution with the use of the futures-timer crate.



            use chrono::Local;
            use futures::{future, sync::mpsc, Future, Sink, Stream};
            use futures_timer::Delay;
            use std::{io::stdin, thread, time::Duration};

            fn read_stdin(mut tx: mpsc::Sender<String>) {
            let stdin = stdin();
            loop {
            let mut buf = String::new();
            stdin.read_line(&mut buf).unwrap();
            tx = tx.send(buf).wait().unwrap()
            }
            }

            fn main() {
            let (stdin_tx, stdin_rx) = mpsc::channel(0);
            thread::spawn(move || read_stdin(stdin_tx));

            let server = stdin_rx
            .map(|data| data.trim().parse::<u64>().unwrap_or(0) * 100)
            .for_each(|delay| {
            println!("[{}] {} ms -> start", Local::now().format("%T%.3f"), delay);
            tokio::spawn({
            Delay::new(Duration::from_millis(delay))
            .and_then(move |_| {
            println!("[{}] {} ms -> done", Local::now().format("%T%.3f"), delay);
            future::ok(())
            })
            .map_err(|e| panic!(e))
            })
            });

            tokio::run(server);
            }


            The issue is that the rather letting the future to become parked and then notifying the current task, the code presented in the question was just sleeping the thread and so no progress could be made.



            Update: Now I've just come across tokio-timer which seems like the standard way of doing this.






            share|improve this answer






























              0














              Found a solution with the use of the futures-timer crate.



              use chrono::Local;
              use futures::{future, sync::mpsc, Future, Sink, Stream};
              use futures_timer::Delay;
              use std::{io::stdin, thread, time::Duration};

              fn read_stdin(mut tx: mpsc::Sender<String>) {
              let stdin = stdin();
              loop {
              let mut buf = String::new();
              stdin.read_line(&mut buf).unwrap();
              tx = tx.send(buf).wait().unwrap()
              }
              }

              fn main() {
              let (stdin_tx, stdin_rx) = mpsc::channel(0);
              thread::spawn(move || read_stdin(stdin_tx));

              let server = stdin_rx
              .map(|data| data.trim().parse::<u64>().unwrap_or(0) * 100)
              .for_each(|delay| {
              println!("[{}] {} ms -> start", Local::now().format("%T%.3f"), delay);
              tokio::spawn({
              Delay::new(Duration::from_millis(delay))
              .and_then(move |_| {
              println!("[{}] {} ms -> done", Local::now().format("%T%.3f"), delay);
              future::ok(())
              })
              .map_err(|e| panic!(e))
              })
              });

              tokio::run(server);
              }


              The issue is that the rather letting the future to become parked and then notifying the current task, the code presented in the question was just sleeping the thread and so no progress could be made.



              Update: Now I've just come across tokio-timer which seems like the standard way of doing this.






              share|improve this answer




























                0












                0








                0







                Found a solution with the use of the futures-timer crate.



                use chrono::Local;
                use futures::{future, sync::mpsc, Future, Sink, Stream};
                use futures_timer::Delay;
                use std::{io::stdin, thread, time::Duration};

                fn read_stdin(mut tx: mpsc::Sender<String>) {
                let stdin = stdin();
                loop {
                let mut buf = String::new();
                stdin.read_line(&mut buf).unwrap();
                tx = tx.send(buf).wait().unwrap()
                }
                }

                fn main() {
                let (stdin_tx, stdin_rx) = mpsc::channel(0);
                thread::spawn(move || read_stdin(stdin_tx));

                let server = stdin_rx
                .map(|data| data.trim().parse::<u64>().unwrap_or(0) * 100)
                .for_each(|delay| {
                println!("[{}] {} ms -> start", Local::now().format("%T%.3f"), delay);
                tokio::spawn({
                Delay::new(Duration::from_millis(delay))
                .and_then(move |_| {
                println!("[{}] {} ms -> done", Local::now().format("%T%.3f"), delay);
                future::ok(())
                })
                .map_err(|e| panic!(e))
                })
                });

                tokio::run(server);
                }


                The issue is that the rather letting the future to become parked and then notifying the current task, the code presented in the question was just sleeping the thread and so no progress could be made.



                Update: Now I've just come across tokio-timer which seems like the standard way of doing this.






                share|improve this answer















                Found a solution with the use of the futures-timer crate.



                use chrono::Local;
                use futures::{future, sync::mpsc, Future, Sink, Stream};
                use futures_timer::Delay;
                use std::{io::stdin, thread, time::Duration};

                fn read_stdin(mut tx: mpsc::Sender<String>) {
                let stdin = stdin();
                loop {
                let mut buf = String::new();
                stdin.read_line(&mut buf).unwrap();
                tx = tx.send(buf).wait().unwrap()
                }
                }

                fn main() {
                let (stdin_tx, stdin_rx) = mpsc::channel(0);
                thread::spawn(move || read_stdin(stdin_tx));

                let server = stdin_rx
                .map(|data| data.trim().parse::<u64>().unwrap_or(0) * 100)
                .for_each(|delay| {
                println!("[{}] {} ms -> start", Local::now().format("%T%.3f"), delay);
                tokio::spawn({
                Delay::new(Duration::from_millis(delay))
                .and_then(move |_| {
                println!("[{}] {} ms -> done", Local::now().format("%T%.3f"), delay);
                future::ok(())
                })
                .map_err(|e| panic!(e))
                })
                });

                tokio::run(server);
                }


                The issue is that the rather letting the future to become parked and then notifying the current task, the code presented in the question was just sleeping the thread and so no progress could be made.



                Update: Now I've just come across tokio-timer which seems like the standard way of doing this.







                share|improve this answer














                share|improve this answer



                share|improve this answer








                edited Nov 19 '18 at 10:03

























                answered Nov 18 '18 at 9:48









                Josh Leeb-du ToitJosh Leeb-du Toit

                16018




                16018






























                    draft saved

                    draft discarded




















































                    Thanks for contributing an answer to Stack Overflow!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid



                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.


                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function () {
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53358256%2frust-concurrent-execution-with-futures-and-tokio%23new-answer', 'question_page');
                    }
                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    Popular posts from this blog

                    Guess what letter conforming each word

                    Run scheduled task as local user group (not BUILTIN)

                    Port of Spain