File: co-async.hpp

package info (click to toggle)
cppgir 2.0%2Bgit20250629.2a7d9ce-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 1,220 kB
  • sloc: cpp: 16,451; ansic: 355; python: 86; makefile: 13; sh: 9
file content (159 lines) | stat: -rw-r--r-- 3,653 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
#pragma once

#include <gi/gi.hpp>

#include <coroutine>
#include <future>

#ifdef CO_DEBUG
#include <iostream>
static auto &dout = std::cerr;
#else
#include <sstream>
static std::ostringstream dout;
#endif

template<typename T, typename SELF>
struct holder
{
  void return_value(T &&v)
  {
    dout << "return value " << std::endl;
    auto self = (SELF *)this;
    self->set_value(std::move(v));
  }
};

template<typename SELF>
struct holder<void, SELF>
{
  void return_void()
  {
    auto self = (SELF *)this;
    self->set_value();
  }
};

template<typename RESULT>
class promise_type_t : public holder<RESULT, promise_type_t<RESULT>>
{
protected:
  std::promise<RESULT> result_;
  std::coroutine_handle<> waiter_;

public:
  struct init
  {
    std::coroutine_handle<> handle;
    std::future<RESULT> f;
  };

  ~promise_type_t() { dout << "promise destruction" << std::endl; }

  auto get_return_object(bool refresh = false)
  {
    dout << "return obj " << std::endl;
    if (refresh)
      result_ = decltype(result_)();
    return init{std::coroutine_handle<promise_type_t>::from_promise(*this),
        result_.get_future()};
  }
  std::suspend_never initial_suspend() noexcept { return {}; }
  std::suspend_never final_suspend() noexcept { return {}; }

  bool resume()
  {
    auto w = waiter_;
    if (w) {
      // waiter takes care of itself again
      waiter_ = nullptr;
      w.resume();
    }
    return bool(w);
  }

  // use any dummy type to avoid reference to void below
  using arg_type = typename std::conditional<std::is_same<RESULT, void>::value,
      std::nullptr_t, RESULT>::type;

  void set_value(arg_type &&v)
  {
    result_.set_value(std::move(v));
    resume();
  }

  void set_value()
  {
    result_.set_value();
    resume();
  }

  void set_waiter(std::coroutine_handle<> handle)
  {
    // a task/promise represent a coroutine function (frame)
    // it should only be waited upon by one other task
    // (rather than handed around and waited in multiple locations)
    if (waiter_)
      gi::detail::try_throw(std::logic_error("already waited upon"));
    waiter_ = handle;
  }

  void unhandled_exception()
  {
#if GI_CONFIG_EXCEPTIONS
    result_.set_exception(std::current_exception());
    // if no-one waiting, deliver to caller
    // the latter likely is the original caller
    // (to which we have not yet returned, so it can yet await)
    // otherwise it might end up totally lost
    if (!resume())
      throw;
#endif
  }
};

template<typename RESULT, typename P = promise_type_t<RESULT>>
class task
{
public:
  using promise_type = P;

  // only 1 actually active
  std::coroutine_handle<promise_type> coro_;
  std::unique_ptr<promise_type> p_;
  // but always this
  std::future<RESULT> result_;

public:
  // NOTE if coroutine exits by co_return, then handle is not useful
  // but the future should have a value
  task(typename P::init i)
      : coro_(decltype(coro_)::from_address(i.handle.address())),
        result_(std::move(i.f))
  {
    dout << "init task" << std::endl;
  }

  task() : p_(new promise_type()) { result_ = p_->get_return_object().f; }

  // move-only
  task(task &&other) = default;
  task &operator=(task &&other) = delete;

  bool await_ready()
  {
    return result_.wait_for(std::chrono::seconds(0)) ==
           std::future_status::ready;
  }

  promise_type &promise() const { return coro_ ? coro_.promise() : *p_; }

  void await_suspend(std::coroutine_handle<> handle)
  {
    if (!coro_ && !p_)
      gi::detail::try_throw(std::logic_error("no routine to wait on"));
    promise().set_waiter(handle);
  }

  RESULT await_resume() { return result_.get(); }
};