/*******************************************************************************
* libretroshare/src/pqi: p3historymgr.cc *
* *
* libretroshare: retroshare core library *
* *
* Copyright 2011 by Thunder. *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU Lesser General Public License as *
* published by the Free Software Foundation, either version 3 of the *
* License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU Lesser General Public License for more details. *
* *
* You should have received a copy of the GNU Lesser General Public License *
* along with this program. If not, see . *
* *
*******************************************************************************/
#include "util/rstime.h"
#include "p3historymgr.h"
#include "rsitems/rshistoryitems.h"
#include "rsitems/rsconfigitems.h"
#include "retroshare/rsiface.h"
#include "retroshare/rspeers.h"
#include "rsitems/rsmsgitems.h"
#include "rsserver/p3face.h"
#include "util/rsstring.h"
/****
* #define HISTMGR_DEBUG 1
***/
// clean too old messages every 5 minutes
//
#define MSG_HISTORY_CLEANING_PERIOD 300
RsHistory *rsHistory = NULL;
p3HistoryMgr::p3HistoryMgr()
: p3Config()
, nextMsgId(1)
, mPublicEnable(false), mLobbyEnable(true), mPrivateEnable(true), mDistantEnable(true)
, mPublicSaveCount(0), mLobbySaveCount(0), mPrivateSaveCount(0), mDistantSaveCount(0)
, mMaxStorageDurationSeconds(10*86400) // store for 10 days at most.
, mLastCleanTime(0)
, mHistoryMtx("p3HistoryMgr")
{
}
p3HistoryMgr::~p3HistoryMgr()
{
}
/***** p3HistoryMgr *****/
void p3HistoryMgr::addMessage(const ChatMessage& cm)
{
uint32_t addMsgId = 0;
rstime_t now = time(NULL) ;
if(mLastCleanTime + MSG_HISTORY_CLEANING_PERIOD < now)
{
cleanOldMessages() ;
mLastCleanTime = now ;
}
{
RsStackMutex stack(mHistoryMtx); /********** STACK LOCKED MTX ******/
RsPeerId msgPeerId; // id of sending peer
RsPeerId chatPeerId; // id of chat endpoint
std::string peerName; //name of sending peer
if (cm.chat_id.isBroadcast() && mPublicEnable == true) {
peerName = rsPeers->getPeerName(cm.broadcast_peer_id);
}
else if (cm.chat_id.isPeerId() && mPrivateEnable == true) {
msgPeerId = cm.incoming ? cm.chat_id.toPeerId() : rsPeers->getOwnId();
peerName = rsPeers->getPeerName(msgPeerId);
}
else if (cm.chat_id.isLobbyId() && mLobbyEnable == true) {
msgPeerId = RsPeerId(cm.lobby_peer_gxs_id);
RsIdentityDetails details;
if (rsIdentity->getIdDetails(cm.lobby_peer_gxs_id, details))
peerName = details.mNickname;
else
peerName = cm.lobby_peer_gxs_id.toStdString();
}
else if(cm.chat_id.isDistantChatId()&& mDistantEnable == true)
{
DistantChatPeerInfo dcpinfo;
if (rsMsgs->getDistantChatStatus(cm.chat_id.toDistantChatId(), dcpinfo))
{
RsIdentityDetails det;
RsGxsId writer_id = cm.incoming?(dcpinfo.to_id):(dcpinfo.own_id);
if(rsIdentity->getIdDetails(writer_id,det))
peerName = det.mNickname;
else
peerName = writer_id.toStdString();
msgPeerId = cm.incoming ? RsPeerId(dcpinfo.to_id) : RsPeerId(dcpinfo.own_id);
}
else
{
RS_ERR( "Cannot retrieve friend name for distant chat ", cm.chat_id.toDistantChatId() );
peerName = "";
}
}
else
return;
if(!chatIdToVirtualPeerId(cm.chat_id, chatPeerId))
return;
RsHistoryMsgItem* item = new RsHistoryMsgItem;
item->chatPeerId = chatPeerId;
item->incoming = cm.incoming;
item->msgPeerId = msgPeerId;
item->peerName = peerName;
item->sendTime = cm.sendTime;
item->recvTime = cm.recvTime;
item->message = cm.msg ;
//librs::util::ConvertUtf16ToUtf8(chatItem->message, item->message);
std::map >::iterator mit = mMessages.find(item->chatPeerId);
if (mit != mMessages.end()) {
item->msgId = nextMsgId++;
mit->second.insert(std::make_pair(item->msgId, item));
addMsgId = item->msgId;
// check the limit
uint32_t limit;
if (chatPeerId.isNull())
limit = mPublicSaveCount;
else if (cm.chat_id.isLobbyId())
limit = mLobbySaveCount;
else
limit = mPrivateSaveCount;
if (limit) {
while (mit->second.size() > limit) {
delete(mit->second.begin()->second);
mit->second.erase(mit->second.begin());
}
}
} else {
std::map msgs;
item->msgId = nextMsgId++;
msgs.insert(std::make_pair(item->msgId, item));
mMessages.insert(std::make_pair(item->chatPeerId, msgs));
addMsgId = item->msgId;
// no need to check the limit
}
IndicateConfigChanged(RsConfigMgr::CheckPriority::SAVE_OFTEN);
}
if (addMsgId) {
RsServer::notify()->notifyHistoryChanged(addMsgId, NOTIFY_TYPE_ADD);
}
}
void p3HistoryMgr::cleanOldMessages()
{
RsStackMutex stack(mHistoryMtx); /********** STACK LOCKED MTX ******/
#ifdef HISTMGR_DEBUG
std::cerr << "****** cleaning old messages." << std::endl;
#endif
rstime_t now = time(NULL) ;
bool changed = false ;
for(std::map >::iterator mit = mMessages.begin(); mit != mMessages.end();)
{
if (mMaxStorageDurationSeconds > 0)
{
for(std::map::iterator lit = mit->second.begin();lit!=mit->second.end();)
if(lit->second->recvTime + mMaxStorageDurationSeconds < now)
{
std::map::iterator lit2 = lit ;
++lit2 ;
#ifdef HISTMGR_DEBUG
std::cerr << " removing msg id " << lit->first << ", for peer id " << mit->first << std::endl;
#endif
delete lit->second ;
mit->second.erase(lit) ;
lit = lit2 ;
changed = true ;
}
else
++lit ;
}
if(mit->second.empty())
{
std::map >::iterator mit2 = mit ;
++mit2 ;
#ifdef HISTMGR_DEBUG
std::cerr << " removing peer id " << mit->first << ", since it has no messages" << std::endl;
#endif
mMessages.erase(mit) ;
mit = mit2 ;
changed = true ;
}
else
++mit ;
}
if(changed)
IndicateConfigChanged(RsConfigMgr::CheckPriority::SAVE_OFTEN);
}
/***** p3Config *****/
RsSerialiser* p3HistoryMgr::setupSerialiser()
{
RsSerialiser *rss = new RsSerialiser;
rss->addSerialType(new RsHistorySerialiser);
rss->addSerialType(new RsGeneralConfigSerialiser());
return rss;
}
bool p3HistoryMgr::saveList(bool& cleanup, std::list& saveData)
{
cleanup = false;
mHistoryMtx.lock(); /********** STACK LOCKED MTX ******/
std::map >::iterator mit;
std::map::iterator lit;
for (mit = mMessages.begin(); mit != mMessages.end(); ++mit) {
for (lit = mit->second.begin(); lit != mit->second.end(); ++lit) {
if (lit->second->saveToDisc) {
saveData.push_back(lit->second);
}
}
}
RsConfigKeyValueSet *vitem = new RsConfigKeyValueSet;
RsTlvKeyValue kv;
kv.key = "PUBLIC_ENABLE";
kv.value = mPublicEnable ? "TRUE" : "FALSE";
vitem->tlvkvs.pairs.push_back(kv);
kv.key = "PRIVATE_ENABLE";
kv.value = mPrivateEnable ? "TRUE" : "FALSE";
vitem->tlvkvs.pairs.push_back(kv);
kv.key = "LOBBY_ENABLE";
kv.value = mLobbyEnable ? "TRUE" : "FALSE";
vitem->tlvkvs.pairs.push_back(kv);
kv.key = "DISTANT_ENABLE";
kv.value = mDistantEnable ? "TRUE" : "FALSE";
vitem->tlvkvs.pairs.push_back(kv);
kv.key = "MAX_STORAGE_TIME";
rs_sprintf(kv.value,"%d",mMaxStorageDurationSeconds) ;
vitem->tlvkvs.pairs.push_back(kv);
kv.key = "LOBBY_SAVECOUNT";
rs_sprintf(kv.value, "%lu", mLobbySaveCount);
vitem->tlvkvs.pairs.push_back(kv);
kv.key = "PUBLIC_SAVECOUNT";
rs_sprintf(kv.value, "%lu", mPublicSaveCount);
vitem->tlvkvs.pairs.push_back(kv);
kv.key = "PRIVATE_SAVECOUNT";
rs_sprintf(kv.value, "%lu", mPrivateSaveCount);
vitem->tlvkvs.pairs.push_back(kv);
kv.key = "DISTANT_SAVECOUNT";
rs_sprintf(kv.value, "%lu", mDistantSaveCount);
vitem->tlvkvs.pairs.push_back(kv);
saveData.push_back(vitem);
saveCleanupList.push_back(vitem);
return true;
}
void p3HistoryMgr::saveDone()
{
/* clean up the save List */
std::list::iterator it;
for (it = saveCleanupList.begin(); it != saveCleanupList.end(); ++it) {
delete (*it);
}
saveCleanupList.clear();
/* unlock mutex */
mHistoryMtx.unlock(); /****** MUTEX UNLOCKED *******/
}
bool p3HistoryMgr::loadList(std::list& load)
{
RsStackMutex stack(mHistoryMtx); /********** STACK LOCKED MTX ******/
RsHistoryMsgItem *msgItem;
std::list::iterator it;
for (it = load.begin(); it != load.end(); ++it)
{
if (NULL != (msgItem = dynamic_cast(*it))) {
std::map >::iterator mit = mMessages.find(msgItem->chatPeerId);
msgItem->msgId = nextMsgId++;
#ifdef HISTMGR_DEBUG
std::cerr << "Loading msg history item: peer id=" << msgItem->chatPeerId << "), msg id =" << msgItem->msgId << std::endl;
#endif
if (mit != mMessages.end()) {
mit->second.insert(std::make_pair(msgItem->msgId, msgItem));
} else {
std::map msgs;
msgs.insert(std::make_pair(msgItem->msgId, msgItem));
mMessages.insert(std::make_pair(msgItem->chatPeerId, msgs));
}
// don't delete the item !!
continue;
}
RsConfigKeyValueSet *rskv ;
if (NULL != (rskv = dynamic_cast(*it))) {
for (std::list::const_iterator kit = rskv->tlvkvs.pairs.begin(); kit != rskv->tlvkvs.pairs.end(); ++kit) {
if (kit->key == "PUBLIC_ENABLE") {
mPublicEnable = (kit->value == "TRUE") ? true : false;
continue;
}
if (kit->key == "PRIVATE_ENABLE") {
mPrivateEnable = (kit->value == "TRUE") ? true : false;
continue;
}
if (kit->key == "LOBBY_ENABLE") {
mLobbyEnable = (kit->value == "TRUE") ? true : false;
continue;
}
if (kit->key == "DISTANT_ENABLE") {
mDistantEnable = (kit->value == "TRUE") ? true : false;
continue;
}
if (kit->key == "MAX_STORAGE_TIME") {
uint32_t val ;
if (sscanf(kit->value.c_str(), "%u", &val) == 1)
mMaxStorageDurationSeconds = val ;
#ifdef HISTMGR_DEBUG
std::cerr << "Loaded max storage time for history = " << val << " seconds" << std::endl;
#endif
continue;
}
if (kit->key == "PUBLIC_SAVECOUNT") {
mPublicSaveCount = atoi(kit->value.c_str());
continue;
}
if (kit->key == "PRIVATE_SAVECOUNT") {
mPrivateSaveCount = atoi(kit->value.c_str());
continue;
}
if (kit->key == "LOBBY_SAVECOUNT") {
mLobbySaveCount = atoi(kit->value.c_str());
continue;
}
if (kit->key == "DISTANT_SAVECOUNT") {
mDistantSaveCount = atoi(kit->value.c_str());
continue;
}
}
delete (*it);
continue;
}
// delete unknown items
delete (*it);
}
load.clear() ;
return true;
}
// have to convert to virtual peer id, to be able to use existing serialiser and file format
bool p3HistoryMgr::chatIdToVirtualPeerId(const ChatId& chat_id, RsPeerId &peer_id)
{
if (chat_id.isBroadcast()) {
peer_id = RsPeerId();
return true;
}
if (chat_id.isPeerId()) {
peer_id = chat_id.toPeerId();
return true;
}
if (chat_id.isLobbyId()) {
if(sizeof(ChatLobbyId) > RsPeerId::SIZE_IN_BYTES){
std::cerr << "p3HistoryMgr::chatIdToVirtualPeerId() ERROR: ChatLobbyId does not fit into virtual peer id. Please report this error." << std::endl;
return false;
}
uint8_t bytes[RsPeerId::SIZE_IN_BYTES] ;
memset(bytes,0,RsPeerId::SIZE_IN_BYTES) ;
ChatLobbyId lobby_id = chat_id.toLobbyId();
memcpy(bytes,&lobby_id,sizeof(ChatLobbyId));
peer_id = RsPeerId(bytes);
return true;
}
if (chat_id.isDistantChatId()) {
peer_id = RsPeerId(chat_id.toDistantChatId());
return true;
}
return false;
}
/***** p3History *****/
static void convertMsg(const RsHistoryMsgItem* item, HistoryMsg &msg)
{
msg.msgId = item->msgId;
msg.chatPeerId = item->chatPeerId;
msg.incoming = item->incoming;
msg.peerId = item->msgPeerId;
msg.peerName = item->peerName;
msg.sendTime = item->sendTime;
msg.recvTime = item->recvTime;
msg.message = item->message;
}
bool p3HistoryMgr::getMessages(const ChatId &chatId, std::list &msgs, uint32_t loadCount)
{
msgs.clear();
RsStackMutex stack(mHistoryMtx); /********** STACK LOCKED MTX ******/
RsPeerId chatPeerId;
bool enabled = false;
if (chatId.isBroadcast() && mPublicEnable == true) {
enabled = true;
}
if (chatId.isPeerId() && mPrivateEnable == true) {
enabled = true;
}
if (chatId.isLobbyId() && mLobbyEnable == true) {
enabled = true;
}
if (chatId.isDistantChatId() && mDistantEnable == true) {
enabled = true;
}
if(enabled == false)
return false;
if(!chatIdToVirtualPeerId(chatId, chatPeerId))
return false;
#ifdef HISTMGR_DEBUG
std::cerr << "Getting history for virtual peer " << chatPeerId << std::endl;
#endif
uint32_t foundCount = 0;
std::map >::iterator mit = mMessages.find(chatPeerId);
if (mit != mMessages.end())
{
std::map::reverse_iterator lit;
for (lit = mit->second.rbegin(); lit != mit->second.rend(); ++lit)
{
HistoryMsg msg;
convertMsg(lit->second, msg);
msgs.insert(msgs.begin(), msg);
foundCount++;
if (loadCount && foundCount >= loadCount) {
break;
}
}
}
#ifdef HISTMGR_DEBUG
std::cerr << msgs.size() << " messages added." << std::endl;
#endif
return true;
}
bool p3HistoryMgr::getMessage(uint32_t msgId, HistoryMsg &msg)
{
RsStackMutex stack(mHistoryMtx); /********** STACK LOCKED MTX ******/
std::map >::iterator mit;
for (mit = mMessages.begin(); mit != mMessages.end(); ++mit) {
std::map::iterator lit = mit->second.find(msgId);
if (lit != mit->second.end()) {
convertMsg(lit->second, msg);
return true;
}
}
return false;
}
void p3HistoryMgr::clear(const ChatId &chatId)
{
{
RsStackMutex stack(mHistoryMtx); /********** STACK LOCKED MTX ******/
RsPeerId chatPeerId;
if(!chatIdToVirtualPeerId(chatId, chatPeerId))
return;
#ifdef HISTMGR_DEBUG
std::cerr << "********** p3History::clear()called for virtual peer id " << chatPeerId << std::endl;
#endif
std::map >::iterator mit = mMessages.find(chatPeerId);
if (mit == mMessages.end()) {
return;
}
std::map::iterator lit;
for (lit = mit->second.begin(); lit != mit->second.end(); ++lit) {
delete(lit->second);
}
mit->second.clear();
mMessages.erase(mit);
IndicateConfigChanged(RsConfigMgr::CheckPriority::SAVE_OFTEN);
}
RsServer::notify()->notifyHistoryChanged(0, NOTIFY_TYPE_MOD);
}
void p3HistoryMgr::removeMessages(const std::list &msgIds)
{
std::list ids = msgIds;
std::list removedIds;
std::list::iterator iit;
#ifdef HISTMGR_DEBUG
std::cerr << "********** p3History::removeMessages called()" << std::endl;
#endif
{
RsStackMutex stack(mHistoryMtx); /********** STACK LOCKED MTX ******/
std::map >::iterator mit;
for (mit = mMessages.begin(); mit != mMessages.end(); ++mit)
{
iit = ids.begin();
while ( !ids.empty() || (iit != ids.end()) )
{
std::map::iterator lit = mit->second.find(*iit);
if (lit != mit->second.end())
{
#ifdef HISTMGR_DEBUG
std::cerr << "**** Removing " << mit->first << " msg id = " << lit->first << std::endl;
#endif
delete(lit->second);
mit->second.erase(lit);
removedIds.push_back(*iit);
iit = ids.erase(iit);
continue;
}
++iit;
}
}
}
if (!removedIds.empty())
{
IndicateConfigChanged(RsConfigMgr::CheckPriority::SAVE_OFTEN);
for (iit = removedIds.begin(); iit != removedIds.end(); ++iit)
RsServer::notify()->notifyHistoryChanged(*iit, NOTIFY_TYPE_DEL);
}
}
bool p3HistoryMgr::getEnable(uint32_t chat_type)
{
switch(chat_type)
{
case RS_HISTORY_TYPE_PUBLIC : return mPublicEnable ;
case RS_HISTORY_TYPE_LOBBY : return mLobbyEnable ;
case RS_HISTORY_TYPE_PRIVATE: return mPrivateEnable ;
case RS_HISTORY_TYPE_DISTANT: return mDistantEnable ;
default:
std::cerr << "Unexpected value " << chat_type<< " in p3HistoryMgr::getEnable(): this is a bug." << std::endl;
return 0 ;
}
}
uint32_t p3HistoryMgr::getMaxStorageDuration()
{
return mMaxStorageDurationSeconds ;
}
void p3HistoryMgr::setMaxStorageDuration(uint32_t seconds)
{
if(mMaxStorageDurationSeconds != seconds)
IndicateConfigChanged(RsConfigMgr::CheckPriority::SAVE_OFTEN);
mMaxStorageDurationSeconds = seconds ;
}
void p3HistoryMgr::setEnable(uint32_t chat_type, bool enable)
{
bool oldValue;
switch(chat_type)
{
case RS_HISTORY_TYPE_PUBLIC : oldValue = mPublicEnable ;
mPublicEnable = enable ;
break ;
case RS_HISTORY_TYPE_LOBBY : oldValue = mLobbyEnable ;
mLobbyEnable = enable;
break ;
case RS_HISTORY_TYPE_PRIVATE: oldValue = mPrivateEnable ;
mPrivateEnable = enable ;
break ;
case RS_HISTORY_TYPE_DISTANT: oldValue = mDistantEnable ;
mDistantEnable = enable ;
break ;
default:
return;
}
if (oldValue != enable)
IndicateConfigChanged(RsConfigMgr::CheckPriority::SAVE_OFTEN);
}
uint32_t p3HistoryMgr::getSaveCount(uint32_t chat_type)
{
switch(chat_type)
{
case RS_HISTORY_TYPE_PUBLIC : return mPublicSaveCount ;
case RS_HISTORY_TYPE_LOBBY : return mLobbySaveCount ;
case RS_HISTORY_TYPE_PRIVATE: return mPrivateSaveCount ;
default:
std::cerr << "Unexpected value " << chat_type<< " in p3HistoryMgr::getSaveCount(): this is a bug." << std::endl;
return 0 ;
}
}
void p3HistoryMgr::setSaveCount(uint32_t chat_type, uint32_t count)
{
uint32_t oldValue;
switch(chat_type)
{
case RS_HISTORY_TYPE_PUBLIC : oldValue = mPublicSaveCount ;
mPublicSaveCount = count ;
break ;
case RS_HISTORY_TYPE_LOBBY : oldValue = mLobbySaveCount ;
mLobbySaveCount = count;
break ;
case RS_HISTORY_TYPE_PRIVATE: oldValue = mPrivateSaveCount ;
mPrivateSaveCount = count ;
break ;
default:
return;
}
if (oldValue != count)
IndicateConfigChanged(RsConfigMgr::CheckPriority::SAVE_OFTEN);
}