Skip to content

Commit

Permalink
Reorganize R2 thread bucket manager v0.5.1, add templated class for p…
Browse files Browse the repository at this point in the history
…ossibility continue parallilize tasks. testing (no any changes must be currently). need check compile under win. Ready for testing.

related to #1402 and some more old.
  • Loading branch information
rsa committed Sep 14, 2013
1 parent 8cc30f6 commit 7c1ee6c
Show file tree
Hide file tree
Showing 8 changed files with 275 additions and 396 deletions.
4 changes: 2 additions & 2 deletions src/game/MapManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,13 +211,13 @@ void MapManager::Update(uint32 diff)
}

if (m_updater.activated())
m_updater.wait();
m_updater.queue_wait();

UpdateLoadBalancer(false);

if (m_updater.IsBroken() || m_threadsCountPreferred != m_threadsCount)
{
m_updater.ReActivate(m_threadsCountPreferred);
m_updater.reactivate(m_threadsCountPreferred);
sLog.outDetail("MapManager::Update map virtual server threads pool reactivated, new threads count is %u", m_threadsCountPreferred);
m_threadsCount = m_threadsCountPreferred;
}
Expand Down
174 changes: 20 additions & 154 deletions src/game/MapUpdater.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/*
* Copyright (C) 2011-2013 /dev/rsa for MangosR2 <http://github.com/MangosR2>
* Copyright (C) 2005-2010 MaNGOS <http://getmangos.com/>
*
* This program is free software; you can redistribute it and/or modify
Expand All @@ -17,186 +18,51 @@
*/

#include "MapUpdater.h"
#include "DelayExecutor.h"
#include "ObjectUpdateTaskBase.h"
#include "Map.h"
#include "MapManager.h"
#include "World.h"
#include "Database/DatabaseEnv.h"
#include <ace/Guard_T.h>
#include <ace/Method_Request.h>

class MapUpdateRequest : public ACE_Method_Request
Map* MapUpdater::GetMapByThreadId(ACE_thread_t threadId)
{
private:

Map& m_map;
MapUpdater& m_updater;
ACE_UINT32 m_diff;

public:

MapUpdateRequest(Map& m, MapUpdater& u, ACE_UINT32 d)
: m_map(m), m_updater(u), m_diff(d)
{
}

virtual int call()
{
ACE_thread_t const threadId = ACE_OS::thr_self();
m_updater.register_thread(threadId, m_map.GetId(),m_map.GetInstanceId());
if (m_map.IsBroken())
{
m_map.ForcedUnload();
}
else
{
m_map.Update(m_diff);
}
m_updater.unregister_thread(threadId);
m_updater.update_finished ();
return 0;
}
};

MapUpdater::MapUpdater()
: m_mutex(), m_condition(m_mutex), m_executor(), pending_requests(0), m_broken(false)
{
}

MapUpdater::~MapUpdater()
{
deactivate();
}

int MapUpdater::activate(size_t num_threads)
{
return m_executor.activate((int)num_threads);
}

int MapUpdater::deactivate()
{
wait();

return m_executor.deactivate();
}

void MapUpdater::ReActivate(uint32 threads)
{
deactivate();
activate(threads);
SetBroken(false);
}

int MapUpdater::wait()
{
ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, m_mutex, -1);

while (pending_requests > 0)
m_condition.wait();

return 0;
}

int MapUpdater::schedule_update(Map& map, ACE_UINT32 diff)
{
ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, m_mutex, -1);

++pending_requests;

if (m_executor.execute(new MapUpdateRequest(map, *this, diff)) == -1)
{
ACE_DEBUG((LM_ERROR, ACE_TEXT("(%t) \n"), ACE_TEXT("Failed to schedule Map Update")));

--pending_requests;
return -1;
}

return 0;
}

bool MapUpdater::activated()
{
return m_executor.activated();
}

