OgreWorkQueue.h
Go to the documentation of this file.
1 /*
2 -----------------------------------------------------------------------------
3 This source file is part of OGRE
4 (Object-oriented Graphics Rendering Engine)
5 For the latest info, see http://www.ogre3d.org/
6 
7 Copyright (c) 2000-2013 Torus Knot Software Ltd
8 
9 Permission is hereby granted, free of charge, to any person obtaining a copy
10 of this software and associated documentation files (the "Software"), to deal
11 in the Software without restriction, including without limitation the rights
12 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
13 copies of the Software, and to permit persons to whom the Software is
14 furnished to do so, subject to the following conditions:
15 
16 The above copyright notice and this permission notice shall be included in
17 all copies or substantial portions of the Software.
18 
19 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
24 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
25 THE SOFTWARE.
26 -----------------------------------------------------------------------------
27 */
28 #ifndef __OgreWorkQueue_H__
29 #define __OgreWorkQueue_H__
30 
31 #include "OgrePrerequisites.h"
32 #include "OgreAny.h"
33 #include "OgreSharedPtr.h"
35 #include "OgreHeaderPrefix.h"
36 
37 namespace Ogre
38 {
71  {
72  protected:
76  OGRE_MUTEX(mChannelMapMutex);
77  public:
79  typedef unsigned long long int RequestID;
80 
84  {
85  friend class WorkQueue;
86  protected:
98  mutable bool mAborted;
99 
100  public:
102  Request(uint16 channel, uint16 rtype, const Any& rData, uint8 retry, RequestID rid);
105  void abortRequest() const { mAborted = true; }
107  uint16 getChannel() const { return mChannel; }
109  uint16 getType() const { return mType; }
111  const Any& getData() const { return mData; }
113  uint8 getRetryCount() const { return mRetryCount; }
115  RequestID getID() const { return mID; }
117  bool getAborted() const { return mAborted; }
118  };
119 
123  {
127  bool mSuccess;
132 
133  public:
134  Response(const Request* rq, bool success, const Any& data, const String& msg = StringUtil::BLANK);
137  const Request* getRequest() const { return mRequest; }
139  bool succeeded() const { return mSuccess; }
141  const String& getMessages() const { return mMessages; }
143  const Any& getData() const { return mData; }
145  void abortRequest() { mRequest->abortRequest(); mData.destroy(); }
146  };
147 
162  {
163  public:
165  virtual ~RequestHandler() {}
166 
173  virtual bool canHandleRequest(const Request* req, const WorkQueue* srcQ)
174  { (void)srcQ; return !req->getAborted(); }
175 
186  virtual Response* handleRequest(const Request* req, const WorkQueue* srcQ) = 0;
187  };
188 
197  {
198  public:
200  virtual ~ResponseHandler() {}
201 
208  virtual bool canHandleResponse(const Response* res, const WorkQueue* srcQ)
209  { (void)srcQ; return !res->getRequest()->getAborted(); }
210 
218  virtual void handleResponse(const Response* res, const WorkQueue* srcQ) = 0;
219  };
220 
221  WorkQueue() : mNextChannel(0) {}
222  virtual ~WorkQueue() {}
223 
228  virtual void startup(bool forceRestart = true) = 0;
238  virtual void addRequestHandler(uint16 channel, RequestHandler* rh) = 0;
240  virtual void removeRequestHandler(uint16 channel, RequestHandler* rh) = 0;
241 
251  virtual void addResponseHandler(uint16 channel, ResponseHandler* rh) = 0;
253  virtual void removeResponseHandler(uint16 channel, ResponseHandler* rh) = 0;
254 
272  virtual RequestID addRequest(uint16 channel, uint16 requestType, const Any& rData, uint8 retryCount = 0,
273  bool forceSynchronous = false, bool idleThread = false) = 0;
274 
280  virtual void abortRequest(RequestID id) = 0;
281 
288  virtual void abortRequestsByChannel(uint16 channel) = 0;
289 
296  virtual void abortPendingRequestsByChannel(uint16 channel) = 0;
297 
302  virtual void abortAllRequests() = 0;
303 
309  virtual void setPaused(bool pause) = 0;
311  virtual bool isPaused() const = 0;
312 
317  virtual void setRequestsAccepted(bool accept) = 0;
319  virtual bool getRequestsAccepted() const = 0;
320 
329  virtual void processResponses() = 0;
330 
334  virtual unsigned long getResponseProcessingTimeLimit() const = 0;
335 
341  virtual void setResponseProcessingTimeLimit(unsigned long ms) = 0;
342 
345  virtual void shutdown() = 0;
346 
354  virtual uint16 getChannel(const String& channelName);
355 
356  };
357 
361  {
362  public:
363 
371  const String& getName() const;
375  virtual size_t getWorkerThreadCount() const;
376 
382  virtual void setWorkerThreadCount(size_t c);
383 
393  virtual bool getWorkersCanAccessRenderSystem() const;
394 
395 
407  virtual void setWorkersCanAccessRenderSystem(bool access);
408 
416  virtual void _processNextRequest();
417 
419  virtual void _threadMain() = 0;
420 
422  virtual bool isShuttingDown() const { return mShuttingDown; }
423 
425  virtual void addRequestHandler(uint16 channel, RequestHandler* rh);
427  virtual void removeRequestHandler(uint16 channel, RequestHandler* rh);
429  virtual void addResponseHandler(uint16 channel, ResponseHandler* rh);
431  virtual void removeResponseHandler(uint16 channel, ResponseHandler* rh);
432 
434  virtual RequestID addRequest(uint16 channel, uint16 requestType, const Any& rData, uint8 retryCount = 0,
435  bool forceSynchronous = false, bool idleThread = false);
437  virtual void abortRequest(RequestID id);
439  virtual void abortRequestsByChannel(uint16 channel);
441  virtual void abortPendingRequestsByChannel(uint16 channel);
443  virtual void abortAllRequests();
445  virtual void setPaused(bool pause);
447  virtual bool isPaused() const;
449  virtual void setRequestsAccepted(bool accept);
451  virtual bool getRequestsAccepted() const;
453  virtual void processResponses();
455  virtual unsigned long getResponseProcessingTimeLimit() const { return mResposeTimeLimitMS; }
457  virtual void setResponseProcessingTimeLimit(unsigned long ms) { mResposeTimeLimitMS = ms; }
458  protected:
463  unsigned long mResposeTimeLimitMS;
464 
467  RequestQueue mRequestQueue; // Guarded by mRequestMutex
468  RequestQueue mProcessQueue; // Guarded by mProcessMutex
469  ResponseQueue mResponseQueue; // Guarded by mResponseMutex
470 
473  {
475 
477  : mQueue(q) {}
478 
479  void operator()();
480 
481  void operator()() const;
482 
483  void run();
484  };
485  WorkerFunc* mWorkerFunc;
486 
492  {
493  protected:
494  OGRE_RW_MUTEX(mRWMutex);
496  public:
498  : mHandler(handler) {}
499 
500  // Disconnect the handler to allow it to be destroyed
502  {
503  // write lock - must wait for all requests to finish
504  OGRE_LOCK_RW_MUTEX_WRITE(mRWMutex);
505  mHandler = 0;
506  }
507 
511  RequestHandler* getHandler() { return mHandler; }
512 
516  Response* handleRequest(const Request* req, const WorkQueue* srcQ)
517  {
518  // Read mutex so that multiple requests can be processed by the
519  // same handler in parallel if required
520  OGRE_LOCK_RW_MUTEX_READ(mRWMutex);
521  Response* response = 0;
522  if (mHandler)
523  {
524  if (mHandler->canHandleRequest(req, srcQ))
525  {
526  response = mHandler->handleRequest(req, srcQ);
527  }
528  }
529  return response;
530  }
531 
532  };
533  // Hold these by shared pointer so they can be copied keeping same instance
535 
540 
543  RequestID mRequestCount; // Guarded by mRequestMutex
544  bool mPaused;
547 
548  //NOTE: If you lock multiple mutexes at the same time, the order is important!
549  // For example if threadA locks mIdleMutex first then tries to lock mProcessMutex,
550  // and threadB locks mProcessMutex first, then mIdleMutex. In this case you can get livelock and the system is dead!
551  //RULE: Lock mProcessMutex before other mutex, to prevent livelocks
552  OGRE_MUTEX(mIdleMutex);
553  OGRE_MUTEX(mRequestMutex);
554  OGRE_MUTEX(mProcessMutex);
555  OGRE_MUTEX(mResponseMutex);
556  OGRE_RW_MUTEX(mRequestHandlerMutex);
557 
558 
559  void processRequestResponse(Request* r, bool synchronous);
563  virtual void notifyWorkers() = 0;
565  void addRequestWithRID(RequestID rid, uint16 channel, uint16 requestType, const Any& rData, uint8 retryCount);
566 
567  RequestQueue mIdleRequestQueue; // Guarded by mIdleMutex
568  bool mIdleThreadRunning; // Guarded by mIdleMutex
569  Request* mIdleProcessed; // Guarded by mProcessMutex
570 
571 
573  };
574 
575 
576 
577 
578 
582 }
583 
584 #include "OgreHeaderSuffix.h"
585 
586 #endif
587 
#define _OgreExport
Definition: OgrePlatform.h:257
#define OGRE_LOCK_RW_MUTEX_WRITE(name)
#define OGRE_LOCK_RW_MUTEX_READ(name)
Superclass for all objects that wish to use custom memory allocators when their new / delete operator...
Variant type that can hold Any other type.
Definition: OgreAny.h:57
void destroy()
Definition: OgreAny.h:122
Intermediate structure to hold a pointer to a request handler which provides insurance against the ha...
RequestHandler * getHandler()
Get handler pointer - note, only use this for == comparison or similar, do not attempt to call it as ...
Response * handleRequest(const Request *req, const WorkQueue *srcQ)
Process a request if possible.
Base for a general purpose request / response style background work queue.
virtual void processResponses()
Process the responses in the queue.
virtual void abortAllRequests()
Abort all previously issued requests.
deque< Request * >::type RequestQueue
virtual bool isShuttingDown() const
Returns whether the queue is trying to shut down.
void processResponse(Response *r)
virtual void removeRequestHandler(uint16 channel, RequestHandler *rh)
Remove a request handler.
const String & getName() const
Get the name of the work queue.
virtual void setRequestsAccepted(bool accept)
Set whether to accept new requests or not.
void addRequestWithRID(RequestID rid, uint16 channel, uint16 requestType, const Any &rData, uint8 retryCount)
Put a Request on the queue with a specific RequestID.
deque< Response * >::type ResponseQueue
DefaultWorkQueueBase(const String &name=StringUtil::BLANK)
Constructor.
void processRequestResponse(Request *r, bool synchronous)
SharedPtr< RequestHandlerHolder > RequestHandlerHolderPtr
virtual void _threadMain()=0
Main function for each thread spawned.
virtual void setWorkersCanAccessRenderSystem(bool access)
Set whether worker threads will be allowed to access render system resources.
OGRE_MUTEX(mResponseMutex)
virtual void abortRequest(RequestID id)
Abort a previously issued request.
virtual void setResponseProcessingTimeLimit(unsigned long ms)
Set the time limit imposed on the processing of responses in a single frame, in milliseconds (0 indic...
map< uint16, ResponseHandlerList >::type ResponseHandlerListByChannel
virtual void setWorkerThreadCount(size_t c)
Set the number of worker threads that this queue will start when startup() is called (default 1).
virtual void setPaused(bool pause)
Set whether to pause further processing of any requests.
virtual void addRequestHandler(uint16 channel, RequestHandler *rh)
Add a request handler instance to the queue.
list< ResponseHandler * >::type ResponseHandlerList
list< RequestHandlerHolderPtr >::type RequestHandlerList
virtual unsigned long getResponseProcessingTimeLimit() const
Get the time limit imposed on the processing of responses in a single frame, in milliseconds (0 indic...
Response * processRequest(Request *r)
virtual void addResponseHandler(uint16 channel, ResponseHandler *rh)
Add a response handler instance to the queue.
virtual void notifyWorkers()=0
Notify workers about a new request.
virtual bool isPaused() const
Return whether the queue is paused ie not sending more work to workers.
virtual bool getRequestsAccepted() const
Returns whether requests are being accepted right now.
virtual bool getWorkersCanAccessRenderSystem() const
Get whether worker threads will be allowed to access render system resources.
ResponseHandlerListByChannel mResponseHandlers
virtual size_t getWorkerThreadCount() const
Get the number of worker threads that this queue will start when startup() is called.
virtual void abortPendingRequestsByChannel(uint16 channel)
Abort all previously issued requests in a given channel.
virtual RequestID addRequest(uint16 channel, uint16 requestType, const Any &rData, uint8 retryCount=0, bool forceSynchronous=false, bool idleThread=false)
Add a new request to the queue.
RequestHandlerListByChannel mRequestHandlers
unsigned long mResposeTimeLimitMS
virtual void removeResponseHandler(uint16 channel, ResponseHandler *rh)
Remove a Response handler.
map< uint16, RequestHandlerList >::type RequestHandlerListByChannel
virtual void abortRequestsByChannel(uint16 channel)
Abort all previously issued requests in a given channel.
OGRE_RW_MUTEX(mRequestHandlerMutex)
virtual void _processNextRequest()
Process the next request on the queue.
Reference-counted shared pointer, used for objects where implicit destruction is required.
static const String BLANK
Constant blank string, useful for returning by ref where local does not exist.
Definition: OgreString.h:196
Interface definition for a handler of requests.
virtual bool canHandleRequest(const Request *req, const WorkQueue *srcQ)
Return whether this handler can process a given request.
virtual Response * handleRequest(const Request *req, const WorkQueue *srcQ)=0
The handler method every subclass must implement.
General purpose request structure.
Definition: OgreWorkQueue.h:84
uint16 getChannel() const
Get the request channel (top level categorisation)
const Any & getData() const
Get the user details of this request.
uint8 getRetryCount() const
Get the remaining retry count.
uint16 mType
The request type, as an integer within the channel (user can define enumerations on this)
Definition: OgreWorkQueue.h:90
RequestID mID
Identifier (assigned by the system)
Definition: OgreWorkQueue.h:96
uint16 getType() const
Get the type of this request within the given channel.
bool getAborted() const
Get the abort flag.
RequestID getID() const
Get the identifier of this request.
bool mAborted
Abort Flag.
Definition: OgreWorkQueue.h:98
void abortRequest() const
Set the abort flag.
uint16 mChannel
The request channel, as an integer.
Definition: OgreWorkQueue.h:88
uint8 mRetryCount
Retry count - set this to non-zero to have the request try again on failure.
Definition: OgreWorkQueue.h:94
Any mData
The details of the request (user defined)
Definition: OgreWorkQueue.h:92
Request(uint16 channel, uint16 rtype, const Any &rData, uint8 retry, RequestID rid)
Constructor.
Interface definition for a handler of responses.
virtual bool canHandleResponse(const Response *res, const WorkQueue *srcQ)
Return whether this handler can process a given response.
virtual void handleResponse(const Response *res, const WorkQueue *srcQ)=0
The handler method every subclass must implement.
Interface to a general purpose request / response style background work queue.
Definition: OgreWorkQueue.h:71
unsigned long long int RequestID
Numeric identifier for a request.
Definition: OgreWorkQueue.h:79
virtual void setRequestsAccepted(bool accept)=0
Set whether to accept new requests or not.
virtual ~WorkQueue()
map< String, uint16 >::type ChannelMap
Definition: OgreWorkQueue.h:73
virtual void setPaused(bool pause)=0
Set whether to pause further processing of any requests.
virtual void shutdown()=0
Shut down the queue.
ChannelMap mChannelMap
Definition: OgreWorkQueue.h:74
virtual void abortRequest(RequestID id)=0
Abort a previously issued request.
virtual bool isPaused() const =0
Return whether the queue is paused ie not sending more work to workers.
virtual uint16 getChannel(const String &channelName)
Get a channel ID for a given channel name.
virtual unsigned long getResponseProcessingTimeLimit() const =0
Get the time limit imposed on the processing of responses in a single frame, in milliseconds (0 indic...
virtual void setResponseProcessingTimeLimit(unsigned long ms)=0
Set the time limit imposed on the processing of responses in a single frame, in milliseconds (0 indic...
virtual void addRequestHandler(uint16 channel, RequestHandler *rh)=0
Add a request handler instance to the queue.
virtual void processResponses()=0
Process the responses in the queue.
OGRE_MUTEX(mChannelMapMutex)
virtual void removeRequestHandler(uint16 channel, RequestHandler *rh)=0
Remove a request handler.
virtual void abortAllRequests()=0
Abort all previously issued requests.
virtual void removeResponseHandler(uint16 channel, ResponseHandler *rh)=0
Remove a Response handler.
virtual bool getRequestsAccepted() const =0
Returns whether requests are being accepted right now.
virtual void addResponseHandler(uint16 channel, ResponseHandler *rh)=0
Add a response handler instance to the queue.
virtual void abortRequestsByChannel(uint16 channel)=0
Abort all previously issued requests in a given channel.
virtual RequestID addRequest(uint16 channel, uint16 requestType, const Any &rData, uint8 retryCount=0, bool forceSynchronous=false, bool idleThread=false)=0
Add a new request to the queue.
virtual void startup(bool forceRestart=true)=0
Start up the queue with the options that have been set.
virtual void abortPendingRequestsByChannel(uint16 channel)=0
Abort all previously issued requests in a given channel.
unsigned char uint8
Definition: OgrePlatform.h:361
unsigned short uint16
Definition: OgrePlatform.h:360
_StringBase String
General purpose response structure.
Any mData
Data associated with the result of the process.
Response(const Request *rq, bool success, const Any &data, const String &msg=StringUtil::BLANK)
void abortRequest()
Abort the request.
const Request * getRequest() const
Get the request that this is a response to (NB destruction destroys this)
const String & getMessages() const
Get any diagnostic messages about the process.
const Any & getData() const
Return the response data (user defined, only valid on success)
String mMessages
Any diagnostic messages.
bool mSuccess
Whether the work item succeeded or not.
bool succeeded() const
Return whether this is a successful response.
const Request * mRequest
Pointer to the request that this response is in relation to.
std::list< T, A > type

Copyright © 2012 Torus Knot Software Ltd
Creative Commons License
This work is licensed under a Creative Commons Attribution-ShareAlike 3.0 Unported License.