xrootd
XrdClParallelOperation.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2017 by European Organization for Nuclear Research (CERN)
3 // Author: Krzysztof Jamrog <krzysztof.piotr.jamrog@cern.ch>,
4 // Michal Simon <michal.simon@cern.ch>
5 //------------------------------------------------------------------------------
6 // This file is part of the XRootD software suite.
7 //
8 // XRootD is free software: you can redistribute it and/or modify
9 // it under the terms of the GNU Lesser General Public License as published by
10 // the Free Software Foundation, either version 3 of the License, or
11 // (at your option) any later version.
12 //
13 // XRootD is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 // GNU General Public License for more details.
17 //
18 // You should have received a copy of the GNU Lesser General Public License
19 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
20 //
21 // In applying this licence, CERN does not waive the privileges and immunities
22 // granted to it by virtue of its status as an Intergovernmental Organization
23 // or submit itself to any jurisdiction.
24 //------------------------------------------------------------------------------
25 
26 #ifndef __XRD_CL_PARALLELOPERATION_HH__
27 #define __XRD_CL_PARALLELOPERATION_HH__
28 
29 #include "XrdCl/XrdClOperations.hh"
31 #include "XrdCl/XrdClDefaultEnv.hh"
32 #include "XrdCl/XrdClPostMaster.hh"
33 #include "XrdCl/XrdClJobManager.hh"
34 
35 #include <atomic>
36 #include <condition_variable>
37 #include <mutex>
38 
39 namespace XrdCl
40 {
41 
42  //----------------------------------------------------------------------------
43  // Interface for different execution policies:
44  // - all : all operations need to succeed in order for the parallel
45  // operation to be successful
46  // - any : just one of the operations needs to succeed in order for
47  // the parallel operation to be successful
48  // - some : n (user defined) operations need to succeed in order for
49  // the parallel operation to be successful
50  // - at least : at least n (user defined) operations need to succeed in
51  // order for the parallel operation to be successful (the
52  // user handler will be called only when all operations are
53  // resolved)
54  //
55  // @param status : status returned by one of the aggregated operations
56  //
57  // @return : true if the status should be passed to the user handler,
58  // false otherwise.
59  //----------------------------------------------------------------------------
61  {
62  virtual ~PolicyExecutor()
63  {
64  }
65 
66  virtual bool Examine( const XrdCl::XRootDStatus &status ) = 0;
67 
68  virtual XRootDStatus Result() = 0;
69  };
70 
71  //----------------------------------------------------------------------------
77  //----------------------------------------------------------------------------
78  template<bool HasHndl>
79  class ParallelOperation: public ConcreteOperation<ParallelOperation, HasHndl, Resp<void>>
80  {
81  template<bool> friend class ParallelOperation;
82 
83  public:
84 
85  //------------------------------------------------------------------------
87  //------------------------------------------------------------------------
88  template<bool from>
90  ConcreteOperation<ParallelOperation, HasHndl, Resp<void>>( std::move( obj ) ),
91  pipelines( std::move( obj.pipelines ) ),
92  policy( std::move( obj.policy ) )
93  {
94  }
95 
96  //------------------------------------------------------------------------
102  //------------------------------------------------------------------------
103  template<class Container>
104  ParallelOperation( Container &&container )
105  {
106  static_assert( !HasHndl, "Constructor is available only operation without handler");
107 
108  pipelines.reserve( container.size() );
109  auto begin = std::make_move_iterator( container.begin() );
110  auto end = std::make_move_iterator( container.end() );
111  std::copy( begin, end, std::back_inserter( pipelines ) );
112  container.clear(); // there's junk inside so we clear it
113  }
114 
116  {
117  }
118 
119  //------------------------------------------------------------------------
121  //------------------------------------------------------------------------
122  std::string ToString()
123  {
124  std::ostringstream oss;
125  oss << "Parallel(";
126  for( size_t i = 0; i < pipelines.size(); i++ )
127  {
128  oss << pipelines[i]->ToString();
129  if( i + 1 != pipelines.size() )
130  {
131  oss << " && ";
132  }
133  }
134  oss << ")";
135  return oss.str();
136  }
137 
138  //------------------------------------------------------------------------
143  //------------------------------------------------------------------------
145  {
146  policy.reset( new AllPolicy() );
147  return std::move( *this );
148  }
149 
150  //------------------------------------------------------------------------
155  //------------------------------------------------------------------------
157  {
158  policy.reset( new AnyPolicy( pipelines.size() ) );
159  return std::move( *this );
160  }
161 
162  //------------------------------------------------------------------------
163  // Set policy to `Some`
167  //------------------------------------------------------------------------
168  ParallelOperation<HasHndl> Some( size_t threshold )
169  {
170  policy.reset( new SomePolicy( pipelines.size(), threshold ) );
171  return std::move( *this );
172  }
173 
174  //------------------------------------------------------------------------
180  //------------------------------------------------------------------------
182  {
183  policy.reset( new AtLeastPolicy( pipelines.size(), threshold ) );
184  return std::move( *this );
185  }
186 
187  private:
188 
189  //------------------------------------------------------------------------
194  //------------------------------------------------------------------------
195  struct AllPolicy : public PolicyExecutor
196  {
197  bool Examine( const XrdCl::XRootDStatus &status )
198  {
199  // keep the status in case this is the final result
200  res = status;
201  if( status.IsOK() ) return false;
202  // we require all request to succeed
203  return true;
204  }
205 
207  {
208  return res;
209  }
210 
212  };
213 
214  //------------------------------------------------------------------------
219  //------------------------------------------------------------------------
220  struct AnyPolicy : public PolicyExecutor
221  {
222  AnyPolicy( size_t size) : cnt( size )
223  {
224  }
225 
226  bool Examine( const XrdCl::XRootDStatus &status )
227  {
228  // keep the status in case this is the final result
229  res = status;
230  // decrement the counter
231  size_t nb = cnt.fetch_sub( 1, std::memory_order_relaxed );
232  // we require just one operation to be successful
233  if( status.IsOK() ) return true;
234  // lets see if this is the last one?
235  if( nb == 1 ) return true;
236  // we still have a chance there will be one that is successful
237  return false;
238  }
239 
241  {
242  return res;
243  }
244 
245  private:
246  std::atomic<size_t> cnt;
248  };
249 
250  //------------------------------------------------------------------------
255  //------------------------------------------------------------------------
257  {
258  SomePolicy( size_t size, size_t threshold ) : failed( 0 ), succeeded( 0 ),
260  {
261  }
262 
263  bool Examine( const XrdCl::XRootDStatus &status )
264  {
265  // keep the status in case this is the final result
266  res = status;
267  if( status.IsOK() )
268  {
269  size_t s = succeeded.fetch_add( 1, std::memory_order_relaxed );
270  if( s + 1 == threshold ) return true; // we reached the threshold
271  // we are not yet there
272  return false;
273  }
274  size_t f = failed.fetch_add( 1, std::memory_order_relaxed );
275  // did we drop below the threshold
276  if( f == size - threshold ) return true;
277  // we still have a chance there will be enough of successful operations
278  return false;
279  }
280 
282  {
283  return res;
284  }
285 
286  private:
287  std::atomic<size_t> failed;
288  std::atomic<size_t> succeeded;
289  const size_t threshold;
290  const size_t size;
292  };
293 
294  //------------------------------------------------------------------------
300  //------------------------------------------------------------------------
302  {
303  AtLeastPolicy( size_t size, size_t threshold ) : pending_cnt( size ),
304  failed_cnt( 0 ),
305  failed_threshold( size - threshold )
306  {
307  }
308 
309  bool Examine( const XrdCl::XRootDStatus &status )
310  {
311  // update number of pending operations
312  size_t pending = pending_cnt.fetch_sub( 1, std::memory_order_relaxed ) - 1;
313  // although we might have the minimum to succeed we wait for the rest
314  if( status.IsOK() ) return ( pending == 0 );
315  size_t nb = failed_cnt.fetch_add( 1, std::memory_order_relaxed );
316  if( nb == failed_threshold ) res = status; // we dropped below the threshold
317  // if we still have to wait for pending operations return false,
318  // otherwise all is done, return true
319  return ( pending == 0 );
320  }
321 
323  {
324  return res;
325  }
326 
327  private:
328  std::atomic<size_t> pending_cnt;
329  std::atomic<size_t> failed_cnt;
330  const size_t failed_threshold;
332  };
333 
334  //------------------------------------------------------------------------
336  //------------------------------------------------------------------------
337  struct barrier_t
338  {
339  barrier_t() : on( true ) { }
340 
341  void wait()
342  {
343  std::unique_lock<std::mutex> lck( mtx );
344  if( on ) cv.wait( lck );
345  }
346 
347  void lift()
348  {
349  on = false;
350  cv.notify_all();
351  }
352 
353  private:
354  std::condition_variable cv;
355  std::mutex mtx;
356  bool on;
357  };
358 
359  //------------------------------------------------------------------------
364  //------------------------------------------------------------------------
365  struct Ctx
366  {
367  //----------------------------------------------------------------------
371  //----------------------------------------------------------------------
373  policy( policy )
374  {
375  }
376 
377  //----------------------------------------------------------------------
379  //----------------------------------------------------------------------
381  {
382  Handle( XRootDStatus() );
383  }
384 
385  //----------------------------------------------------------------------
390  //----------------------------------------------------------------------
391  inline void Examine( const XRootDStatus &st )
392  {
393  if( policy->Examine( st ) )
394  Handle( policy->Result() );
395  }
396 
397  //----------------------------------------------------------------------
402  //---------------------------------------------------------------------
403  inline void Handle( const XRootDStatus &st )
404  {
405  PipelineHandler* hdlr = handler.exchange( nullptr, std::memory_order_relaxed );
406  if( hdlr )
407  {
408  barrier.wait();
409  hdlr->HandleResponse( new XRootDStatus( st ), nullptr );
410  }
411  }
412 
413  //----------------------------------------------------------------------
415  //----------------------------------------------------------------------
416  std::atomic<PipelineHandler*> handler;
417 
418  //----------------------------------------------------------------------
420  //----------------------------------------------------------------------
421  std::unique_ptr<PolicyExecutor> policy;
422 
423  //----------------------------------------------------------------------
426  //----------------------------------------------------------------------
428  };
429 
430  //------------------------------------------------------------------------
432  //------------------------------------------------------------------------
433  struct PipelineEnd : public Job
434  {
435  //----------------------------------------------------------------------
436  // Constructor
437  //----------------------------------------------------------------------
438  PipelineEnd( std::shared_ptr<Ctx> &ctx,
439  const XrdCl::XRootDStatus &st ) : ctx( ctx ), st( st )
440  {
441  }
442 
443  //----------------------------------------------------------------------
444  // Run Ctx::Examine in the thread-pool
445  //----------------------------------------------------------------------
446  void Run( void* )
447  {
448  ctx->Examine( st );
449  delete this;
450  }
451 
452  private:
453  std::shared_ptr<Ctx> ctx; //< ParallelOperaion context
454  XrdCl::XRootDStatus st; //< final status of the ParallelOperation
455  };
456 
457  //------------------------------------------------------------------------
459  //------------------------------------------------------------------------
460  inline static
461  void Schedule( std::shared_ptr<Ctx> &ctx, const XrdCl::XRootDStatus &st)
462  {
464  PipelineEnd *end = new PipelineEnd( ctx, st );
465  mgr->QueueJob( end, nullptr );
466  }
467 
468  //------------------------------------------------------------------------
474  //------------------------------------------------------------------------
475  XRootDStatus RunImpl( PipelineHandler *handler, uint16_t pipelineTimeout )
476  {
477  // make sure we have a valid policy for the parallel operation
478  if( !policy ) policy.reset( new AllPolicy() );
479 
480  std::shared_ptr<Ctx> ctx =
481  std::make_shared<Ctx>( handler, policy.release() );
482 
483  uint16_t timeout = pipelineTimeout < this->timeout ?
484  pipelineTimeout : this->timeout;
485 
486  for( size_t i = 0; i < pipelines.size(); ++i )
487  {
488  if( !pipelines[i] ) continue;
489  pipelines[i].Run( timeout,
490  [ctx]( const XRootDStatus &st ) mutable { Schedule( ctx, st ); } );
491  }
492 
493  ctx->barrier.lift();
494  return XRootDStatus();
495  }
496 
497  std::vector<Pipeline> pipelines;
498  std::unique_ptr<PolicyExecutor> policy;
499  };
500 
501  //----------------------------------------------------------------------------
503  //----------------------------------------------------------------------------
504  template<class Container>
505  inline ParallelOperation<false> Parallel( Container &&container )
506  {
507  return ParallelOperation<false>( container );
508  }
509 
510  //----------------------------------------------------------------------------
512  //----------------------------------------------------------------------------
513  inline void PipesToVec( std::vector<Pipeline>& )
514  {
515  // base case
516  }
517 
518  //----------------------------------------------------------------------------
519  // Declare PipesToVec (we need to do declare those functions ahead of
520  // definitions, as they may call each other.
521  //----------------------------------------------------------------------------
522  template<typename ... Others>
523  inline void PipesToVec( std::vector<Pipeline> &v, Operation<false> &operation,
524  Others&... others );
525 
526  template<typename ... Others>
527  inline void PipesToVec( std::vector<Pipeline> &v, Operation<true> &operation,
528  Others&... others );
529 
530  template<typename ... Others>
531  inline void PipesToVec( std::vector<Pipeline> &v, Pipeline &pipeline,
532  Others&... others );
533 
534  //----------------------------------------------------------------------------
535  // Define PipesToVec
536  //----------------------------------------------------------------------------
537  template<typename ... Others>
538  void PipesToVec( std::vector<Pipeline> &v, Operation<false> &operation,
539  Others&... others )
540  {
541  v.emplace_back( operation );
542  PipesToVec( v, others... );
543  }
544 
545  template<typename ... Others>
546  void PipesToVec( std::vector<Pipeline> &v, Operation<true> &operation,
547  Others&... others )
548  {
549  v.emplace_back( operation );
550  PipesToVec( v, others... );
551  }
552 
553  template<typename ... Others>
554  void PipesToVec( std::vector<Pipeline> &v, Pipeline &pipeline,
555  Others&... others )
556  {
557  v.emplace_back( std::move( pipeline ) );
558  PipesToVec( v, others... );
559  }
560 
561  //----------------------------------------------------------------------------
566  //----------------------------------------------------------------------------
567  template<typename ... Operations>
568  inline ParallelOperation<false> Parallel( Operations&& ... operations )
569  {
570  constexpr size_t size = sizeof...( operations );
571  std::vector<Pipeline> v;
572  v.reserve( size );
573  PipesToVec( v, operations... );
574  return Parallel( v );
575  }
576 }
577 
578 #endif // __XRD_CL_OPERATIONS_HH__
Definition: XrdClOperations.hh:542
uint16_t timeout
Operation timeout.
Definition: XrdClOperations.hh:766
static PostMaster * GetPostMaster()
Get default post master.
A synchronized queue.
Definition: XrdClJobManager.hh:51
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
Definition: XrdClJobManager.hh:92
Interface for a job to be run by the job manager.
Definition: XrdClJobManager.hh:34
Definition: XrdClOperations.hh:179
std::unique_ptr< PipelineHandler > handler
Operation handler.
Definition: XrdClOperations.hh:304
Definition: XrdClParallelOperation.hh:80
std::vector< Pipeline > pipelines
Definition: XrdClParallelOperation.hh:497
ParallelOperation(ParallelOperation< from > &&obj)
Constructor: copy-move a ParallelOperation in different state.
Definition: XrdClParallelOperation.hh:89
ParallelOperation(Container &&container)
Definition: XrdClParallelOperation.hh:104
static void Schedule(std::shared_ptr< Ctx > &ctx, const XrdCl::XRootDStatus &st)
Schedule Ctx::Examine to be executed in the client thread-pool.
Definition: XrdClParallelOperation.hh:461
std::unique_ptr< PolicyExecutor > policy
Definition: XrdClParallelOperation.hh:498
std::string ToString()
Definition: XrdClParallelOperation.hh:122
ParallelOperation< HasHndl > Some(size_t threshold)
Definition: XrdClParallelOperation.hh:168
ParallelOperation< HasHndl > All()
Definition: XrdClParallelOperation.hh:144
XRootDStatus RunImpl(PipelineHandler *handler, uint16_t pipelineTimeout)
Definition: XrdClParallelOperation.hh:475
ParallelOperation< HasHndl > Any()
Definition: XrdClParallelOperation.hh:156
~ParallelOperation()
Definition: XrdClParallelOperation.hh:115
ParallelOperation< HasHndl > AtLeast(size_t threshold)
Definition: XrdClParallelOperation.hh:181
Definition: XrdClOperations.hh:59
void HandleResponse(XRootDStatus *status, AnyObject *response)
Callback function.
Definition: XrdClOperations.hh:320
JobManager * GetJobManager()
Get the job manager object user by the post master.
Request status.
Definition: XrdClXRootDResponses.hh:219
Definition: XrdClAnyObject.hh:26
void PipesToVec(std::vector< Pipeline > &)
Helper function for converting parameter pack into a vector.
Definition: XrdClParallelOperation.hh:513
ParallelOperation< false > Parallel(Container &&container)
Factory function for creating parallel operation from a vector.
Definition: XrdClParallelOperation.hh:505
Definition: XrdOucJson.hh:4517
Definition: XrdClParallelOperation.hh:196
XRootDStatus Result()
Definition: XrdClParallelOperation.hh:206
bool Examine(const XrdCl::XRootDStatus &status)
Definition: XrdClParallelOperation.hh:197
XRootDStatus res
Definition: XrdClParallelOperation.hh:211
Definition: XrdClParallelOperation.hh:221
XRootDStatus Result()
Definition: XrdClParallelOperation.hh:240
std::atomic< size_t > cnt
Definition: XrdClParallelOperation.hh:246
bool Examine(const XrdCl::XRootDStatus &status)
Definition: XrdClParallelOperation.hh:226
XRootDStatus res
Definition: XrdClParallelOperation.hh:247
AnyPolicy(size_t size)
Definition: XrdClParallelOperation.hh:222
Definition: XrdClParallelOperation.hh:302
std::atomic< size_t > failed_cnt
Definition: XrdClParallelOperation.hh:329
std::atomic< size_t > pending_cnt
Definition: XrdClParallelOperation.hh:328
const size_t failed_threshold
Definition: XrdClParallelOperation.hh:330
XRootDStatus res
Definition: XrdClParallelOperation.hh:331
AtLeastPolicy(size_t size, size_t threshold)
Definition: XrdClParallelOperation.hh:303
XRootDStatus Result()
Definition: XrdClParallelOperation.hh:322
bool Examine(const XrdCl::XRootDStatus &status)
Definition: XrdClParallelOperation.hh:309
Definition: XrdClParallelOperation.hh:366
barrier_t barrier
Definition: XrdClParallelOperation.hh:427
void Handle(const XRootDStatus &st)
Definition: XrdClParallelOperation.hh:403
void Examine(const XRootDStatus &st)
Definition: XrdClParallelOperation.hh:391
~Ctx()
Destructor.
Definition: XrdClParallelOperation.hh:380
std::unique_ptr< PolicyExecutor > policy
Policy defining when the user handler should be called.
Definition: XrdClParallelOperation.hh:421
std::atomic< PipelineHandler * > handler
PipelineHandler of the ParallelOperation.
Definition: XrdClParallelOperation.hh:416
Ctx(PipelineHandler *handler, PolicyExecutor *policy)
Definition: XrdClParallelOperation.hh:372
The thread-pool job for schedule Ctx::Examine.
Definition: XrdClParallelOperation.hh:434
std::shared_ptr< Ctx > ctx
Definition: XrdClParallelOperation.hh:453
XrdCl::XRootDStatus st
Definition: XrdClParallelOperation.hh:454
PipelineEnd(std::shared_ptr< Ctx > &ctx, const XrdCl::XRootDStatus &st)
Definition: XrdClParallelOperation.hh:438
void Run(void *)
The job logic.
Definition: XrdClParallelOperation.hh:446
Definition: XrdClParallelOperation.hh:257
SomePolicy(size_t size, size_t threshold)
Definition: XrdClParallelOperation.hh:258
XRootDStatus res
Definition: XrdClParallelOperation.hh:291
const size_t size
Definition: XrdClParallelOperation.hh:290
bool Examine(const XrdCl::XRootDStatus &status)
Definition: XrdClParallelOperation.hh:263
std::atomic< size_t > failed
Definition: XrdClParallelOperation.hh:287
const size_t threshold
Definition: XrdClParallelOperation.hh:289
std::atomic< size_t > succeeded
Definition: XrdClParallelOperation.hh:288
XRootDStatus Result()
Definition: XrdClParallelOperation.hh:281
A wait barrier helper class.
Definition: XrdClParallelOperation.hh:338
void lift()
Definition: XrdClParallelOperation.hh:347
bool on
Definition: XrdClParallelOperation.hh:356
void wait()
Definition: XrdClParallelOperation.hh:341
std::condition_variable cv
Definition: XrdClParallelOperation.hh:354
barrier_t()
Definition: XrdClParallelOperation.hh:339
std::mutex mtx
Definition: XrdClParallelOperation.hh:355
Definition: XrdClParallelOperation.hh:61
virtual ~PolicyExecutor()
Definition: XrdClParallelOperation.hh:62
virtual XRootDStatus Result()=0
virtual bool Examine(const XrdCl::XRootDStatus &status)=0
Definition: XrdClOperationHandlers.hh:624
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:123