LCOV - code coverage report
Current view: top level - include/utils - ReceiveBuffer.h (source / functions) Hit Total Coverage
Test: idaholab/moose ray_tracing: #31405 (292dce) with base fef103 Lines: 61 62 98.4 %
Date: 2025-09-04 07:56:07 Functions: 7 7 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : //* This file is part of the MOOSE framework
       2             : //* https://mooseframework.inl.gov
       3             : //*
       4             : //* All rights reserved, see COPYRIGHT for full restrictions
       5             : //* https://github.com/idaholab/moose/blob/master/COPYRIGHT
       6             : //*
       7             : //* Licensed under LGPL 2.1, please see LICENSE for details
       8             : //* https://www.gnu.org/licenses/lgpl-2.1.html
       9             : 
      10             : #pragma once
      11             : 
      12             : // Moose Includes
      13             : #include "LIFOBuffer.h"
      14             : #include "SharedPool.h"
      15             : 
      16             : // Local includes
      17             : #include "ParallelStudyMethod.h"
      18             : 
      19             : // libMesh Includes
      20             : #include "libmesh/parallel.h"
      21             : #include "libmesh/parallel_object.h"
      22             : 
      23             : // System Includes
      24             : #include <list>
      25             : #include <chrono>
      26             : 
      27             : template <typename Object, typename Context>
      28             : class ReceiveBuffer : public libMesh::ParallelObject
      29             : {
      30             : public:
      31             :   ReceiveBuffer(const libMesh::Parallel::Communicator & comm,
      32             :                 Context * const context,
      33             :                 const ParallelStudyMethod method,
      34             :                 const unsigned int clicks_per_receive,
      35             :                 const Parallel::MessageTag object_buffer_tag);
      36             : 
      37             :   /**
      38             :    * Destructor: ensures that all send requests have completed
      39             :    */
      40             :   ~ReceiveBuffer() override;
      41             : 
      42             :   /**
      43             :    * Whether or not there are messages that are being currently received
      44             :    */
      45          75 :   bool currentlyReceiving() const { return _requests.size(); }
      46             : 
      47             :   /**
      48             :    * The work received since the last reset
      49             :    */
      50          14 :   unsigned long int objectsReceived() const { return _objects_received; }
      51             : 
      52             :   /**
      53             :    * The number of buffers received since the last reset
      54             :    */
      55          14 :   unsigned long int buffersReceived() const { return _buffers_received; }
      56             : 
      57             :   /**
      58             :    * The total number of times we've polled for messages
      59             :    */
      60          14 :   unsigned long int numProbes() const { return _num_probes; }
      61             : 
      62             :   /**
      63             :    * Number of buffers created in the object buffer pool
      64             :    */
      65             :   unsigned long int objectPoolCreated() const { return _object_buffer_pool.num_created(); }
      66             : 
      67             :   /**
      68             :    * Number of buffers created in the buffer pool
      69             :    */
      70             :   unsigned long int bufferPoolCreated() const { return _buffer_pool.num_created(); }
      71             : 
      72             :   /**
      73             :    * Start receives for all currently available messages
      74             :    *
      75             :    * Adds the to the working buffer
      76             :    */
      77             :   void receive(const bool start_receives_only = false);
      78             : 
      79             :   /**
      80             :    * Clear all existing data
      81             :    */
      82             :   void clear();
      83             : 
      84             :   /**
      85             :    * Amount of time (in seconds) spent in the loop checking for messages and creating Receives
      86             :    */
      87             :   Real receiveLoopTime() const { return std::chrono::duration<Real>(_receive_loop_time).count(); }
      88             : 
      89             :   /**
      90             :    * Amount of time (in seconds) spent finishing receives and collecting objects
      91             :    */
      92             :   Real cleanupRequestsTime() const
      93             :   {
      94             :     return std::chrono::duration<Real>(_cleanup_requests_time).count();
      95             :   }
      96             : 
      97             :   /**
      98             :    * Checks to see if any Requests can be finished
      99             :    */
     100             :   void cleanupRequests();
     101             : 
     102             :   /**
     103             :    * Gets the buffer that the received objects are filled into after the requests are finished.
     104             :    */
     105             :   MooseUtils::LIFOBuffer<std::shared_ptr<Object>> & buffer() { return _buffer; }
     106             : 
     107             : private:
     108             :   /// The context
     109             :   Context * const _context;
     110             :   /// The buffer that finished requests are filled into
     111             :   MooseUtils::LIFOBuffer<std::shared_ptr<Object>> _buffer;
     112             : 
     113             :   /// The method
     114             :   const ParallelStudyMethod _method;
     115             :   /// Iterations to wait before checking for new objects
     116             :   const unsigned int _clicks_per_receive;
     117             : 
     118             :   /// List of Requests and buffers for each request
     119             :   std::list<std::pair<std::shared_ptr<Parallel::Request>,
     120             :                       std::shared_ptr<std::vector<std::shared_ptr<Object>>>>>
     121             :       _requests;
     122             : 
     123             :   /// MessageTag for sending objects
     124             :   const Parallel::MessageTag _object_buffer_tag;
     125             : 
     126             :   /// Receive loop time
     127             :   std::chrono::steady_clock::duration _receive_loop_time;
     128             :   /// Time cleaning up requests
     129             :   std::chrono::steady_clock::duration _cleanup_requests_time;
     130             : 
     131             :   /// Total objects received
     132             :   unsigned long int _objects_received;
     133             :   /// Total object buffers received
     134             :   unsigned long int _buffers_received;
     135             :   /// Total number of times we've polled for messages
     136             :   unsigned long int _num_probes;
     137             : 
     138             :   /// Shared pool of object buffers for incoming messages
     139             :   MooseUtils::SharedPool<std::vector<std::shared_ptr<Object>>> _object_buffer_pool;
     140             :   /// Shared pool of buffers
     141             :   MooseUtils::SharedPool<
     142             :       std::vector<typename Parallel::Packing<std::shared_ptr<Object>>::buffer_type>>
     143             :       _buffer_pool;
     144             : 
     145             :   template <typename C, typename OutputIter, typename T>
     146          20 :   inline void blocking_receive_packed_range(const Parallel::Communicator & comm,
     147             :                                             const processor_id_type src_processor_id,
     148             :                                             C * context,
     149             :                                             OutputIter out,
     150             :                                             const T * /* output_type */,
     151             :                                             Parallel::Request & req,
     152             :                                             Parallel::Status & stat,
     153             :                                             const Parallel::MessageTag & tag) const
     154             :   {
     155             :     libmesh_experimental();
     156             : 
     157             :     typedef typename Parallel::Packing<T>::buffer_type buffer_t;
     158             : 
     159             :     // Receive serialized variable size objects as a sequence of
     160             :     // buffer_t.
     161             :     // Allocate a buffer on the heap so we don't have to free it until
     162             :     // after the Request::wait()
     163          20 :     std::vector<buffer_t> * buffer = new std::vector<buffer_t>(stat.size());
     164          20 :     comm.receive(src_processor_id, *buffer, tag);
     165             : 
     166             :     // Make the Request::wait() handle unpacking the buffer
     167          20 :     req.add_post_wait_work(
     168          20 :         new libMesh::Parallel::PostWaitUnpackBuffer<std::vector<buffer_t>, C, OutputIter, T>(
     169             :             *buffer, context, out));
     170             : 
     171             :     // Make the Request::wait() handle deleting the buffer
     172          20 :     req.add_post_wait_work(
     173          20 :         new libMesh::Parallel::PostWaitDeleteBuffer<std::vector<buffer_t>>(buffer));
     174          20 :   }
     175             : };
     176             : 
     177             : template <typename Object, typename Context>
     178        3827 : ReceiveBuffer<Object, Context>::ReceiveBuffer(const libMesh::Parallel::Communicator & comm,
     179             :                                               Context * const context,
     180             :                                               const ParallelStudyMethod method,
     181             :                                               const unsigned int clicks_per_receive,
     182             :                                               const Parallel::MessageTag object_buffer_tag)
     183             :   : ParallelObject(comm),
     184        3827 :     _context(context),
     185        3827 :     _method(method),
     186        3827 :     _clicks_per_receive(clicks_per_receive),
     187        3827 :     _object_buffer_tag(object_buffer_tag)
     188             : {
     189        3827 : }
     190             : 
     191             : template <typename Object, typename Context>
     192        7134 : ReceiveBuffer<Object, Context>::~ReceiveBuffer()
     193             : {
     194        3567 :   if (!_requests.empty())
     195           0 :     mooseError("Some requests not serviced!");
     196        7134 : }
     197             : 
     198             : template <typename Object, typename Context>
     199             : void
     200     4465854 : ReceiveBuffer<Object, Context>::receive(const bool start_receives_only /* = false */)
     201             : {
     202     4465854 :   bool flag = false;
     203             :   Parallel::Status stat;
     204             : 
     205             :   static unsigned int current_clicks = 0;
     206             : 
     207     4465854 :   if (current_clicks % _clicks_per_receive == 0)
     208             :   {
     209     4465854 :     current_clicks = 0;
     210             : 
     211             :     // Receive and process a bunch of objects
     212     4482813 :     do
     213             :     {
     214     4482813 :       stat = _communicator.template packed_range_probe<std::shared_ptr<Object>>(
     215     4482813 :           Parallel::any_source, _object_buffer_tag, flag);
     216             : 
     217     4482813 :       _num_probes++;
     218             : 
     219     4482813 :       if (flag)
     220             :       {
     221             :         auto req = std::make_shared<Parallel::Request>();
     222       33918 :         std::shared_ptr<std::vector<std::shared_ptr<Object>>> objects =
     223             :             _object_buffer_pool.acquire();
     224             : 
     225             :         // Make sure the buffer is clear - this shouldn't resize the storage though.
     226       16959 :         objects->clear();
     227             : 
     228       16959 :         if (_method == ParallelStudyMethod::HARM || _method == ParallelStudyMethod::BS)
     229          20 :           blocking_receive_packed_range(comm(),
     230             :                                         stat.source(),
     231          20 :                                         _context,
     232             :                                         std::back_inserter(*objects),
     233             :                                         (std::shared_ptr<Object> *)(libmesh_nullptr),
     234             :                                         *req,
     235             :                                         stat,
     236             :                                         _object_buffer_tag);
     237             :         else
     238             :         {
     239             :           std::shared_ptr<
     240             :               std::vector<typename Parallel::Packing<std::shared_ptr<Object>>::buffer_type>>
     241       33878 :               buffer = _buffer_pool.acquire();
     242             : 
     243       16939 :           _communicator.nonblocking_receive_packed_range(
     244             :               stat.source(),
     245       16939 :               _context,
     246             :               std::back_inserter(*objects),
     247             :               (std::shared_ptr<Object> *)(libmesh_nullptr),
     248             :               *req,
     249             :               stat,
     250             :               buffer,
     251             :               _object_buffer_tag);
     252             :         }
     253             : 
     254       16959 :         _requests.emplace_back(req, objects);
     255             :       }
     256             :     } while (flag);
     257             :   }
     258             : 
     259     4465854 :   current_clicks++;
     260             : 
     261     4465854 :   if (!start_receives_only)
     262             :     cleanupRequests();
     263     4465854 : }
     264             : 
     265             : template <typename Object, typename Context>
     266             : void
     267        8023 : ReceiveBuffer<Object, Context>::clear()
     268             : {
     269             :   _requests.clear();
     270             : 
     271        8023 :   _receive_loop_time = std::chrono::steady_clock::duration::zero();
     272        8023 :   _cleanup_requests_time = std::chrono::steady_clock::duration::zero();
     273             : 
     274        8023 :   _objects_received = 0;
     275        8023 :   _buffers_received = 0;
     276        8023 :   _num_probes = 0;
     277        8023 : }
     278             : 
     279             : template <typename Object, typename Context>
     280             : void
     281             : ReceiveBuffer<Object, Context>::cleanupRequests()
     282             : {
     283     4417935 :   _requests.remove_if(
     284       16959 :       [&](std::pair<std::shared_ptr<Parallel::Request>,
     285             :                     std::shared_ptr<std::vector<std::shared_ptr<Object>>>> & request_pair)
     286             :       {
     287             :         auto req = request_pair.first;
     288             :         auto objects = request_pair.second;
     289             : 
     290       16959 :         if (req->test()) // See if the receive has completed
     291             :         {
     292       16959 :           req->wait(); // MUST call wait() to do post_wait_work which actually fills the object
     293             :                        // buffer
     294             : 
     295       16959 :           _buffers_received++;
     296       16959 :           _objects_received += objects->size();
     297             : 
     298       16959 :           if (_buffer.capacity() < _buffer.size() + objects->size())
     299             :             _buffer.setCapacity(_buffer.size() + objects->size());
     300             : 
     301     1276285 :           for (auto & object : *objects)
     302     1259326 :             _buffer.move(object);
     303             : 
     304       16959 :           objects->clear();
     305             : 
     306       16959 :           return true;
     307             :         }
     308             :         else
     309             :           return false;
     310             :       });
     311     4417935 : }

Generated by: LCOV version 1.14