libMesh
thread_buffered_syncbuf.C
Go to the documentation of this file.
1 // The libMesh Finite Element Library.
2 // Copyright (C) 2002-2025 Benjamin S. Kirk, John W. Peterson, Roy H. Stogner
3 
4 // This library is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU Lesser General Public
6 // License as published by the Free Software Foundation; either
7 // version 2.1 of the License, or (at your option) any later version.
8 
9 // This library is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 // Lesser General Public License for more details.
13 
14 // You should have received a copy of the GNU Lesser General Public
15 // License along with this library; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 
18 #include "libmesh/thread_buffered_syncbuf.h"
19 #include "libmesh/threads.h"
20 
21 namespace libMesh
22 {
23 ThreadBufferedSyncbuf::ThreadBufferedSyncbuf(std::streambuf & sink, bool flush_on_newline)
24  : _sink(sink), _flush_on_newline(flush_on_newline), _mu(std::make_unique<Threads::spin_mutex>())
25 {
26 }
27 
29 
30 ThreadBufferedSyncbuf::int_type ThreadBufferedSyncbuf::overflow(int_type ch)
31 {
32  if (traits_type::eq_int_type(ch, traits_type::eof()))
33  return traits_type::not_eof(ch);
34 
35  auto & t = this->thread_local_buffer();
36  t.buf.push_back(traits_type::to_char_type(ch));
37 
38  if (_flush_on_newline && ch == '\n')
39  this->emit_from_thread_local_buffer(t.buf, /*force_flush=*/false);
40 
41  return ch;
42 }
43 
44 std::streamsize ThreadBufferedSyncbuf::xsputn(const char * s, std::streamsize n)
45 {
46  auto & t = this->thread_local_buffer();
47  t.buf.append(s, static_cast<size_t>(n));
48 
49  if (_flush_on_newline && !t.buf.empty() && t.buf.back() == '\n')
50  this->emit_from_thread_local_buffer(t.buf, /*force_flush=*/false);
51 
52  return n;
53 }
54 
56 {
57  this->emit_from_thread_local_buffer(this->thread_local_buffer().buf, /*force_flush=*/true);
58  return 0;
59 }
60 
62 {
63  static thread_local ThreadLocalBuffer t(*this);
64  return t;
65 }
66 
67 void ThreadBufferedSyncbuf::emit_from_thread_local_buffer(std::string & b, bool force_flush)
68 {
69  if (b.empty())
70  {
71  if (force_flush)
72  {
73  Threads::spin_mutex::scoped_lock lk(*_mu);
74  _sink.pubsync();
75  }
76  return;
77  }
78 
79  {
80  Threads::spin_mutex::scoped_lock lk(*_mu);
81  _sink.sputn(b.data(), static_cast<std::streamsize>(b.size()));
82  if (force_flush)
83  _sink.pubsync();
84  }
85  b.clear();
86 }
87 
89 {
90  // Runs at thread exit: commit any leftovers for this thread.
91  if (!buf.empty())
92  _owner.emit_from_thread_local_buffer(buf, /*force_flush=*/false);
93 }
94 
95 }
ThreadBufferedSyncbuf(std::streambuf &sink, bool flush_on_newline=true)
const bool _flush_on_newline
Whether to flush our sink to terminal/file on terminating new-line characters ( ) ...
The libMesh namespace provides an interface to certain functionality in the library.
ThreadLocalBuffer & thread_local_buffer()
One ThreadLocalBuffer instance per thread, constructed on first use with *this.
void emit_from_thread_local_buffer(std::string &b, bool force_flush)
Emit from the thread local buffer to our wrapped _sink.
~ThreadLocalBuffer()
Ensures we write to our sink upon destruction if our buf is non-empty.
std::streambuf & _sink
Wrapped output sink.
std::unique_ptr< Threads::spin_mutex > _mu
Serialization for commits to the shared sink.
int_type overflow(int_type ch) override
ThreadBufferedSyncbuf & _owner
owning syncing stream buffer
~ThreadBufferedSyncbuf()
Defaulted destructor defined in implementation so we don&#39;t need to include threads.h.
std::streamsize xsputn(const char *s, std::streamsize n) override
tbb::spin_mutex spin_mutex
Spin mutex.
Definition: threads_tbb.h:167
A class that wraps a thread-local string.