include/boost/corosio/native/detail/select/select_socket_service.hpp

75.7% Lines (262/346) 93.1% Functions (27/29)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/socket_service.hpp>
20
21 #include <boost/corosio/native/detail/select/select_socket.hpp>
22 #include <boost/corosio/native/detail/select/select_scheduler.hpp>
23
24 #include <boost/corosio/native/detail/endpoint_convert.hpp>
25 #include <boost/corosio/detail/dispatch_coro.hpp>
26 #include <boost/corosio/native/detail/make_err.hpp>
27
28 #include <boost/corosio/detail/except.hpp>
29
30 #include <boost/capy/buffers.hpp>
31
32 #include <errno.h>
33 #include <fcntl.h>
34 #include <netinet/in.h>
35 #include <netinet/tcp.h>
36 #include <sys/socket.h>
37 #include <unistd.h>
38
39 #include <memory>
40 #include <mutex>
41 #include <unordered_map>
42
43 /*
44 select Socket Implementation
45 ============================
46
47 This mirrors the epoll_sockets design for behavioral consistency.
48 Each I/O operation follows the same pattern:
49 1. Try the syscall immediately (non-blocking socket)
50 2. If it succeeds or fails with a real error, post to completion queue
51 3. If EAGAIN/EWOULDBLOCK, register with select scheduler and wait
52
53 Cancellation
54 ------------
55 See op.hpp for the completion/cancellation race handling via the
56 `registered` atomic. cancel() must complete pending operations (post
57 them with cancelled flag) so coroutines waiting on them can resume.
58 close_socket() calls cancel() first to ensure this.
59
60 Impl Lifetime with shared_ptr
61 -----------------------------
62 Socket impls use enable_shared_from_this. The service owns impls via
63 shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
64 removal. When a user calls close(), we call cancel() which posts pending
65 ops to the scheduler.
66
67 CRITICAL: The posted ops must keep the impl alive until they complete.
68 Otherwise the scheduler would process a freed op (use-after-free). The
69 cancel() method captures shared_from_this() into op.impl_ptr before
70 posting. When the op completes, impl_ptr is cleared, allowing the impl
71 to be destroyed if no other references exist.
72
73 Service Ownership
74 -----------------
75 select_socket_service owns all socket impls. destroy() removes the
76 shared_ptr from the map, but the impl may survive if ops still hold
77 impl_ptr refs. shutdown() closes all sockets and clears the map; any
78 in-flight ops will complete and release their refs.
79 */
80
81 namespace boost::corosio::detail {
82
83 /** State for select socket service. */
84 class select_socket_state
85 {
86 public:
87 168x explicit select_socket_state(select_scheduler& sched) noexcept
88 168x : sched_(sched)
89 {
90 168x }
91
92 select_scheduler& sched_;
93 std::mutex mutex_;
94 intrusive_list<select_socket> socket_list_;
95 std::unordered_map<select_socket*, std::shared_ptr<select_socket>>
96 socket_ptrs_;
97 };
98
99 /** select socket service implementation.
100
101 Inherits from socket_service to enable runtime polymorphism.
102 Uses key_type = socket_service for service lookup.
103 */
104 class BOOST_COROSIO_DECL select_socket_service final : public socket_service
105 {
106 public:
107 explicit select_socket_service(capy::execution_context& ctx);
108 ~select_socket_service() override;
109
110 select_socket_service(select_socket_service const&) = delete;
111 select_socket_service& operator=(select_socket_service const&) = delete;
112
113 void shutdown() override;
114
115 io_object::implementation* construct() override;
116 void destroy(io_object::implementation*) override;
117 void close(io_object::handle&) override;
118 std::error_code open_socket(
119 tcp_socket::implementation& impl,
120 int family,
121 int type,
122 int protocol) override;
123
124 7018x select_scheduler& scheduler() const noexcept
125 {
126 7018x return state_->sched_;
127 }
128 void post(select_op* op);
129 void work_started() noexcept;
130 void work_finished() noexcept;
131
132 private:
133 std::unique_ptr<select_socket_state> state_;
134 };
135
136 // Backward compatibility alias
137 using select_sockets = select_socket_service;
138
139 inline void
140 98x select_op::canceller::operator()() const noexcept
141 {
142 98x op->cancel();
143 98x }
144
145 inline void
146 select_connect_op::cancel() noexcept
147 {
148 if (socket_impl_)
149 socket_impl_->cancel_single_op(*this);
150 else
151 request_cancel();
152 }
153
154 inline void
155 98x select_read_op::cancel() noexcept
156 {
157 98x if (socket_impl_)
158 98x socket_impl_->cancel_single_op(*this);
159 else
160 request_cancel();
161 98x }
162
163 inline void
164 select_write_op::cancel() noexcept
165 {
166 if (socket_impl_)
167 socket_impl_->cancel_single_op(*this);
168 else
169 request_cancel();
170 }
171
172 inline void
173 2189x select_connect_op::operator()()
174 {
175 2189x stop_cb.reset();
176
177 2189x bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
178
179 // Cache endpoints on successful connect
180 2189x if (success && socket_impl_)
181 {
182 2187x endpoint local_ep;
183 2187x sockaddr_storage local_storage{};
184 2187x socklen_t local_len = sizeof(local_storage);
185 2187x if (::getsockname(
186 2187x fd, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
187 0)
188 2187x local_ep = from_sockaddr(local_storage);
189 2187x static_cast<select_socket*>(socket_impl_)
190 2187x ->set_endpoints(local_ep, target_endpoint);
191 }
192
193 2189x if (ec_out)
194 {
195 2189x if (cancelled.load(std::memory_order_acquire))
196 *ec_out = capy::error::canceled;
197 2189x else if (errn != 0)
198 2x *ec_out = make_err(errn);
199 else
200 2187x *ec_out = {};
201 }
202
203 2189x if (bytes_out)
204 *bytes_out = bytes_transferred;
205
206 // Move to stack before destroying the frame
207 2189x capy::executor_ref saved_ex(ex);
208 2189x std::coroutine_handle<> saved_h(h);
209 2189x impl_ptr.reset();
210 2189x dispatch_coro(saved_ex, saved_h).resume();
211 2189x }
212
213 6587x inline select_socket::select_socket(select_socket_service& svc) noexcept
214 6587x : svc_(svc)
215 {
216 6587x }
217
218 inline std::coroutine_handle<>
219 2189x select_socket::connect(
220 std::coroutine_handle<> h,
221 capy::executor_ref ex,
222 endpoint ep,
223 std::stop_token token,
224 std::error_code* ec)
225 {
226 2189x auto& op = conn_;
227 2189x op.reset();
228 2189x op.h = h;
229 2189x op.ex = ex;
230 2189x op.ec_out = ec;
231 2189x op.fd = fd_;
232 2189x op.target_endpoint = ep; // Store target for endpoint caching
233 2189x op.start(token, this);
234
235 2189x sockaddr_storage storage{};
236 socklen_t addrlen =
237 2189x detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
238 2189x int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
239
240 2189x if (result == 0)
241 {
242 // Sync success — cache endpoints immediately
243 sockaddr_storage local_storage{};
244 socklen_t local_len = sizeof(local_storage);
245 if (::getsockname(
246 fd_, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
247 0)
248 local_endpoint_ = detail::from_sockaddr(local_storage);
249 remote_endpoint_ = ep;
250
251 op.complete(0, 0);
252 op.impl_ptr = shared_from_this();
253 svc_.post(&op);
254 // completion is always posted to scheduler queue, never inline.
255 return std::noop_coroutine();
256 }
257
258 2189x if (errno == EINPROGRESS)
259 {
260 2189x svc_.work_started();
261 2189x op.impl_ptr = shared_from_this();
262
263 // Set registering BEFORE register_fd to close the race window where
264 // reactor sees an event before we set registered. The reactor treats
265 // registering the same as registered when claiming the op.
266 2189x op.registered.store(
267 select_registration_state::registering, std::memory_order_release);
268 2189x svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
269
270 // Transition to registered. If this fails, reactor or cancel already
271 // claimed the op (state is now unregistered), so we're done. However,
272 // we must still deregister the fd because cancel's deregister_fd may
273 // have run before our register_fd, leaving the fd orphaned.
274 2189x auto expected = select_registration_state::registering;
275 2189x if (!op.registered.compare_exchange_strong(
276 expected, select_registration_state::registered,
277 std::memory_order_acq_rel))
278 {
279 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
280 // completion is always posted to scheduler queue, never inline.
281 return std::noop_coroutine();
282 }
283
284 // If cancelled was set before we registered, handle it now.
285 2189x if (op.cancelled.load(std::memory_order_acquire))
286 {
287 auto prev = op.registered.exchange(
288 select_registration_state::unregistered,
289 std::memory_order_acq_rel);
290 if (prev != select_registration_state::unregistered)
291 {
292 svc_.scheduler().deregister_fd(
293 fd_, select_scheduler::event_write);
294 op.impl_ptr = shared_from_this();
295 svc_.post(&op);
296 svc_.work_finished();
297 }
298 }
299 // completion is always posted to scheduler queue, never inline.
300 2189x return std::noop_coroutine();
301 }
302
303 op.complete(errno, 0);
304 op.impl_ptr = shared_from_this();
305 svc_.post(&op);
306 // completion is always posted to scheduler queue, never inline.
307 return std::noop_coroutine();
308 }
309
310 inline std::coroutine_handle<>
311 101855x select_socket::read_some(
312 std::coroutine_handle<> h,
313 capy::executor_ref ex,
314 buffer_param param,
315 std::stop_token token,
316 std::error_code* ec,
317 std::size_t* bytes_out)
318 {
319 101855x auto& op = rd_;
320 101855x op.reset();
321 101855x op.h = h;
322 101855x op.ex = ex;
323 101855x op.ec_out = ec;
324 101855x op.bytes_out = bytes_out;
325 101855x op.fd = fd_;
326 101855x op.start(token, this);
327
328 101855x capy::mutable_buffer bufs[select_read_op::max_buffers];
329 101855x op.iovec_count =
330 101855x static_cast<int>(param.copy_to(bufs, select_read_op::max_buffers));
331
332 101855x if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
333 {
334 1x op.empty_buffer_read = true;
335 1x op.complete(0, 0);
336 1x op.impl_ptr = shared_from_this();
337 1x svc_.post(&op);
338 1x return std::noop_coroutine();
339 }
340
341 203708x for (int i = 0; i < op.iovec_count; ++i)
342 {
343 101854x op.iovecs[i].iov_base = bufs[i].data();
344 101854x op.iovecs[i].iov_len = bufs[i].size();
345 }
346
347 101854x ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
348
349 101854x if (n > 0)
350 {
351 101569x op.complete(0, static_cast<std::size_t>(n));
352 101569x op.impl_ptr = shared_from_this();
353 101569x svc_.post(&op);
354 101569x return std::noop_coroutine();
355 }
356
357 285x if (n == 0)
358 {
359 5x op.complete(0, 0);
360 5x op.impl_ptr = shared_from_this();
361 5x svc_.post(&op);
362 5x return std::noop_coroutine();
363 }
364
365 280x if (errno == EAGAIN || errno == EWOULDBLOCK)
366 {
367 280x svc_.work_started();
368 280x op.impl_ptr = shared_from_this();
369
370 // Set registering BEFORE register_fd to close the race window where
371 // reactor sees an event before we set registered.
372 280x op.registered.store(
373 select_registration_state::registering, std::memory_order_release);
374 280x svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
375
376 // Transition to registered. If this fails, reactor or cancel already
377 // claimed the op (state is now unregistered), so we're done. However,
378 // we must still deregister the fd because cancel's deregister_fd may
379 // have run before our register_fd, leaving the fd orphaned.
380 280x auto expected = select_registration_state::registering;
381 280x if (!op.registered.compare_exchange_strong(
382 expected, select_registration_state::registered,
383 std::memory_order_acq_rel))
384 {
385 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
386 return std::noop_coroutine();
387 }
388
389 // If cancelled was set before we registered, handle it now.
390 280x if (op.cancelled.load(std::memory_order_acquire))
391 {
392 auto prev = op.registered.exchange(
393 select_registration_state::unregistered,
394 std::memory_order_acq_rel);
395 if (prev != select_registration_state::unregistered)
396 {
397 svc_.scheduler().deregister_fd(
398 fd_, select_scheduler::event_read);
399 op.impl_ptr = shared_from_this();
400 svc_.post(&op);
401 svc_.work_finished();
402 }
403 }
404 280x return std::noop_coroutine();
405 }
406
407 op.complete(errno, 0);
408 op.impl_ptr = shared_from_this();
409 svc_.post(&op);
410 return std::noop_coroutine();
411 }
412
413 inline std::coroutine_handle<>
414 101693x select_socket::write_some(
415 std::coroutine_handle<> h,
416 capy::executor_ref ex,
417 buffer_param param,
418 std::stop_token token,
419 std::error_code* ec,
420 std::size_t* bytes_out)
421 {
422 101693x auto& op = wr_;
423 101693x op.reset();
424 101693x op.h = h;
425 101693x op.ex = ex;
426 101693x op.ec_out = ec;
427 101693x op.bytes_out = bytes_out;
428 101693x op.fd = fd_;
429 101693x op.start(token, this);
430
431 101693x capy::mutable_buffer bufs[select_write_op::max_buffers];
432 101693x op.iovec_count =
433 101693x static_cast<int>(param.copy_to(bufs, select_write_op::max_buffers));
434
435 101693x if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
436 {
437 1x op.complete(0, 0);
438 1x op.impl_ptr = shared_from_this();
439 1x svc_.post(&op);
440 1x return std::noop_coroutine();
441 }
442
443 203384x for (int i = 0; i < op.iovec_count; ++i)
444 {
445 101692x op.iovecs[i].iov_base = bufs[i].data();
446 101692x op.iovecs[i].iov_len = bufs[i].size();
447 }
448
449 101692x msghdr msg{};
450 101692x msg.msg_iov = op.iovecs;
451 101692x msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
452
453 101692x ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
454
455 101692x if (n > 0)
456 {
457 101691x op.complete(0, static_cast<std::size_t>(n));
458 101691x op.impl_ptr = shared_from_this();
459 101691x svc_.post(&op);
460 101691x return std::noop_coroutine();
461 }
462
463 1x if (errno == EAGAIN || errno == EWOULDBLOCK)
464 {
465 svc_.work_started();
466 op.impl_ptr = shared_from_this();
467
468 // Set registering BEFORE register_fd to close the race window where
469 // reactor sees an event before we set registered.
470 op.registered.store(
471 select_registration_state::registering, std::memory_order_release);
472 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
473
474 // Transition to registered. If this fails, reactor or cancel already
475 // claimed the op (state is now unregistered), so we're done. However,
476 // we must still deregister the fd because cancel's deregister_fd may
477 // have run before our register_fd, leaving the fd orphaned.
478 auto expected = select_registration_state::registering;
479 if (!op.registered.compare_exchange_strong(
480 expected, select_registration_state::registered,
481 std::memory_order_acq_rel))
482 {
483 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
484 return std::noop_coroutine();
485 }
486
487 // If cancelled was set before we registered, handle it now.
488 if (op.cancelled.load(std::memory_order_acquire))
489 {
490 auto prev = op.registered.exchange(
491 select_registration_state::unregistered,
492 std::memory_order_acq_rel);
493 if (prev != select_registration_state::unregistered)
494 {
495 svc_.scheduler().deregister_fd(
496 fd_, select_scheduler::event_write);
497 op.impl_ptr = shared_from_this();
498 svc_.post(&op);
499 svc_.work_finished();
500 }
501 }
502 return std::noop_coroutine();
503 }
504
505 1x op.complete(errno ? errno : EIO, 0);
506 1x op.impl_ptr = shared_from_this();
507 1x svc_.post(&op);
508 1x return std::noop_coroutine();
509 }
510
511 inline std::error_code
512 3x select_socket::shutdown(tcp_socket::shutdown_type what) noexcept
513 {
514 int how;
515 3x switch (what)
516 {
517 1x case tcp_socket::shutdown_receive:
518 1x how = SHUT_RD;
519 1x break;
520 1x case tcp_socket::shutdown_send:
521 1x how = SHUT_WR;
522 1x break;
523 1x case tcp_socket::shutdown_both:
524 1x how = SHUT_RDWR;
525 1x break;
526 default:
527 return make_err(EINVAL);
528 }
529 3x if (::shutdown(fd_, how) != 0)
530 return make_err(errno);
531 3x return {};
532 }
533
534 inline std::error_code
535 28x select_socket::set_option(
536 int level, int optname, void const* data, std::size_t size) noexcept
537 {
538 28x if (::setsockopt(fd_, level, optname, data, static_cast<socklen_t>(size)) !=
539 0)
540 return make_err(errno);
541 28x return {};
542 }
543
544 inline std::error_code
545 31x select_socket::get_option(
546 int level, int optname, void* data, std::size_t* size) const noexcept
547 {
548 31x socklen_t len = static_cast<socklen_t>(*size);
549 31x if (::getsockopt(fd_, level, optname, data, &len) != 0)
550 return make_err(errno);
551 31x *size = static_cast<std::size_t>(len);
552 31x return {};
553 }
554
555 inline void
556 175x select_socket::cancel() noexcept
557 {
558 175x auto self = weak_from_this().lock();
559 175x if (!self)
560 return;
561
562 525x auto cancel_op = [this, &self](select_op& op, int events) {
563 525x auto prev = op.registered.exchange(
564 select_registration_state::unregistered, std::memory_order_acq_rel);
565 525x op.request_cancel();
566 525x if (prev != select_registration_state::unregistered)
567 {
568 91x svc_.scheduler().deregister_fd(fd_, events);
569 91x op.impl_ptr = self;
570 91x svc_.post(&op);
571 91x svc_.work_finished();
572 }
573 700x };
574
575 175x cancel_op(conn_, select_scheduler::event_write);
576 175x cancel_op(rd_, select_scheduler::event_read);
577 175x cancel_op(wr_, select_scheduler::event_write);
578 175x }
579
580 inline void
581 98x select_socket::cancel_single_op(select_op& op) noexcept
582 {
583 98x auto self = weak_from_this().lock();
584 98x if (!self)
585 return;
586
587 // Called from stop_token callback to cancel a specific pending operation.
588 98x auto prev = op.registered.exchange(
589 select_registration_state::unregistered, std::memory_order_acq_rel);
590 98x op.request_cancel();
591
592 98x if (prev != select_registration_state::unregistered)
593 {
594 // Determine which event type to deregister
595 66x int events = 0;
596 66x if (&op == &conn_ || &op == &wr_)
597 events = select_scheduler::event_write;
598 66x else if (&op == &rd_)
599 66x events = select_scheduler::event_read;
600
601 66x svc_.scheduler().deregister_fd(fd_, events);
602
603 66x op.impl_ptr = self;
604 66x svc_.post(&op);
605 66x svc_.work_finished();
606 }
607 98x }
608
609 inline void
610 19769x select_socket::close_socket() noexcept
611 {
612 19769x auto self = weak_from_this().lock();
613 19769x if (self)
614 {
615 59307x auto cancel_op = [this, &self](select_op& op, int events) {
616 59307x auto prev = op.registered.exchange(
617 select_registration_state::unregistered,
618 std::memory_order_acq_rel);
619 59307x op.request_cancel();
620 59307x if (prev != select_registration_state::unregistered)
621 {
622 1x svc_.scheduler().deregister_fd(fd_, events);
623 1x op.impl_ptr = self;
624 1x svc_.post(&op);
625 1x svc_.work_finished();
626 }
627 79076x };
628
629 19769x cancel_op(conn_, select_scheduler::event_write);
630 19769x cancel_op(rd_, select_scheduler::event_read);
631 19769x cancel_op(wr_, select_scheduler::event_write);
632 }
633
634 19769x if (fd_ >= 0)
635 {
636 4391x svc_.scheduler().deregister_fd(
637 fd_, select_scheduler::event_read | select_scheduler::event_write);
638 4391x ::close(fd_);
639 4391x fd_ = -1;
640 }
641
642 19769x local_endpoint_ = endpoint{};
643 19769x remote_endpoint_ = endpoint{};
644 19769x }
645
646 168x inline select_socket_service::select_socket_service(
647 168x capy::execution_context& ctx)
648 168x : state_(
649 std::make_unique<select_socket_state>(
650 168x ctx.use_service<select_scheduler>()))
651 {
652 168x }
653
654 336x inline select_socket_service::~select_socket_service() {}
655
656 inline void
657 168x select_socket_service::shutdown()
658 {
659 168x std::lock_guard lock(state_->mutex_);
660
661 168x while (auto* impl = state_->socket_list_.pop_front())
662 impl->close_socket();
663
664 // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
665 // drains completed_ops_, calling destroy() on each queued op. Letting
666 // ~state_ release the ptrs (during service destruction, after scheduler
667 // shutdown) keeps every impl alive until all ops have been drained.
668 168x }
669
670 inline io_object::implementation*
671 6587x select_socket_service::construct()
672 {
673 6587x auto impl = std::make_shared<select_socket>(*this);
674 6587x auto* raw = impl.get();
675
676 {
677 6587x std::lock_guard lock(state_->mutex_);
678 6587x state_->socket_list_.push_back(raw);
679 6587x state_->socket_ptrs_.emplace(raw, std::move(impl));
680 6587x }
681
682 6587x return raw;
683 6587x }
684
685 inline void
686 6587x select_socket_service::destroy(io_object::implementation* impl)
687 {
688 6587x auto* select_impl = static_cast<select_socket*>(impl);
689 6587x select_impl->close_socket();
690 6587x std::lock_guard lock(state_->mutex_);
691 6587x state_->socket_list_.remove(select_impl);
692 6587x state_->socket_ptrs_.erase(select_impl);
693 6587x }
694
695 inline std::error_code
696 2204x select_socket_service::open_socket(
697 tcp_socket::implementation& impl, int family, int type, int protocol)
698 {
699 2204x auto* select_impl = static_cast<select_socket*>(&impl);
700 2204x select_impl->close_socket();
701
702 2204x int fd = ::socket(family, type, protocol);
703 2204x if (fd < 0)
704 return make_err(errno);
705
706 2204x if (family == AF_INET6)
707 {
708 5x int one = 1;
709 5x ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
710 }
711
712 // Set non-blocking and close-on-exec
713 2204x int flags = ::fcntl(fd, F_GETFL, 0);
714 2204x if (flags == -1)
715 {
716 int errn = errno;
717 ::close(fd);
718 return make_err(errn);
719 }
720 2204x if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
721 {
722 int errn = errno;
723 ::close(fd);
724 return make_err(errn);
725 }
726 2204x if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
727 {
728 int errn = errno;
729 ::close(fd);
730 return make_err(errn);
731 }
732
733 // Check fd is within select() limits
734 2204x if (fd >= FD_SETSIZE)
735 {
736 ::close(fd);
737 return make_err(EMFILE); // Too many open files
738 }
739
740 2204x select_impl->fd_ = fd;
741 2204x return {};
742 }
743
744 inline void
745 10978x select_socket_service::close(io_object::handle& h)
746 {
747 10978x static_cast<select_socket*>(h.get())->close_socket();
748 10978x }
749
750 inline void
751 203426x select_socket_service::post(select_op* op)
752 {
753 203426x state_->sched_.post(op);
754 203426x }
755
756 inline void
757 2469x select_socket_service::work_started() noexcept
758 {
759 2469x state_->sched_.work_started();
760 2469x }
761
762 inline void
763 158x select_socket_service::work_finished() noexcept
764 {
765 158x state_->sched_.work_finished();
766 158x }
767
768 } // namespace boost::corosio::detail
769
770 #endif // BOOST_COROSIO_HAS_SELECT
771
772 #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
773