My Project
p2pcommunicator.hh
1 /*
2  Copyright 2015 IRIS AS
3 
4  This file is part of the Open Porous Media project (OPM).
5 
6  OPM is free software: you can redistribute it and/or modify
7  it under the terms of the GNU General Public License as published by
8  the Free Software Foundation, either version 3 of the License, or
9  (at your option) any later version.
10 
11  OPM is distributed in the hope that it will be useful,
12  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  GNU General Public License for more details.
15 
16  You should have received a copy of the GNU General Public License
17  along with OPM. If not, see <http://www.gnu.org/licenses/>.
18 */
19 #ifndef DUNE_COMMUNICATOR_HEADER_INCLUDED
20 #define DUNE_COMMUNICATOR_HEADER_INCLUDED
21 
22 #include <cassert>
23 #include <algorithm>
24 #include <vector>
25 #include <set>
26 #include <map>
27 
28 #include <dune/common/version.hh>
29 
30 #include <dune/common/parallel/mpihelper.hh>
31 #if DUNE_VERSION_NEWER(DUNE_COMMON, 2, 7)
32 #include <dune/common/parallel/communication.hh>
33 #else
34 #include <dune/common/parallel/collectivecommunication.hh>
35 #endif
36 
37 // the following implementation is only available in case MPI is available
38 #if HAVE_MPI
39 #if DUNE_VERSION_NEWER(DUNE_COMMON, 2, 7)
40 #include <dune/common/parallel/mpicommunication.hh>
41 #else
42 #include <dune/common/parallel/mpicollectivecommunication.hh>
43 #endif
44 #endif
45 
46 
47 namespace Dune
48 {
50  {
51  typedef std::vector< char > BufferType;
52 
53  mutable BufferType buffer_;
54  const double factor_;
55  mutable size_t pos_;
56 public:
59  SimpleMessageBuffer( const double factor = 1.1 )
60  : buffer_(), factor_( factor )
61  {
63  }
64 
66  void clear() { buffer_.clear(); resetReadPosition(); }
68  void resetReadPosition() { pos_ = 0 ; }
70  size_t size() const { return buffer_.size(); }
71 
73  void reserve( const size_t size )
74  {
75  buffer_.reserve( size );
76  }
77 
79  void resize( const size_t size )
80  {
81  buffer_.resize( size );
82  }
83 
85  template <class T>
86  void write( const T& value )
87  {
88  // union to access bytes in value
89  const size_t tsize = sizeof( T );
90  size_t pos = buffer_.size();
91  const size_t sizeNeeded = pos + tsize ;
92  // reserve with some 10% overestimation
93  if( buffer_.capacity() < sizeNeeded )
94  {
95  reserve( size_t(factor_ * sizeNeeded) ) ;
96  }
97  // resize to size need to store value
98  buffer_.resize( sizeNeeded );
99  // copy value to buffer
100  std::copy_n( reinterpret_cast<const char *> (&value), tsize, buffer_.data()+pos );
101  }
102 
103  void write( const std::string& str)
104  {
105  int size = str.size();
106  write(size);
107  for (int k = 0; k < size; ++k) {
108  write(str[k]);
109  }
110  }
111 
113  template <class T>
114  void read( T& value ) const
115  {
116  // read bytes from stream and store in value
117  const size_t tsize = sizeof( T );
118  assert( pos_ + tsize <= buffer_.size() );
119  std::copy_n( buffer_.data()+pos_, tsize, reinterpret_cast<char *> (&value) );
120  pos_ += tsize;
121  }
122 
123  void read( std::string& str) const
124  {
125  int size = 0;
126  read(size);
127  str.resize(size);
128  for (int k = 0; k < size; ++k) {
129  read(str[k]);
130  }
131  }
132 
134  std::pair< char* , int > buffer() const
135  {
136  return std::make_pair( buffer_.data(), int(buffer_.size()) );
137  }
138  };
139 
141  template < class MsgBuffer >
142  class Point2PointCommunicator : public CollectiveCommunication< MPIHelper::MPICommunicator >
143  {
144  public:
146  typedef MPIHelper::MPICommunicator MPICommunicator ;
147 
149  typedef MsgBuffer MessageBufferType ;
150 
151  protected:
152  typedef CollectiveCommunication< MPICommunicator > BaseType;
154 
155  // starting message tag
156  static const int messagetag = 234;
157 
158  typedef std::map< int, int > linkage_t;
159  typedef std::vector< int > vector_t;
160 
161  linkage_t sendLinkage_ ;
162  linkage_t recvLinkage_ ;
163 
164  vector_t sendDest_ ;
165  vector_t recvSource_ ;
166 
167  mutable vector_t _recvBufferSizes;
168  mutable bool _recvBufferSizesComputed;
169 
170  public :
171  using BaseType :: rank;
172  using BaseType :: size;
173 
174  /* \brief data handle interface that needs to be implemented for use with some of
175  * the exchange methods */
177  {
178  protected:
179  DataHandleInterface () {}
180  public:
181  virtual ~DataHandleInterface () {}
182  virtual void pack( const int link, MessageBufferType& os ) = 0 ;
183  virtual void unpack( const int link, MessageBufferType& os ) = 0 ;
184  // should contain work that could be done between send and receive
185  virtual void localComputation () {}
186  };
187 
188  public:
190  Point2PointCommunicator( const MPICommunicator& mpiComm = MPIHelper::getCommunicator() )
191  : BaseType( mpiComm ) { removeLinkage(); }
192 
194  Point2PointCommunicator( const BaseType& comm ) : BaseType( comm ) { removeLinkage(); }
195 
196 
198  inline void insertRequest( const std::set< int >& sendLinks, const std::set< int >& recvLinks );
199 
201  inline int sendLinks () const { return sendLinkage_.size(); }
202 
204  inline int recvLinks () const { return recvLinkage_.size(); }
205 
207  const vector_t& recvBufferSizes() const { return _recvBufferSizes; }
208 
210  inline int sendLink (const int rank) const
211  {
212  assert (sendLinkage_.end () != sendLinkage_.find (rank)) ;
213  return (* sendLinkage_.find (rank)).second ;
214  }
215 
217  inline int recvLink (const int rank) const
218  {
219  assert (recvLinkage_.end () != recvLinkage_.find (rank)) ;
220  return (* recvLinkage_.find (rank)).second ;
221  }
222 
224  const std::vector< int > &sendDest () const { return sendDest_; }
226  const std::vector< int > &recvSource () const { return recvSource_; }
227 
229  inline void removeLinkage () ;
230 
232  virtual std::vector< MessageBufferType > exchange (const std::vector< MessageBufferType > &) const;
233 
235  virtual void exchange ( DataHandleInterface& ) const;
236 
240  virtual void exchangeCached ( DataHandleInterface& ) const;
241 
242  protected:
243  inline void computeDestinations( const linkage_t& linkage, vector_t& dest );
244 
245  // return new tag number for the exchange messages
246  static int getMessageTag( const unsigned int increment )
247  {
248  static int tag = messagetag + 2 ;
249  // increase tag counter
250  const int retTag = tag;
251  tag += increment ;
252  // the MPI standard guaratees only up to 2^15-1
253  if( tag >= 32767 )
254  {
255  // reset tag to initial value
256  tag = messagetag + 2 ;
257  }
258  return retTag;
259  }
260 
261  // return new tag number for the exchange messages
262  static int getMessageTag()
263  {
264  return getMessageTag( 1 );
265  }
266  };
267 
268 } // namespace Dune
269 
270 // include inline implementation
271 #include "p2pcommunicator_impl.hh"
272 
273 #endif // #ifndef DUNE_COMMUNICATOR_HEADER_INCLUDED
Definition: p2pcommunicator.hh:177
Point-2-Point communicator for exchange messages between processes.
Definition: p2pcommunicator.hh:143
MsgBuffer MessageBufferType
type of message buffer used
Definition: p2pcommunicator.hh:149
int recvLinks() const
return number of processes we will receive data from
Definition: p2pcommunicator.hh:204
int recvLink(const int rank) const
return recv link number for a given recv rank number
Definition: p2pcommunicator.hh:217
MPIHelper::MPICommunicator MPICommunicator
type of MPI communicator, either MPI_Comm or NoComm as defined in MPIHelper
Definition: p2pcommunicator.hh:146
int sendLink(const int rank) const
return send link number for a given send rank number
Definition: p2pcommunicator.hh:210
virtual void exchangeCached(DataHandleInterface &) const
exchange data with peers, handle defines pack and unpack of data, if receive buffers are known from p...
Definition: p2pcommunicator_impl.hh:619
int sendLinks() const
return number of processes we will send data to
Definition: p2pcommunicator.hh:201
virtual std::vector< MessageBufferType > exchange(const std::vector< MessageBufferType > &) const
exchange message buffers with peers defined by inserted linkage
Definition: p2pcommunicator_impl.hh:599
Point2PointCommunicator(const MPICommunicator &mpiComm=MPIHelper::getCommunicator())
constructor taking mpi communicator
Definition: p2pcommunicator.hh:190
const std::vector< int > & recvSource() const
return vector containing all process numbers we will receive from
Definition: p2pcommunicator.hh:226
const std::vector< int > & sendDest() const
return vector containing all process numbers we will send to
Definition: p2pcommunicator.hh:224
void insertRequest(const std::set< int > &sendLinks, const std::set< int > &recvLinks)
insert communication request with a set os ranks to send to and a set of ranks to receive from
Definition: p2pcommunicator_impl.hh:59
void removeLinkage()
remove stored linkage
Definition: p2pcommunicator_impl.hh:30
const vector_t & recvBufferSizes() const
return vector containing possible recv buffer sizes
Definition: p2pcommunicator.hh:207
Point2PointCommunicator(const BaseType &comm)
constructor taking collective communication
Definition: p2pcommunicator.hh:194
Definition: p2pcommunicator.hh:50
void resetReadPosition()
reset read position of buffer to beginning
Definition: p2pcommunicator.hh:68
size_t size() const
return size of buffer
Definition: p2pcommunicator.hh:70
void clear()
clear the buffer
Definition: p2pcommunicator.hh:66
void reserve(const size_t size)
reserve memory for 'size' entries
Definition: p2pcommunicator.hh:73
void resize(const size_t size)
resize buffer to 'size' entries
Definition: p2pcommunicator.hh:79
SimpleMessageBuffer(const double factor=1.1)
constructor taking memory reserve estimation factor (default is 1.1, i.e.
Definition: p2pcommunicator.hh:59
void read(T &value) const
read value from buffer, value must implement the operator= correctly (i.e.
Definition: p2pcommunicator.hh:114
void write(const T &value)
write value to buffer, value must implement the operator= correctly (i.e.
Definition: p2pcommunicator.hh:86
std::pair< char *, int > buffer() const
return pointer to buffer and size for use with MPI functions
Definition: p2pcommunicator.hh:134
Copyright 2019 Equinor AS.
Definition: CartesianIndexMapper.hpp:10