我已经实现了一些基于模块的 Active Object 设计模式.这是非常简单的实现.我有调度程序、激活列表、请求和期货来获得响应.我的要求是这样的:
I have implemented some module based Active Object design pattern. It is very simple implementation. I have Scheduler, ActivationList, Requests and Futures to get response. My requirements were like that:
好的,现在问题:是否可以使用 boost::asio 并满足我的所有要求?我的实现正在运行,但我想使用一些可能比我这样做更好的方式实现的东西.我也想知道它的未来,不要再次重新发明轮子".
OK now question: Is it possible to use boost::asio and fulfill all my requirements? My implementation is working but I would like to use something what is probably implemented in much better way than I have done this. Also I would like to know it for the future and do not "reinvent the wheel" once again.
Boost.Asio 可用于包含 Active Object:将方法执行与方法调用分离.需要在更高级别处理其他要求,但将 Boost.Asio 与其他 Boost 库结合使用时不会过于复杂.
Boost.Asio can be used to encompass the intention of Active Object: decouple method execution from method invocation. Additional requirements will need to be handled at a higher-level, but it is not overly complex when using Boost.Asio in conjunction with other Boost libraries.
Scheduler
可以使用:
boost::thread代码>用于线程抽象.
boost::thread_group代码>来管理线程的生命周期.
boost::asio::io_service
提供线程池.可能想要使用 boost::asio::io_service::work
在没有待处理的工作时保持线程处于活动状态.ActivationList
可以实现为:
insert()
,为具有相同优先级的请求保留插入顺序.std::multiset
或 std::multimap
.但是,在 C++03 中,对于具有相同键(优先级)的请求的顺序没有指定.Request
不需要保护方法,则可以使用std::priority_queue
.insert()
, the insertion order is preserved for request with the same priority.std::multiset
or std::multimap
can be used. However, it is unspecified in C++03 as to the order of request with the same key (priority).Request
do not need an guard method, then std::priority_queue
could be used.Request
可以是未指定的类型:
Request
could be an unspecified type:
boost::function
和 boost::bind
可用于提供类型擦除,同时绑定到可调用类型而不引入 Request
层次结构.boost::function
and boost::bind
could be used to provide a type-erasure, while binding to callable types without introducing a Request
hierarchy.Futures
可以使用 Boost.Thread 的 期货 支持.
Futures
could use Boost.Thread's Futures support.
Request
已添加到 ActivationList
,future.valid()
将返回 true.future.wait()
将阻止等待结果可用.future.get()
会阻塞等待结果.future
做任何事情,那么调用者将不会被阻止.Request
的异常将被传递到 Future
.future.valid()
will return true if Request
has been added to ActivationList
.future.wait()
will block waiting for a result to become available.future.get()
will block waiting for the result.future
, then caller will not be blocked.Request
will be passed to the Future
.这是一个利用各种 Boost 库的完整示例,应满足要求:
Here is a complete example leveraging various Boost libraries and should meet the requirements:
// Standard includes
#include <algorithm> // std::find_if
#include <iostream>
#include <string>
// 3rd party includes
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/make_shared.hpp>
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/utility/result_of.hpp>
/// @brief scheduler that provides limits with prioritized jobs.
template <typename Priority,
typename Compare = std::less<Priority> >
class scheduler
{
public:
typedef Priority priority_type;
private:
/// @brief method_request is used to couple the guard and call
/// functions for a given method.
struct method_request
{
typedef boost::function<bool()> ready_func_type;
typedef boost::function<void()> run_func_type;
template <typename ReadyFunctor,
typename RunFunctor>
method_request(ReadyFunctor ready,
RunFunctor run)
: ready(ready),
run(run)
{}
ready_func_type ready;
run_func_type run;
};
/// @brief Pair type used to associate a request with its priority.
typedef std::pair<priority_type,
boost::shared_ptr<method_request> > pair_type;
static bool is_method_ready(const pair_type& pair)
{
return pair.second->ready();
}
public:
/// @brief Construct scheduler.
///
/// @param max_threads Maximum amount of concurrent task.
/// @param max_request Maximum amount of request.
scheduler(std::size_t max_threads,
std::size_t max_request)
: work_(io_service_),
max_request_(max_request),
request_count_(0)
{
// Spawn threads, dedicating them to the io_service.
for (std::size_t i = 0; i < max_threads; ++i)
threads_.create_thread(
boost::bind(&boost::asio::io_service::run, &io_service_));
}
/// @brief Destructor.
~scheduler()
{
// Release threads from the io_service.
io_service_.stop();
// Cleanup.
threads_.join_all();
}
/// @brief Insert a method request into the scheduler.
///
/// @param priority Priority of job.
/// @param ready_func Invoked to check if method is ready to run.
/// @param run_func Invoked when ready to run.
///
/// @return future associated with the method.
template <typename ReadyFunctor,
typename RunFunctor>
boost::unique_future<typename boost::result_of<RunFunctor()>::type>
insert(priority_type priority,
const ReadyFunctor& ready_func,
const RunFunctor& run_func)
{
typedef typename boost::result_of<RunFunctor()>::type result_type;
typedef boost::unique_future<result_type> future_type;
boost::unique_lock<mutex_type> lock(mutex_);
// If max request has been reached, then return an invalid future.
if (max_request_ &&
(request_count_ == max_request_))
return future_type();
++request_count_;
// Use a packaged task to handle populating promise and future.
typedef boost::packaged_task<result_type> task_type;
// Bind does not work with rvalue, and packaged_task is only moveable,
// so allocate a shared pointer.
boost::shared_ptr<task_type> task =
boost::make_shared<task_type>(run_func);
// Create method request.
boost::shared_ptr<method_request> request =
boost::make_shared<method_request>(
ready_func,
boost::bind(&task_type::operator(), task));
// Insert into priority. Hint to inserting as close to the end as
// possible to preserve insertion order for request with same priority.
activation_list_.insert(activation_list_.end(),
pair_type(priority, request));
// There is now an outstanding request, so post to dispatch.
io_service_.post(boost::bind(&scheduler::dispatch, this));
return task->get_future();
}
/// @brief Insert a method request into the scheduler.
///
/// @param ready_func Invoked to check if method is ready to run.
/// @param run_func Invoked when ready to run.
///
/// @return future associated with the method.
template <typename ReadyFunctor,
typename RunFunctor>
boost::unique_future<typename boost::result_of<RunFunctor()>::type>
insert(const ReadyFunctor& ready_func,
const RunFunctor& run_func)
{
return insert(priority_type(), ready_func, run_func);
}
/// @brief Insert a method request into the scheduler.
///
/// @param priority Priority of job.
/// @param run_func Invoked when ready to run.
///
/// @return future associated with the method.
template <typename RunFunctor>
boost::unique_future<typename boost::result_of<RunFunctor()>::type>
insert(priority_type priority,
const RunFunctor& run_func)
{
return insert(priority, &always_ready, run_func);
}
/// @brief Insert a method request with default priority into the
/// scheduler.
///
/// @param run_func Invoked when ready to run.
///
/// @param functor Job to run.
///
/// @return future associated with the job.
template <typename RunFunc>
boost::unique_future<typename boost::result_of<RunFunc()>::type>
insert(const RunFunc& run_func)
{
return insert(&always_ready, run_func);
}
/// @brief Cancel all outstanding request.
void cancel()
{
boost::unique_lock<mutex_type> lock(mutex_);
activation_list_.clear();
request_count_ = 0;
}
private:
/// @brief Dispatch a request.
void dispatch()
{
// Get the current highest priority request ready to run from the queue.
boost::unique_lock<mutex_type> lock(mutex_);
if (activation_list_.empty()) return;
// Find the highest priority method ready to run.
typedef typename activation_list_type::iterator iterator;
iterator end = activation_list_.end();
iterator result = std::find_if(
activation_list_.begin(), end, &is_method_ready);
// If no methods are ready, then post into dispatch, as the
// method may have become ready.
if (end == result)
{
io_service_.post(boost::bind(&scheduler::dispatch, this));
return;
}
// Take ownership of request.
boost::shared_ptr<method_request> method = result->second;
activation_list_.erase(result);
// Run method without mutex.
lock.unlock();
method->run();
lock.lock();
// Perform bookkeeping.
--request_count_;
}
static bool always_ready() { return true; }
private:
/// @brief List of outstanding request.
typedef boost::multi_index_container<
pair_type,
boost::multi_index::indexed_by<
boost::multi_index::ordered_non_unique<
boost::multi_index::member<pair_type,
typename pair_type::first_type,
&pair_type::first>,
Compare
>
>
> activation_list_type;
activation_list_type activation_list_;
/// @brief Thread group managing threads servicing pool.
boost::thread_group threads_;
/// @brief io_service used to function as a thread pool.
boost::asio::io_service io_service_;
/// @brief Work is used to keep threads servicing io_service.
boost::asio::io_service::work work_;
/// @brief Maximum amount of request.
const std::size_t max_request_;
/// @brief Count of outstanding request.
std::size_t request_count_;
/// @brief Synchronize access to the activation list.
typedef boost::mutex mutex_type;
mutex_type mutex_;
};
typedef scheduler<unsigned int,
std::greater<unsigned int> > high_priority_scheduler;
/// @brief adder is a simple proxy that will delegate work to
/// the scheduler.
class adder
{
public:
adder(high_priority_scheduler& scheduler)
: scheduler_(scheduler)
{}
/// @brief Add a and b with a priority.
///
/// @return Return future result.
template <typename T>
boost::unique_future<T> add(
high_priority_scheduler::priority_type priority,
const T& a, const T& b)
{
// Insert method request
return scheduler_.insert(
priority,
boost::bind(&adder::do_add<T>, a, b));
}
/// @brief Add a and b.
///
/// @return Return future result.
template <typename T>
boost::unique_future<T> add(const T& a, const T& b)
{
return add(high_priority_scheduler::priority_type(), a, b);
}
private:
/// @brief Actual add a and b.
template <typename T>
static T do_add(const T& a, const T& b)
{
std::cout << "Starting addition of '" << a
<< "' and '" << b << "'" << std::endl;
// Mimic busy work.
boost::this_thread::sleep_for(boost::chrono::seconds(2));
std::cout << "Finished addition" << std::endl;
return a + b;
}
private:
high_priority_scheduler& scheduler_;
};
bool get(bool& value) { return value; }
void guarded_call()
{
std::cout << "guarded_call" << std::endl;
}
int main()
{
const unsigned int max_threads = 1;
const unsigned int max_request = 4;
// Sscheduler
high_priority_scheduler scheduler(max_threads, max_request);
// Proxy
adder adder(scheduler);
// Client
// Add guarded method to scheduler.
bool ready = false;
std::cout << "Add guarded method." << std::endl;
boost::unique_future<void> future1 = scheduler.insert(
boost::bind(&get, boost::ref(ready)),
&guarded_call);
// Add 1 + 100 with default priority.
boost::unique_future<int> future2 = adder.add(1, 100);
// Force sleep to try to get scheduler to run request 2 first.
boost::this_thread::sleep_for(boost::chrono::seconds(1));
// Add:
// 2 + 200 with low priority (5)
// "test" + "this" with high priority (99)
boost::unique_future<int> future3 = adder.add(5, 2, 200);
boost::unique_future<std::string> future4 = adder.add(99,
std::string("test"), std::string("this"));
// Max request should have been reached, so add another.
boost::unique_future<int> future5 = adder.add(3, 300);
// Check if request was added.
std::cout << "future1 is valid: " << future1.valid()
<< "
future2 is valid: " << future2.valid()
<< "
future3 is valid: " << future3.valid()
<< "
future4 is valid: " << future4.valid()
<< "
future5 is valid: " << future5.valid()
<< std::endl;
// Get results for future2 and future3. Do nothing with future4's results.
std::cout << "future2 result: " << future2.get()
<< "
future3 result: " << future3.get()
<< std::endl;
std::cout << "Unguarding method." << std::endl;
ready = true;
future1.wait();
}
执行使用线程池为 1,最多 4 个请求.
The execution uses thread pool of 1 with a max of 4 request.
输出如下:
Add guarded method.
Starting addition of '1' and '100'
future1 is valid: 1
future2 is valid: 1
future3 is valid: 1
future4 is valid: 1
future5 is valid: 0
Finished addition
Starting addition of 'test' and 'this'
Finished addition
Starting addition of '2' and '200'
Finished addition
future2 result: 101
future3 result: 202
Unguarding method.
guarded_call
这篇关于boost::asio 和活动对象的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持html5模板网!