#ifndef __M_BROKER_H__
#define __M_BROKER_H__
#include "muduo/proto/codec.h"
#include "muduo/proto/dispatcher.h"
#include "muduo/base/Logging.h"
#include "muduo/base/Mutex.h"
#include "muduo/net/EventLoop.h"
#include "muduo/net/TcpServer.h"
#include "../mqcommon/threadpool.hpp"
#include "../mqcommon/msg.pb.h"
#include "../mqcommon/proto.pb.h"
#include "../mqcommon/logger.hpp"
#include "connection.hpp"
#include "consumer.hpp"
#include "host.hpp"
namespace rabbitmq {
#define DBFILE "/meta.db"
#define HOSTNAME "MyVirtualHost"
class Server {
public:
typedef std::shared_ptr<google::protobuf::Message> MessagePtr;
Server(int port, const std::string &basedir)
: _server(&_baseloop, muduo::net::InetAddress("0.0.0.0", port), "Server", muduo::net::TcpServer::kReusePort),
_dispatcher(std::bind(&Server::onUnknownMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),
_codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))),
_virtual_host(std::make_shared<VirtualHost>(HOSTNAME, basedir, basedir + DBFILE)),
_consumer_manager(std::make_shared<ConsumerManager>()),
_connection_manager(std::make_shared<ConnectionManager>()),
_threadpool(std::make_shared<threadpool>()) {
QueueMap qm = _virtual_host->allQueues();
for (auto& q : qm) {
_consumer_manager->initQueueConsumer(q.first);
}
_dispatcher.registerMessageCallback<rabbitmq::openChannelRequest>(std::bind(&Server::onOpenChannel, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
_dispatcher.registerMessageCallback<rabbitmq::closeChannelRequest>(std::bind(&Server::onCloseChannel, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
_dispatcher.registerMessageCallback<rabbitmq::declareExchangeRequest>(std::bind(&Server::onDeclareExchange, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
_dispatcher.registerMessageCallback<rabbitmq::deleteExchangeRequest>(std::bind(&Server::onDeleteExchange, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
_dispatcher.registerMessageCallback<rabbitmq::declareQueueRequest>(std::bind(&Server::onDeclareQueue, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
_dispatcher.registerMessageCallback<rabbitmq::deleteQueueRequest>(std::bind(&Server::onDeleteQueue, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
_dispatcher.registerMessageCallback<rabbitmq::queueBindRequest>(std::bind(&Server::onQueueBind, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
_dispatcher.registerMessageCallback<rabbitmq::queueUnBindRequest>(std::bind(&Server::onQueueUnBind, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
_dispatcher.registerMessageCallback<rabbitmq::basicPublishRequest>(std::bind(&Server::onBasicPublish, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
_dispatcher.registerMessageCallback<rabbitmq::basicAckRequest>(std::bind(&Server::onBasicAck, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
_dispatcher.registerMessageCallback<rabbitmq::basicConsumeRequest>(std::bind(&Server::onBasicConsume, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
_dispatcher.registerMessageCallback<rabbitmq::basicCancelRequest>(std::bind(&Server::onBasicCancel, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
_server.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec.get(), std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
_server.setConnectionCallback(std::bind(&Server::onConnection, this, std::placeholders::_1));
}
void start() {
_server.start();
_baseloop.loop();
}
private:
void onOpenChannel(const muduo::net::TcpConnectionPtr &conn, const openChannelRequestPtr &message, muduo::Timestamp) {
Connection::ptr mconn = _connection_manager->getConnection(conn);
if (mconn.get() == nullptr) {
DLOG("打开信道时,没有找到连接对应的 Connection 对象!");
conn->shutdown();
return;
}
mconn->openChannel(message);
}
void onCloseChannel(const muduo::net::TcpConnectionPtr &conn, const closeChannelRequestPtr &message, muduo::Timestamp) {
Connection::ptr mconn = _connection_manager->getConnection(conn);
if (mconn.get() == nullptr) {
DLOG("关闭信道时,没有找到连接对应的 Connection 对象!");
conn->shutdown();
return;
}
mconn->closeChannel(message);
}
void onDeclareExchange(const muduo::net::TcpConnectionPtr &conn, const declareExchangeRequestPtr &message, muduo::Timestamp) {
Connection::ptr mconn = _connection_manager->getConnection(conn);
if (mconn.get() == nullptr) {
DLOG("声明交换机时,没有找到连接对应的 Connection 对象!");
conn->shutdown();
return;
}
Channel::ptr cp = mconn->getChannel(message->cid());
if (cp.get() == nullptr) {
DLOG("声明交换机时,没有找到信道!");
return;
}
cp->declareExchange(message);
}
void onDeleteExchange(const muduo::net::TcpConnectionPtr &conn, const deleteExchangeRequestPtr &message, muduo::Timestamp) {
Connection::ptr mconn = _connection_manager->getConnection(conn);
if (mconn.get() == nullptr) {
DLOG("删除交换机时,没有找到连接对应的 Connection 对象!");
conn->shutdown();
return;
}
Channel::ptr cp = mconn->getChannel(message->cid());
if (cp.get() == nullptr) {
DLOG("删除交换机时,没有找到信道!");
return;
}
cp->deleteExchange(message);
}
void onDeclareQueue(const muduo::net::TcpConnectionPtr &conn, const declareQueueRequestPtr &message, muduo::Timestamp) {
Connection::ptr mconn = _connection_manager->getConnection(conn);
if (mconn.get() == nullptr) {
DLOG("声明队列时,没有找到连接对应的 Connection 对象!");
conn->shutdown();
return;
}
Channel::ptr cp = mconn->getChannel(message->cid());
if (cp.get() == nullptr) {
DLOG("声明队列时,没有找到信道!");
return;
}
cp->declareQueue(message);
}
void onDeleteQueue(const muduo::net::TcpConnectionPtr &conn, const deleteQueueRequestPtr &message, muduo::Timestamp) {
Connection::ptr mconn = _connection_manager->getConnection(conn);
if (mconn.get() == nullptr) {
DLOG("删除队列时,没有找到连接对应的 Connection 对象!");
conn->shutdown();
return;
}
Channel::ptr cp = mconn->getChannel(message->cid());
if (cp.get() == nullptr) {
DLOG("删除队列时,没有找到信道!");
return;
}
cp->deleteQueue(message);
}
void onQueueBind(const muduo::net::TcpConnectionPtr &conn, const queueBindRequestPtr &message, muduo::Timestamp) {
Connection::ptr mconn = _connection_manager->getConnection(conn);
if (mconn.get() == nullptr) {
DLOG("队列绑定时,没有找到连接对应的 Connection 对象!");
conn->shutdown();
return;
}
Channel::ptr cp = mconn->getChannel(message->cid());
if (cp.get() == nullptr) {
DLOG("队列绑定时,没有找到信道!");
return;
}
cp->queueBind(message);
}
void onQueueUnBind(const muduo::net::TcpConnectionPtr &conn, const queueUnBindRequestPtr &message, muduo::Timestamp) {
Connection::ptr mconn = _connection_manager->getConnection(conn);
if (mconn.get() == nullptr) {
DLOG("队列解绑时,没有找到连接对应的 Connection 对象!");
conn->shutdown();
return;
}
Channel::ptr cp = mconn->getChannel(message->cid());
if (cp.get() == nullptr) {
DLOG("队列解绑时,没有找到信道!");
return;
}
cp->queueUnBind(message);
}
void onBasicPublish(const muduo::net::TcpConnectionPtr &conn, const basicPublishRequestPtr &message, muduo::Timestamp) {
Connection::ptr mconn = _connection_manager->getConnection(conn);
if (mconn.get() == nullptr) {
DLOG("消息发布时,没有找到连接对应的 Connection 对象!");
conn->shutdown();
return;
}
Channel::ptr cp = mconn->getChannel(message->cid());
if (cp.get() == nullptr) {
DLOG("消息发布时,没有找到信道!");
return;
}
cp->basicPublish(message);
}
void onBasicAck(const muduo::net::TcpConnectionPtr &conn, const basicAckRequestPtr &message, muduo::Timestamp) {
Connection::ptr mconn = _connection_manager->getConnection(conn);
if (mconn.get() == nullptr) {
DLOG("消息确认时,没有找到连接对应的 Connection 对象!");
conn->shutdown();
return;
}
Channel::ptr cp = mconn->getChannel(message->cid());
if (cp.get() == nullptr) {
DLOG("消息确认时,没有找到信道!");
return;
}
cp->basicAck(message);
}
void onBasicConsume(const muduo::net::TcpConnectionPtr &conn, const basicConsumeRequestPtr &message, muduo::Timestamp) {
Connection::ptr mconn = _connection_manager->getConnection(conn);
if (mconn.get() == nullptr) {
DLOG("队列消息订阅时,没有找到连接对应的 Connection 对象!");
conn->shutdown();
return;
}
Channel::ptr cp = mconn->getChannel(message->cid());
if (cp.get() == nullptr) {
DLOG("队列消息订阅时,没有找到信道!");
return;
}
cp->basicConsume(message);
}
void onBasicCancel(const muduo::net::TcpConnectionPtr &conn, const basicCancelRequestPtr &message, muduo::Timestamp) {
Connection::ptr mconn = _connection_manager->getConnection(conn);
if (mconn.get() == nullptr) {
DLOG("队列消息取消订阅时,没有找到连接对应的 Connection 对象!");
conn->shutdown();
return;
}
Channel::ptr cp = mconn->getChannel(message->cid());
if (cp.get() == nullptr) {
DLOG("队列消息取消订阅时,没有找到信道!");
return;
}
cp->basicCancel(message);
}
void onUnknownMessage(const muduo::net::TcpConnectionPtr &conn, const MessagePtr &message, muduo::Timestamp) {
LOG_INFO << "onUnknownMessage: " << message->GetTypeName();
conn->shutdown();
}
void onConnection(const muduo::net::TcpConnectionPtr &conn) {
if (conn->connected()) {
_connection_manager->newConnection(_virtual_host, _consumer_manager, _codec, conn, _threadpool);
} else {
_connection_manager->delConnection(conn);
}
}
private:
muduo::net::EventLoop _baseloop;
muduo::net::TcpServer _server;
ProtobufDispatcher _dispatcher;
ProtobufCodecPtr _codec;
VirtualHost::ptr _virtual_host;
ConsumerManager::ptr _consumer_manager;
ConnectionManager::ptr _connection_manager;
threadpool::ptr _threadpool;
};
}
#endif