Overslaan naar inhoud
CTech Digital
  • Startpagina
  • Odoo services
  • AI services
  • Contact
  • 0
  • Nederlands (BE) English (UK) Français
CTech Digital
  • 0
    • Startpagina
    • Odoo services
    • AI services
    • Contact
  • Nederlands (BE) English (UK) Français

27/7/25

Working on a message based proxy design
  • Alle blogs
  • Daily blog
  • 27/7/25
  • 27 mei 2025 in
    CTech Metrology, Luc Wens

    With the new message design working fine (see this article), we can move ahead and start refactoring commands/states/events in CTrack.

    Today also the WaitConnection was reworked using a more modern condition variable approach.

    We'll start off with our Proxy Driver to test the behavior:

    • Send an alarm from the proxy to the engine : receiving an alarm
    • Sending a request with a callback or wait for the result : config / hardware detect
    • Sending a sequence of requests : lasertracker example


    Responding to an alarm

    CProxyDevice has the next inheritance scheme

    In the above scheme we derived CProxyDevice from SubScribe, so we can subscribe our CProxyTestDevice for messages send by its proxy.

    Here is an example for registering an alarm:

    void CProxyTestDevice::Init()
    {
        SetICON("IDI_NODE_CAMERA");
        m_Name                 = "proxy_test";
        m_ProxyExe             = ".\\Proxy\\Template\\Template.exe";
        m_TCP_Port             = 0;
        m_bAllowMultpleProxies = false;
        Subscribe("warning", CTrack::MakeMemberHandler(this, &CProxyTestDevice::OnWarning));
    }

    Note the CTrack::MakeMemberHandler which allows to use 'this' inside the function that we provide, and in this case this represents the MessageResponder. 

    Subscribe is a dedicated subscription function for CProxyDevice so we can respond to messages send by the proxy program. The actual responder is inside the CCommunicationObject m_CommunicationObject as a shared_ptr.

    void CProxyDevice::Subscribe(const std::string &messageID, CTrack::Handler handler)
    {
        CTrack::Subscriber::Subscribe(*m_CommunicationObject.m_pMessageResponder, messageID, handler);
    }

    OnWarning is pretty straightforward

    CTrack::Reply CProxyTestDevice::OnWarning(const CTrack::Message &message)
    {
        std::string warningMessage = message.GetParams()["message"].get<std::string>();
        PrintWarning("Warning : {}", warningMessage);
        StateManager.SendWarning(warningMessage.c_str());
        return nullptr;
    }

    The warning gets escalated to the StateManager inside the engine.

    Some improvements here:

    • A check on the existence of the parameter "message", currently an exception is thrown and it is not caught until the main loop of the engine
    • An identification from where this warning message is coming, after all, when everything is going through a proxy device, we need to know from how the message came.

    Improvement:

    CTrack::Reply CProxyTestDevice::OnWarning(const CTrack::Message &message)
    {
        std::string warningMessage("No warning message available");
        if (message.GetParams().contains("message"))
            warningMessage = message.GetParams()["message"].get<std::string>();
        PrintWarning("{} : Warning : {}", m_Name, warningMessage);
        StateManager.SendWarning(warningMessage.c_str());
        return nullptr;
    }

    Sending a request

    Small recap, sending a request implies you expect a result, this can be done in 2 ways:

    • Provide a callback handler
    • Get a future and wait for that

    I suppose the first will happens most, since we'll need to do something once we get a result.

    We also need to send the request, this is through a MessageResponder derivation.

    This has two key functions

    • SendRequest with the send fuction set by SetSendFunction
    • RespondToMessage : wich calls the responder upon receiving a message

    For the sake of testing this (and we do have to do this conversion anyway), we'll start with converting the next function of CProxyDevice:

    SendCommand(LPCTSTR iCommand, bool StopProxy = true);
    SendCommandWaitReturn(LPCTSTR iCommand, bool StopProxy = true, bool CheckHandshake = true);
    SendCommand(std::unique_ptr<TiXmlElement> &XMLElement, bool StopProxy = true);
    SendCommandWaitReturn(std::unique_ptr<TiXmlElement> , bool = true, bool = true);

    Looking at CProxyDevice and CProxyTestDevice, we need to convert all the routines that use SendCommand from XML form to JSON format:

    • CProxyDevice
      • HandShake
      • SendCommand
      • SendCommandWaitReturn
      • Shutdown
    • CProxyTestDevice
      • Need to set additional responders in Init
      • HardwareDetect
      • ConfigDetect
      • CheckInitialize
    • Template Proxy
      • Driver::HardwareDetect
      • Driver::ConfigDetect
      • Driver::CheckInitialize
      • Driver::ShutDown
      • Main 
        • Need to set responder
        • TCPServer.GetReceivePackage
        • Line 100 and further 
        • Command generation after keyboard click


    First in the row is the handshake.

    The request side is pretty straightforward: we set the ID and the challenge as parameter, and fire a request, wait for the future:

    CTrack::Message message;
    message.SetID(TAG_HANDSHAKE);
    message.SetParams(ATTRIB_CHALLENGE, challengeBase64);
    std::future<CTrack::Message> future      = CTrack::Request(message);

    Not sure if we should make the 5 second wait a fix define that we use everywhere. 5 seconds sounds reasonable.

    Here we get the info from the request

    if (future.wait_for(std::chrono::seconds(5)) == std::future_status::timeout)
    {
        PrintError("Timeout waiting for handshake response on port {}", m_TCP_Port);
        return false;
    }
    std::string     ChallengeReturnBase64;
    CTrack::Message response = future.get();
    if (response.GetID() != TAG_HANDSHAKE)
    {
        PrintError("Handshake failed on port {}: expected {}, got {}", m_TCP_Port, TAG_HANDSHAKE, response.GetID());
        return false;
    }
    if (!response.GetParam(ATTRIB_CHALLENGE, ChallengeReturnBase64))
    {
        PrintError("Handshake failed on port {}: missing attribute {}", m_TCP_Port, ATTRIB_CHALLENGE);
        return false;
    }

    Now we also need to implement this at the other side. This will happen by setting a responder to the message TAG_HANDSHAKE

    We have to set the responders where the messages enter, and to an object which is derived from CTrack::MessageResponder.

    Best place in in main.cpp, where there are already responders for connect/disconnect:

     std::unique_ptr<Driver> driver = std::make_unique<Driver>();
    CCommunicationObject    TCPServer;

    TCPServer.SetOnConnectFunction([](SOCKET, size_t numConnections) { PrintInfo("connected : {}", numConnections); });
    TCPServer.SetOnDisconnectFunction([](SOCKET, size_t numConnections) { PrintInfo("DISCONNNECTED : {}", numConnections); });

    TCPServer.Open(TCP_SERVER, PortNumber);
    PrintInfo("Server started on port {}", PortNumber);

    The actual responder is inside CCommunicationInterface::m_pMessageResponder. The reason is because we need the messageresponder to be of shared_ptr, because MessageResponder is derived from std::enable_shared_from_this, and this because our Subscription needs a weak pointer to the MessageResponder. 

    We could have derived CCommunicationInterface from MessageResponder, but this might have disrupted existing code too much, so hence the decision to go for composition/aggregation.


    Essence of subscribing, as this seems to confuse time and time over.

    CASE 1: subscribing a stand-alone function

    We have a MessageResponder MR and we want to execute a function F when the message with identity ID is received by MR.

    auto subscription = MR.Subscribe(ID,&F);

    If subscription gets destroyed, we automatically unsubscribe F. 
    If we have multiple subscriptions then we can store them in a array within the context of execution:

    std::vector<subscription> subscriptions;
    subscriptions.emplace_back(std::move(MR.Subscribe(ID,F)));

    Emplace and move are used to prevent creation of a new subscription, and consequently destroying the old which would lead to an unsubscription.Emplace and move are used to prevent creation of a new subscription, and consequently destroying the old which would lead to an unsubscription.

    CASE 2: subscribing a class member

    We have class C with member function F that needs to be called if message ID passes MR.

    class C : public CTrack::Subscribe
    {
    CTrack::Reply F(const CTrack::Message&) ;
    }

    By deriving from Subscribe we automatically unsubscribe when C gets destroyed.

    C c;
    MR.Subscribe(ID,CTrack::MakeMemberHandler(&c, &C::F));

    MakeMemberHandler combines a pointer to the instance with the function in a lambda.




    in Daily blog
    26/5/25
    Copyright © CTech
    Nederlands (BE) | English (UK) | Français
    Aangeboden door Odoo - De #1 Open source e-commerce