first commit

This commit is contained in:
sanya
2025-09-01 14:20:39 +00:00
committed by ExternPointer
commit 490fc11f6a
4328 changed files with 1796224 additions and 0 deletions

View File

@@ -0,0 +1,659 @@
//
// sio_client_impl.cpp
// SioChatDemo
//
// Created by Melo Yao on 4/3/15.
// Copyright (c) 2015 Melo Yao. All rights reserved.
//
#include "sio_client_impl.h"
#include <functional>
#include <sstream>
#include <chrono>
#include <mutex>
#include <cmath>
// Comment this out to disable handshake logging to stdout
#if (DEBUG || _DEBUG) && !defined(SIO_DISABLE_LOGGING)
#define LOG(x) std::cout << x
#else
#define LOG(x)
#endif
#if SIO_TLS
// If using Asio's SSL support, you will also need to add this #include.
// Source: http://think-async.com/Asio/asio-1.10.6/doc/asio/using.html
// #include <asio/ssl/impl/src.hpp>
#endif
using std::chrono::milliseconds;
using namespace std;
namespace sio
{
/*************************public:*************************/
client_impl::client_impl(client_options const& options) :
m_ping_interval(0),
m_ping_timeout(0),
m_network_thread(),
m_con_state(con_closed),
m_reconn_delay(5000),
m_reconn_delay_max(25000),
m_reconn_attempts(0xFFFFFFFF),
m_reconn_made(0)
{
using websocketpp::log::alevel;
#ifndef DEBUG
m_client.clear_access_channels(alevel::all);
m_client.set_access_channels(alevel::connect|alevel::disconnect|alevel::app);
#endif
// Initialize the Asio transport policy
if (options.io_context != nullptr) {
m_client.init_asio(options.io_context);
} else {
m_client.init_asio();
}
// Bind the clients we are using
using std::placeholders::_1;
using std::placeholders::_2;
m_client.set_open_handler(std::bind(&client_impl::on_open,this,_1));
m_client.set_close_handler(std::bind(&client_impl::on_close,this,_1));
m_client.set_fail_handler(std::bind(&client_impl::on_fail,this,_1));
m_client.set_message_handler(std::bind(&client_impl::on_message,this,_1,_2));
#if SIO_TLS
m_client.set_tls_init_handler(std::bind(&client_impl::on_tls_init,this,_1));
#endif
m_packet_mgr.set_decode_callback(std::bind(&client_impl::on_decode,this,_1));
m_packet_mgr.set_encode_callback(std::bind(&client_impl::on_encode,this,_1,_2));
}
client_impl::~client_impl()
{
this->sockets_invoke_void(&sio::socket::on_close);
sync_close();
}
void client_impl::set_proxy_basic_auth(const std::string& uri, const std::string& username, const std::string& password)
{
m_proxy_base_url = uri;
m_proxy_basic_username = username;
m_proxy_basic_password = password;
}
void client_impl::connect(const string& uri, const map<string,string>& query, const map<string, string>& headers, const message::ptr& auth)
{
if(m_reconn_timer)
{
m_reconn_timer->cancel();
m_reconn_timer.reset();
}
if(m_network_thread)
{
if(m_con_state == con_closing||m_con_state == con_closed)
{
//if client is closing, join to wait.
//if client is closed, still need to join,
//but in closed case,join will return immediately.
m_network_thread->join();
m_network_thread.reset();//defensive
}
else
{
//if we are connected, do nothing.
return;
}
}
m_con_state = con_opening;
m_base_url = uri;
m_reconn_made = 0;
string query_str;
for(map<string,string>::const_iterator it=query.begin();it!=query.end();++it){
query_str.append("&");
query_str.append(it->first);
query_str.append("=");
string query_str_value=encode_query_string(it->second);
query_str.append(query_str_value);
}
m_query_string=move(query_str);
m_http_headers = headers;
m_auth = auth;
this->reset_states();
m_abort_retries = false;
m_client.get_io_service().dispatch(std::bind(&client_impl::connect_impl,this,uri,m_query_string));
m_network_thread.reset(new thread(std::bind(&client_impl::run_loop,this)));//uri lifecycle?
}
socket::ptr const& client_impl::socket(string const& nsp)
{
lock_guard<mutex> guard(m_socket_mutex);
string aux;
if(nsp == "")
{
aux = "/";
}
else if( nsp[0] != '/')
{
aux.append("/",1);
aux.append(nsp);
}
else
{
aux = nsp;
}
auto it = m_sockets.find(aux);
if(it!= m_sockets.end())
{
return it->second;
}
else
{
pair<const string, socket::ptr> p(aux,shared_ptr<sio::socket>(new sio::socket(this,aux,m_auth)));
return (m_sockets.insert(p).first)->second;
}
}
void client_impl::close()
{
m_con_state = con_closing;
m_abort_retries = true;
this->sockets_invoke_void(&sio::socket::close);
m_client.get_io_service().dispatch(std::bind(&client_impl::close_impl, this,close::status::normal,"End by user"));
}
void client_impl::sync_close()
{
m_con_state = con_closing;
m_abort_retries = true;
this->sockets_invoke_void(&sio::socket::close);
m_client.get_io_service().dispatch(std::bind(&client_impl::close_impl, this,close::status::normal,"End by user"));
if(m_network_thread)
{
m_network_thread->join();
m_network_thread.reset();
}
}
void client_impl::set_logs_default()
{
m_client.clear_access_channels(websocketpp::log::alevel::all);
m_client.set_access_channels(websocketpp::log::alevel::connect | websocketpp::log::alevel::disconnect | websocketpp::log::alevel::app);
}
void client_impl::set_logs_quiet()
{
m_client.clear_access_channels(websocketpp::log::alevel::all);
}
void client_impl::set_logs_verbose()
{
m_client.set_access_channels(websocketpp::log::alevel::all);
}
/*************************protected:*************************/
void client_impl::send(packet& p)
{
m_packet_mgr.encode(p);
}
void client_impl::remove_socket(string const& nsp)
{
lock_guard<mutex> guard(m_socket_mutex);
auto it = m_sockets.find(nsp);
if(it!= m_sockets.end())
{
m_sockets.erase(it);
}
}
asio::io_service& client_impl::get_io_service()
{
return m_client.get_io_service();
}
void client_impl::on_socket_closed(string const& nsp)
{
if(m_socket_close_listener)m_socket_close_listener(nsp);
}
void client_impl::on_socket_opened(string const& nsp)
{
if(m_socket_open_listener)m_socket_open_listener(nsp);
}
/*************************private:*************************/
void client_impl::run_loop()
{
m_client.run();
m_client.reset();
m_client.get_alog().write(websocketpp::log::alevel::devel,
"run loop end");
}
void client_impl::connect_impl(const string& uri, const string& queryString)
{
do{
websocketpp::uri uo(uri);
ostringstream ss;
#if SIO_TLS
ss<<"wss://";
#else
ss<<"ws://";
#endif
const std::string host(uo.get_host());
// As per RFC2732, literal IPv6 address should be enclosed in "[" and "]".
if(host.find(':')!=std::string::npos){
ss<<"["<<uo.get_host()<<"]";
} else {
ss<<uo.get_host();
}
// If a resource path was included in the URI, use that, otherwise
// use the default /socket.io/.
const std::string path(uo.get_resource() == "/" ? "/socket.io/" : uo.get_resource());
ss<<":"<<uo.get_port()<<path<<"?EIO=4&transport=websocket";
if(m_sid.size()>0){
ss<<"&sid="<<m_sid;
}
ss<<"&t="<<time(NULL)<<queryString;
lib::error_code ec;
client_type::connection_ptr con = m_client.get_connection(ss.str(), ec);
if (ec) {
m_client.get_alog().write(websocketpp::log::alevel::app,
"Get Connection Error: "+ec.message());
break;
}
for( auto&& header: m_http_headers ) {
con->replace_header(header.first, header.second);
}
if (!m_proxy_base_url.empty()) {
con->set_proxy(m_proxy_base_url, ec);
if (ec) {
m_client.get_alog().write(websocketpp::log::alevel::app,
"Set Proxy Error: " + ec.message());
break;
}
if (!m_proxy_basic_username.empty()) {
con->set_proxy_basic_auth(m_proxy_basic_username, m_proxy_basic_password, ec);
if (ec) {
m_client.get_alog().write(websocketpp::log::alevel::app,
"Set Proxy Basic Auth Error: " + ec.message());
break;
}
}
}
m_client.connect(con);
return;
}
while(0);
if(m_fail_listener)
{
m_fail_listener();
}
}
void client_impl::close_impl(close::status::value const& code,string const& reason)
{
LOG("Close by reason:"<<reason << endl);
if(m_reconn_timer)
{
m_reconn_timer->cancel();
m_reconn_timer.reset();
}
if (m_con.expired())
{
cerr << "Error: No active session" << endl;
}
else
{
lib::error_code ec;
m_client.close(m_con, code, reason, ec);
}
}
void client_impl::send_impl(shared_ptr<const string> const& payload_ptr,frame::opcode::value opcode)
{
if(m_con_state == con_opened)
{
lib::error_code ec;
m_client.send(m_con,*payload_ptr,opcode,ec);
if(ec)
{
cerr<<"Send failed,reason:"<< ec.message()<<endl;
}
}
}
void client_impl::timeout_ping(const asio::error_code &ec)
{
if(ec)
{
return;
}
LOG("Ping timeout"<<endl);
m_client.get_io_service().dispatch(std::bind(&client_impl::close_impl, this,close::status::policy_violation,"Ping timeout"));
}
void client_impl::timeout_reconnect(asio::error_code const& ec)
{
if(ec)
{
return;
}
if(m_con_state == con_closed)
{
m_con_state = con_opening;
m_reconn_made++;
this->reset_states();
LOG("Reconnecting..."<<endl);
if(m_reconnecting_listener) m_reconnecting_listener();
m_client.get_io_service().dispatch(std::bind(&client_impl::connect_impl,this,m_base_url,m_query_string));
}
}
unsigned client_impl::next_delay() const
{
//no jitter, fixed power root.
unsigned reconn_made = min<unsigned>(m_reconn_made,32);//protect the pow result to be too big.
return static_cast<unsigned>(min<double>(m_reconn_delay * pow(1.5,reconn_made),m_reconn_delay_max));
}
socket::ptr client_impl::get_socket_locked(string const& nsp)
{
lock_guard<mutex> guard(m_socket_mutex);
auto it = m_sockets.find(nsp);
if(it != m_sockets.end())
{
return it->second;
}
else
{
return socket::ptr();
}
}
void client_impl::sockets_invoke_void(void (sio::socket::*fn)(void))
{
map<const string,socket::ptr> socks;
{
lock_guard<mutex> guard(m_socket_mutex);
socks.insert(m_sockets.begin(),m_sockets.end());
}
for (auto it = socks.begin(); it!=socks.end(); ++it) {
((*(it->second)).*fn)();
}
}
void client_impl::on_fail(connection_hdl)
{
if (m_con_state == con_closing) {
LOG("Connection failed while closing." << endl);
this->close();
return;
}
m_con.reset();
m_con_state = con_closed;
this->sockets_invoke_void(&sio::socket::on_disconnect);
LOG("Connection failed." << endl);
if(m_reconn_made<m_reconn_attempts && !m_abort_retries)
{
LOG("Reconnect for attempt:"<<m_reconn_made<<endl);
unsigned delay = this->next_delay();
if(m_reconnect_listener) m_reconnect_listener(m_reconn_made,delay);
m_reconn_timer.reset(new asio::steady_timer(m_client.get_io_service()));
asio::error_code ec;
m_reconn_timer->expires_from_now(milliseconds(delay), ec);
m_reconn_timer->async_wait(std::bind(&client_impl::timeout_reconnect,this, std::placeholders::_1));
}
else
{
if(m_fail_listener)m_fail_listener();
}
}
void client_impl::on_open(connection_hdl con)
{
if (m_con_state == con_closing) {
LOG("Connection opened while closing." << endl);
this->close();
return;
}
LOG("Connected." << endl);
m_con_state = con_opened;
m_con = con;
m_reconn_made = 0;
this->sockets_invoke_void(&sio::socket::on_open);
this->socket("");
if(m_open_listener)m_open_listener();
}
void client_impl::on_close(connection_hdl con)
{
LOG("Client Disconnected." << endl);
con_state m_con_state_was = m_con_state;
m_con_state = con_closed;
lib::error_code ec;
close::status::value code = close::status::normal;
client_type::connection_ptr conn_ptr = m_client.get_con_from_hdl(con, ec);
if (ec) {
LOG("OnClose get conn failed"<<ec<<endl);
}
else
{
code = conn_ptr->get_local_close_code();
}
m_con.reset();
this->clear_timers();
client::close_reason reason;
// If we initiated the close, no matter what the close status was,
// we'll consider it a normal close. (When using TLS, we can
// sometimes get a TLS Short Read error when closing.)
if(code == close::status::normal || m_con_state_was == con_closing)
{
this->sockets_invoke_void(&sio::socket::on_disconnect);
reason = client::close_reason_normal;
}
else
{
this->sockets_invoke_void(&sio::socket::on_disconnect);
if(m_reconn_made<m_reconn_attempts && !m_abort_retries)
{
LOG("Reconnect for attempt:"<<m_reconn_made<<endl);
unsigned delay = this->next_delay();
if(m_reconnect_listener) m_reconnect_listener(m_reconn_made,delay);
m_reconn_timer.reset(new asio::steady_timer(m_client.get_io_service()));
asio::error_code ec;
m_reconn_timer->expires_from_now(milliseconds(delay), ec);
m_reconn_timer->async_wait(std::bind(&client_impl::timeout_reconnect,this, std::placeholders::_1));
return;
}
reason = client::close_reason_drop;
}
if(m_close_listener)
{
m_close_listener(reason);
}
}
void client_impl::on_message(connection_hdl, client_type::message_ptr msg)
{
// Parse the incoming message according to socket.IO rules
m_packet_mgr.put_payload(msg->get_payload());
}
void client_impl::on_handshake(message::ptr const& message)
{
if(message && message->get_flag() == message::flag_object)
{
const object_message* obj_ptr =static_cast<object_message*>(message.get());
const map<string,message::ptr>* values = &(obj_ptr->get_map());
auto it = values->find("sid");
if (it!= values->end()) {
m_sid = static_pointer_cast<string_message>(it->second)->get_string();
}
else
{
goto failed;
}
it = values->find("pingInterval");
if (it!= values->end()&&it->second->get_flag() == message::flag_integer) {
m_ping_interval = (unsigned)static_pointer_cast<int_message>(it->second)->get_int();
}
else
{
m_ping_interval = 25000;
}
it = values->find("pingTimeout");
if (it!=values->end()&&it->second->get_flag() == message::flag_integer) {
m_ping_timeout = (unsigned) static_pointer_cast<int_message>(it->second)->get_int();
}
else
{
m_ping_timeout = 60000;
}
// Start ping timeout
update_ping_timeout_timer();
return;
}
failed:
//just close it.
m_client.get_io_service().dispatch(std::bind(&client_impl::close_impl, this,close::status::policy_violation,"Handshake error"));
}
void client_impl::on_ping()
{
// Reply with pong packet.
packet p(packet::frame_pong);
m_packet_mgr.encode(p, [&](bool /*isBin*/,shared_ptr<const string> payload)
{
this->m_client.send(this->m_con, *payload, frame::opcode::text);
});
// Reset the ping timeout.
update_ping_timeout_timer();
}
void client_impl::on_decode(packet const& p)
{
switch(p.get_frame())
{
case packet::frame_message:
{
// Special event for sid sync
if (p.get_type() == packet::type_connect) {
auto message = p.get_message();
if (message && message->get_flag() == message::flag_object)
{
const object_message* obj_ptr = static_cast<object_message*>(message.get());
const std::map<std::string, message::ptr>* values = &(obj_ptr->get_map());
auto it = values->find("sid");
if (it != values->end()) {
m_sid = std::static_pointer_cast<string_message>(it->second)->get_string();
}
}
}
socket::ptr so_ptr = get_socket_locked(p.get_nsp());
if(so_ptr)so_ptr->on_message_packet(p);
break;
}
case packet::frame_open:
this->on_handshake(p.get_message());
break;
case packet::frame_close:
//FIXME how to deal?
this->close_impl(close::status::abnormal_close, "End by server");
break;
case packet::frame_ping:
this->on_ping();
break;
default:
break;
}
}
void client_impl::on_encode(bool isBinary,shared_ptr<const string> const& payload)
{
LOG("encoded payload length:"<<payload->length()<<endl);
m_client.get_io_service().dispatch(std::bind(&client_impl::send_impl,this,payload,isBinary?frame::opcode::binary:frame::opcode::text));
}
void client_impl::clear_timers()
{
LOG("clear timers"<<endl);
asio::error_code ec;
if(m_ping_timeout_timer)
{
m_ping_timeout_timer->cancel(ec);
m_ping_timeout_timer.reset();
}
}
void client_impl::update_ping_timeout_timer() {
if (!m_ping_timeout_timer) {
m_ping_timeout_timer = std::unique_ptr<asio::steady_timer>(new asio::steady_timer(get_io_service()));
}
asio::error_code ec;
m_ping_timeout_timer->expires_from_now(milliseconds(m_ping_interval + m_ping_timeout), ec);
m_ping_timeout_timer->async_wait(std::bind(&client_impl::timeout_ping, this, std::placeholders::_1));
}
void client_impl::reset_states()
{
m_client.reset();
m_sid.clear();
m_packet_mgr.reset();
}
#if SIO_TLS
client_impl::context_ptr client_impl::on_tls_init(connection_hdl conn)
{
context_ptr ctx = context_ptr(new asio::ssl::context(asio::ssl::context::tls));
asio::error_code ec;
ctx->set_options(asio::ssl::context::default_workarounds |
asio::ssl::context::no_tlsv1 |
asio::ssl::context::no_tlsv1_1 |
asio::ssl::context::single_dh_use,ec);
if(ec)
{
cerr<<"Init tls failed,reason:"<< ec.message()<<endl;
}
return ctx;
}
#endif
std::string client_impl::encode_query_string(const std::string &query){
ostringstream ss;
ss << std::hex;
// Percent-encode (RFC3986) non-alphanumeric characters.
for(const char c : query){
if((c >= 'a' && c <= 'z') || (c>= 'A' && c<= 'Z') || (c >= '0' && c<= '9')){
ss << c;
} else {
ss << '%' << std::uppercase << std::setw(2) << int((unsigned char) c) << std::nouppercase;
}
}
ss << std::dec;
return ss.str();
}
}