void MapUpdater::update_finished()
{
ACE_GUARD(ACE_Thread_Mutex, guard, m_mutex);

if (pending_requests == 0)
{
ACE_ERROR((LM_ERROR, ACE_TEXT("(%t)\n"), ACE_TEXT("MapUpdater::update_finished BUG, report to devs")));
return;
}

--pending_requests;

m_condition.broadcast();
}

void MapUpdater::register_thread(ACE_thread_t const threadId, uint32 mapId, uint32 instanceId)
{
ACE_GUARD(ACE_Thread_Mutex, guard, m_mutex);
MapID pair = MapID(mapId, instanceId);
m_threads.insert(std::make_pair(threadId, pair));
m_starttime.insert(std::make_pair(threadId, WorldTimer::getMSTime()));
}

void MapUpdater::unregister_thread(ACE_thread_t const threadId)
{
ACE_GUARD(ACE_Thread_Mutex, guard, m_mutex);
m_threads.erase(threadId);
m_starttime.erase(threadId);
}

MapID const* MapUpdater::GetMapPairByThreadId(ACE_thread_t const threadId)
{
if (!m_threads.empty())
{
ThreadMapMap::const_iterator itr = m_threads.find(threadId);
if (itr != m_threads.end())
return &itr->second;
}
return NULL;
return getObject(threadId);
}

void MapUpdater::FreezeDetect()
{
ACE_GUARD(ACE_Thread_Mutex, guard, m_mutex);
if (!m_starttime.empty())
// FIXME - Need rewrite on base timed mutexes
for (ThreadsMap::const_iterator itr = m_threadsMap.begin(); itr != m_threadsMap.end(); ++itr)
{
for (ThreadStartTimeMap::const_iterator itr = m_starttime.begin(); itr != m_starttime.end(); ++itr)
if (WorldTimer::getMSTime() - itr->second > sWorld.getConfig(CONFIG_UINT32_VMSS_FREEZEDETECTTIME))
if (WorldTimer::getMSTime() - itr->second.second > sWorld.getConfig(CONFIG_UINT32_VMSS_FREEZEDETECTTIME))
{
if (MapID const* mapPair = GetMapPairByThreadId(itr->first))
if (Map* map = itr->second.first)
{
bool b_needKill = false;
if (Map* map = sMapMgr.FindMap(mapPair->nMapId, mapPair->nInstanceId))
if (map->IsBroken())
{
if (map->IsBroken())
{
if (WorldTimer::getMSTime() - itr->second - sWorld.getConfig(CONFIG_UINT32_VMSS_FREEZEDETECTTIME) > sWorld.getConfig(CONFIG_UINT32_VMSS_FORCEUNLOADDELAY))
b_needKill = true;
}
else
if (WorldTimer::getMSTime() - itr->second.second - sWorld.getConfig(CONFIG_UINT32_VMSS_FREEZEDETECTTIME) > sWorld.getConfig(CONFIG_UINT32_VMSS_FORCEUNLOADDELAY))
b_needKill = true;
}
else
b_needKill = true;

if (b_needKill)
{
DEBUG_LOG("VMSS::MapUpdater::FreezeDetect thread "I64FMT" possible freezed (is update map %u instance %u). Killing.",itr->first,mapPair->nMapId, mapPair->nInstanceId);
ACE_OS::thr_kill(itr->first, SIGABRT);
DEBUG_LOG("VMSS::MapUpdater::FreezeDetect thread "I64FMT" possible freezed (is update map %u instance %u). Killing.",itr->first,map->GetId(), map->GetInstanceId());
//thr_mgr()->kill(itr->first, SIGABRT);
thr_mgr()->cancel(itr->first, 0);
}
}
}
}
}

