Toolbox snapshot
The Reactive C++ Toolbox
Loading...
Searching...
No Matches
Timer.cpp
Go to the documentation of this file.
1// The Reactive C++ Toolbox.
2// Copyright (C) 2013-2019 Swirly Cloud Limited
3// Copyright (C) 2021 Reactive Markets Limited
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17#include "Timer.hpp"
18
19#include <toolbox/sys/Log.hpp>
20
21#include <string>
22
23namespace toolbox {
24inline namespace io {
25using namespace std;
26namespace {
27// Number of entries per 4K slab, assuming that malloc overhead is no more than 16 bytes.
28constexpr size_t Overhead = 16;
29constexpr size_t PageSize = 4096;
30constexpr size_t SlabSize = (PageSize - Overhead) / sizeof(Timer);
31
32bool is_after(const Timer& lhs, const Timer& rhs)
33{
34 return lhs.expiry() > rhs.expiry();
35}
36
37} // namespace
38
39Timer::Impl* TimerPool::allocate(MonoTime /*expiry*/, Duration /*interval*/, TimerSlot /*slot*/)
40{
42
43 if (free_) {
44
45 // Pop next free timer from stack.
46 impl = free_;
47 free_ = free_->next;
48
49 } else {
50
51 // Add new slab of timers to stack.
52 SlabPtr slab{new Timer::Impl[SlabSize]};
53 impl = &slab[0];
54
55 for (size_t i{1}; i < SlabSize; ++i) {
56 slab[i].next = free_;
57 free_ = &slab[i];
58 }
59 slabs_.push_back(std::move(slab));
60 }
61
62 return impl;
63}
64
66{
67 assert(slot);
68
69 heap_.reserve(heap_.size() + 1);
70 const auto tmr{allocate(expiry, interval, slot)};
71
72 // Cannot fail.
73 heap_.push_back(tmr);
74 push_heap(heap_.begin(), heap_.end(), is_after);
75
76 return tmr;
77}
78
80{
81 int work{};
82 while (!heap_.empty()) {
83
84 // If not pending, then must have been cancelled.
85 if (!heap_.front().pending()) {
86 pop();
87 --cancelled_;
88 assert(cancelled_ >= 0);
89 } else if (heap_.front().expiry() <= now.mono_time()) {
90 expire(now);
91 ++work;
92 } else {
93 break;
94 }
95 }
96 gc();
97 return work;
98}
99
100Timer TimerQueue::allocate(MonoTime expiry, Duration interval, TimerSlot slot)
101{
102 Timer::Impl* impl{pool_.allocate(expiry, interval, slot)};
103
104 impl->tq = this;
105 impl->ref_count = 1;
106 impl->id = ++max_id_;
107 impl->expiry = expiry;
108 impl->interval = interval;
109 impl->slot = slot;
110
111 return Timer{impl};
112}
113
114void TimerQueue::cancel() noexcept
115{
116 ++cancelled_;
117
118 // Ensure that a pending timer is at the front of the queue.
119 // If not pending, then must have been cancelled.
120 while (!heap_.empty() && !heap_.front().pending()) {
121 pop();
122 --cancelled_;
123 assert(cancelled_ >= 0);
124 }
125 gc();
126}
127
128void TimerQueue::expire(CyclTime now)
129{
130 // Pop timer.
131 auto tmr = pop();
132 assert(tmr.pending());
133 try {
134 // Notify user.
135 tmr.slot().invoke(now, tmr);
136 } catch (const std::exception& e) {
137 TOOLBOX_ERROR << "exception in i/o timer handler: " << e.what();
138 }
139
140 // If timer was not cancelled during the callback.
141 if (tmr.pending()) {
142
143 // If periodic timer.
144 if (tmr.interval().count() > 0) {
145
146 // Add interval to expiry, while ensuring that next expiry is always in the future.
147 tmr.set_expiry(max(tmr.expiry() + tmr.interval(), now.mono_time() + 1ns));
148
149 // Reschedule popped timer.
150 heap_.push_back(tmr);
151 push_heap(heap_.begin(), heap_.end(), is_after);
152
153 } else {
154
155 // Free handler for non-repeating timer.
156 tmr.slot().reset();
157 }
158 }
159}
160
161void TimerQueue::gc() noexcept
162{
163 // Garbage collect if more than half of the timers have been cancelled.
164 if (cancelled_ > static_cast<int>(heap_.size() >> 1)) {
165 const auto it
166 = remove_if(heap_.begin(), heap_.end(), [](const auto& tmr) { return !tmr.pending(); });
167 heap_.erase(it, heap_.end());
168 make_heap(heap_.begin(), heap_.end(), is_after);
169 cancelled_ = 0;
170 }
171}
172
173Timer TimerQueue::pop() noexcept
174{
175 auto tmr = heap_.front();
176 pop_heap(heap_.begin(), heap_.end(), is_after);
177 heap_.pop_back();
178 return tmr;
179}
180
182{
183 --impl->ref_count;
184 if (impl->ref_count == 1) {
185 // Cancel pending if only one reference remains. If only one reference remains after
186 // decrementing the counter, and the timer is still pending, then the final reference must
187 // be held within the internal timer queue, which means that no more references exist
188 // outside of the timer queue.
189 if (impl->slot) {
190 impl->slot.reset();
191 impl->tq->cancel();
192 }
193 } else if (impl->ref_count == 0) {
194 impl->tq->pool_.deallocate(impl);
195 }
196}
197
198} // namespace io
199} // namespace toolbox
#define TOOLBOX_ERROR
Definition Log.hpp:93
Timer::Impl * allocate(MonoTime expiry, Duration interval, TimerSlot slot)
Definition Timer.cpp:39
Timer insert(MonoTime expiry, Duration interval, TimerSlot slot)
Throws std::bad_alloc only.
Definition Timer.cpp:65
int dispatch(CyclTime now)
Definition Timer.cpp:79
MonoTime mono_time() const noexcept
Definition Time.hpp:140
STL namespace.
int64_t max(const Histogram &h) noexcept
Definition Utility.cpp:46
void intrusive_ptr_release(Timer::Impl *impl) noexcept
Definition Timer.cpp:181
MonoClock::time_point MonoTime
Definition Time.hpp:110
Nanos Duration
Definition Time.hpp:40
constexpr auto bind() noexcept
Definition Slot.hpp:92
Impl * next
Singly-linked free-list.
Definition Timer.hpp:43