1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
use std::{panic::resume_unwind, thread};

use crate::{
    context::{internal::Env, Context, TaskContext},
    handle::Handle,
    result::{JsResult, NeonResult},
    sys::{async_work, raw},
    types::{Deferred, JsPromise, Value},
};

/// Node asynchronous task builder
///
/// ```
/// # use neon::prelude::*;
/// fn greet(mut cx: FunctionContext) -> JsResult<JsPromise> {
///     let name = cx.argument::<JsString>(0)?.value(&mut cx);
///
///     let promise = cx
///         .task(move || format!("Hello, {}!", name))
///         .promise(move |mut cx, greeting| Ok(cx.string(greeting)));
///
///     Ok(promise)
/// }
/// ```
pub struct TaskBuilder<'cx, C, E> {
    cx: &'cx mut C,
    execute: E,
}

impl<'a: 'cx, 'cx, C, O, E> TaskBuilder<'cx, C, E>
where
    C: Context<'a>,
    O: Send + 'static,
    E: FnOnce() -> O + Send + 'static,
{
    /// Construct a new task builder from an `execute` callback that can be
    /// scheduled to execute on the Node worker pool
    pub fn new(cx: &'cx mut C, execute: E) -> Self {
        Self { cx, execute }
    }

    /// Schedules a task to execute on the Node worker pool, executing the
    /// `complete` callback on the JavaScript main thread with the result
    /// of the `execute` callback
    pub fn and_then<F>(self, complete: F)
    where
        F: FnOnce(TaskContext, O) -> NeonResult<()> + 'static,
    {
        let env = self.cx.env();
        let execute = self.execute;

        schedule(env, execute, complete);
    }

    /// Schedules a task to execute on the Node worker pool and returns a
    /// promise that is resolved with the value from the `complete` callback.
    ///
    /// The `complete` callback will execute on the JavaScript main thread and
    /// is passed the return value from `execute`. If the `complete` callback
    /// throws, the promise will be rejected with the exception
    pub fn promise<V, F>(self, complete: F) -> Handle<'a, JsPromise>
    where
        V: Value,
        F: FnOnce(TaskContext, O) -> JsResult<V> + 'static,
    {
        let env = self.cx.env();
        let (deferred, promise) = JsPromise::new(self.cx);
        let execute = self.execute;

        schedule_promise(env, execute, complete, deferred);

        promise
    }
}

// Schedule a task to execute on the Node worker pool
fn schedule<I, O, D>(env: Env, input: I, data: D)
where
    I: FnOnce() -> O + Send + 'static,
    O: Send + 'static,
    D: FnOnce(TaskContext, O) -> NeonResult<()> + 'static,
{
    unsafe {
        async_work::schedule(env.to_raw(), input, execute::<I, O>, complete::<O, D>, data);
    }
}

fn execute<I, O>(input: I) -> O
where
    I: FnOnce() -> O + Send + 'static,
    O: Send + 'static,
{
    input()
}

fn complete<O, D>(env: raw::Env, output: thread::Result<O>, callback: D)
where
    O: Send + 'static,
    D: FnOnce(TaskContext, O) -> NeonResult<()> + 'static,
{
    let output = output.unwrap_or_else(|panic| {
        // If a panic was caught while executing the task on the Node Worker
        // pool, resume panicking on the main JavaScript thread
        resume_unwind(panic)
    });

    TaskContext::with_context(env.into(), move |cx| {
        let _ = callback(cx, output);
    });
}

// Schedule a task to execute on the Node worker pool and settle a `Promise` with the result
fn schedule_promise<I, O, D, V>(env: Env, input: I, complete: D, deferred: Deferred)
where
    I: FnOnce() -> O + Send + 'static,
    O: Send + 'static,
    D: FnOnce(TaskContext, O) -> JsResult<V> + 'static,
    V: Value,
{
    unsafe {
        async_work::schedule(
            env.to_raw(),
            input,
            execute::<I, O>,
            complete_promise::<O, D, V>,
            (complete, deferred),
        );
    }
}

fn complete_promise<O, D, V>(
    env: raw::Env,
    output: thread::Result<O>,
    (complete, deferred): (D, Deferred),
) where
    O: Send + 'static,
    D: FnOnce(TaskContext, O) -> JsResult<V> + 'static,
    V: Value,
{
    let env = env.into();

    TaskContext::with_context(env, move |cx| {
        deferred.try_catch_settle(cx, move |cx| {
            let output = output.unwrap_or_else(|panic| resume_unwind(panic));

            complete(cx, output)
        })
    });
}