void MapUpdater::MapBrokenEvent(MapID const* mapPair)
void MapUpdater::MapBrokenEvent(Map* map)
{
if (!m_brokendata.empty())
{
MapBrokenDataMap::iterator itr = m_brokendata.find(*mapPair);
MapBrokenDataMap::iterator itr = m_brokendata.find(map);
if (itr != m_brokendata.end())
{
if ((time(NULL) - itr->second.lastErrorTime) > sWorld.getConfig(CONFIG_UINT32_VMSS_TBREMTIME))
Expand All @@ -206,14 +72,14 @@ void MapUpdater::MapBrokenEvent(MapID const* mapPair)
return;
}
}
m_brokendata.insert(std::make_pair(*mapPair, MapBrokenData()));
m_brokendata.insert(std::make_pair(map, MapBrokenData()));
}

MapBrokenData const* MapUpdater::GetMapBrokenData(MapID const* mapPair)
MapBrokenData const* MapUpdater::GetMapBrokenData(Map* map)
{
if (!m_brokendata.empty())
{
MapBrokenDataMap::const_iterator itr = m_brokendata.find(*mapPair);
MapBrokenDataMap::const_iterator itr = m_brokendata.find(map);
if (itr != m_brokendata.end())
return &itr->second;
}
Expand Down
51 changes: 11 additions & 40 deletions src/game/MapUpdater.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/*
* Copyright (C) 2011-2013 /dev/rsa for MangosR2 <http://github.com/MangosR2>
* Copyright (C) 2005-2010 MaNGOS <http://getmangos.com/>
*
* This program is free software; you can redistribute it and/or modify
Expand All @@ -19,14 +20,10 @@
#ifndef _MAP_UPDATER_H_INCLUDED
#define _MAP_UPDATER_H_INCLUDED

#include <ace/Thread_Mutex.h>
#include <ace/Condition_Thread_Mutex.h>

#include "DelayExecutor.h"
#include "ObjectUpdateTaskBase.h"
#include "Common.h"

class Map;
struct MapID;

struct MapBrokenData
{
Expand All @@ -46,55 +43,29 @@ struct MapBrokenData
time_t lastErrorTime;
};

typedef std::map<ACE_thread_t const, MapID> ThreadMapMap;
typedef std::map<ACE_thread_t const, uint32/*MSTime*/> ThreadStartTimeMap;
typedef std::map<MapID,MapBrokenData> MapBrokenDataMap;
typedef std::map<Map*,MapBrokenData> MapBrokenDataMap;

class MapUpdater
class MapUpdater : public ObjectUpdateTaskBase<class Map>
{
public:

MapUpdater();
virtual ~MapUpdater();

friend class MapUpdateRequest;

int schedule_update(Map& map, ACE_UINT32 diff);

int wait();

int activate(size_t num_threads);
MapUpdater() : ObjectUpdateTaskBase(), m_broken(false)
{}

int deactivate();
virtual ~MapUpdater() {};

bool activated();

void update_finished();

void register_thread(ACE_thread_t const threadId, uint32 mapId, uint32 instanceId);
void unregister_thread(ACE_thread_t const threadId);

MapID const* GetMapPairByThreadId(ACE_thread_t const threadId);
Map* GetMapByThreadId(ACE_thread_t const threadId);
void FreezeDetect();

void SetBroken( bool value = false) { m_broken = value; };
bool IsBroken() { return m_broken; };
void ReActivate( uint32 threads);

void MapBrokenEvent(MapID const* mapPair);
MapBrokenData const* GetMapBrokenData(MapID const* mapPair);
void MapBrokenEvent(Map* map);
MapBrokenData const* GetMapBrokenData(Map* map);

private:

ACE_Thread_Mutex m_mutex;
ACE_Condition_Thread_Mutex m_condition;
DelayExecutor m_executor;
size_t pending_requests;

ThreadMapMap m_threads;
ThreadStartTimeMap m_starttime;
MapBrokenDataMap m_brokendata;
bool m_broken;
bool m_broken;
};

#endif //_MAP_UPDATER_H_INCLUDED
26 changes: 12 additions & 14 deletions src/mangosd/Master.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -609,43 +609,41 @@ void Master::_OnSignal(int s)
sLog.outError("VMSS:: Signal %.2u received from thread "I64FMT".\r\n",s,threadId);
ACE_Stack_Trace _StackTrace;
std::string StackTrace = _StackTrace.c_str();
if (MapID const* mapPair = sMapMgr.GetMapUpdater()->GetMapPairByThreadId(threadId))
if (Map* map = sMapMgr.GetMapUpdater()->GetMapByThreadId(threadId))
{
MapBrokenData const* pMBData = sMapMgr.GetMapUpdater()->GetMapBrokenData(mapPair);
MapBrokenData const* pMBData = sMapMgr.GetMapUpdater()->GetMapBrokenData(map);
uint32 counter = pMBData ? pMBData->count : 0;

sLog.outError("VMSS:: crushed thread is update map %u, instance %u, counter %u",mapPair->nMapId, mapPair->nInstanceId, counter);
sLog.outError("VMSS:: BackTrace for map %u: ",mapPair->nMapId);
sLog.outError("VMSS:: crushed thread is update map %u, instance %u, counter %u",map->GetId(), map->GetInstanceId(), counter);
sLog.outError("VMSS:: BackTrace for map %u: ",map->GetId());

size_t found = 0;
while (found < StackTrace.size())
{
size_t next = StackTrace.find_first_of("\n",found);
std::string to_log = StackTrace.substr(found, (next - found));
if (to_log.size() > 1)
sLog.outError("VMSS:%u: %s",mapPair->nMapId,to_log.c_str());
sLog.outError("VMSS:%u: %s",map->GetId(), to_log.c_str());
found = next+1;
}
sLog.outError("VMSS:: /BackTrace for map %u: ",mapPair->nMapId);
sLog.outError("VMSS:: /BackTrace for map %u: ",map->GetId());

if (Map* map = sMapMgr.FindMap(mapPair->nMapId, mapPair->nInstanceId))
if (!sWorld.getConfig(CONFIG_BOOL_VMSS_TRYSKIPFIRST) || counter > 0)
map->SetBroken(true);
if (!sWorld.getConfig(CONFIG_BOOL_VMSS_TRYSKIPFIRST) || counter > 0)
map->SetBroken(true);

sMapMgr.GetMapUpdater()->MapBrokenEvent(mapPair);
sMapMgr.GetMapUpdater()->MapBrokenEvent(map);

if (counter > sWorld.getConfig(CONFIG_UINT32_VMSS_MAXTHREADBREAKS))
{
sLog.outError("VMSS:: Limit of map restarting (map %u instance %u) exceeded. Stopping world!",mapPair->nMapId, mapPair->nInstanceId);
sLog.outError("VMSS:: Limit of map restarting (map %u instance %u) exceeded. Stopping world!",map->GetId(), map->GetInstanceId());
signal(s, SIG_DFL);
ACE_OS::kill(getpid(), s);
}
else
{
sLog.outError("VMSS:: Restarting virtual map server (map %u instance %u). Count of restarts: %u",mapPair->nMapId, mapPair->nInstanceId, sMapMgr.GetMapUpdater()->GetMapBrokenData(mapPair)->count);
sMapMgr.GetMapUpdater()->unregister_thread(threadId);
sMapMgr.GetMapUpdater()->update_finished();
sLog.outError("VMSS:: Restarting virtual map server (map %u instance %u). Count of restarts: %u",map->GetId(), map->GetInstanceId(), sMapMgr.GetMapUpdater()->GetMapBrokenData(map)->count);
sMapMgr.GetMapUpdater()->SetBroken(true);
sMapMgr.GetMapUpdater()->decreasePendingRequestsCount();
ACE_OS::thr_exit();
}
}
Expand Down
Loading

0 comments on commit 7c1ee6c

Please sign in to comment.