LCOV - code coverage report
Current view: top level - include/utils - SendBuffer.h (source / functions) Hit Total Coverage
Test: idaholab/moose ray_tracing: #31405 (292dce) with base fef103 Lines: 61 64 95.3 %
Date: 2025-09-04 07:56:07 Functions: 8 8 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : //* This file is part of the MOOSE framework
       2             : //* https://mooseframework.inl.gov
       3             : //*
       4             : //* All rights reserved, see COPYRIGHT for full restrictions
       5             : //* https://github.com/idaholab/moose/blob/master/COPYRIGHT
       6             : //*
       7             : //* Licensed under LGPL 2.1, please see LICENSE for details
       8             : //* https://www.gnu.org/licenses/lgpl-2.1.html
       9             : 
      10             : #pragma once
      11             : 
      12             : // Moose Includes
      13             : #include "Moose.h"
      14             : #include "SharedPool.h"
      15             : 
      16             : // libMesh Includes
      17             : #include "libmesh/parallel_object.h"
      18             : 
      19             : // Local includes
      20             : #include "ParallelStudyMethod.h"
      21             : 
      22             : // System Includes
      23             : #include <list>
      24             : 
      25             : template <typename Object, typename Context>
      26             : class SendBuffer : public libMesh::ParallelObject
      27             : {
      28             : public:
      29             :   SendBuffer(const libMesh::Parallel::Communicator & comm,
      30             :              const Context * const context,
      31             :              const processor_id_type pid,
      32             :              const ParallelStudyMethod & method,
      33             :              const unsigned int min_buffer_size,
      34             :              const unsigned int max_buffer_size,
      35             :              const Real buffer_growth_multiplier,
      36             :              const Real buffer_shrink_multiplier,
      37             :              const Parallel::MessageTag object_buffer_tag);
      38             : 
      39             :   /**
      40             :    * Destructor: ensures that all send requests have completed
      41             :    */
      42             :   ~SendBuffer() override;
      43             : 
      44             :   /**
      45             :    * Get the number of objects sent from this buffer
      46             :    */
      47          10 :   unsigned long int objectsSent() const { return _objects_sent; }
      48             :   /**
      49             :    * Get the number of buffers sent from this buffer
      50             :    */
      51          10 :   unsigned long int buffersSent() const { return _buffers_sent; }
      52             :   /**
      53             :    * Get the number of buffers created in the buffer pool
      54             :    */
      55             :   unsigned long int bufferPoolCreated() const { return _buffer_pool.num_created(); }
      56             : 
      57             :   /**
      58             :    * Whether or not messages are currently being sent
      59             :    */
      60             :   bool currentlySending() const { return _requests.size(); }
      61             :   /**
      62             :    * Whether or not objects are currently waiting to be sent
      63             :    */
      64             :   bool currentlyBuffered() const { return _buffer.size(); }
      65             : 
      66             :   /**
      67             :    * Move an object to the buffer.  May cause the buffer to be communicated.
      68             :    *
      69             :    * This DOES call std::move on the object
      70             :    */
      71             :   void moveObject(std::shared_ptr<Object> & object);
      72             : 
      73             :   /**
      74             :    * Forces a Send for all currently buffered objects
      75             :    */
      76             :   void forceSend(const bool shrink_current_buffer_size = true);
      77             : 
      78             :   /**
      79             :    * Wait for all requests to finish
      80             :    */
      81             :   void waitAll();
      82             : 
      83             :   /**
      84             :    * Clear all existing data
      85             :    */
      86             :   void clear();
      87             : 
      88             :   /**
      89             :    * Checks to see if any Requests can be finished
      90             :    */
      91             :   void cleanupRequests();
      92             : 
      93             : private:
      94             :   /// The context
      95             :   const Context * const _context;
      96             :   /// The processor ID this buffer will send to
      97             :   const processor_id_type _pid;
      98             : 
      99             :   const ParallelStudyMethod & _method;
     100             : 
     101             :   /// Minimum size of the buffer (in objects)
     102             :   const unsigned int _min_buffer_size;
     103             :   /// Maximum size of the buffer (in objects)
     104             :   const unsigned int _max_buffer_size;
     105             : 
     106             :   /// Multiplier for the buffer size for growing the buffer
     107             :   const Real _buffer_growth_multiplier;
     108             :   /// Multiplier for the buffer size for shrinking the buffer
     109             :   const Real _buffer_shrink_multiplier;
     110             : 
     111             :   /// MessageTag for sending objects
     112             :   const Parallel::MessageTag _object_buffer_tag;
     113             : 
     114             :   /// Current size of the buffer (in objects)
     115             :   unsigned int _current_buffer_size;
     116             :   /// Running buffer size
     117             :   Real _current_buffer_size_real;
     118             : 
     119             :   /// The buffer
     120             :   std::vector<std::shared_ptr<Object>> _buffer;
     121             :   /// The size of the objects in the buffer in bytes
     122             :   std::size_t _buffer_size_bytes;
     123             : 
     124             :   /// List of Requests
     125             :   std::list<std::shared_ptr<Parallel::Request>> _requests;
     126             : 
     127             :   /// Shared pool of buffers
     128             :   MooseUtils::SharedPool<
     129             :       std::vector<typename Parallel::Packing<std::shared_ptr<Object>>::buffer_type>>
     130             :       _buffer_pool;
     131             : 
     132             :   /// Counter for objects sent
     133             :   unsigned long int _objects_sent;
     134             :   /// Counter for buffers sent
     135             :   unsigned long int _buffers_sent;
     136             : };
     137             : 
     138             : template <typename Object, typename Context>
     139        3653 : SendBuffer<Object, Context>::SendBuffer(const libMesh::Parallel::Communicator & comm,
     140             :                                         const Context * const context,
     141             :                                         const processor_id_type pid,
     142             :                                         const ParallelStudyMethod & method,
     143             :                                         const unsigned int min_buffer_size,
     144             :                                         const unsigned int max_buffer_size,
     145             :                                         const Real buffer_growth_multiplier,
     146             :                                         const Real buffer_shrink_multiplier,
     147             :                                         const Parallel::MessageTag object_buffer_tag)
     148             :   : ParallelObject(comm),
     149        3653 :     _context(context),
     150        3653 :     _pid(pid),
     151        3653 :     _method(method),
     152        3653 :     _min_buffer_size(min_buffer_size),
     153        3653 :     _max_buffer_size(max_buffer_size),
     154        3653 :     _buffer_growth_multiplier(buffer_growth_multiplier),
     155        3653 :     _buffer_shrink_multiplier(buffer_shrink_multiplier),
     156        3653 :     _object_buffer_tag(object_buffer_tag),
     157        3653 :     _current_buffer_size(_min_buffer_size),
     158        3653 :     _current_buffer_size_real(_min_buffer_size),
     159        3653 :     _buffer_size_bytes(0),
     160        3653 :     _objects_sent(0),
     161        3653 :     _buffers_sent(0)
     162             : {
     163        3653 :   _buffer.reserve(max_buffer_size);
     164        3653 : }
     165             : 
     166             : template <typename Object, typename Context>
     167        7306 : SendBuffer<Object, Context>::~SendBuffer()
     168             : {
     169        3653 :   waitAll();
     170             :   cleanupRequests();
     171       14612 : }
     172             : 
     173             : template <typename Object, typename Context>
     174             : void
     175     1259326 : SendBuffer<Object, Context>::moveObject(std::shared_ptr<Object> & object)
     176             : {
     177     1259326 :   _buffer_size_bytes +=
     178     1259326 :       Parallel::Packing<std::shared_ptr<Object>>::packable_size(object, _context) *
     179             :       sizeof(typename Parallel::Packing<std::shared_ptr<Object>>::buffer_type);
     180     1259326 :   _buffer.emplace_back(std::move(object));
     181             : 
     182             :   // Force a send with SMART if we find it appropriate
     183     2517172 :   if ((_method == ParallelStudyMethod::SMART &&
     184     1259326 :        (_buffer.size() >= _current_buffer_size || _buffer.size() == _max_buffer_size)) ||
     185     1247695 :       (_buffer_size_bytes > 1048576)) // 1 MB
     186             :   {
     187       11631 :     _current_buffer_size_real =
     188       11631 :         std::min(_buffer_growth_multiplier * _current_buffer_size_real, (Real)_max_buffer_size);
     189             : 
     190       11631 :     if (_current_buffer_size != (unsigned int)_current_buffer_size_real)
     191           0 :       _current_buffer_size = _current_buffer_size_real;
     192             : 
     193       11631 :     forceSend(false);
     194             :   }
     195     1259326 : }
     196             : 
     197             : template <typename Object, typename Context>
     198             : void
     199      350148 : SendBuffer<Object, Context>::forceSend(const bool shrink_current_buffer_size)
     200             : {
     201      350148 :   if (!_buffer.empty())
     202             :   {
     203             :     auto req = std::make_shared<Parallel::Request>();
     204             : 
     205       16959 :     _requests.push_back(req);
     206             : 
     207       16959 :     _buffer_size_bytes = 0;
     208       16959 :     _objects_sent += _buffer.size();
     209       16959 :     _buffers_sent++;
     210             : 
     211             :     std::shared_ptr<std::vector<typename Parallel::Packing<std::shared_ptr<Object>>::buffer_type>>
     212       33918 :         buffer = _buffer_pool.acquire();
     213             : 
     214       16959 :     comm().nonblocking_send_packed_range(
     215       16959 :         _pid, _context, _buffer.begin(), _buffer.end(), *req, buffer, _object_buffer_tag);
     216             : 
     217       16959 :     _buffer.clear();
     218       16959 :     _buffer.reserve(_max_buffer_size);
     219             : 
     220       16959 :     if (_method == ParallelStudyMethod::SMART && shrink_current_buffer_size)
     221             :     {
     222        5308 :       _current_buffer_size_real =
     223        5308 :           std::max((Real)_min_buffer_size, _current_buffer_size_real * _buffer_shrink_multiplier);
     224             : 
     225        5308 :       if (_current_buffer_size != (unsigned int)_current_buffer_size_real)
     226           0 :         _current_buffer_size = _current_buffer_size_real;
     227             :     }
     228             :   }
     229             : 
     230             :   cleanupRequests();
     231      350148 : }
     232             : 
     233             : template <typename Object, typename Context>
     234             : void
     235        1898 : SendBuffer<Object, Context>::clear()
     236             : {
     237        1898 :   _buffer.clear();
     238             :   _requests.clear();
     239             : 
     240        1898 :   _objects_sent = 0;
     241        1898 :   _buffers_sent = 0;
     242        1898 : }
     243             : 
     244             : template <typename Object, typename Context>
     245             : void
     246             : SendBuffer<Object, Context>::cleanupRequests()
     247             : {
     248      353801 :   _requests.remove_if(
     249       16959 :       [](std::shared_ptr<Parallel::Request> & req)
     250             :       {
     251       16959 :         if (req->test())
     252             :         {
     253       16959 :           req->wait(); // MUST call wait() to do post_wait_work
     254       16959 :           return true;
     255             :         }
     256             :         else
     257             :           return false;
     258             :       });
     259             : }
     260             : 
     261             : template <typename Object, typename Context>
     262             : void
     263        3653 : SendBuffer<Object, Context>::waitAll()
     264             : {
     265           0 :   std::for_each(_requests.begin(), _requests.end(), [](auto & request) { request->wait(); });
     266        3653 : }

Generated by: LCOV version 1.14