/*******************************************************************************
* libretroshare/src/pqi: pqihandler.cc *
* *
* libretroshare: retroshare core library *
* *
* Copyright (C) 2004-2006 Robert Fernie *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU Lesser General Public License as *
* published by the Free Software Foundation, either version 3 of the *
* License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU Lesser General Public License for more details. *
* *
* You should have received a copy of the GNU Lesser General Public License *
* along with this program. If not, see . *
* *
*******************************************************************************/
#include "pqi/pqihandler.h"
#include // for NULL
#include "util/rstime.h" // for time, rstime_t
#include // for sort
#include // for dec
#include // for string, char_traits, operator+, bas...
#include // for pair
#include "pqi/pqi_base.h" // for PQInterface, RsBwRates
#include "retroshare/rsconfig.h" // for RSTrafficClue
#include "retroshare/rsids.h" // for t_RsGenericIdType
#include "retroshare/rspeers.h" // for RsPeers, rsPeers
#include "serialiser/rsserial.h" // for RsItem, RsRawItem
#include "util/rsdebug.h" // for pqioutput, PQL_DEBUG_BASIC, PQL_ALERT
#include "util/rsstring.h" // for rs_sprintf_append
using std::dec;
#ifdef WINDOWS_SYS
#include
#endif
struct RsLog::logInfo pqihandlerzoneInfo = {RsLog::Default, "pqihandler"};
#define pqihandlerzone &pqihandlerzoneInfo
//static const int PQI_HANDLER_NB_PRIORITY_LEVELS = 10 ;
//static const float PQI_HANDLER_NB_PRIORITY_RATIO = 2 ;
//#define UPDATE_RATES_DEBUG 1
// #define DEBUG_TICK 1
// #define RSITEM_DEBUG 1
pqihandler::pqihandler() : coreMtx("pqihandler")
{
RS_STACK_MUTEX(coreMtx); /**************** LOCKED MUTEX ****************/
// setup minimal total+individual rates.
rateIndiv_out = 0.01;
rateIndiv_in = 0.01;
rateMax_out = 0.01;
rateMax_in = 0.01;
rateTotal_in = 0.0 ;
rateTotal_out = 0.0 ;
traffInSum = 0;
traffOutSum = 0;
last_m = time(NULL) ;
nb_ticks = 0 ;
mLastRateCapUpdate = 0 ;
ticks_per_sec = 5 ; // initial guess
return;
}
int pqihandler::tick()
{
int moreToTick = 0;
{
RS_STACK_MUTEX(coreMtx); /**************** LOCKED MUTEX ****************/
// tick all interfaces...
std::map::iterator it;
for(it = mods.begin(); it != mods.end(); ++it)
{
if (0 < ((it -> second) -> pqi) -> tick())
{
#ifdef DEBUG_TICK
std::cerr << "pqihandler::tick() moreToTick from mod()" << std::endl;
#endif
moreToTick = 1;
}
}
#ifdef TO_BE_REMOVED
// get the items, and queue them correctly
if (0 < locked_GetItems())
{
#ifdef DEBUG_TICK
std::cerr << "pqihandler::tick() moreToTick from GetItems()" << std::endl;
#endif
moreToTick = 1;
}
#endif
}
rstime_t now = time(NULL) ;
if(now > mLastRateCapUpdate + 5)
{
std::map rateMap;
std::map::iterator it;
// every 5 secs, update the max rates for all modules
RS_STACK_MUTEX(coreMtx); /**************** LOCKED MUTEX ****************/
for(std::map::iterator it = mods.begin(); it != mods.end(); ++it)
{
// This is rather inelegant, but pqihandler has searchModules that are dynamically allocated, so the max rates
// need to be updated from inside.
uint32_t maxUp = 0,maxDn =0 ;
if (rsPeers->getPeerMaximumRates(it->first,maxUp,maxDn) )
it->second->pqi->setRateCap(maxDn,maxUp);// mind the order! Dn first, than Up.
}
mLastRateCapUpdate = now ;
}
UpdateRates();
return moreToTick;
}
bool pqihandler::queueOutRsItem(RsItem *item)
{
RS_STACK_MUTEX(coreMtx); /**************** LOCKED MUTEX ****************/
uint32_t size ;
locked_HandleRsItem(item, size);
#ifdef DEBUG_QOS
if(item->priority_level() == QOS_PRIORITY_UNKNOWN)
std::cerr << "Caught an unprioritized item !" << std::endl;
print() ;
#endif
return true ;
}
int pqihandler::status()
{
std::map::iterator it;
RS_STACK_MUTEX(coreMtx); /**************** LOCKED MUTEX ****************/
{ // for output
std::string out = "pqihandler::status() Active Modules:\n";
// display all interfaces...
for(it = mods.begin(); it != mods.end(); ++it)
{
rs_sprintf_append(out, "\tModule [%s] Pointer <%p>", it -> first.toStdString().c_str(), (void *) ((it -> second) -> pqi));
}
pqioutput(PQL_DEBUG_BASIC, pqihandlerzone, out);
} // end of output.
// status all interfaces...
for(it = mods.begin(); it != mods.end(); ++it)
{
((it -> second) -> pqi) -> status();
}
return 1;
}
bool pqihandler::AddSearchModule(SearchModule *mod)
{
RS_STACK_MUTEX(coreMtx); /**************** LOCKED MUTEX ****************/
// if peerid used -> error.
//std::map::iterator it;
if (mod->peerid != mod->pqi->PeerId())
{
// ERROR!
pqioutput(PQL_ALERT, pqihandlerzone, "ERROR peerid != PeerId!");
return false;
}
if (mod->peerid.isNull())
{
// ERROR!
pqioutput(PQL_ALERT, pqihandlerzone, "ERROR peerid == NULL");
return false;
}
if (mods.find(mod->peerid) != mods.end())
{
// ERROR!
pqioutput(PQL_ALERT, pqihandlerzone, "ERROR PeerId Module already exists!");
return false;
}
// store.
mods[mod->peerid] = mod;
return true;
}
bool pqihandler::RemoveSearchModule(SearchModule *mod)
{
RS_STACK_MUTEX(coreMtx); /**************** LOCKED MUTEX ****************/
std::map::iterator it;
for(it = mods.begin(); it != mods.end(); ++it)
{
if (mod == it -> second)
{
mods.erase(it);
return true;
}
}
return false;
}
// generalised output
int pqihandler::locked_HandleRsItem(RsItem *item, uint32_t& computed_size)
{
computed_size = 0 ;
std::map::iterator it;
pqioutput(PQL_DEBUG_BASIC, pqihandlerzone,
"pqihandler::HandleRsItem()");
pqioutput(PQL_DEBUG_BASIC, pqihandlerzone,
"pqihandler::HandleRsItem() Sending to One Channel");
#ifdef DEBUG_TICK
std::cerr << "pqihandler::HandleRsItem() Sending to One Channel" << std::endl;
#endif
// find module.
if ((it = mods.find(item->PeerId())) == mods.end())
{
std::string out = "pqihandler::HandleRsItem() Invalid chan!";
pqioutput(PQL_DEBUG_BASIC, pqihandlerzone, out);
#ifdef DEBUG_TICK
std::cerr << out << std::endl;
#endif
delete item;
return -1;
}
std::string out = "pqihandler::HandleRsItem() sending to chan: " + it -> first.toStdString();
pqioutput(PQL_DEBUG_BASIC, pqihandlerzone, out);
#ifdef DEBUG_TICK
std::cerr << out << std::endl;
#endif
// if yes send on item.
((it -> second) -> pqi) -> SendItem(item,computed_size);
return 1;
}
int pqihandler::SendRsRawItem(RsRawItem *ns)
{
pqioutput(PQL_DEBUG_BASIC, pqihandlerzone, "pqihandler::SendRsRawItem()");
// directly send item to streamers
return queueOutRsItem(ns) ;
}
int pqihandler::ExtractTrafficInfo(std::list& out_lst,std::list& in_lst)
{
in_lst.clear() ;
out_lst.clear() ;
for( std::map::iterator it = mods.begin(); it != mods.end(); ++it)
{
std::list ilst,olst ;
(it -> second)->pqi->gatherStatistics(olst,ilst) ;
for(std::list::const_iterator it(ilst.begin());it!=ilst.end();++it) in_lst.push_back(*it) ;
for(std::list::const_iterator it(olst.begin());it!=olst.end();++it) out_lst.push_back(*it) ;
}
return 1 ;
}
// NEW extern fn to extract rates.
int pqihandler::ExtractRates(std::map &ratemap, RsBwRates &total)
{
total.mMaxRateIn = getMaxRate(true);
total.mMaxRateOut = getMaxRate(false);
total.mRateIn = 0;
total.mRateOut = 0;
total.mQueueIn = 0;
total.mQueueOut = 0;
/* Lock once rates have been retrieved */
RS_STACK_MUTEX(coreMtx); /**************** LOCKED MUTEX ****************/
std::map::iterator it;
for(it = mods.begin(); it != mods.end(); ++it)
{
SearchModule *mod = (it -> second);
RsBwRates peerRates;
mod -> pqi -> getRates(peerRates);
total.mRateIn += peerRates.mRateIn;
total.mRateOut += peerRates.mRateOut;
total.mQueueIn += peerRates.mQueueIn;
total.mQueueOut += peerRates.mQueueOut;
ratemap[it->first] = peerRates;
}
return 1;
}
// internal fn to send updates
int pqihandler::UpdateRates()
{
std::map::iterator it;
float avail_in = getMaxRate(true);
float avail_out = getMaxRate(false);
float used_bw_in = 0;
float used_bw_out = 0;
/* Lock once rates have been retrieved */
RS_STACK_MUTEX(coreMtx); /**************** LOCKED MUTEX ****************/
int num_sm = mods.size();
float used_bw_in_table[num_sm]; /* table of in bandwidth currently used by each module */
float used_bw_out_table[num_sm]; /* table of out bandwidth currently used by each module */
// loop through modules to get the used bandwidth
#ifdef UPDATE_RATES_DEBUG
RsDbg() << "UPDATE_RATES pqihandler::UpdateRates Looping through modules" << std::endl;
#endif
int index = 0;
for(it = mods.begin(); it != mods.end(); ++it)
{
SearchModule *mod = (it -> second);
traffInSum += mod -> pqi -> getTraffic(true);
traffOutSum += mod -> pqi -> getTraffic(false);
float crate_in = mod -> pqi -> getRate(true);
float crate_out = mod -> pqi -> getRate(false);
used_bw_in += crate_in;
used_bw_out += crate_out;
/* fill the table of used bandwidths */
used_bw_in_table[index] = crate_in;
used_bw_out_table[index] = crate_out;
++index;
}
#ifdef UPDATE_RATES_DEBUG
RsDbg() << "UPDATE_RATES pqihandler::UpdateRates Sorting used_bw_out_table: " << num_sm << " entries" << std::endl;
#endif
/* Sort the used bw in/out table in ascending order */
std::sort(used_bw_in_table, used_bw_in_table + num_sm);
std::sort(used_bw_out_table, used_bw_out_table + num_sm);
#ifdef UPDATE_RATES_DEBUG
RsDbg() << "UPDATE_RATES pqihandler::UpdateRates used_bw_out " << used_bw_out << std::endl;
#endif
/* Calculate the optimal out_max value, taking into account avail_out and the out bw requested by modules */
float out_remaining_bw = avail_out;
float out_max_bw = 0;
bool keep_going = true;
int mod_index = 0;
while (keep_going && (mod_index < num_sm)) {
float result = (num_sm - mod_index) * (used_bw_out_table[mod_index] - out_max_bw);
if (result > out_remaining_bw) {
/* There is not enough remaining out bw to satisfy all modules,
distribute the remaining out bw among modules, then exit */
out_max_bw += out_remaining_bw / (num_sm - mod_index);
out_remaining_bw = 0;
keep_going = false;
} else {
/* Grant the requested out bandwidth to all modules,
then recalculate the remaining out bandwidth */
out_remaining_bw -= result;
out_max_bw = used_bw_out_table[mod_index];
++mod_index;
}
}
#ifdef UPDATE_RATES_DEBUG
RsDbg() << "UPDATE_RATES pqihandler::UpdateRates mod_index " << mod_index << " out_max_bw " << out_max_bw << " remaining out bw " << out_remaining_bw << std::endl;
#endif
/* Allocate only 50 pct the remaining out bw, if any, to make the transition more smooth */
out_max_bw = out_max_bw + 0.5 * out_remaining_bw;
/* Calculate the optimal in_max value, taking into account avail_in and the in bw requested by modules */
float in_remaining_bw = avail_in;
float in_max_bw = 0;
keep_going = true;
mod_index = 0;
while (keep_going && mod_index < num_sm) {
float result = (num_sm - mod_index) * (used_bw_in_table[mod_index] - in_max_bw);
if (result > in_remaining_bw) {
/* There is not enough remaining in bw to satisfy all modules,
distribute the remaining in bw among modules, then exit */
in_max_bw += in_remaining_bw / (num_sm - mod_index);
in_remaining_bw = 0;
keep_going = false;
} else {
/* Grant the requested in bandwidth to all modules,
then recalculate the remaining in bandwidth */
in_remaining_bw -= result;
in_max_bw = used_bw_in_table[mod_index];
++mod_index;
}
}
#ifdef UPDATE_RATES_DEBUG
RsDbg() << "UPDATE_RATES pqihandler::UpdateRates mod_index " << mod_index << " in_max_bw " << in_max_bw << " remaining in bw " << in_remaining_bw << std::endl;
#endif
// allocate only 75 pct of the remaining in bw, to make the transition more smooth
in_max_bw = in_max_bw + 0.75 * in_remaining_bw;
// store current total in and out used bw
locked_StoreCurrentRates(used_bw_in, used_bw_out);
#ifdef UPDATE_RATES_DEBUG
RsDbg() << "UPDATE_RATES pqihandler::UpdateRates setting new out_max " << out_max_bw << " in_max " << in_max_bw << std::endl;
#endif
// retrieve the bandwidth limits provided by peers via BwCtrl
std::map rateMap;
rsConfig->getAllBandwidthRates(rateMap);
std::map::iterator rateMap_it;
#ifdef UPDATE_RATES_DEBUG
// dump RsConfigurationDataRates
RsDbg() << "UPDATE_RATES pqihandler::UpdateRates RsConfigDataRates dump" << std::endl;
for (rateMap_it = rateMap.begin(); rateMap_it != rateMap.end(); rateMap_it++)
RsDbg () << "UPDATE_RATES pqihandler::UpdateRates PeerId " << rateMap_it->first.toStdString() << " mAllowedOut " << rateMap_it->second.mAllowedOut << std::endl;
#endif
// update max rates
for(it = mods.begin(); it != mods.end(); ++it)
{
SearchModule *mod = (it -> second);
// for our down bandwidth we use the calculated value without taking into account the max up provided by peers via BwCtrl
// this is harmless as they will control their up bw on their side
mod -> pqi -> setMaxRate(true, in_max_bw);
// for our up bandwidth we take into account the max down provided by peers via BwCtrl
// because we don't want to clog our outqueues, the TCP buffers, and the peers inbound queues
mod -> pqi -> setMaxRate(false, out_max_bw);
if ((rateMap_it = rateMap.find(mod->pqi->PeerId())) != rateMap.end())
if (rateMap_it->second.mAllowedOut > 0)
if (out_max_bw > rateMap_it->second.mAllowedOut)
mod -> pqi -> setMaxRate(false, rateMap_it->second.mAllowedOut);
}
#ifdef UPDATE_RATES_DEBUG
// dump maxRates
for(it = mods.begin(); it != mods.end(); ++it)
{
SearchModule *mod = (it -> second);
RsDbg() << "UPDATE_RATES pqihandler::UpdateRates PeerID " << (mod ->pqi -> PeerId()).toStdString() << " new bandwidth limits up " << mod -> pqi -> getMaxRate(false) << " down " << mod -> pqi -> getMaxRate(true) << std::endl;
}
#endif
return 1;
}
void pqihandler::getCurrentRates(float &in, float &out)
{
RS_STACK_MUTEX(coreMtx); /**************** LOCKED MUTEX ****************/
in = rateTotal_in;
out = rateTotal_out;
}
void pqihandler::locked_StoreCurrentRates(float in, float out)
{
rateTotal_in = in;
rateTotal_out = out;
}
void pqihandler::GetTraffic(uint64_t &in, uint64_t &out)
{
in = traffInSum;
out = traffOutSum;
}