Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions docs/about-conditional.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,13 @@ public:
static WFConditional *create_conditional(const std::string& cond_name, SubTask *task, void **msgbuf);
static int signal_by_name(const std::string& cond_name, void *msg);
static int signal_by_name(const std::string& cond_name, void *msg, size_t max);
static int signal_by_name(const std::string& cond_name, void *const msg[], size_t max);
template<typename T>
static int signal_by_name(const std::string& cond_name, T *const msg[], size_t max);
};
~~~
我们看到,与普通条件任务唯一区别是,命名条件任务创建时,需要传入一个cond_name。
而signal_by_name()接口,默认将msg发送到所有在这个名称上等待的条件任务,将它们全部唤醒。
也可以通过max参数指定唤醒的最大任务数。此时,msg还可以是一个数组,可给不同的条件任务发送不同的消息。
也可以通过max参数指定唤醒的最大任务数。此时,msg还可以是一个指针数组,可给不同的条件任务发送不同的消息。
任何一个signal_by_name的重载函数,其返回值都是表示实际唤醒的条件任务个数。
这就相当于实现了观察者模式。

Expand Down
2 changes: 1 addition & 1 deletion src/client/WFKafkaClient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ void KafkaClientTask::kafka_rebalance_callback(__WFKafkaTask *task)
snprintf(name, 64, "%p.cgroup", member);
member->mutex.unlock();

WFTaskFactory::signal_by_name(name, (void *)NULL, max);
WFTaskFactory::signal_by_name(name, NULL, max);
}
else
{
Expand Down
2 changes: 2 additions & 0 deletions src/factory/WFTaskFactory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,7 @@ int WFTaskFactory::send_by_name(const std::string& name, void *msg,
return __mailbox_map.send(name, &msg, max, 0);
}

template<>
int WFTaskFactory::send_by_name(const std::string& name, void *const msg[],
size_t max)
{
Expand Down Expand Up @@ -903,6 +904,7 @@ int WFTaskFactory::signal_by_name(const std::string& name, void *msg,
return __conditional_map.signal(name, &msg, max, 0);
}

template<>
int WFTaskFactory::signal_by_name(const std::string& name, void *const msg[],
size_t max)
{
Expand Down
41 changes: 5 additions & 36 deletions src/factory/WFTaskFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include <sys/types.h>
#include <sys/uio.h>
#include <time.h>
#include <stdint.h>
#include <utility>
#include <functional>
#include <openssl/ssl.h>
Expand Down Expand Up @@ -254,8 +253,7 @@ class WFTaskFactory

/* Count by a counter's name. When count_by_name(), it's safe to count
* exceeding target_value. When multiple counters share a same name,
* this operation will be performed on the first created. If no counter
* matches the name, nothing is performed. */
* this operation will be performed on the first created. */
static int count_by_name(const std::string& counter_name)
{
return WFTaskFactory::count_by_name(counter_name, 1);
Expand Down Expand Up @@ -296,7 +294,8 @@ class WFTaskFactory
static int send_by_name(const std::string& mailbox_name, void *msg,
size_t max);

static int send_by_name(const std::string& mailbox_name, void *const msg[],
template<typename T>
static int send_by_name(const std::string& mailbox_name, T *const msg[],
size_t max);

public:
Expand Down Expand Up @@ -331,7 +330,8 @@ class WFTaskFactory
static int signal_by_name(const std::string& cond_name, void *msg,
size_t max);

static int signal_by_name(const std::string& cond_name, void *const msg[],
template<typename T>
static int signal_by_name(const std::string& cond_name, T *const msg[],
size_t max);

public:
Expand Down Expand Up @@ -409,37 +409,6 @@ class WFTaskFactory
task->sub_series()->set_last_task(last);
return task;
}

private:
/* Some compilers don't declare 'nullptr_t' although required by C++11. */
using nullptr_t = std::nullptr_t;

public:
/* The following functions are for overload resolution only. */

static int send_by_name(const std::string& mailbox_name, intptr_t msg,
size_t max)
{
return WFTaskFactory::send_by_name(mailbox_name, (void *)msg, max);
}

static int send_by_name(const std::string& mailbox_name, nullptr_t msg,
size_t max)
{
return WFTaskFactory::send_by_name(mailbox_name, (void *)0, max);
}

static int signal_by_name(const std::string& cond_name, intptr_t msg,
size_t max)
{
return WFTaskFactory::signal_by_name(cond_name, (void *)msg, max);
}

static int signal_by_name(const std::string& cond_name, nullptr_t msg,
size_t max)
{
return WFTaskFactory::signal_by_name(cond_name, (void *)0, max);
}
};

template<class REQ, class RESP>
Expand Down
30 changes: 25 additions & 5 deletions src/factory/WFTaskFactory.inl
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,26 @@ WFTaskFactory::create_dynamic_task(dynamic_create_t create)
return new __WFDynamicTask(std::move(create));
}

template<>
int WFTaskFactory::send_by_name(const std::string&, void *const *, size_t);

template<typename T>
int WFTaskFactory::send_by_name(const std::string& mailbox_name, T *const msg[],
size_t max)
{
return WFTaskFactory::send_by_name(mailbox_name, (void *const *)msg, max);
}

template<>
int WFTaskFactory::signal_by_name(const std::string&, void *const *, size_t);

template<typename T>
int WFTaskFactory::signal_by_name(const std::string& cond_name, T *const msg[],
size_t max)
{
return WFTaskFactory::signal_by_name(cond_name, (void *const *)msg, max);
}

template<class REQ, class RESP, typename CTX = bool>
class WFComplexClientTask : public WFClientTask<REQ, RESP>
{
Expand Down Expand Up @@ -709,7 +729,7 @@ void WFTaskFactory::reset_go_task(WFGoTask *task, FUNC&& func, ARGS&&... args)

template<> inline
WFGoTask *WFTaskFactory::create_go_task(const std::string& queue_name,
nullptr_t&& func)
std::nullptr_t&&)
{
return new __WFGoTask(WFGlobal::get_exec_queue(queue_name),
WFGlobal::get_compute_executor(),
Expand All @@ -719,7 +739,7 @@ WFGoTask *WFTaskFactory::create_go_task(const std::string& queue_name,
template<> inline
WFGoTask *WFTaskFactory::create_timedgo_task(time_t seconds, long nanoseconds,
const std::string& queue_name,
nullptr_t&& func)
std::nullptr_t&&)
{
return new __WFTimedGoTask(seconds, nanoseconds,
WFGlobal::get_exec_queue(queue_name),
Expand All @@ -729,21 +749,21 @@ WFGoTask *WFTaskFactory::create_timedgo_task(time_t seconds, long nanoseconds,

template<> inline
WFGoTask *WFTaskFactory::create_go_task(ExecQueue *queue, Executor *executor,
nullptr_t&& func)
std::nullptr_t&&)
{
return new __WFGoTask(queue, executor, nullptr);
}

template<> inline
WFGoTask *WFTaskFactory::create_timedgo_task(time_t seconds, long nanoseconds,
ExecQueue *queue, Executor *executor,
nullptr_t&& func)
std::nullptr_t&&)
{
return new __WFTimedGoTask(seconds, nanoseconds, queue, executor, nullptr);
}

template<> inline
void WFTaskFactory::reset_go_task(WFGoTask *task, nullptr_t&& func)
void WFTaskFactory::reset_go_task(WFGoTask *task, std::nullptr_t&&)
{
((__WFGoTask *)task)->set_go_func(nullptr);
}
Expand Down
Loading