/*******************************************************************************
* libretroshare/src/gxs: rsgxsdataaccess.cc *
* *
* libretroshare: retroshare core library *
* *
* Copyright 2012-2013 by Christopher Evi-Parker, Robert Fernie *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU Lesser General Public License as *
* published by the Free Software Foundation, either version 3 of the *
* License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU Lesser General Public License for more details. *
* *
* You should have received a copy of the GNU Lesser General Public License *
* along with this program. If not, see . *
* *
*******************************************************************************/
#include "util/rstime.h"
#include "rsgxsutil.h"
#include "rsgxsdataaccess.h"
#include "retroshare/rsgxsflags.h"
/***********
* #define DATA_DEBUG 1
**********/
// Debug system to allow to print only for some services (group, Peer, etc)
#if defined(DATA_DEBUG)
static const uint32_t service_to_print = RS_SERVICE_GXS_TYPE_FORUMS;// use this to allow to this service id only, or 0 for all services
// warning. Numbers should be SERVICE IDS (see serialiser/rsserviceids.h. E.g. 0x0215 for forums)
class nullstream: public std::ostream {};
static std::string nice_time_stamp(rstime_t now,rstime_t TS)
{
if(TS == 0)
return "Never" ;
else
{
std::ostringstream s;
s << now - TS << " secs ago" ;
return s.str() ;
}
}
static std::ostream& gxsdatadebug(uint32_t service_type)
{
static nullstream null ;
if (service_to_print==0 || service_type == 0 || (service_type == service_to_print))
return std::cerr << time(NULL) << ":GXSDATASERVICE: " ;
else
return null ;
}
#define GXSDATADEBUG gxsdatadebug(mDataStore->serviceType())
static const std::vector tokenStatusString( {
std::string("FAILED"),
std::string("PENDING"),
std::string("PARTIAL"),
std::string("COMPLETE"),
std::string("DONE"),
std::string("CANCELLED"),
});
#endif
bool operator<(const std::pair& p1,const std::pair& p2)
{
return p1.second->Options.mPriority <= p2.second->Options.mPriority ; // <= so that new elements with same priority are inserted before
}
RsGxsDataAccess::RsGxsDataAccess(RsGeneralDataService* ds) :
mDataStore(ds), mDataMutex("RsGxsDataAccess"), mNextToken(0) {}
RsGxsDataAccess::~RsGxsDataAccess()
{
for(auto& it:mRequestQueue)
delete it.second;
}
bool RsGxsDataAccess::requestGroupInfo( uint32_t &token, uint32_t ansType, const RsTokReqOptions &opts, const std::list &groupIds )
{
if(groupIds.empty())
{
std::cerr << __PRETTY_FUNCTION__ << " (WW) Group Id list is empty!" << std::endl;
return false;
}
GxsRequest* req = nullptr;
uint32_t reqType = opts.mReqType;
if(reqType & GXS_REQUEST_TYPE_GROUP_META)
{
GroupMetaReq* gmr = new GroupMetaReq();
gmr->mGroupIds = groupIds;
req = gmr;
}
else if(reqType & GXS_REQUEST_TYPE_GROUP_DATA)
{
GroupDataReq* gdr = new GroupDataReq();
gdr->mGroupIds = groupIds;
req = gdr;
}
else if(reqType & GXS_REQUEST_TYPE_GROUP_IDS)
{
GroupIdReq* gir = new GroupIdReq();
gir->mGroupIds = groupIds;
req = gir;
}
else if(reqType & GXS_REQUEST_TYPE_GROUP_SERIALIZED_DATA)
{
GroupSerializedDataReq* gir = new GroupSerializedDataReq();
gir->mGroupIds = groupIds;
req = gir;
}
if(!req)
{
RsErr() << __PRETTY_FUNCTION__ << " request type not recognised, " << "reqType: " << reqType << std::endl;
return false;
}
generateToken(token);
#ifdef DATA_DEBUG
GXSDATADEBUG << "RsGxsDataAccess::requestGroupInfo() gets token: " << token << std::endl;
#endif
setReq(req, token, ansType, opts);
storeRequest(req);
return true;
}
bool RsGxsDataAccess::requestGroupInfo(uint32_t &token, uint32_t ansType, const RsTokReqOptions &opts)
{
GxsRequest* req = nullptr;
uint32_t reqType = opts.mReqType;
if(reqType & GXS_REQUEST_TYPE_GROUP_META)
req = new GroupMetaReq();
else if(reqType & GXS_REQUEST_TYPE_GROUP_DATA)
req = new GroupDataReq();
else if(reqType & GXS_REQUEST_TYPE_GROUP_IDS)
req = new GroupIdReq();
else if(reqType & GXS_REQUEST_TYPE_GROUP_SERIALIZED_DATA)
req = new GroupSerializedDataReq();
else
{
RsErr() << "RsGxsDataAccess::requestGroupInfo() request type not recognised, type " << reqType << std::endl;
return false;
}
generateToken(token);
#ifdef DATA_DEBUG
GXSDATADEBUG << "RsGxsDataAccess::requestGroupInfo() gets Token: " << token << std::endl;
#endif
setReq(req, token, ansType, opts);
storeRequest(req);
return true;
}
void RsGxsDataAccess::generateToken(uint32_t &token)
{
RS_STACK_MUTEX(mDataMutex);
token = mNextToken++;
}
bool RsGxsDataAccess::requestMsgInfo(uint32_t &token, uint32_t ansType, const RsTokReqOptions &opts, const GxsMsgReq &msgIds)
{
GxsRequest* req = nullptr;
uint32_t reqType = opts.mReqType;
// remove all empty grpId entries
GxsMsgReq::const_iterator mit = msgIds.begin();
std::vector toRemove;
for(; mit != msgIds.end(); ++mit)
{
if(mit->second.empty())
toRemove.push_back(mit->first);
}
std::vector::const_iterator vit = toRemove.begin();
GxsMsgReq filteredMsgIds = msgIds;
for(; vit != toRemove.end(); ++vit)
filteredMsgIds.erase(*vit);
if(reqType & GXS_REQUEST_TYPE_MSG_META)
{
MsgMetaReq* mmr = new MsgMetaReq();
mmr->mMsgIds = filteredMsgIds;
req = mmr;
}else if(reqType & GXS_REQUEST_TYPE_MSG_DATA)
{
MsgDataReq* mdr = new MsgDataReq();
mdr->mMsgIds = filteredMsgIds;
req = mdr;
}else if(reqType & GXS_REQUEST_TYPE_MSG_IDS)
{
MsgIdReq* mir = new MsgIdReq();
mir->mMsgIds = filteredMsgIds;
req = mir;
}
if(req == nullptr)
{
RsErr() << "RsGxsDataAccess::requestMsgInfo() request type not recognised, type " << reqType << std::endl;
return false;
}else
{
generateToken(token);
#ifdef DATA_DEBUG
GXSDATADEBUG << "RsGxsDataAccess::requestMsgInfo() gets Token: " << token << std::endl;
#endif
}
setReq(req, token, ansType, opts);
storeRequest(req);
return true;
}
bool RsGxsDataAccess::requestMsgInfo(uint32_t &token, uint32_t ansType, const RsTokReqOptions &opts, const std::list& grpIds)
{
GxsRequest* req = nullptr;
uint32_t reqType = opts.mReqType;
std::list::const_iterator lit = grpIds.begin();
if(reqType & GXS_REQUEST_TYPE_MSG_META)
{
MsgMetaReq* mmr = new MsgMetaReq();
for(; lit != grpIds.end(); ++lit)
mmr->mMsgIds[*lit] = std::set();
req = mmr;
}
else if(reqType & GXS_REQUEST_TYPE_MSG_DATA)
{
MsgDataReq* mdr = new MsgDataReq();
for(; lit != grpIds.end(); ++lit)
mdr->mMsgIds[*lit] = std::set();
req = mdr;
}
else if(reqType & GXS_REQUEST_TYPE_MSG_IDS)
{
MsgIdReq* mir = new MsgIdReq();
for(; lit != grpIds.end(); ++lit)
mir->mMsgIds[*lit] = std::set();
req = mir;
}
if(req == nullptr)
{
RsErr() << __PRETTY_FUNCTION__ << " request type not recognised, type " << reqType << std::endl;
return false;
}else
{
generateToken(token);
#ifdef DATA_DEBUG
GXSDATADEBUG << __PRETTY_FUNCTION__ << " gets Token: " << token << std::endl;
#endif
}
setReq(req, token, ansType, opts);
storeRequest(req);
return true;
}
void RsGxsDataAccess::requestServiceStatistic(uint32_t& token,const RsTokReqOptions& opts)
{
ServiceStatisticRequest* req = new ServiceStatisticRequest();
generateToken(token);
if(opts.mReqType != GXS_REQUEST_TYPE_SERVICE_STATS)
{
RsErr() << "Expected opts.mReqType to be GXS_REQUEST_TYPE_SERVICE_STATS requestServiceStatistic()" << std::endl;
return;
}
setReq(req, token, 0, opts);
storeRequest(req);
}
void RsGxsDataAccess::requestGroupStatistic(uint32_t& token, const RsGxsGroupId& grpId,const RsTokReqOptions& opts)
{
GroupStatisticRequest* req = new GroupStatisticRequest();
req->mGrpId = grpId;
if(opts.mReqType != GXS_REQUEST_TYPE_GROUP_STATS)
{
RsErr() << "Expected opts.mReqType to be GXS_REQUEST_TYPE_SERVICE_STATS requestServiceStatistic()" << std::endl;
return;
}
generateToken(token);
setReq(req, token,0, opts);
storeRequest(req);
}
bool RsGxsDataAccess::requestMsgRelatedInfo(uint32_t &token, uint32_t ansType, const RsTokReqOptions &opts, const std::vector &msgIds)
{
MsgRelatedInfoReq* req = new MsgRelatedInfoReq();
req->mMsgIds = msgIds;
generateToken(token);
setReq(req, token, ansType, opts);
storeRequest(req);
return true;
}
void RsGxsDataAccess::setReq(GxsRequest* req, uint32_t token, uint32_t ansType, const RsTokReqOptions& opts) const
{
req->token = token;
req->clientAnswerType = ansType;
req->Options = opts;
return;
}
void RsGxsDataAccess::storeRequest(GxsRequest* req)
{
RS_STACK_MUTEX(mDataMutex);
req->status = PENDING;
req->reqTime = time(nullptr);
mRequestQueue.insert(std::make_pair(req->token,req));
mPublicToken[req->token] = PENDING;
#ifdef DATA_DEBUG
GXSDATADEBUG << "Stored request token=" << req->token << " priority = " << static_cast(req->Options.mPriority) << " Current request Queue is:" ;
for(auto it(mRequestQueue.begin());it!=mRequestQueue.end();++it)
GXSDATADEBUG << it->first << " (p=" << static_cast(req->Options.mPriority) << ") ";
GXSDATADEBUG << std::endl;
GXSDATADEBUG << "PublicToken size: " << mPublicToken.size() << " Completed requests waiting for client: " << mCompletedRequests.size() << std::endl;
#endif
}
RsTokenService::GxsRequestStatus RsGxsDataAccess::requestStatus(uint32_t token)
{
RsTokenService::GxsRequestStatus status;
uint32_t reqtype;
uint32_t anstype;
rstime_t ts;
if (!checkRequestStatus(token, status, reqtype, anstype, ts))
return RsTokenService::FAILED;
return status;
}
bool RsGxsDataAccess::cancelRequest(const uint32_t& token)
{
RsStackMutex stack(mDataMutex); /****** LOCKED *****/
GxsRequest* req = locked_retrieveCompletedRequest(token);
if (!req)
{
return false;
}
req->status = CANCELLED;
return true;
}
bool RsGxsDataAccess::clearRequest(const uint32_t& token)
{
RS_STACK_MUTEX(mDataMutex);
return locked_clearRequest(token);
}
bool RsGxsDataAccess::locked_clearRequest(const uint32_t& token)
{
auto it = mCompletedRequests.find(token);
if(it == mCompletedRequests.end())
return false;
delete it->second;
mCompletedRequests.erase(it);
auto it2 = mPublicToken.find(token);
if(it2 != mPublicToken.end())
mPublicToken.erase(it2);
#ifdef DATA_DEBUG
GXSDATADEBUG << "Service " << std::hex << mDataStore->serviceType() << std::dec << ": Removing public token " << token << ". Completed tokens: " << mCompletedRequests.size() << " Size of mPublicToken: " << mPublicToken.size() << std::endl;
#endif
return true;
}
bool RsGxsDataAccess::getGroupSummary(const uint32_t& token, std::list >& groupInfo)
{
RS_STACK_MUTEX(mDataMutex);
GxsRequest* req = locked_retrieveCompletedRequest(token);
if(req == nullptr)
{
RsErr() << __PRETTY_FUNCTION__ << " Unable to retrieve group summary" << std::endl;
return false;
}
GroupMetaReq* gmreq = dynamic_cast(req);
if(gmreq)
{
groupInfo = gmreq->mGroupMetaData;
gmreq->mGroupMetaData.clear();
}
else
{
RsErr() << __PRETTY_FUNCTION__ << " Req found, failed cast" << std::endl;
return false;
}
locked_clearRequest(token);
return true;
}
bool RsGxsDataAccess::getGroupData(const uint32_t& token, std::list& grpData)
{
RS_STACK_MUTEX(mDataMutex);
GxsRequest* req = locked_retrieveCompletedRequest(token);
if(req == nullptr)
{
RsErr() << "RsGxsDataAccess::getGroupData() Unable to retrieve group data" << std::endl;
return false;
}
GroupDataReq* gmreq = dynamic_cast(req);
GroupSerializedDataReq* gsreq = dynamic_cast(req);
if(gsreq)
{
grpData.swap(gsreq->mGroupData);
gsreq->mGroupData.clear();
}
else if(gmreq)
{
grpData.swap(gmreq->mGroupData);
gmreq->mGroupData.clear();
}
else
{
RsErr() << __PRETTY_FUNCTION__ << " Req found, failed cast req->reqType: " << req->reqType << std::endl;
return false;
}
locked_clearRequest(token);
return true;
}
bool RsGxsDataAccess::getMsgData(const uint32_t& token, NxsMsgDataResult& msgData)
{
RsStackMutex stack(mDataMutex);
GxsRequest* req = locked_retrieveCompletedRequest(token);
if(req == nullptr)
{
RsErr() << __PRETTY_FUNCTION__ << " Unable to retrieve group data" << std::endl;
return false;
}
MsgDataReq* mdreq = dynamic_cast(req);
if(mdreq)
{
msgData.swap(mdreq->mMsgData);
mdreq->mMsgData.clear();
}
else
{
RsErr() << "RsGxsDataAccess::getMsgData() Req found, failed caste" << std::endl;
return false;
}
locked_clearRequest(token);
return true;
}
bool RsGxsDataAccess::getMsgRelatedData(const uint32_t& token, NxsMsgRelatedDataResult& msgData)
{
RsStackMutex stack(mDataMutex);
GxsRequest* req = locked_retrieveCompletedRequest(token);
if(req == nullptr)
{
RsErr() << __PRETTY_FUNCTION__ << " Unable to retrieve group data" << std::endl;
return false;
}
MsgRelatedInfoReq* mrireq = dynamic_cast(req);
if(req->Options.mReqType != GXS_REQUEST_TYPE_MSG_RELATED_DATA)
return false;
if(mrireq)
{
msgData.swap(mrireq->mMsgDataResult);
mrireq->mMsgDataResult.clear();
}
else
{
RsErr() << __PRETTY_FUNCTION__ << " Req found, failed caste" << std::endl;
return false;
}
locked_clearRequest(token);
return true;
}
bool RsGxsDataAccess::getMsgSummary(const uint32_t& token, GxsMsgMetaResult& msgInfo)
{
RsStackMutex stack(mDataMutex);
GxsRequest* req = locked_retrieveCompletedRequest(token);
if(req == nullptr)
{
RsErr() << __PRETTY_FUNCTION__ << " Unable to retrieve group data" << std::endl;
return false;
}
MsgMetaReq* mmreq = dynamic_cast(req);
if(mmreq)
{
msgInfo.swap(mmreq->mMsgMetaData);
mmreq->mMsgMetaData.clear();
}
else
{
RsErr() << " Req found, failed caste" << std::endl;
return false;
}
locked_clearRequest(token);
return true;
}
bool RsGxsDataAccess::getMsgRelatedSummary(const uint32_t &token, MsgRelatedMetaResult &msgMeta)
{
RsStackMutex stack(mDataMutex);
GxsRequest* req = locked_retrieveCompletedRequest(token);
if(req == nullptr)
{
RsErr() << __PRETTY_FUNCTION__ << " Unable to retrieve message summary" << std::endl;
return false;
}
if(req->Options.mReqType != GXS_REQUEST_TYPE_MSG_RELATED_META)
return false;
MsgRelatedInfoReq* mrireq = dynamic_cast(req);
if(mrireq)
{
msgMeta.swap(mrireq->mMsgMetaResult);
mrireq->mMsgMetaResult.clear();
}
else
{
RsErr() << __PRETTY_FUNCTION__ << " Req found, failed caste" << std::endl;
return false;
}
locked_clearRequest(token);
return true;
}
bool RsGxsDataAccess::getMsgRelatedList(const uint32_t &token, MsgRelatedIdResult &msgIds)
{
RsStackMutex stack(mDataMutex);
GxsRequest* req = locked_retrieveCompletedRequest(token);
if(req == nullptr)
{
RsErr() << __PRETTY_FUNCTION__ << " Unable to retrieve message data" << std::endl;
return false;
}
if(req->Options.mReqType != GXS_REQUEST_TYPE_MSG_RELATED_IDS)
return false;
MsgRelatedInfoReq* mrireq = dynamic_cast(req);
if(mrireq)
{
msgIds.swap(mrireq->mMsgIdResult);
mrireq->mMsgIdResult.clear();
}
else
{
RsErr() << __PRETTY_FUNCTION__ << " Req found, failed caste" << std::endl;
return false;
}
locked_clearRequest(token);
return true;
}
bool RsGxsDataAccess::getMsgIdList(const uint32_t& token, GxsMsgIdResult& msgIds)
{
RsStackMutex stack(mDataMutex);
GxsRequest* req = locked_retrieveCompletedRequest(token);
if(req == nullptr)
{
RsErr() << __PRETTY_FUNCTION__ << " Unable to retrieve msg Ids" << std::endl;
return false;
}
MsgIdReq* mireq = dynamic_cast(req);
if(mireq)
{
msgIds.swap(mireq->mMsgIdResult);
mireq->mMsgIdResult.clear();
}
else
{
RsErr() << __PRETTY_FUNCTION__ << " Req found, failed cast" << std::endl;
return false;
}
locked_clearRequest(token);
return true;
}
bool RsGxsDataAccess::getGroupList(const uint32_t& token, std::list& groupIds)
{
RsStackMutex stack(mDataMutex);
GxsRequest* req = locked_retrieveCompletedRequest(token);
if(req == nullptr)
{
RsErr() << __PRETTY_FUNCTION__ << " Unable to retrieve group Ids, Request does not exist" << std::endl;
return false;
}
GroupIdReq* gireq = dynamic_cast(req);
if(gireq)
{
groupIds.swap(gireq->mGroupIdResult);
gireq->mGroupIdResult.clear();
}
else
{
RsErr() << __PRETTY_FUNCTION__ << " Req found, failed caste" << std::endl;
return false;
}
locked_clearRequest(token);
return true;
}
bool RsGxsDataAccess::getGroupStatistic(const uint32_t &token, GxsGroupStatistic &grpStatistic)
{
RsStackMutex stack(mDataMutex);
GxsRequest* req = locked_retrieveCompletedRequest(token);
if(req == nullptr)
{
RsErr() << "RsGxsDataAccess::getGroupStatistic() Unable to retrieve grp stats" << std::endl;
return false;
}
GroupStatisticRequest* gsreq = dynamic_cast(req);
if(gsreq)
grpStatistic = gsreq->mGroupStatistic;
else
{
RsErr() << __PRETTY_FUNCTION__ << " Req found, failed caste" << std::endl;
return false;
}
locked_clearRequest(token);
return true;
}
bool RsGxsDataAccess::getServiceStatistic(const uint32_t &token, GxsServiceStatistic &servStatistic)
{
RsStackMutex stack(mDataMutex);
GxsRequest* req = locked_retrieveCompletedRequest(token);
if(req == nullptr)
{
RsErr() << __PRETTY_FUNCTION__ << " Unable to retrieve service stats" << std::endl;
return false;
}
ServiceStatisticRequest* ssreq = dynamic_cast(req);
if(ssreq)
servStatistic = ssreq->mServiceStatistic;
else
{
RsErr() << __PRETTY_FUNCTION__ << " Req found, failed caste" << std::endl;
return false;
}
locked_clearRequest(token);
return true;
}
GxsRequest* RsGxsDataAccess::locked_retrieveCompletedRequest(const uint32_t& token)
{
auto it = mCompletedRequests.find(token) ;
if(it == mCompletedRequests.end())
return nullptr;
return it->second;
}
#define MAX_REQUEST_AGE 120 // 2 minutes
void RsGxsDataAccess::processRequests()
{
// process requests
while (!mRequestQueue.empty())
{
#ifdef DATA_DEBUG
dumpTokenQueues();
#endif
// Extract the first elements from the request queue. cleanup all other elements marked at terminated.
GxsRequest* req = nullptr;
{
RsStackMutex stack(mDataMutex); /******* LOCKED *******/
rstime_t now = time(nullptr); // this is ok while in the loop below
while(!mRequestQueue.empty() && req == nullptr)
{
if(now > mRequestQueue.begin()->second->reqTime + MAX_REQUEST_AGE)
{
mPublicToken[mRequestQueue.begin()->second->token] = CANCELLED;
delete mRequestQueue.begin()->second;
mRequestQueue.erase(mRequestQueue.begin());
continue;
}
switch( mRequestQueue.begin()->second->status )
{
case PARTIAL:
RsErr() << "Found partial request in mRequestQueue. This is a bug." << std::endl; // fallthrough
case COMPLETE:
case DONE:
case FAILED:
case CANCELLED:
#ifdef DATA_DEBUG
GXSDATADEBUG << " Service " << std::hex << mDataStore->serviceType() << std::dec << ": request " << mRequestQueue.begin()->second->token << ": status = " << mRequestQueue.begin()->second->status << ": removing from the RequestQueue" << std::endl;
#endif
delete mRequestQueue.begin()->second;
mRequestQueue.erase(mRequestQueue.begin());
continue;
break;
case PENDING:
req = mRequestQueue.begin()->second;
req->status = PARTIAL;
mRequestQueue.erase(mRequestQueue.begin()); // remove it right away from the waiting queue.
break;
}
}
} // END OF MUTEX.
if (!req)
break;
GroupMetaReq* gmr;
GroupDataReq* gdr;
GroupIdReq* gir;
MsgMetaReq* mmr;
MsgDataReq* mdr;
MsgIdReq* mir;
MsgRelatedInfoReq* mri;
GroupStatisticRequest* gsr;
GroupSerializedDataReq* grr;
ServiceStatisticRequest* ssr;
#ifdef DATA_DEBUG
GXSDATADEBUG << "Service " << std::hex << mDataStore->serviceType() << std::dec << ": Processing request: " << req->token << " Status: " << req->status << " ReqType: " << req->reqType << " Age: " << time(nullptr) - req->reqTime << std::endl;
#endif
/* PROCESS REQUEST! */
bool ok = false;
if((gmr = dynamic_cast(req)) != nullptr)
{
ok = getGroupSummary(gmr);
}
else if((gdr = dynamic_cast(req)) != nullptr)
{
ok = getGroupData(gdr);
}
else if((gir = dynamic_cast(req)) != nullptr)
{
ok = getGroupList(gir);
}
else if((mmr = dynamic_cast(req)) != nullptr)
{
ok = getMsgSummary(mmr);
}
else if((mdr = dynamic_cast(req)) != nullptr)
{
ok = getMsgData(mdr);
}
else if((mir = dynamic_cast(req)) != nullptr)
{
ok = getMsgIdList(mir);
}
else if((mri = dynamic_cast(req)) != nullptr)
{
ok = getMsgRelatedInfo(mri);
}
else if((gsr = dynamic_cast(req)) != nullptr)
{
ok = getGroupStatistic(gsr);
}
else if((ssr = dynamic_cast(req)) != nullptr)
{
ok = getServiceStatistic(ssr);
}
else if((grr = dynamic_cast(req)) != nullptr)
{
ok = getGroupSerializedData(grr);
}
else
RsErr() << __PRETTY_FUNCTION__ << " Failed to process request, token: " << req->token << std::endl;
// We cannot easily remove the request here because the queue may have more elements now and mRequestQueue.begin() is not necessarily the same element.
// but we mark it as COMPLETE/FAILED so that it will be removed in the next loop.
{
RsStackMutex stack(mDataMutex); /******* LOCKED *******/
if(ok)
{
// When the request is complete, we move it to the complete list, so that the caller can easily retrieve the request data
#ifdef DATA_DEBUG
GXSDATADEBUG << " Service " << std::hex << mDataStore->serviceType() << std::dec << ": Request completed successfully. Marking as COMPLETE." << std::endl;
#endif
req->status = COMPLETE ;
mCompletedRequests[req->token] = req;
mPublicToken[req->token] = COMPLETE;
}
else
{
mPublicToken[req->token] = FAILED;
delete req;//req belongs to no one now
#ifdef DATA_DEBUG
GXSDATADEBUG << " Service " << std::hex << mDataStore->serviceType() << std::dec << ": Request failed. Marking as FAILED." << std::endl;
#endif
}
} // END OF MUTEX.
}
}
bool RsGxsDataAccess::getGroupSerializedData(GroupSerializedDataReq* req)
{
std::map grpData;
std::list grpIdsOut;
getGroupList(req->mGroupIds, req->Options, grpIdsOut);
if(grpIdsOut.empty())
return true;
for(std::list::iterator lit = grpIdsOut.begin();lit != grpIdsOut.end();++lit)
grpData[*lit] = nullptr;
bool ok = mDataStore->retrieveNxsGrps(grpData, true);
req->mGroupData.clear();
std::map::iterator mit = grpData.begin();
for(; mit != grpData.end(); ++mit)
req->mGroupData.push_back(mit->second) ;
return ok;
}
bool RsGxsDataAccess::getGroupData(GroupDataReq* req)
{
std::map grpData;
std::list grpIdsOut;
getGroupList(req->mGroupIds, req->Options, grpIdsOut);
if(grpIdsOut.empty())
return true;
std::list::iterator lit = grpIdsOut.begin(),
lit_end = grpIdsOut.end();
for(; lit != lit_end; ++lit)
{
grpData[*lit] = nullptr;
}
bool ok = mDataStore->retrieveNxsGrps(grpData, true);
std::map::iterator mit = grpData.begin();
for(; mit != grpData.end(); ++mit)
req->mGroupData.push_back(mit->second);
return ok;
}
bool RsGxsDataAccess::getGroupSummary(GroupMetaReq* req)
{
RsGxsGrpMetaTemporaryMap grpMeta;
std::list grpIdsOut;
getGroupList(req->mGroupIds, req->Options, grpIdsOut);
if(grpIdsOut.empty())
return true;
for(auto lit = grpIdsOut.begin();lit != grpIdsOut.end(); ++lit)
grpMeta[*lit] = nullptr;
mDataStore->retrieveGxsGrpMetaData(grpMeta);
for(auto mit = grpMeta.begin(); mit != grpMeta.end(); ++mit)
req->mGroupMetaData.push_back(mit->second);
return true;
}
bool RsGxsDataAccess::getGroupList(GroupIdReq* req)
{
return getGroupList(req->mGroupIds, req->Options, req->mGroupIdResult);
}
bool RsGxsDataAccess::getGroupList(const std::list& grpIdsIn, const RsTokReqOptions& opts, std::list& grpIdsOut)
{
RsGxsGrpMetaTemporaryMap grpMeta;
for(auto lit = grpIdsIn.begin(); lit != grpIdsIn.end(); ++lit)
grpMeta[*lit] = nullptr;
mDataStore->retrieveGxsGrpMetaData(grpMeta);
for(auto mit = grpMeta.begin() ; mit != grpMeta.end(); ++mit)
grpIdsOut.push_back(mit->first);
filterGrpList(grpIdsOut, opts, grpMeta);
return true;
}
bool RsGxsDataAccess::getMsgData(MsgDataReq* req)
{
GxsMsgReq msgIdOut;
const RsTokReqOptions& opts(req->Options);
// filter based on options
getMsgIdList(req->mMsgIds, opts, msgIdOut);
// If the list is empty because of filtering do not retrieve from DB
if((opts.mMsgFlagMask || opts.mStatusMask) && msgIdOut.empty())
return true;
mDataStore->retrieveNxsMsgs(msgIdOut, req->mMsgData, true);
return true;
}
bool RsGxsDataAccess::getMsgSummary(MsgMetaReq* req)
{
GxsMsgReq msgIdOut;
const RsTokReqOptions& opts(req->Options);
// filter based on options
getMsgMetaDataList(req->mMsgIds, opts, req->mMsgMetaData);
// // If the list is empty because of filtering do not retrieve from DB
// if((opts.mMsgFlagMask || opts.mStatusMask) && msgIdOut.empty())
// return true;
//
// mDataStore->retrieveGxsMsgMetaData(msgIdOut, req->mMsgMetaData);
return true;
}
bool RsGxsDataAccess::getMsgMetaDataList( const GxsMsgReq& msgIds, const RsTokReqOptions& opts, GxsMsgMetaResult& result )
{
// First get all message metas, then filter out the ones we want to keep.
result.clear();
mDataStore->retrieveGxsMsgMetaData(msgIds, result);
/* CASEs this handles.
* Input is groupList + Flags.
* 1) No Flags => All Messages in those Groups.
*
*/
#ifdef DATA_DEBUG
GXSDATADEBUG << "Service " << std::hex << mDataStore->serviceType() << std::dec << ": RsGxsDataAccess::getMsgList()" << std::endl;
#endif
bool onlyOrigMsgs = false;
bool onlyLatestMsgs = false;
bool onlyThreadHeadMsgs = false;
// Can only choose one of these two.
if (opts.mOptions & RS_TOKREQOPT_MSG_ORIGMSG)
{
#ifdef DATA_DEBUG
GXSDATADEBUG << "Service " << std::hex << mDataStore->serviceType() << std::dec << ": RsGxsDataAccess::getMsgList() MSG_ORIGMSG" << std::endl;
#endif
onlyOrigMsgs = true;
}
else if (opts.mOptions & RS_TOKREQOPT_MSG_LATEST)
{
#ifdef DATA_DEBUG
GXSDATADEBUG << "Service " << std::hex << mDataStore->serviceType() << std::dec << ": RsGxsDataAccess::getMsgList() MSG_LATEST" << std::endl;
#endif
onlyLatestMsgs = true;
}
if (opts.mOptions & RS_TOKREQOPT_MSG_THREAD)
{
#ifdef DATA_DEBUG
GXSDATADEBUG << "Service " << std::hex << mDataStore->serviceType() << std::dec << ": RsGxsDataAccess::getMsgList() MSG_THREAD" << std::endl;
#endif
onlyThreadHeadMsgs = true;
}
GxsMsgMetaResult::iterator meta_it;
for(meta_it = result.begin(); meta_it != result.end(); ++meta_it)
{
//const RsGxsGroupId& grpId = meta_it->first;
//auto& filter( metaFilter[grpId] ); // does the initialization of metaFilter[grpId] and avoids further O(log(n)) calls
auto& metaV = meta_it->second;
if (onlyLatestMsgs) // if we only consider latest messages, we need to first filter out messages with "children"
{
// The strategy is the following: for each msg we only know its direct ancestor. So we build a map to be able to find for a given message
// which messages derive from it.
// Then when this map is fuly build, we follow this map and every message that has no direct follow up will be kept.
// Because msgs are stored in a std::vector we build a map to convert each vector to its position in metaV.
std::vector keep(metaV.size(),true); // this vector will tell wether we keep or not a given Meta
std::map index_in_metaV; // holds the index of each group Id in metaV
for(uint32_t i=0;imMsgId] = i;
// Now loop once over message Metas and see if they have a parent. If yes, then mark the parent to be discarded.
for(uint32_t i=0;imOrigMsgId.isNull() && metaV[i]->mOrigMsgId != metaV[i]->mMsgId) // this one is a follow up
{
auto it = index_in_metaV.find(metaV[i]->mOrigMsgId);
if(it != index_in_metaV.end())
keep[it->second] = false;
else
std::cerr << "Found a msg that has a parent that is not locally known. Not an error anyway." << std::endl;
}
// Finally we just discard the messages for which the keep flag has been set to false.
for(uint32_t i=0;i TS.
std::map > origMsgTs;
for(uint32_t i=0;imParentId.isNull()))
{
delete msgMeta;
metaV[i] = nullptr;
continue;
}
auto oit = origMsgTs.find(msgMeta->mOrigMsgId);
bool addMsg = false;
if (oit != origMsgTs.end())
{
if(oit->second.second > msgMeta->mPublishTs)
{
#ifdef DATA_DEBUG
std::cerr << "RsGxsDataAccess::getMsgList() Found New OrigMsgId: ";
std::cerr << msgMeta->mOrigMsgId;
std::cerr << " MsgId: " << msgMeta->mMsgId;
std::cerr << " TS: " << msgMeta->mPublishTs;
std::cerr << std::endl;
#endif
origMsgTs[msgMeta->mOrigMsgId] = std::make_pair(msgMeta->mMsgId, msgMeta->mPublishTs); // add as latest. (overwriting if necessary)
}
}
else
{
delete msgMeta;
metaV[i] = nullptr;
continue;
}
}
// Add the discovered Latest Msgs.
for(auto oit = origMsgTs.begin(); oit != origMsgTs.end(); ++oit)
{
msgIdsOut[grpId].insert(oit->second.first);
}
#endif
for(uint32_t i=0;imParentId.isNull())
{
//delete msgMeta;
metaV[i] = nullptr;
continue;
}
if (onlyOrigMsgs && !msgMeta->mOrigMsgId.isNull() && msgMeta->mMsgId != msgMeta->mOrigMsgId)
{
//delete msgMeta;
metaV[i] = nullptr;
continue;
}
}
}
// collapse results while keeping the order, eliminating empty slots
for(auto it(result.begin());it!=result.end();++it)
{
uint32_t j=0; // j is the end of the cleaned-up tab, at the first available place
for(uint32_t i=0;isecond.size();++i) // i is the index in the tab possibly containing nullptr's
if(it->second[i] != nullptr)
{
it->second[j] = it->second[i]; // move the pointer to the first available place
++j;
}
it->second.resize(j); // normally all pointers have been moved forward so there is nothing to delete here.
}
// filterMsgIdList(msgIdsOut, opts, metaFilter); // this call is absurd: we already have in metaFilter the content we want.
//metaFilter.clear();
// delete meta data
//cleanseMsgMetaMap(result);
return true;
}
bool RsGxsDataAccess::getMsgIdList( const GxsMsgReq& msgIds, const RsTokReqOptions& opts, GxsMsgReq& msgIdsOut )
{
GxsMsgMetaResult result;
getMsgMetaDataList( msgIds, opts, result );
// extract MessageIds
msgIdsOut.clear();
for(auto it(result.begin());it!=result.end();++it)
{
auto& id_set(msgIdsOut[it->first]);
for(uint32_t i=0;isecond.size();++i)
id_set.insert(it->second[i]->mMsgId);
}
// delete meta data
//cleanseMsgMetaMap(result);
return true;
}
bool RsGxsDataAccess::getMsgRelatedInfo(MsgRelatedInfoReq *req)
{
/* CASEs this handles.
* Input is msgList + Flags.
* 1) No Flags => return nothing
*/
#ifdef DATA_DEBUG
GXSDATADEBUG << "RsGxsDataAccess::getMsgRelatedList()" << std::endl;
#endif
const RsTokReqOptions& opts = req->Options;
bool onlyLatestMsgs = false;
bool onlyAllVersions = false;
bool onlyChildMsgs = false;
bool onlyThreadMsgs = false;
if (opts.mOptions & RS_TOKREQOPT_MSG_LATEST)
{
#ifdef DATA_DEBUG
GXSDATADEBUG << "Service " << std::hex << mDataStore->serviceType() << std::dec << ": RsGxsDataAccess::getMsgRelatedList() MSG_LATEST" << std::endl;
#endif
onlyLatestMsgs = true;
}
else if (opts.mOptions & RS_TOKREQOPT_MSG_VERSIONS)
{
#ifdef DATA_DEBUG
GXSDATADEBUG << "Service " << std::hex << mDataStore->serviceType() << std::dec << ": RsGxsDataAccess::getMsgRelatedList() MSG_VERSIONS" << std::endl;
#endif
onlyAllVersions = true;
}
if (opts.mOptions & RS_TOKREQOPT_MSG_PARENT)
{
#ifdef DATA_DEBUG
GXSDATADEBUG << "Service " << std::hex << mDataStore->serviceType() << std::dec << ": RsGxsDataAccess::getMsgRelatedList() MSG_PARENTS" << std::endl;
#endif
onlyChildMsgs = true;
}
if (opts.mOptions & RS_TOKREQOPT_MSG_THREAD)
{
#ifdef DATA_DEBUG
GXSDATADEBUG << "Service " << std::hex << mDataStore->serviceType() << std::dec << ": RsGxsDataAccess::getMsgRelatedList() MSG_THREAD" << std::endl;
#endif
onlyThreadMsgs = true;
}
if(onlyAllVersions && onlyChildMsgs)
{
RS_ERR("Incompatible FLAGS (VERSIONS & PARENT)");
return false;
}
if(onlyAllVersions && onlyThreadMsgs)
{
RS_ERR("Incompatible FLAGS (VERSIONS & THREAD)");
return false;
}
if((!onlyLatestMsgs) && onlyChildMsgs)
{
RS_ERR("Incompatible FLAGS (!LATEST & PARENT)");
return false;
}
if((!onlyLatestMsgs) && onlyThreadMsgs)
{
RS_ERR("Incompatible FLAGS (!LATEST & THREAD)");
return false;
}
if(onlyChildMsgs && onlyThreadMsgs)
{
RS_ERR("Incompatible FLAGS (PARENT & THREAD)");
return false;
}
if( (!onlyLatestMsgs) && (!onlyAllVersions) && (!onlyChildMsgs) &&
(!onlyThreadMsgs) )
{
RS_WARN("NO FLAGS -> SIMPLY RETURN nothing");
return true;
}
for(auto vit_msgIds(req->mMsgIds.begin()); vit_msgIds != req->mMsgIds.end(); ++vit_msgIds)
{
MsgMetaFilter filterMap;
const RsGxsGrpMsgIdPair& grpMsgIdPair = *vit_msgIds;
// get meta data for all in group
GxsMsgMetaResult result;
GxsMsgReq msgIds;
msgIds.insert(std::make_pair(grpMsgIdPair.first, std::set()));
mDataStore->retrieveGxsMsgMetaData(msgIds, result);
auto& metaV = result[grpMsgIdPair.first];
// msg id to relate to
const RsGxsMessageId& msgId = grpMsgIdPair.second;
const RsGxsGroupId& grpId = grpMsgIdPair.first;
std::set outMsgIds;
std::shared_ptr origMeta;
for(auto vit_meta = metaV.begin(); vit_meta != metaV.end(); ++vit_meta)
if(msgId == (*vit_meta)->mMsgId)
{
origMeta = *vit_meta;
break;
}
if(!origMeta)
{
RS_ERR("Cannot find meta of msgId: ", msgId, " to relate to");
continue;
}
const RsGxsMessageId& origMsgId = origMeta->mOrigMsgId;
auto& metaMap = filterMap[grpId];
if (onlyLatestMsgs)
{
if (onlyChildMsgs || onlyThreadMsgs)
{
// RUN THROUGH ALL MSGS... in map origId -> TS.
std::map > origMsgTs;
std::map >::iterator oit;
for(auto vit_meta = metaV.begin(); vit_meta != metaV.end(); ++vit_meta)
{
auto meta = *vit_meta;
// skip msgs that aren't children.
if (onlyChildMsgs)
{
if (meta->mParentId != origMsgId)
{
continue;
}
}
else /* onlyThreadMsgs */
{
if (meta->mThreadId != msgId)
{
continue;
}
}
oit = origMsgTs.find(meta->mOrigMsgId);
bool addMsg = false;
if (oit == origMsgTs.end())
{
#ifdef DATA_DEBUG
GXSDATADEBUG << "RsGxsDataAccess::getMsgRelatedList() Found New OrigMsgId: "
<< meta->mOrigMsgId
<< " MsgId: " << meta->mMsgId
<< " TS: " << meta->mPublishTs
<< std::endl;
#endif
addMsg = true;
}
// check timestamps.
else if (oit->second.second < meta->mPublishTs)
{
#ifdef DATA_DEBUG
GXSDATADEBUG << "Service " << std::hex << mDataStore->serviceType() << std::dec << ": RsGxsDataAccess::getMsgRelatedList() Found Later Msg. OrigMsgId: "
<< meta->mOrigMsgId
<< " MsgId: " << meta->mMsgId
<< " TS: " << meta->mPublishTs
<< std::endl;
#endif
addMsg = true;
}
if (addMsg)
{
// add as latest. (overwriting if necessary)
origMsgTs[meta->mOrigMsgId] = std::make_pair(meta->mMsgId, meta->mPublishTs);
metaMap.insert(std::make_pair(meta->mMsgId, meta));
}
}
// Add the discovered Latest Msgs.
for(oit = origMsgTs.begin(); oit != origMsgTs.end(); ++oit)
{
outMsgIds.insert(oit->second.first);
}
}
else
{
/* first guess is potentially better than Orig (can't be worse!) */
rstime_t latestTs = 0;
RsGxsMessageId latestMsgId;
std::shared_ptr latestMeta;
for(auto vit_meta = metaV.begin(); vit_meta != metaV.end(); ++vit_meta)
if ((*vit_meta)->mOrigMsgId == origMsgId)
{
if ((*vit_meta)->mPublishTs > latestTs)
{
latestTs = (*vit_meta)->mPublishTs;
latestMsgId = (*vit_meta)->mMsgId;
latestMeta = (*vit_meta);
}
}
outMsgIds.insert(latestMsgId);
metaMap.insert(std::make_pair(latestMsgId, latestMeta));
}
}
else if (onlyAllVersions)
{
for(auto vit_meta = metaV.begin(); vit_meta != metaV.end(); ++vit_meta)
if ((*vit_meta)->mOrigMsgId == origMsgId)
{
outMsgIds.insert((*vit_meta)->mMsgId);
metaMap.insert(std::make_pair((*vit_meta)->mMsgId, (*vit_meta)));
}
}
GxsMsgIdResult filteredOutMsgIds;
filteredOutMsgIds[grpId] = outMsgIds;
filterMsgIdList(filteredOutMsgIds, opts, filterMap);
if(!filteredOutMsgIds[grpId].empty())
{
if(req->Options.mReqType == GXS_REQUEST_TYPE_MSG_RELATED_IDS)
{
req->mMsgIdResult[grpMsgIdPair] = filteredOutMsgIds[grpId];
}
else if(req->Options.mReqType == GXS_REQUEST_TYPE_MSG_RELATED_META)
{
GxsMsgMetaResult metaResult;
mDataStore->retrieveGxsMsgMetaData(filteredOutMsgIds, metaResult);
req->mMsgMetaResult[grpMsgIdPair] = metaResult[grpId];
}
else if(req->Options.mReqType == GXS_REQUEST_TYPE_MSG_RELATED_DATA)
{
GxsMsgResult msgResult;
mDataStore->retrieveNxsMsgs(filteredOutMsgIds, msgResult, true);
req->mMsgDataResult[grpMsgIdPair] = msgResult[grpId];
}
}
outMsgIds.clear();
filteredOutMsgIds.clear();
}
return true;
}
bool RsGxsDataAccess::getGroupStatistic(GroupStatisticRequest *req)
{
// filter based on options
GxsMsgIdResult metaReq;
metaReq[req->mGrpId] = std::set();
GxsMsgMetaResult metaResult;
mDataStore->retrieveGxsMsgMetaData(metaReq, metaResult);
const auto& msgMetaV_it = metaResult.find(req->mGrpId);
if(msgMetaV_it == metaResult.end())
return false;
const auto& msgMetaV(msgMetaV_it->second);
req->mGroupStatistic.mGrpId = req->mGrpId;
req->mGroupStatistic.mNumMsgs = msgMetaV.size();
req->mGroupStatistic.mTotalSizeOfMsgs = 0;
req->mGroupStatistic.mNumThreadMsgsNew = 0;
req->mGroupStatistic.mNumThreadMsgsUnread = 0;
req->mGroupStatistic.mNumChildMsgsNew = 0;
req->mGroupStatistic.mNumChildMsgsUnread = 0;
std::set obsolete_msgs ; // stored message ids that are referred to as older versions of an existing message
for(uint32_t i = 0; i < msgMetaV.size(); ++i)
if(!msgMetaV[i]->mOrigMsgId.isNull() && msgMetaV[i]->mOrigMsgId!=msgMetaV[i]->mMsgId)
obsolete_msgs.insert(msgMetaV[i]->mOrigMsgId);
for(uint32_t i = 0; i < msgMetaV.size(); ++i)
{
const auto& m = msgMetaV[i];
req->mGroupStatistic.mTotalSizeOfMsgs += m->mMsgSize + m->serial_size();
if(obsolete_msgs.find(m->mMsgId) != obsolete_msgs.end()) // skip obsolete messages.
continue;
if (IS_MSG_NEW(m->mMsgStatus))
{
if (m->mParentId.isNull())
++req->mGroupStatistic.mNumThreadMsgsNew;
else
++req->mGroupStatistic.mNumChildMsgsNew;
}
if (IS_MSG_UNREAD(m->mMsgStatus))
{
if (m->mParentId.isNull())
++req->mGroupStatistic.mNumThreadMsgsUnread;
else
++req->mGroupStatistic.mNumChildMsgsUnread;
}
}
return true;
}
// potentially very expensive!
bool RsGxsDataAccess::getServiceStatistic(ServiceStatisticRequest *req)
{
RsGxsGrpMetaTemporaryMap grpMeta ;
mDataStore->retrieveGxsGrpMetaData(grpMeta);
req->mServiceStatistic.mNumGrps = grpMeta.size();
req->mServiceStatistic.mNumMsgs = 0;
req->mServiceStatistic.mSizeOfGrps = 0;
req->mServiceStatistic.mSizeOfMsgs = 0;
req->mServiceStatistic.mNumGrpsSubscribed = 0;
req->mServiceStatistic.mNumThreadMsgsNew = 0;
req->mServiceStatistic.mNumThreadMsgsUnread = 0;
req->mServiceStatistic.mNumChildMsgsNew = 0;
req->mServiceStatistic.mNumChildMsgsUnread = 0;
for(auto mit = grpMeta.begin(); mit != grpMeta.end(); ++mit)
{
const auto& m = mit->second;
req->mServiceStatistic.mSizeOfGrps += m->mGrpSize + m->serial_size(RS_GXS_GRP_META_DATA_CURRENT_API_VERSION);
if (IS_GROUP_SUBSCRIBED(m->mSubscribeFlags))
{
++req->mServiceStatistic.mNumGrpsSubscribed;
GroupStatisticRequest gr;
gr.mGrpId = m->mGroupId;
getGroupStatistic(&gr);
req->mServiceStatistic.mNumMsgs += gr.mGroupStatistic.mNumMsgs;
req->mServiceStatistic.mSizeOfMsgs += gr.mGroupStatistic.mTotalSizeOfMsgs;
req->mServiceStatistic.mNumThreadMsgsNew += gr.mGroupStatistic.mNumThreadMsgsNew;
req->mServiceStatistic.mNumThreadMsgsUnread += gr.mGroupStatistic.mNumThreadMsgsUnread;
req->mServiceStatistic.mNumChildMsgsNew += gr.mGroupStatistic.mNumChildMsgsNew;
req->mServiceStatistic.mNumChildMsgsUnread += gr.mGroupStatistic.mNumChildMsgsUnread;
}
}
req->mServiceStatistic.mSizeStore = req->mServiceStatistic.mSizeOfGrps + req->mServiceStatistic.mSizeOfMsgs;
return true;
}
bool RsGxsDataAccess::getMsgIdList(MsgIdReq* req)
{
GxsMsgMetaResult result;
mDataStore->retrieveGxsMsgMetaData(req->mMsgIds, result);
for(auto mit = result.begin(); mit != result.end(); ++mit)
{
const RsGxsGroupId grpId = mit->first;
auto& metaV = mit->second;
for(auto vit=metaV.begin(); vit != metaV.end(); ++vit)
req->mMsgIdResult[grpId].insert((*vit)->mMsgId);
}
GxsMsgReq msgIdOut;
// filter based on options
getMsgIdList(req->mMsgIdResult, req->Options, msgIdOut);
req->mMsgIdResult = msgIdOut;
return true;
}
void RsGxsDataAccess::filterMsgIdList( GxsMsgIdResult& resultsMap, const RsTokReqOptions& opts, const MsgMetaFilter& msgMetas ) const
{
for( GxsMsgIdResult::iterator grpIt = resultsMap.begin(); grpIt != resultsMap.end(); ++grpIt )
{
const RsGxsGroupId& groupId(grpIt->first);
std::set& msgsIdSet(grpIt->second);
MsgMetaFilter::const_iterator cit = msgMetas.find(groupId);
if(cit == msgMetas.end()) continue;
#ifdef DATA_DEBUG
GXSDATADEBUG << __PRETTY_FUNCTION__ << " " << msgsIdSet.size()
<< " for group: " << groupId << " before filtering"
<< std::endl;
#endif
for( std::set::iterator msgIdIt = msgsIdSet.begin(); msgIdIt != msgsIdSet.end(); )
{
const RsGxsMessageId& msgId(*msgIdIt);
const auto& msgsMetaMap = cit->second;
bool keep = false;
auto msgsMetaMapIt = msgsMetaMap.find(msgId);
if( msgsMetaMapIt != msgsMetaMap.end() )
keep = checkMsgFilter(opts, msgsMetaMapIt->second);
if(keep)
++msgIdIt;
else
msgIdIt = msgsIdSet.erase(msgIdIt);
}
#ifdef DATA_DEBUG
GXSDATADEBUG << __PRETTY_FUNCTION__ << " " << msgsIdSet.size()
<< " for group: " << groupId << " after filtering"
<< std::endl;
#endif
}
}
void RsGxsDataAccess::filterGrpList(std::list &grpIds, const RsTokReqOptions &opts, const GrpMetaFilter& meta) const
{
for(auto lit = grpIds.begin(); lit != grpIds.end(); )
{
auto cit = meta.find(*lit);
bool keep = false;
if(cit != meta.end())
keep = checkGrpFilter(opts, cit->second);
if(keep)
++lit;
else
lit = grpIds.erase(lit);
}
}
bool RsGxsDataAccess::checkRequestStatus( uint32_t token, GxsRequestStatus& status, uint32_t& reqtype, uint32_t& anstype, rstime_t& ts )
{
RS_STACK_MUTEX(mDataMutex);
GxsRequest* req = locked_retrieveCompletedRequest(token);
#ifdef DATA_DEBUG
GXSDATADEBUG << "CheckRequestStatus: token=" << token << std::endl ;
#endif
if(req != nullptr)
{
anstype = req->clientAnswerType;
reqtype = req->reqType;
status = COMPLETE;
ts = req->reqTime;
#ifdef DATA_DEBUG
GXSDATADEBUG << " Returning status = COMPLETE" << std::endl;
#endif
return true;
}
auto it = mPublicToken.find(token);
if(it != mPublicToken.end())
{
status = it->second;
#ifdef DATA_DEBUG
GXSDATADEBUG << " Returning status = " << status << std::endl;
#endif
return true;
}
status = FAILED;
#ifdef DATA_DEBUG
GXSDATADEBUG << " Token not found. Returning FAILED" << std::endl;
#endif
return false;
}
bool RsGxsDataAccess::addGroupData(RsNxsGrp* grp) {
RsStackMutex stack(mDataMutex);
std::list grpM;
grpM.push_back(grp);
return mDataStore->storeGroup(grpM);
}
bool RsGxsDataAccess::updateGroupData(RsNxsGrp* grp) {
RsStackMutex stack(mDataMutex);
std::list grpM;
grpM.push_back(grp);
return mDataStore->updateGroup(grpM);
}
bool RsGxsDataAccess::getGroupData(const RsGxsGroupId& grpId, RsNxsGrp *& grp_data)
{
RsStackMutex stack(mDataMutex);
std::map grps ;
grps[grpId] = nullptr ;
if(mDataStore->retrieveNxsGrps(grps, false)) // the false here is very important: it removes the private key parts.
{
grp_data = grps.begin()->second;
return true;
}
else
return false ;
}
bool RsGxsDataAccess::addMsgData(RsNxsMsg* msg) {
RsStackMutex stack(mDataMutex);
std::list msgM;
msgM.push_back(msg);
return mDataStore->storeMessage(msgM);
}
void RsGxsDataAccess::tokenList(std::list& tokens)
{
RsStackMutex stack(mDataMutex);
for(auto& it:mRequestQueue)
tokens.push_back(it.second->token);
for(auto& it:mCompletedRequests)
tokens.push_back(it.first);
}
bool RsGxsDataAccess::locked_updateRequestStatus( uint32_t token, RsTokenService::GxsRequestStatus status )
{
GxsRequest* req = locked_retrieveCompletedRequest(token);
if(req) req->status = status;
else return false;
return true;
}
uint32_t RsGxsDataAccess::generatePublicToken()
{
uint32_t token;
generateToken(token);
{
RS_STACK_MUTEX(mDataMutex);
mPublicToken[token] = PENDING ;
#ifdef DATA_DEBUG
GXSDATADEBUG << "Service " << std::hex << mDataStore->serviceType() << std::dec << ": Adding new public token " << token << " in PENDING state. Completed tokens: " << mCompletedRequests.size() << " Size of mPublicToken: " << mPublicToken.size() << std::endl;
if(mDataStore->serviceType() == 0x218 && token==19)
print_stacktrace();
#endif
}
return token;
}
bool RsGxsDataAccess::updatePublicRequestStatus( uint32_t token, RsTokenService::GxsRequestStatus status )
{
RS_STACK_MUTEX(mDataMutex);
auto mit = mPublicToken.find(token);
if(mit != mPublicToken.end())
{
mit->second = status;
#ifdef DATA_DEBUG
GXSDATADEBUG << "Service " << std::hex << mDataStore->serviceType() << std::dec << ": updating public token " << token << " to state " << tokenStatusString[status] << std::endl;
#endif
return true;
}
else
return false;
}
bool RsGxsDataAccess::disposeOfPublicToken(uint32_t token)
{
RS_STACK_MUTEX(mDataMutex);
auto mit = mPublicToken.find(token);
if(mit != mPublicToken.end())
{
mPublicToken.erase(mit);
#ifdef DATA_DEBUG
GXSDATADEBUG << "Service " << std::hex << mDataStore->serviceType() << std::dec << ": Deleting public token " << token << ". Completed tokens: " << mCompletedRequests.size() << " Size of mPublicToken: " << mPublicToken.size() << std::endl;
#endif
return true;
}
else
return false;
}
#ifdef DATA_DEBUG
void RsGxsDataAccess::dumpTokenQueues()
{
RS_STACK_MUTEX(mDataMutex);
GXSDATADEBUG << "Service " << std::hex << mDataStore->serviceType() << std::dec << ": dumping token list."<< std::endl;
for(auto tokenpair:mPublicToken)
GXSDATADEBUG << " Public Token " << tokenpair.first << " : " << tokenStatusString[tokenpair.second] << std::endl;
for(auto tokenpair:mCompletedRequests)
GXSDATADEBUG << " Completed Tokens: " << tokenpair.first << std::endl;
for(auto tokenpair:mRequestQueue)
GXSDATADEBUG << " RequestQueue: " << tokenpair.first << " status " << tokenStatusString[tokenpair.second->status] << std::endl;
GXSDATADEBUG << std::endl;
}
#endif
bool RsGxsDataAccess::checkGrpFilter(const RsTokReqOptions &opts, const std::shared_ptr& meta) const
{
bool subscribeMatch = false;
if(opts.mSubscribeMask)
{
// Exact Flags match required.
if ((opts.mSubscribeMask & opts.mSubscribeFilter) == (opts.mSubscribeMask & meta->mSubscribeFlags))
{
subscribeMatch = true;
}
}
else
{
subscribeMatch = true;
}
return subscribeMatch;
}
bool RsGxsDataAccess::checkMsgFilter(const RsTokReqOptions& opts, const std::shared_ptr &meta ) const
{
if (opts.mStatusMask)
{
// Exact Flags match required.
if ( (opts.mStatusMask & opts.mStatusFilter) ==
(opts.mStatusMask & meta->mMsgStatus) )
{
#ifdef DATA_DEBUG
GXSDATADEBUG << __PRETTY_FUNCTION__
<< " Continue checking Msg as StatusMatches: "
<< " Mask: " << opts.mStatusMask
<< " StatusFilter: " << opts.mStatusFilter
<< " MsgStatus: " << meta->mMsgStatus
<< " MsgId: " << meta->mMsgId << std::endl;
#endif
}
else
{
#ifdef DATA_DEBUG
GXSDATADEBUG << __PRETTY_FUNCTION__
<< " Dropping Msg due to !StatusMatch "
<< " Mask: " << opts.mStatusMask
<< " StatusFilter: " << opts.mStatusFilter
<< " MsgStatus: " << meta->mMsgStatus
<< " MsgId: " << meta->mMsgId << std::endl;
#endif
return false;
}
}
else
{
#ifdef DATA_DEBUG
GXSDATADEBUG << __PRETTY_FUNCTION__
<< " Status check not requested"
<< " mStatusMask: " << opts.mStatusMask
<< " MsgId: " << meta->mMsgId << std::endl;
#endif
}
if(opts.mMsgFlagMask)
{
// Exact Flags match required.
if ( (opts.mMsgFlagMask & opts.mMsgFlagFilter) ==
(opts.mMsgFlagMask & meta->mMsgFlags) )
{
#ifdef DATA_DEBUG
GXSDATADEBUG << __PRETTY_FUNCTION__
<< " Accepting Msg as FlagMatches: "
<< " Mask: " << opts.mMsgFlagMask
<< " FlagFilter: " << opts.mMsgFlagFilter
<< " MsgFlag: " << meta->mMsgFlags
<< " MsgId: " << meta->mMsgId << std::endl;
#endif
}
else
{
#ifdef DATA_DEBUG
GXSDATADEBUG << __PRETTY_FUNCTION__
<< " Dropping Msg due to !FlagMatch "
<< " Mask: " << opts.mMsgFlagMask
<< " FlagFilter: " << opts.mMsgFlagFilter
<< " MsgFlag: " << meta->mMsgFlags
<< " MsgId: " << meta->mMsgId << std::endl;
#endif
return false;
}
}
else
{
#ifdef DATA_DEBUG
GXSDATADEBUG << __PRETTY_FUNCTION__
<< " Flags check not requested"
<< " mMsgFlagMask: " << opts.mMsgFlagMask
<< " MsgId: " << meta->mMsgId << std::endl;
#endif
}
return true;
}
GxsGroupStatistic::~GxsGroupStatistic() = default;
GxsServiceStatistic::~GxsServiceStatistic() = default;