View File

@@ -0,0 +1,253 @@
#ifndef SIO_CLIENT_IMPL_H
#define SIO_CLIENT_IMPL_H
#include <cstdint>
#ifdef _WIN32
#define _WEBSOCKETPP_CPP11_THREAD_
//#define _WEBSOCKETPP_CPP11_RANDOM_DEVICE_
#define _WEBSOCKETPP_NO_CPP11_FUNCTIONAL_
#define INTIALIZER(__TYPE__)
#else
#define _WEBSOCKETPP_CPP11_STL_ 1
#define INTIALIZER(__TYPE__) (__TYPE__)
#endif
#include <websocketpp/client.hpp>
#if _DEBUG || DEBUG
#if SIO_TLS
#include <websocketpp/config/debug_asio.hpp>
typedef websocketpp::config::debug_asio_tls client_config;
#else
#include <websocketpp/config/debug_asio_no_tls.hpp>
typedef websocketpp::config::debug_asio client_config;
#endif //SIO_TLS
#else
#if SIO_TLS
#include <websocketpp/config/asio_client.hpp>
typedef websocketpp::config::asio_tls_client client_config;
#else
#include <websocketpp/config/asio_no_tls_client.hpp>
typedef websocketpp::config::asio_client client_config;
#endif //SIO_TLS
#endif //DEBUG
#if SIO_TLS
#include <asio/ssl/context.hpp>
#endif
#include <asio/steady_timer.hpp>
#include <asio/error_code.hpp>
#include <asio/io_service.hpp>
#include <atomic>
#include <memory>
#include <map>
#include <thread>
#include "../sio_client.h"
#include "sio_packet.h"
namespace sio
{
using namespace websocketpp;
typedef websocketpp::client<client_config> client_type;
class client_impl {
protected:
enum con_state
{
con_opening,
con_opened,
con_closing,
con_closed
};
client_impl(client_options const& options);
~client_impl();
//set listeners and event bindings.
#define SYNTHESIS_SETTER(__TYPE__,__FIELD__) \
void set_##__FIELD__(__TYPE__ const& l) \
{ m_##__FIELD__ = l;}
SYNTHESIS_SETTER(client::con_listener,open_listener)
SYNTHESIS_SETTER(client::con_listener,fail_listener)
SYNTHESIS_SETTER(client::reconnect_listener,reconnect_listener)
SYNTHESIS_SETTER(client::con_listener,reconnecting_listener)
SYNTHESIS_SETTER(client::close_listener,close_listener)
SYNTHESIS_SETTER(client::socket_listener,socket_open_listener)
SYNTHESIS_SETTER(client::socket_listener,socket_close_listener)
#undef SYNTHESIS_SETTER
void clear_con_listeners()
{
m_open_listener = nullptr;
m_close_listener = nullptr;
m_fail_listener = nullptr;
m_reconnect_listener = nullptr;
m_reconnecting_listener = nullptr;
}
void clear_socket_listeners()
{
m_socket_open_listener = nullptr;
m_socket_close_listener = nullptr;
}
// Client Functions - such as send, etc.
void connect(const std::string& uri, const std::map<std::string, std::string>& queryString,
const std::map<std::string, std::string>& httpExtraHeaders, const message::ptr& auth);
sio::socket::ptr const& socket(const std::string& nsp);
// Closes the connection
void close();
void sync_close();
bool opened() const { return m_con_state == con_opened; }
std::string const& get_sessionid() const { return m_sid; }
void set_reconnect_attempts(unsigned attempts) {m_reconn_attempts = attempts;}
void set_reconnect_delay(unsigned millis) {m_reconn_delay = millis;if(m_reconn_delay_max<millis) m_reconn_delay_max = millis;}
void set_reconnect_delay_max(unsigned millis) {m_reconn_delay_max = millis;if(m_reconn_delay>millis) m_reconn_delay = millis;}
void set_logs_default();
void set_logs_quiet();
void set_logs_verbose();
void set_proxy_basic_auth(const std::string& uri, const std::string& username, const std::string& password);
protected:
void send(packet& p);
void remove_socket(std::string const& nsp);
asio::io_service& get_io_service();
void on_socket_closed(std::string const& nsp);
void on_socket_opened(std::string const& nsp);
private:
void run_loop();
void connect_impl(const std::string& uri, const std::string& query);
void close_impl(close::status::value const& code,std::string const& reason);
void send_impl(std::shared_ptr<const std::string> const& payload_ptr,frame::opcode::value opcode);
void ping(const asio::error_code& ec);
void timeout_ping(const asio::error_code& ec);
void timeout_reconnect(asio::error_code const& ec);
unsigned next_delay() const;
socket::ptr get_socket_locked(std::string const& nsp);
void sockets_invoke_void(void (sio::socket::*fn)(void));
void on_decode(packet const& pack);
void on_encode(bool isBinary,shared_ptr<const string> const& payload);
//websocket callbacks
void on_fail(connection_hdl con);
void on_open(connection_hdl con);
void on_close(connection_hdl con);
void on_message(connection_hdl con, client_type::message_ptr msg);
//socketio callbacks
void on_handshake(message::ptr const& message);
void on_ping();
void reset_states();
void clear_timers();
void update_ping_timeout_timer();
#if SIO_TLS
typedef websocketpp::lib::shared_ptr<asio::ssl::context> context_ptr;
context_ptr on_tls_init(connection_hdl con);
#endif
// Percent encode query string
std::string encode_query_string(const std::string &query);
// Connection pointer for client functions.
connection_hdl m_con;
client_type m_client;
// Socket.IO server settings
std::string m_sid;
std::string m_base_url;
std::string m_query_string;
std::map<std::string, std::string> m_http_headers;
message::ptr m_auth;
std::string m_proxy_base_url;
std::string m_proxy_basic_username;
std::string m_proxy_basic_password;
unsigned int m_ping_interval;
unsigned int m_ping_timeout;
std::unique_ptr<std::thread> m_network_thread;
packet_manager m_packet_mgr;
std::unique_ptr<asio::steady_timer> m_ping_timeout_timer;
std::unique_ptr<asio::steady_timer> m_reconn_timer;
con_state m_con_state;
client::con_listener m_open_listener;
client::con_listener m_fail_listener;
client::con_listener m_reconnecting_listener;
client::reconnect_listener m_reconnect_listener;
client::close_listener m_close_listener;
client::socket_listener m_socket_open_listener;
client::socket_listener m_socket_close_listener;
std::map<const std::string,socket::ptr> m_sockets;
std::mutex m_socket_mutex;
unsigned m_reconn_delay;
unsigned m_reconn_delay_max;
unsigned m_reconn_attempts;
unsigned m_reconn_made;
std::atomic<bool> m_abort_retries { false };
friend class sio::client;
friend class sio::socket;
};
}
#endif // SIO_CLIENT_IMPL_H

View File

@@ -0,0 +1,510 @@
//
// sio_packet.cpp
//
// Created by Melo Yao on 3/22/15.
//
#include "sio_packet.h"
#include <rapidjson/document.h>
#include <rapidjson/encodedstream.h>
#include <rapidjson/writer.h>
#include <cassert>
#define kBIN_PLACE_HOLDER "_placeholder"
namespace sio
{
using namespace rapidjson;
using namespace std;
void accept_message(message const& msg,Value& val, Document& doc,vector<shared_ptr<const string> >& buffers);
void accept_bool_message(bool_message const& msg, Value& val)
{
val.SetBool(msg.get_bool());
}
void accept_null_message(Value& val)
{
val.SetNull();
}
void accept_int_message(int_message const& msg, Value& val)
{
val.SetInt64(msg.get_int());
}
void accept_double_message(double_message const& msg, Value& val)
{
val.SetDouble(msg.get_double());
}
void accept_string_message(string_message const& msg, Value& val)
{
val.SetString(msg.get_string().data(),(SizeType) msg.get_string().length());
}
void accept_binary_message(binary_message const& msg,Value& val,Document& doc,vector<shared_ptr<const string> >& buffers)
{
val.SetObject();
Value boolVal;
boolVal.SetBool(true);
val.AddMember(kBIN_PLACE_HOLDER, boolVal, doc.GetAllocator());
Value numVal;
numVal.SetInt((int)buffers.size());
val.AddMember("num", numVal, doc.GetAllocator());
buffers.push_back(msg.get_binary());
}
void accept_array_message(array_message const& msg,Value& val,Document& doc,vector<shared_ptr<const string> >& buffers)
{
val.SetArray();
for (vector<message::ptr>::const_iterator it = msg.get_vector().begin(); it!=msg.get_vector().end(); ++it) {
Value child;
accept_message(*(*it), child, doc,buffers);
val.PushBack(child, doc.GetAllocator());
}
}
void accept_object_message(object_message const& msg,Value& val,Document& doc,vector<shared_ptr<const string> >& buffers)
{
val.SetObject();
for (map<string,message::ptr>::const_iterator it = msg.get_map().begin(); it!= msg.get_map().end(); ++it) {
Value nameVal;
nameVal.SetString(it->first.data(), (SizeType)it->first.length(), doc.GetAllocator());
Value valueVal;
accept_message(*(it->second), valueVal, doc,buffers);
val.AddMember(nameVal, valueVal, doc.GetAllocator());
}
}
void accept_message(message const& msg,Value& val, Document& doc,vector<shared_ptr<const string> >& buffers)
{
const message* msg_ptr = &msg;
switch(msg.get_flag())
{
case message::flag_integer:
{
accept_int_message(*(static_cast<const int_message*>(msg_ptr)), val);
break;
}
case message::flag_double:
{
accept_double_message(*(static_cast<const double_message*>(msg_ptr)), val);
break;
}
case message::flag_string:
{
accept_string_message(*(static_cast<const string_message*>(msg_ptr)), val);
break;
}
case message::flag_boolean:
{
accept_bool_message(*(static_cast<const bool_message*>(msg_ptr)), val);
break;
}
case message::flag_null:
{
accept_null_message(val);
break;
}
case message::flag_binary:
{
accept_binary_message(*(static_cast<const binary_message*>(msg_ptr)), val,doc,buffers);
break;
}
case message::flag_array:
{
accept_array_message(*(static_cast<const array_message*>(msg_ptr)), val,doc,buffers);
break;
}
case message::flag_object:
{
accept_object_message(*(static_cast<const object_message*>(msg_ptr)), val,doc,buffers);
break;
}
default:
break;
}
}
message::ptr from_json(Value const& value, vector<shared_ptr<const string> > const& buffers)
{
if(value.IsInt64())
{
return int_message::create(value.GetInt64());
}
else if(value.IsDouble())
{
return double_message::create(value.GetDouble());
}
else if(value.IsString())
{
string str(value.GetString(),value.GetStringLength());
return string_message::create(str);
}
else if(value.IsArray())
{
message::ptr ptr = array_message::create();
for (SizeType i = 0; i< value.Size(); ++i) {
static_cast<array_message*>(ptr.get())->get_vector().push_back(from_json(value[i],buffers));
}
return ptr;
}
else if(value.IsObject())
{
//binary placeholder
auto mem_it = value.FindMember(kBIN_PLACE_HOLDER);
if (mem_it!=value.MemberEnd() && mem_it->value.GetBool()) {
int num = value["num"].GetInt();
if(num >= 0 && num < static_cast<int>(buffers.size()))
{
return binary_message::create(buffers[num]);
}
return message::ptr();
}
//real object message.
message::ptr ptr = object_message::create();
for (auto it = value.MemberBegin();it!=value.MemberEnd();++it)
{
if(it->name.IsString())
{
string key(it->name.GetString(),it->name.GetStringLength());
static_cast<object_message*>(ptr.get())->get_map()[key] = from_json(it->value,buffers);
}
}
return ptr;
}
else if(value.IsBool())
{
return bool_message::create(value.GetBool());
}
else if(value.IsNull())
{
return null_message::create();
}
return message::ptr();
}
packet::packet(string const& nsp,message::ptr const& msg,int pack_id, bool isAck):
_frame(frame_message),
_type((isAck?type_ack : type_event) | type_undetermined),
_nsp(nsp),
_pack_id(pack_id),
_message(msg),
_pending_buffers(0)
{
assert((!isAck
|| (isAck&&pack_id>=0)));
}
packet::packet(type type,string const& nsp, message::ptr const& msg):
_frame(frame_message),
_type(type),
_nsp(nsp),
_pack_id(-1),
_message(msg),
_pending_buffers(0)
{
}
packet::packet(packet::frame_type frame):
_frame(frame),
_type(type_undetermined),
_pack_id(-1),
_pending_buffers(0)
{
}
packet::packet():
_type(type_undetermined),
_pack_id(-1),
_pending_buffers(0)
{
}
bool packet::is_binary_message(string const& payload_ptr)
{
return payload_ptr.size()>0 && payload_ptr[0] == frame_message;
}
bool packet::is_text_message(string const& payload_ptr)
{
return payload_ptr.size()>0 && payload_ptr[0] == (frame_message + '0');
}
bool packet::is_message(string const& payload_ptr)
{
return is_binary_message(payload_ptr) || is_text_message(payload_ptr);
}
bool packet::parse_buffer(const string &buf_payload)
{
if (_pending_buffers > 0) {
assert(is_binary_message(buf_payload));//this is ensured by outside.
_buffers.push_back(std::make_shared<string>(buf_payload.data(),buf_payload.size()));
_pending_buffers--;
if (_pending_buffers == 0) {
Document doc;
doc.Parse<0>(_buffers.front()->data());
_buffers.erase(_buffers.begin());
_message = from_json(doc, _buffers);
_buffers.clear();
return false;
}
return true;
}
return false;
}
bool packet::parse(const string& payload_ptr)
{
assert(!is_binary_message(payload_ptr)); //this is ensured by outside
_frame = (packet::frame_type) (payload_ptr[0] - '0');
_message.reset();
_pack_id = -1;
_buffers.clear();
_pending_buffers = 0;
size_t pos = 1;
if (_frame == frame_message) {
_type = (packet::type)(payload_ptr[pos] - '0');
if(_type < type_min || _type > type_max)
{
return false;
}
pos++;
if (_type == type_binary_event || _type == type_binary_ack) {
size_t score_pos = payload_ptr.find('-');
_pending_buffers = static_cast<unsigned>(std::stoul(payload_ptr.substr(pos, score_pos - pos)));
pos = score_pos+1;
}
}
size_t nsp_json_pos = payload_ptr.find_first_of("{[\"/",pos,4);
if(nsp_json_pos==string::npos)//no namespace and no message,the end.
{
_nsp = "/";
return false;
}
size_t json_pos = nsp_json_pos;
if(payload_ptr[nsp_json_pos] == '/')//nsp_json_pos is start of nsp
{
size_t comma_pos = payload_ptr.find_first_of(",");//end of nsp
if(comma_pos == string::npos)//packet end with nsp
{
_nsp = payload_ptr.substr(nsp_json_pos);
return false;
}
else//we have a message, maybe the message have an id.
{
_nsp = payload_ptr.substr(nsp_json_pos,comma_pos - nsp_json_pos);
pos = comma_pos+1;//start of the message
json_pos = payload_ptr.find_first_of("\"[{", pos, 3);//start of the json part of message
if(json_pos == string::npos)
{
//no message,the end
//assume if there's no message, there's no message id.
return false;
}
}
}
else
{
_nsp = "/";
}
if(pos<json_pos)//we've got pack id.
{
_pack_id = std::stoi(payload_ptr.substr(pos,json_pos - pos));
}
if (_frame == frame_message && (_type == type_binary_event || _type == type_binary_ack)) {
//parse later when all buffers are arrived.
_buffers.push_back(make_shared<string>(payload_ptr.data() + json_pos, payload_ptr.length() - json_pos));
return true;
}
else
{
Document doc;
doc.Parse<0>(payload_ptr.data()+json_pos);
_message = from_json(doc, vector<shared_ptr<const string> >());
return false;
}
}
bool packet::accept(string& payload_ptr, vector<shared_ptr<const string> >&buffers)
{
char frame_char = _frame+'0';
payload_ptr.append(&frame_char,1);
if (_frame!=frame_message) {
return false;
}
bool hasMessage = false;
Document doc;
if (_message) {
accept_message(*_message, doc, doc, buffers);
hasMessage = true;
}
bool hasBinary = buffers.size()>0;
_type = _type&(~type_undetermined);
if(_type == type_event)
{
_type = hasBinary?type_binary_event:type_event;
}
else if(_type == type_ack)
{
_type = hasBinary? type_binary_ack : type_ack;
}
ostringstream ss;
ss.precision(8);
ss<<_type;
if (hasBinary) {
ss<<buffers.size()<<"-";
}
if(_nsp.size()>0 && _nsp!="/")
{
ss<<_nsp;
if (hasMessage || _pack_id>=0) {
ss<<",";
}
}
if(_pack_id>=0)
{
ss<<_pack_id;
}
payload_ptr.append(ss.str());
if (hasMessage)
{
StringBuffer buffer;
Writer<StringBuffer> writer(buffer);
doc.Accept(writer);
payload_ptr.append(buffer.GetString(),buffer.GetSize());
}
return hasBinary;
}
packet::frame_type packet::get_frame() const
{
return _frame;
}
packet::type packet::get_type() const
{
assert((_type & type_undetermined) == 0);
return (type)_type;
}
string const& packet::get_nsp() const
{
return _nsp;
}
message::ptr const& packet::get_message() const
{
return _message;
}
unsigned packet::get_pack_id() const
{
return _pack_id;
}
void packet_manager::set_decode_callback(function<void (packet const&)> const& decode_callback)
{
m_decode_callback = decode_callback;
}
void packet_manager::set_encode_callback(function<void (bool,shared_ptr<const string> const&)> const& encode_callback)
{
m_encode_callback = encode_callback;
}
void packet_manager::reset()
{
m_partial_packet.reset();
}
void packet_manager::encode(packet& pack,encode_callback_function const& override_encode_callback) const
{
shared_ptr<string> ptr = make_shared<string>();
vector<shared_ptr<const string> > buffers;
const encode_callback_function *cb_ptr = &m_encode_callback;
if(override_encode_callback)
{
cb_ptr = &override_encode_callback;
}
if(pack.accept(*ptr,buffers))
{
if((*cb_ptr))
{
(*cb_ptr)(false,ptr);
}
for(auto it = buffers.begin();it!=buffers.end();++it)
{
if((*cb_ptr))
{
(*cb_ptr)(true,*it);
}
}
}
else
{
if((*cb_ptr))
{
(*cb_ptr)(false,ptr);
}
}
}
void packet_manager::put_payload(string const& payload)
{
unique_ptr<packet> p;
do
{
if(packet::is_text_message(payload))
{
p.reset(new packet());
if(p->parse(payload))
{
m_partial_packet = std::move(p);
}
else
{
break;
}
}
else if(packet::is_binary_message(payload))
{
if(m_partial_packet)
{
if(!m_partial_packet->parse_buffer(payload))
{
p = std::move(m_partial_packet);
break;
}
}
}
else
{
p.reset(new packet());
p->parse(payload);
break;
}
return;
}while(0);
if(m_decode_callback)
{
m_decode_callback(*p);
}
}
}

View File

@@ -0,0 +1,106 @@
//
// sio_packet.h
//
// Created by Melo Yao on 3/19/15.
//
#ifndef SIO_PACKET_H
#define SIO_PACKET_H
#include <sstream>
#include "../sio_message.h"
#include <functional>
namespace sio
{
using namespace std;
class packet
{
public:
enum frame_type
{
frame_open = 0,
frame_close = 1,
frame_ping = 2,
frame_pong = 3,
frame_message = 4,
frame_upgrade = 5,
frame_noop = 6
};
enum type
{
type_min = 0,
type_connect = 0,
type_disconnect = 1,
type_event = 2,
type_ack = 3,
type_error = 4,
type_binary_event = 5,
type_binary_ack = 6,
type_max = 6,
type_undetermined = 0x10 //undetermined mask bit
};
private:
frame_type _frame;
int _type;
string _nsp;
int _pack_id;
message::ptr _message;
unsigned _pending_buffers;
vector<shared_ptr<const string> > _buffers;
public:
packet(string const& nsp,message::ptr const& msg,int pack_id = -1,bool isAck = false);//message type constructor.
packet(frame_type frame);
packet(type type,string const& nsp= string(),message::ptr const& msg = message::ptr());//other message types constructor.
//empty constructor for parse.
packet();
frame_type get_frame() const;
type get_type() const;
bool parse(string const& payload_ptr);//return true if need to parse buffer.
bool parse_buffer(string const& buf_payload);
bool accept(string& payload_ptr, vector<shared_ptr<const string> >&buffers); //return true if has binary buffers.
string const& get_nsp() const;
message::ptr const& get_message() const;
unsigned get_pack_id() const;
static bool is_message(string const& payload_ptr);
static bool is_text_message(string const& payload_ptr);
static bool is_binary_message(string const& payload_ptr);
};
class packet_manager
{
public:
typedef function<void (bool,shared_ptr<const string> const&)> encode_callback_function;
typedef function<void (packet const&)> decode_callback_function;
void set_decode_callback(decode_callback_function const& decode_callback);
void set_encode_callback(encode_callback_function const& encode_callback);
void encode(packet& pack,encode_callback_function const& override_encode_callback = encode_callback_function()) const;
void put_payload(string const& payload);
void reset();
private:
decode_callback_function m_decode_callback;
encode_callback_function m_encode_callback;
std::unique_ptr<packet> m_partial_packet;
};
}
#endif

View File

@@ -0,0 +1,165 @@
//
// sio_client.h
//
// Created by Melo Yao on 3/25/15.
//
#include "sio_client.h"
#include "internal/sio_client_impl.h"
using namespace websocketpp;
using std::stringstream;
namespace sio
{
client::client() : m_impl(new client_impl({})) {}
client::client(client_options const& options):
m_impl(new client_impl(options))
{
}
client::~client()
{
delete m_impl;
}
void client::set_open_listener(con_listener const& l)
{
m_impl->set_open_listener(l);
}
void client::set_fail_listener(con_listener const& l)
{
m_impl->set_fail_listener(l);
}
void client::set_close_listener(close_listener const& l)
{
m_impl->set_close_listener(l);
}
void client::set_socket_open_listener(socket_listener const& l)
{
m_impl->set_socket_open_listener(l);
}
void client::set_reconnect_listener(reconnect_listener const& l)
{
m_impl->set_reconnect_listener(l);
}
void client::set_reconnecting_listener(con_listener const& l)
{
m_impl->set_reconnecting_listener(l);
}
void client::set_socket_close_listener(socket_listener const& l)
{
m_impl->set_socket_close_listener(l);
}
void client::clear_con_listeners()
{
m_impl->clear_con_listeners();
}
void client::clear_socket_listeners()
{
m_impl->clear_socket_listeners();
}
void client::set_proxy_basic_auth(const std::string& uri, const std::string& username, const std::string& password)
{
m_impl->set_proxy_basic_auth(uri, username, password);
}
void client::connect(const std::string& uri)
{
m_impl->connect(uri, {}, {}, {});
}
void client::connect(const std::string& uri, const message::ptr& auth)
{
m_impl->connect(uri, {}, {}, auth);
}
void client::connect(const std::string& uri, const std::map<string,string>& query)
{
m_impl->connect(uri, query, {}, {});
}
void client::connect(const std::string& uri, const std::map<string,string>& query, const message::ptr& auth)
{
m_impl->connect(uri, query, {}, auth);
}
void client::connect(const std::string& uri, const std::map<std::string,std::string>& query,
const std::map<std::string,std::string>& http_extra_headers)
{
m_impl->connect(uri, query, http_extra_headers, {});
}
void client::connect(const std::string& uri, const std::map<std::string,std::string>& query,
const std::map<std::string,std::string>& http_extra_headers, const message::ptr& auth)
{
m_impl->connect(uri, query, http_extra_headers, auth);
}
socket::ptr const& client::socket(const std::string& nsp)
{
return m_impl->socket(nsp);
}
// Closes the connection
void client::close()
{
m_impl->close();
}
void client::sync_close()
{
m_impl->sync_close();
}
bool client::opened() const
{
return m_impl->opened();
}
std::string const& client::get_sessionid() const
{
return m_impl->get_sessionid();
}
void client::set_reconnect_attempts(int attempts)
{
m_impl->set_reconnect_attempts(attempts);
}
void client::set_reconnect_delay(unsigned millis)
{
m_impl->set_reconnect_delay(millis);
}
void client::set_reconnect_delay_max(unsigned millis)
{
m_impl->set_reconnect_delay_max(millis);
}
void client::set_logs_default()
{
m_impl->set_logs_default();
}
void client::set_logs_quiet()
{
m_impl->set_logs_quiet();
}
void client::set_logs_verbose()
{
m_impl->set_logs_verbose();
}
}

View File

@@ -0,0 +1,116 @@
//
// sio_client.h
//
// Created by Melo Yao on 3/25/15.
//
#ifndef SIO_CLIENT_H
#define SIO_CLIENT_H
#include <string>
#include <functional>
#include "sio_message.h"
#include "sio_socket.h"
namespace asio {
class io_context;
}
namespace sio
{
class client_impl;
struct client_options {
asio::io_context* io_context = nullptr;
};
class client {
public:
enum close_reason
{
close_reason_normal,
close_reason_drop
};
typedef std::function<void(void)> con_listener;
typedef std::function<void(close_reason const& reason)> close_listener;
typedef std::function<void(unsigned, unsigned)> reconnect_listener;
typedef std::function<void(std::string const& nsp)> socket_listener;
client();
client(client_options const& options);
~client();
//set listeners and event bindings.
void set_open_listener(con_listener const& l);
void set_fail_listener(con_listener const& l);
void set_reconnecting_listener(con_listener const& l);
void set_reconnect_listener(reconnect_listener const& l);
void set_close_listener(close_listener const& l);
void set_socket_open_listener(socket_listener const& l);
void set_socket_close_listener(socket_listener const& l);
void clear_con_listeners();
void clear_socket_listeners();
// Client Functions - such as send, etc.
void connect(const std::string& uri);
void connect(const std::string& uri, const message::ptr& auth);
void connect(const std::string& uri, const std::map<std::string,std::string>& query);
void connect(const std::string& uri, const std::map<std::string,std::string>& query, const message::ptr& auth);
void connect(const std::string& uri, const std::map<std::string,std::string>& query,
const std::map<std::string,std::string>& http_extra_headers);
void connect(const std::string& uri, const std::map<std::string,std::string>& query,
const std::map<std::string,std::string>& http_extra_headers, const message::ptr& auth);
void set_reconnect_attempts(int attempts);
void set_reconnect_delay(unsigned millis);
void set_reconnect_delay_max(unsigned millis);
void set_logs_default();
void set_logs_quiet();
void set_logs_verbose();
sio::socket::ptr const& socket(const std::string& nsp = "");
// Closes the connection
void close();
void sync_close();
void set_proxy_basic_auth(const std::string& uri, const std::string& username, const std::string& password);
bool opened() const;
std::string const& get_sessionid() const;
private:
//disable copy constructor and assign operator.
client(client const&){}
void operator=(client const&){}
client_impl* m_impl;
};
}
#endif // __SIO_CLIENT__H__

View File

@@ -0,0 +1,569 @@
//
// sio_message.h
//
// Created by Melo Yao on 3/25/15.
//
#ifndef __SIO_MESSAGE_H__
#define __SIO_MESSAGE_H__
#include <string>
#include <memory>
#include <vector>
#include <map>
#include <cassert>
#include <type_traits>
namespace sio
{
class message
{
public:
enum flag
{
flag_integer,
flag_double,
flag_string,
flag_binary,
flag_array,
flag_object,
flag_boolean,
flag_null
};
virtual ~message(){};
class list;
flag get_flag() const
{
return _flag;
}
typedef std::shared_ptr<message> ptr;
virtual bool get_bool() const
{
assert(false);
return false;
}
virtual int64_t get_int() const
{
assert(false);
return 0;
}
virtual double get_double() const
{
assert(false);
return 0;
}
virtual std::string const& get_string() const
{
assert(false);
static std::string s_empty_string;
s_empty_string.clear();
return s_empty_string;
}
virtual std::shared_ptr<const std::string> const& get_binary() const
{
assert(false);
static std::shared_ptr<const std::string> s_empty_binary;
s_empty_binary = nullptr;
return s_empty_binary;
}
virtual const std::vector<ptr>& get_vector() const
{
assert(false);
static std::vector<ptr> s_empty_vector;
s_empty_vector.clear();
return s_empty_vector;
}
virtual std::vector<ptr>& get_vector()
{
assert(false);
static std::vector<ptr> s_empty_vector;
s_empty_vector.clear();
return s_empty_vector;
}
virtual const std::map<std::string,message::ptr>& get_map() const
{
assert(false);
static std::map<std::string,message::ptr> s_empty_map;
s_empty_map.clear();
return s_empty_map;
}
virtual std::map<std::string,message::ptr>& get_map()
{
assert(false);
static std::map<std::string,message::ptr> s_empty_map;
s_empty_map.clear();
return s_empty_map;
}
private:
flag _flag;
protected:
message(flag f):_flag(f){}
};
class null_message : public message
{
protected:
null_message()
:message(flag_null)
{
}
public:
static message::ptr create()
{
return ptr(new null_message());
}
};
class bool_message : public message
{
bool _v;
protected:
bool_message(bool v)
:message(flag_boolean),_v(v)
{
}
public:
static message::ptr create(bool v)
{
return ptr(new bool_message(v));
}
bool get_bool() const
{
return _v;
}
};
class int_message : public message
{
int64_t _v;
protected:
int_message(int64_t v)
:message(flag_integer),_v(v)
{
}
public:
static message::ptr create(int64_t v)
{
return ptr(new int_message(v));
}
int64_t get_int() const
{
return _v;
}
double get_double() const//add double accessor for integer.
{
return static_cast<double>(_v);
}
};
class double_message : public message
{
double _v;
double_message(double v)
:message(flag_double),_v(v)
{
}
public:
static message::ptr create(double v)
{
return ptr(new double_message(v));
}
double get_double() const
{
return _v;
}
};
class string_message : public message
{
std::string _v;
string_message(std::string const& v)
:message(flag_string),_v(v)
{
}
string_message(std::string&& v)
:message(flag_string),_v(std::move(v))
{
}
public:
static message::ptr create(std::string const& v)
{
return ptr(new string_message(v));
}
static message::ptr create(std::string&& v)
{
return ptr(new string_message(std::move(v)));
}
std::string const& get_string() const
{
return _v;
}
};
class binary_message : public message
{
std::shared_ptr<const std::string> _v;
binary_message(std::shared_ptr<const std::string> const& v)
:message(flag_binary),_v(v)
{
}
public:
static message::ptr create(std::shared_ptr<const std::string> const& v)
{
return ptr(new binary_message(v));
}
std::shared_ptr<const std::string> const& get_binary() const
{
return _v;
}
};
class array_message : public message
{
std::vector<message::ptr> _v;
array_message():message(flag_array)
{
}
public:
static message::ptr create()
{
return ptr(new array_message());
}
void push(message::ptr const& message)
{
if(message)
_v.push_back(message);
}
void push(const std::string& text)
{
_v.push_back(string_message::create(text));
}
void push(std::string&& text)
{
_v.push_back(string_message::create(std::move(text)));
}
void push(std::shared_ptr<std::string> const& binary)
{
if(binary)
_v.push_back(binary_message::create(binary));
}
void push(std::shared_ptr<const std::string> const& binary)
{
if(binary)
_v.push_back(binary_message::create(binary));
}
void insert(size_t pos,message::ptr const& message)
{
_v.insert(_v.begin()+pos, message);
}
void insert(size_t pos,const std::string& text)
{
_v.insert(_v.begin()+pos, string_message::create(text));
}
void insert(size_t pos,std::string&& text)
{
_v.insert(_v.begin()+pos, string_message::create(std::move(text)));
}
void insert(size_t pos,std::shared_ptr<std::string> const& binary)
{
if(binary)
_v.insert(_v.begin()+pos, binary_message::create(binary));
}
void insert(size_t pos,std::shared_ptr<const std::string> const& binary)
{
if(binary)
_v.insert(_v.begin()+pos, binary_message::create(binary));
}
size_t size() const
{
return _v.size();
}
const message::ptr& at(size_t i) const
{
return _v[i];
}
const message::ptr& operator[] (size_t i) const
{
return _v[i];
}
std::vector<ptr>& get_vector()
{
return _v;
}
const std::vector<ptr>& get_vector() const
{
return _v;
}
};
class object_message : public message
{
std::map<std::string,message::ptr> _v;
object_message() : message(flag_object)
{
}
public:
static message::ptr create()
{
return ptr(new object_message());
}
void insert(const std::string & key,message::ptr const& message)
{
_v[key] = message;
}
void insert(const std::string & key,const std::string& text)
{
_v[key] = string_message::create(text);
}
void insert(const std::string & key,std::string&& text)
{
_v[key] = string_message::create(std::move(text));
}
void insert(const std::string & key,std::shared_ptr<std::string> const& binary)
{
if(binary)
_v[key] = binary_message::create(binary);
}
void insert(const std::string & key,std::shared_ptr<const std::string> const& binary)
{
if(binary)
_v[key] = binary_message::create(binary);
}
bool has(const std::string & key)
{
return _v.find(key) != _v.end();
}
const message::ptr& at(const std::string & key) const
{
static std::shared_ptr<message> not_found;
std::map<std::string,message::ptr>::const_iterator it = _v.find(key);
if (it != _v.cend()) return it->second;
return not_found;
}
const message::ptr& operator[] (const std::string & key) const
{
return at(key);
}
bool has(const std::string & key) const
{
return _v.find(key) != _v.end();
}
std::map<std::string,message::ptr>& get_map()
{
return _v;
}
const std::map<std::string,message::ptr>& get_map() const
{
return _v;
}
};
class message::list
{
public:
list()
{
}
list(std::nullptr_t)
{
}
list(message::list&& rhs):
m_vector(std::move(rhs.m_vector))
{
}
list & operator= (const message::list && rhs)
{
m_vector = std::move(rhs.m_vector);
return *this;
}
template <typename T>
list(T&& content,
typename std::enable_if<std::is_same<std::vector<message::ptr>,typename std::remove_reference<T>::type>::value>::type* = 0):
m_vector(std::forward<T>(content))
{
}
list(message::list const& rhs):
m_vector(rhs.m_vector)
{
}
list(message::ptr const& message)
{
if(message)
m_vector.push_back(message);
}
list(const std::string& text)
{
m_vector.push_back(string_message::create(text));
}
list(std::string&& text)
{
m_vector.push_back(string_message::create(std::move(text)));
}
list(std::shared_ptr<std::string> const& binary)
{
if(binary)
m_vector.push_back(binary_message::create(binary));
}
list(std::shared_ptr<const std::string> const& binary)
{
if(binary)
m_vector.push_back(binary_message::create(binary));
}
void push(message::ptr const& message)
{
if(message)
m_vector.push_back(message);
}
void push(const std::string& text)
{
m_vector.push_back(string_message::create(text));
}
void push(std::string&& text)
{
m_vector.push_back(string_message::create(std::move(text)));
}
void push(std::shared_ptr<std::string> const& binary)
{
if(binary)
m_vector.push_back(binary_message::create(binary));
}
void push(std::shared_ptr<const std::string> const& binary)
{
if(binary)
m_vector.push_back(binary_message::create(binary));
}
void insert(size_t pos,message::ptr const& message)
{
m_vector.insert(m_vector.begin()+pos, message);
}
void insert(size_t pos,const std::string& text)
{
m_vector.insert(m_vector.begin()+pos, string_message::create(text));
}
void insert(size_t pos,std::string&& text)
{
m_vector.insert(m_vector.begin()+pos, string_message::create(std::move(text)));
}
void insert(size_t pos,std::shared_ptr<std::string> const& binary)
{
if(binary)
m_vector.insert(m_vector.begin()+pos, binary_message::create(binary));
}
void insert(size_t pos,std::shared_ptr<const std::string> const& binary)
{
if(binary)
m_vector.insert(m_vector.begin()+pos, binary_message::create(binary));
}
size_t size() const
{
return m_vector.size();
}
const message::ptr& at(size_t i) const
{
return m_vector[i];
}
const message::ptr& operator[] (size_t i) const
{
return m_vector[i];
}
message::ptr to_array_message(std::string const& event_name) const
{
message::ptr arr = array_message::create();
arr->get_vector().push_back(string_message::create(event_name));
arr->get_vector().insert(arr->get_vector().end(),m_vector.begin(),m_vector.end());
return arr;
}
message::ptr to_array_message() const
{
message::ptr arr = array_message::create();
arr->get_vector().insert(arr->get_vector().end(),m_vector.begin(),m_vector.end());
return arr;
}
private:
std::vector<message::ptr> m_vector;
};
}
#endif

View File

@@ -0,0 +1,638 @@
#include "sio_socket.h"
#include "internal/sio_packet.h"
#include "internal/sio_client_impl.h"
#include <asio/steady_timer.hpp>
#include <asio/error_code.hpp>
#include <queue>
#include <chrono>
#include <cstdarg>
#include <functional>
#if (DEBUG || _DEBUG) && !defined(SIO_DISABLE_LOGGING)
#define LOG(x) std::cout << x
#else
#define LOG(x)
#endif
#define NULL_GUARD(_x_) \
if(_x_ == NULL) return
namespace sio
{
class event_adapter
{
public:
static void adapt_func(socket::event_listener_aux const& func, event& event)
{
func(event.get_name(),event.get_message(),event.need_ack(),event.get_ack_message_impl());
}
static inline socket::event_listener do_adapt(socket::event_listener_aux const& func)
{
return std::bind(&event_adapter::adapt_func, func,std::placeholders::_1);
}
static inline event create_event(std::string const& nsp,std::string const& name,message::list&& message,bool need_ack)
{
return event(nsp,name,message,need_ack);
}
};
const std::string& event::get_nsp() const
{
return m_nsp;
}
const std::string& event::get_name() const
{
return m_name;
}
const message::ptr& event::get_message() const
{
if(m_messages.size()>0)
return m_messages[0];
else
{
static message::ptr null_ptr;
return null_ptr;
}
}
const message::list& event::get_messages() const
{
return m_messages;
}
bool event::need_ack() const
{
return m_need_ack;
}
void event::put_ack_message(message::list const& ack_message)
{
if(m_need_ack)
m_ack_message = std::move(ack_message);
}
inline
event::event(std::string const& nsp,std::string const& name,message::list&& messages,bool need_ack):
m_nsp(nsp),
m_name(name),
m_messages(std::move(messages)),
m_need_ack(need_ack)
{
}
inline
event::event(std::string const& nsp,std::string const& name,message::list const& messages,bool need_ack):
m_nsp(nsp),
m_name(name),
m_messages(messages),
m_need_ack(need_ack)
{
}
message::list const& event::get_ack_message() const
{
return m_ack_message;
}
inline
message::list& event::get_ack_message_impl()
{
return m_ack_message;
}
class socket::impl
{
public:
impl(client_impl *, std::string const&, message::ptr const&);
~impl();
void on(std::string const& event_name,event_listener_aux const& func);
void on(std::string const& event_name,event_listener const& func);
void on_any(event_listener_aux const& func);
void on_any(event_listener const& func);
void off(std::string const& event_name);
void off_all();
#define SYNTHESIS_SETTER(__TYPE__,__FIELD__) \
void set_##__FIELD__(__TYPE__ const& l) \
{ m_##__FIELD__ = l;}
SYNTHESIS_SETTER(error_listener, error_listener) //socket io errors
#undef SYNTHESIS_SETTER
void on_error(error_listener const& l);
void off_error();
void close();
void emit(std::string const& name, message::list const& msglist, std::function<void (message::list const&)> const& ack);
std::string const& get_namespace() const {return m_nsp;}
protected:
void on_connected();
void on_close();
void on_open();
void on_message_packet(packet const& packet);
void on_disconnect();
private:
// Message Parsing callbacks.
void on_socketio_event(const std::string& nsp, int msgId,const std::string& name, message::list&& message);
void on_socketio_ack(int msgId, message::list const& message);
void on_socketio_error(message::ptr const& err_message);
event_listener get_bind_listener_locked(string const& event);
void ack(int msgId,string const& name,message::list const& ack_message);
void timeout_connection(const asio::error_code &ec);
void send_connect();
void send_packet(packet& p);
static event_listener s_null_event_listener;
static unsigned int s_global_event_id;
sio::client_impl *m_client;
bool m_connected;
std::string m_nsp;
message::ptr m_auth;
std::map<unsigned int, std::function<void (message::list const&)> > m_acks;
std::map<std::string, event_listener> m_event_binding;
event_listener m_event_listener;
error_listener m_error_listener;
std::unique_ptr<asio::steady_timer> m_connection_timer;
std::queue<packet> m_packet_queue;
std::mutex m_event_mutex;
std::mutex m_packet_mutex;
friend class socket;
};
void socket::impl::on(std::string const& event_name,event_listener_aux const& func)
{
this->on(event_name,event_adapter::do_adapt(func));
}
void socket::impl::on(std::string const& event_name,event_listener const& func)
{
std::lock_guard<std::mutex> guard(m_event_mutex);
m_event_binding[event_name] = func;
}
void socket::impl::on_any(event_listener_aux const& func)
{
m_event_listener = event_adapter::do_adapt(func);
}
void socket::impl::on_any(event_listener const& func)
{
m_event_listener = func;
}
void socket::impl::off(std::string const& event_name)
{
std::lock_guard<std::mutex> guard(m_event_mutex);
auto it = m_event_binding.find(event_name);
if(it!=m_event_binding.end())
{
m_event_binding.erase(it);
}
}
void socket::impl::off_all()
{
std::lock_guard<std::mutex> guard(m_event_mutex);
m_event_binding.clear();
}
void socket::impl::on_error(error_listener const& l)
{
m_error_listener = l;
}
void socket::impl::off_error()
{
m_error_listener = nullptr;
}
socket::impl::impl(client_impl *client, std::string const& nsp, message::ptr const& auth):
m_client(client),
m_connected(false),
m_nsp(nsp),
m_auth(auth)
{
NULL_GUARD(client);
if(m_client->opened())
{
send_connect();
}
}
socket::impl::~impl()
{
}
unsigned int socket::impl::s_global_event_id = 1;
void socket::impl::emit(std::string const& name, message::list const& msglist, std::function<void (message::list const&)> const& ack)
{
NULL_GUARD(m_client);
message::ptr msg_ptr = msglist.to_array_message(name);
int pack_id;
if(ack)
{
pack_id = s_global_event_id++;
std::lock_guard<std::mutex> guard(m_event_mutex);
m_acks[pack_id] = ack;
}
else
{
pack_id = -1;
}
packet p(m_nsp, msg_ptr,pack_id);
send_packet(p);
}
void socket::impl::send_connect()
{
NULL_GUARD(m_client);
packet p(packet::type_connect, m_nsp, m_auth);
m_client->send(p);
m_connection_timer.reset(new asio::steady_timer(m_client->get_io_service()));
asio::error_code ec;
m_connection_timer->expires_from_now(std::chrono::milliseconds(20000), ec);
m_connection_timer->async_wait(std::bind(&socket::impl::timeout_connection,this, std::placeholders::_1));
}
void socket::impl::close()
{
NULL_GUARD(m_client);
if(m_connected)
{
packet p(packet::type_disconnect,m_nsp);
send_packet(p);
if(!m_connection_timer)
{
m_connection_timer.reset(new asio::steady_timer(m_client->get_io_service()));
}
asio::error_code ec;
m_connection_timer->expires_from_now(std::chrono::milliseconds(3000), ec);
m_connection_timer->async_wait(std::bind(&socket::impl::on_close, this));
}
}
void socket::impl::on_connected()
{
if(m_connection_timer)
{
m_connection_timer->cancel();
m_connection_timer.reset();
}
if(!m_connected)
{
m_connected = true;
m_client->on_socket_opened(m_nsp);
while (true) {
m_packet_mutex.lock();
if(m_packet_queue.empty())
{
m_packet_mutex.unlock();
return;
}
sio::packet front_pack = std::move(m_packet_queue.front());
m_packet_queue.pop();
m_packet_mutex.unlock();
m_client->send(front_pack);
}
}
}
void socket::impl::on_close()
{
NULL_GUARD(m_client);
sio::client_impl *client = m_client;
m_client = NULL;
if(m_connection_timer)
{
m_connection_timer->cancel();
m_connection_timer.reset();
}
m_connected = false;
{
std::lock_guard<std::mutex> guard(m_packet_mutex);
while (!m_packet_queue.empty()) {
m_packet_queue.pop();
}
}
client->on_socket_closed(m_nsp);
client->remove_socket(m_nsp);
}
void socket::impl::on_open()
{
send_connect();
}
void socket::impl::on_disconnect()
{
NULL_GUARD(m_client);
if(m_connected)
{
m_connected = false;
std::lock_guard<std::mutex> guard(m_packet_mutex);
while (!m_packet_queue.empty()) {
m_packet_queue.pop();
}
}
}
void socket::impl::on_message_packet(packet const& p)
{
NULL_GUARD(m_client);
if(p.get_nsp() == m_nsp)
{
switch (p.get_type())
{
// Connect open
case packet::type_connect:
{
LOG("Received Message type (Connect)"<<std::endl);
this->on_connected();
break;
}
case packet::type_disconnect:
{
LOG("Received Message type (Disconnect)"<<std::endl);
this->on_close();
break;
}
case packet::type_event:
case packet::type_binary_event:
{
LOG("Received Message type (Event)"<<std::endl);
const message::ptr ptr = p.get_message();
if(ptr->get_flag() == message::flag_array)
{
const array_message* array_ptr = static_cast<const array_message*>(ptr.get());
if(array_ptr->get_vector().size() >= 1&&array_ptr->get_vector()[0]->get_flag() == message::flag_string)
{
const string_message* name_ptr = static_cast<const string_message*>(array_ptr->get_vector()[0].get());
message::list mlist;
for(size_t i = 1;i<array_ptr->get_vector().size();++i)
{
mlist.push(array_ptr->get_vector()[i]);
}
this->on_socketio_event(p.get_nsp(), p.get_pack_id(),name_ptr->get_string(), std::move(mlist));
}
}
break;
}
// Ack
case packet::type_ack:
case packet::type_binary_ack:
{
LOG("Received Message type (ACK)"<<std::endl);
const message::ptr ptr = p.get_message();
if(ptr->get_flag() == message::flag_array)
{
message::list msglist(ptr->get_vector());
this->on_socketio_ack(p.get_pack_id(),msglist);
}
else
{
this->on_socketio_ack(p.get_pack_id(),message::list(ptr));
}
break;
}
// Error
case packet::type_error:
{
LOG("Received Message type (ERROR)"<<std::endl);
this->on_socketio_error(p.get_message());
break;
}
default:
break;
}
}
}
void socket::impl::on_socketio_event(const std::string& nsp,int msgId,const std::string& name, message::list && message)
{
bool needAck = msgId >= 0;
event ev = event_adapter::create_event(nsp,name, std::move(message),needAck);
event_listener func = this->get_bind_listener_locked(name);
if(func)func(ev);
if (m_event_listener) m_event_listener(ev);
if(needAck)
{
this->ack(msgId, name, ev.get_ack_message());
}
}
void socket::impl::ack(int msgId, const string &, const message::list &ack_message)
{
packet p(m_nsp, ack_message.to_array_message(),msgId,true);
send_packet(p);
}
void socket::impl::on_socketio_ack(int msgId, message::list const& message)
{
std::function<void (message::list const&)> l;
{
std::lock_guard<std::mutex> guard(m_event_mutex);
auto it = m_acks.find(msgId);
if(it!=m_acks.end())
{
l = it->second;
m_acks.erase(it);
}
}
if(l)l(message);
}
void socket::impl::on_socketio_error(message::ptr const& err_message)
{
if(m_error_listener)m_error_listener(err_message);
}
void socket::impl::timeout_connection(const asio::error_code &ec)
{
NULL_GUARD(m_client);
if(ec)
{
return;
}
m_connection_timer.reset();
LOG("Connection timeout,close socket."<<std::endl);
//Should close socket if no connected message arrive.Otherwise we'll never ask for open again.
this->on_close();
}
void socket::impl::send_packet(sio::packet &p)
{
NULL_GUARD(m_client);
if(m_connected)
{
while (true) {
m_packet_mutex.lock();
if(m_packet_queue.empty())
{
m_packet_mutex.unlock();
break;
}
sio::packet front_pack = std::move(m_packet_queue.front());
m_packet_queue.pop();
m_packet_mutex.unlock();
m_client->send(front_pack);
}
m_client->send(p);
}
else
{
std::lock_guard<std::mutex> guard(m_packet_mutex);
m_packet_queue.push(p);
}
}
socket::event_listener socket::impl::get_bind_listener_locked(const string &event)
{
std::lock_guard<std::mutex> guard(m_event_mutex);
auto it = m_event_binding.find(event);
if(it!=m_event_binding.end())
{
return it->second;
}
return socket::event_listener();
}
socket::socket(client_impl* client,std::string const& nsp,message::ptr const& auth):
m_impl(new impl(client,nsp,auth))
{
}
socket::~socket()
{
delete m_impl;
}
void socket::on(std::string const& event_name,event_listener const& func)
{
m_impl->on(event_name, func);
}
void socket::on(std::string const& event_name,event_listener_aux const& func)
{
m_impl->on(event_name, func);
}
void socket::on_any(event_listener_aux const& func)
{
m_impl->on_any(func);
}
void socket::on_any(event_listener const& func)
{
m_impl->on_any(func);
}
void socket::off(std::string const& event_name)
{
m_impl->off(event_name);
}
void socket::off_all()
{
m_impl->off_all();
}
void socket::close()
{
m_impl->close();
}
void socket::on_error(error_listener const& l)
{
m_impl->on_error(l);
}
void socket::off_error()
{
m_impl->off_error();
}
void socket::emit(std::string const& name, message::list const& msglist, std::function<void (message::list const&)> const& ack)
{
m_impl->emit(name, msglist,ack);
}
std::string const& socket::get_namespace() const
{
return m_impl->get_namespace();
}
void socket::on_connected()
{
m_impl->on_connected();
}
void socket::on_close()
{
m_impl->on_close();
}
void socket::on_open()
{
m_impl->on_open();
}
void socket::on_message_packet(packet const& p)
{
m_impl->on_message_packet(p);
}
void socket::on_disconnect()
{
m_impl->on_disconnect();
}
}

View File

@@ -0,0 +1,105 @@
#ifndef SIO_SOCKET_H
#define SIO_SOCKET_H
#include "sio_message.h"
#include <functional>
namespace sio
{
class event_adapter;
class event
{
public:
const std::string& get_nsp() const;
const std::string& get_name() const;
const message::ptr& get_message() const;
const message::list& get_messages() const;
bool need_ack() const;
void put_ack_message(message::list const& ack_message);
message::list const& get_ack_message() const;
protected:
event(std::string const& nsp,std::string const& name,message::list const& messages,bool need_ack);
event(std::string const& nsp,std::string const& name,message::list&& messages,bool need_ack);
message::list& get_ack_message_impl();
private:
const std::string m_nsp;
const std::string m_name;
const message::list m_messages;
const bool m_need_ack;
message::list m_ack_message;
friend class event_adapter;
};
class client_impl;
class packet;
//The name 'socket' is taken from concept of official socket.io.
class socket
{
public:
typedef std::function<void(const std::string& name,message::ptr const& message,bool need_ack, message::list& ack_message)> event_listener_aux;
typedef std::function<void(event& event)> event_listener;
typedef std::function<void(message::ptr const& message)> error_listener;
typedef std::shared_ptr<socket> ptr;
~socket();
void on(std::string const& event_name,event_listener const& func);
void on(std::string const& event_name,event_listener_aux const& func);
void off(std::string const& event_name);
void on_any(event_listener const& func);
void on_any(event_listener_aux const& func);
void off_all();
void close();
void on_error(error_listener const& l);
void off_error();
void emit(std::string const& name, message::list const& msglist = nullptr, std::function<void (message::list const&)> const& ack = nullptr);
std::string const& get_namespace() const;
protected:
socket(client_impl*,std::string const&,message::ptr const&);
void on_connected();
void on_close();
void on_open();
void on_disconnect();
void on_message_packet(packet const& p);
friend class client_impl;
private:
//disable copy constructor and assign operator.
socket(socket const&){}
void operator=(socket const&){}
class impl;
impl *m_impl;
};
}
#endif // SIO_SOCKET_H