Initiative for enhancing Transfer Accelerator Plugin

Hi @alainmazy ,

I am using Transfer Accelerator Plugin to transfer dicom from one Orthanc A (acts as Gateway) to another Orthanc B (acts as LongTerm Storage). Whenever stable series event is triggered, Orthanc A sends the dicom series to Orthanc B. Since Modality sends a lot of dicoms to Orthanc A, it cannot properly transfer them to Orthanc B. As a result, Orthanc A quickly became stuck or hung due to many PushTransfer jobs pending/ running in Orthanc A. After analyzing the source, I notice that after Orthanc A sends buckets to Orthanc B, it sends the final “/commit” messages to Orthanc B and waiting for Orthanc B indexing those buckets in its storage. If Orthanc B is busy due to many incoming HTTP requests then the final “/commit” message will become timeout. Hence the transfer is failed. So I recommend that we add a new option that makes the “/commit” route asynchronous in addition to synchronous. In more detail, when receiver process the commit message, it will create a job to process the uploaded buckets and returns immediately to the sender. By that way, sender does not need to wait receiver completely processing its dicom files. I have successfully implemented that initiative. Here is snippet of my code.

ActivePushTransactions.cpp

void ActivePushTransactions::FinalizeTransaction(const std::string& transactionUuid,
                                                   bool commit)
  {
    boost::mutex::scoped_lock  lock(mutex_);

    Content::iterator found = content_.find(transactionUuid);
    if (found == content_.end())
    {
        throw Orthanc::OrthancException(Orthanc::ErrorCode_UnknownResource);
    }

    assert(found->second != NULL);
    if (commit)
    {
      if (!PluginContext::GetInstance().IsCommitAsync())
      {
        found->second->GetDownloadArea()->Commit();
      }
      else // Submit Job
      {
        std::string id = OrthancPlugins::OrthancJob::Submit(new OrthancPlugins::UploadDicomJob("UploadDicomJob", found->second->GetDownloadArea().release()), 1);
        LOG(INFO) << "ActivePushTransactions::FinalizeTransaction submit jobID=" << id;
      }
    }

    delete found->second;
    content_.erase(found);
    index_.Invalidate(transactionUuid);
    Saola::TransactionDatabase::Instance().DeleteById(transactionUuid);
  }

UploadDicomJob.cpp

#include "UploadDicomJob.h"
#include <Compatibility.h>  // For std::unique_ptr
#include <SystemToolbox.h>
#include <Logging.h>
#include <boost/filesystem.hpp>

namespace OrthancPlugins
{
  UploadDicomJob::UploadDicomJob(const std::string& jobType, DownloadArea* area) :
    OrthancJob(jobType)
  {
    area_.reset(area);
    totalInstances_ = area->GetInstancesCount();
    totalSize_ = area->GetTotalSize();

    Json::Value content;
    content["TotalInstances"] = totalInstances_;
    content["TotalSizeMB"] = ConvertToMegabytes(totalSize_);
    this->UpdateContent(content);
  }

  OrthancPluginJobStepStatus UploadDicomJob::Step()
  {
    size_t remaining = this->area_->StepCommit();
    Json::Value content;
    content["TotalInstances"] = totalInstances_;
    content["TotalSizeMB"] = ConvertToMegabytes(totalSize_);
    if (remaining > 0)
    {
      content["UploadedSizeMB"] = ConvertToMegabytes(totalSize_ - this->area_->GetSize());
      content["UploadedInstances"] = totalInstances_ - remaining;
      this->UpdateProgress(1 - (static_cast<float>(remaining) / static_cast<float>(this->totalInstances_)));
      this->UpdateContent(content);
      return OrthancPluginJobStepStatus_Continue;
    }
    content["UploadedSizeMB"] = ConvertToMegabytes(totalSize_);
    content["UploadedInstances"] = totalInstances_;
    this->UpdateContent(content);
    this->UpdateProgress(1.0f);

    return OrthancPluginJobStepStatus_Success;
  }

  
  void UploadDicomJob::Stop(OrthancPluginJobStopReason reason)
  {
    LOG(INFO) << "[UploadDicomJob::Stop] reason=" << reason;
  }

  
  void UploadDicomJob::Reset()
  {
    LOG(INFO) << "[UploadDicomJob::Reset]";
  }
}

Hi @Christophe

Interesting ! However, you may loose the “atomicity” of the transfer: the sender might have its job succeed while the commit may actually fail on the receiver side.

I’m wondering if another option is not to simply have a larger timeout for the commit phase no ?

BTW, I’ll have a deeper look at it … could you share the name of your configuration to enable this mode ? (PluginContext code + the configuration reading code)

Thanks,

Alain.

Hi @alainmazy ,
Thanks a lot for spending time to read this thread. The idea came from what I observed during operating Orthanc in one of our hospitals. A lot of transfers have been failed due to heavy dicom transmission.
Setting larger timeout for the commit phase only solves the issue that transfer is not getting failed. However another issue arises that concurrent OrthancJob in Sender is limit, and there are many pending jobs (waiting until the running jobs finish). And yes , we may loose the “atomicity” of the transfer, but we need to trade off. Atomicity versus Speed.

The configuration is very simple. My configuration name is “CommitAsync”

  "Transfers": {
        "CommitAsync": true,
        "Threads": 6, // Number of worker threads for one transfer
        "BucketSize": 4096, // Optimal size for a bucket (in KB)
        "CacheSize": 512, // Size of the memory cache to process DICOM files (in MB)
        "MaxPushTransactions": 20, // Maximum number of simultaneous receptions in push mode
        "MaxHttpRetries": 2, // Maximum number of HTTP retries for one bucket
        "PeerConnectivityTimeout": 120 // HTTP Timeout (in seconds) used when checking if a remote peer has the transfer plugin enabled in /transfers/peers GET route
    },

PluginContext.h

/**
 * Transfers accelerator plugin for Orthanc
 * Copyright (C) 2018-2023 Osimis S.A., Belgium
 * Copyright (C) 2024-2025 Orthanc Team SRL, Belgium
 * Copyright (C) 2021-2025 Sebastien Jodogne, ICTEAM UCLouvain, Belgium
 *
 * This program is free software: you can redistribute it and/or
 * modify it under the terms of the GNU Affero General Public License
 * as published by the Free Software Foundation, either version 3 of
 * the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Affero General Public License for more details.
 * 
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 **/


#pragma once

#include "../Framework/OrthancInstancesCache.h"
#include "../Framework/PushMode/ActivePushTransactions.h"

#include <Compatibility.h>  // For std::unique_ptr
#include <MultiThreading/Semaphore.h>

#include <map>

namespace OrthancPlugins
{
  class PluginContext : public boost::noncopyable
  {
  private:
    // Runtime structures
    OrthancInstancesCache    cache_;
    ActivePushTransactions   pushTransactions_;
    Orthanc::Semaphore       semaphore_;
    std::string              pluginUuid_;

    // Configuration
    size_t                   threadsCount_;
    size_t                   targetBucketSize_;
    unsigned int             maxHttpRetries_;
    unsigned int             peerConnectivityTimeout_;
    bool                     commitAsync_;
  
    PluginContext(size_t threadsCount,
                  size_t targetBucketSize,
                  size_t maxPushTransactions,
                  size_t memoryCacheSize,
                  unsigned int maxHttpRetries,
                  unsigned int peerConnectivityTimeout,
                  bool commitAsync);

    static std::unique_ptr<PluginContext>& GetSingleton();
  
  public:
    OrthancInstancesCache& GetCache()
    {
      return cache_;
    }

    ActivePushTransactions& GetActivePushTransactions()
    {
      return pushTransactions_;
    }

    Orthanc::Semaphore& GetSemaphore()
    {
      return semaphore_;
    }

    const std::string& GetPluginUuid() const
    {
      return pluginUuid_;
    }

    size_t GetThreadsCount() const
    {
      return threadsCount_;
    }

    size_t GetTargetBucketSize() const
    {
      return targetBucketSize_;
    }

    unsigned int GetMaxHttpRetries() const
    {
      return maxHttpRetries_;
    }

    unsigned int GetPeerConnectivityTimeout() const
    {
      return peerConnectivityTimeout_;
    }

    bool IsCommitAsync() const
    {
      return commitAsync_;
    }

    static void Initialize(size_t threadsCount,
                           size_t targetBucketSize,
                           size_t maxPushTransactions,
                           size_t memoryCacheSize,
                           unsigned int maxHttpRetries,
                           unsigned int peerConnectivityTimeout,
                           bool commitAsync);
  
    static PluginContext& GetInstance();

    static void Finalize();
  };
}

Plugin.cpp


extern "C"
{
  ORTHANC_PLUGINS_API int32_t OrthancPluginInitialize(OrthancPluginContext* context)
  {
#if ORTHANC_FRAMEWORK_VERSION_IS_ABOVE(1, 7, 2)
    Orthanc::Logging::InitializePluginContext(context);
#else
    Orthanc::Logging::Initialize(context);
#endif
    
    assert(DisplayPerformanceWarning());

    OrthancPlugins::SetGlobalContext(context);
    
    /* Check the version of the Orthanc core */
    if (OrthancPluginCheckVersion(context) == 0)
    {
      LOG(ERROR) << "Your version of Orthanc (" 
                 << context->orthancVersion << ") must be above "
                 << ORTHANC_PLUGINS_MINIMAL_MAJOR_NUMBER << "."
                 << ORTHANC_PLUGINS_MINIMAL_MINOR_NUMBER << "."
                 << ORTHANC_PLUGINS_MINIMAL_REVISION_NUMBER
                 << " to run this plugin";
      return -1;
    }

    OrthancPlugins::SetDescription(PLUGIN_NAME, "Accelerates transfers and provides "
                                   "storage commitment between Orthanc peers");

    try
    {
      size_t threadsCount = 4;
      size_t targetBucketSize = 4096;  // In KB
      size_t maxPushTransactions = 4;
      size_t memoryCacheSize = 512;    // In MB
      unsigned int maxHttpRetries = 0;
      unsigned int peerConnectivityTimeout = 2;

      bool commitAsync = true;
    
      {
        OrthancPlugins::OrthancConfiguration config;

        if (config.IsSection(KEY_PLUGIN_CONFIGURATION))
        {
          OrthancPlugins::OrthancConfiguration plugin;
          config.GetSection(plugin, KEY_PLUGIN_CONFIGURATION);

          threadsCount = plugin.GetUnsignedIntegerValue("Threads", threadsCount);
          targetBucketSize = plugin.GetUnsignedIntegerValue("BucketSize", targetBucketSize);
          memoryCacheSize = plugin.GetUnsignedIntegerValue("CacheSize", memoryCacheSize);
          maxPushTransactions = plugin.GetUnsignedIntegerValue("MaxPushTransactions", maxPushTransactions);
          maxHttpRetries = plugin.GetUnsignedIntegerValue("MaxHttpRetries", maxHttpRetries);
          peerConnectivityTimeout = plugin.GetUnsignedIntegerValue("PeerConnectivityTimeout", peerConnectivityTimeout);
          commitAsync = plugin.GetBooleanValue("CommitAsync", true);
        }
      }

      OrthancPlugins::PluginContext::Initialize(threadsCount, targetBucketSize * KB, maxPushTransactions,   memoryCacheSize * MB, maxHttpRetries, peerConnectivityTimeout,  commitAsync);

And the last point I want to mention here that for Transfer Plugin running in Distributed Environent (Docker Swarm/ Kubernetes), the idea is to get Orthanc Plugin communicate with third party distributed cached database (like Redis, or MemCached). I myself use RQLite. I am not sure if this point can be included into this topic, so please ignore if that is not relevant to the title of the topic.

Hi @Christophe

I thought about your patch proposal again and, although I understand the benefit in your situation, I consider that it should not make it into the plugin repository.

In all our other APIs to transfer a study (DICOM, DicomWeb, peering), when the job succeeds, it means that the study has been stored on the remote destination which would not happen with this asynchronous commit.

In a worst case scenario when duplicating the content of 2 very large Orthanc instances with a slow storage on destination, you could have to wait hours/days before all instances are actually committed with many risks of disk starvation either for the main Orthanc Storage or for the temporary storage used by the transfer plugin.

I understand that you have your own fork of the plugin anyway so I hope this is not a big issue for you.

Thanks anyway for your contribution and do not hesitate to discuss if you feel I’m wrong.

Note that I have also implemented a PeerCommitTimeout configuration defaulting to 600 seconds to reduce the timeouts during the commit phase.

1 Like

Hi @alainmazy ,

I would like to revisit this discussion and propose another initiative.

In my current setup, I use gRPC as the communication protocol between Orthanc peers for transferring DICOM files. One advantage of gRPC is that it can multiplex multiple requests over a single TCP connection, whereas REST APIs typically rely on multiple connections.

In my Orthanc infrastructure, I have observed that a large number of request/response exchanges between peers can cause Orthanc to become unresponsive during DICOM transfers. I suspect this may be related to limitations in the underlying HTTP stack (libc/mongoose-based implementation) used by Orthanc.

This article provides a comparison of gRPC and REST APIs and explains some of the potential advantages of gRPC:
https://blog.logrocket.com/grpc-vs-rest/

I created a quick proof of concept to demonstrate that gRPC can work together with the existing Transfer Accelerator plugin. The patch is attached. And the proto file is included in the project as following project structure

âžś  orthanc-transfers ls
AUTHORS  build  CITATION.cff  CMakeLists.txt  CMakeScripts  COPYING  Framework  NEWS  plans  Plugin  Proto  README  Resources  ThirdPartyDownloads  TODO  UnitTests
âžś 

For testing, I started two Orthanc instances:

  • orthanc1

  • orthanc2

orthanc1.json
{
“RemoteAccessAllowed”: true,
“AuthenticationEnabled”: false,
“Plugins”: [
“./libOrthancExplorer2.so”,
“/home/phtran/workingspace/orthanc-transfers/build/libOrthancTransfers.so”
],
“OrthancExplorer2”: {
“Enable”: true,
“IsDefaultOrthancUI”: true
},
“Transfers”: {
“UseGrpcPush”: true,
“GrpcPort”: 9042
},
“OrthancPeers”: {
“orthanc2”: {
“Url”: “http://127.0.0.1:8043/”,
“GrpcPort”: 9043
}
}
}

orthanc2.json
{
  "DicomPort": 4243,
  "HttpPort": 8043,
  "RemoteAccessAllowed": true,
  "AuthenticationEnabled": false,
  "Plugins": [
    "/home/phtran/workingspace/orthanc-transfers/build/libOrthancTransfers.so"
  ],
  "Transfers": {
    "UseGrpcPush": true,
    "GrpcPort": 9043
  },
  "OrthancPeers": {
    "orthanc1": [
      "http://127.0.0.1:8042/"
    ]
  }
}

I started both Orthanc instances using:

./Orthanc ./orthanc.json --verbose

The transfer performance was quite good and was comparable to the current REST API implementation.

Please note that this patch is only a proof of concept. The current implementation only covers the PUSH mode, not the PULL mode. There are still many areas where the performance and design can be improved, but this experiment shows that gRPC could be another possible direction for improving the Transfer Accelerator plugin.

I would appreciate any feedback or review from you.

grpc-modified.patch.txt (26.8 KB)

transfers.proto.txt (1.2 KB)

Hi @Christophe

I had a look at the patch (I have not tried to integrate or run the code). It seems transfers.proto is missing from the patch.

I’m not opposed to integrating gRPC provided it shows a significant performance boost in the PUSH mode that is the mode that is used most of the time (so no need to spend time on the PULL mode right now).

My knowledge of gRPC and HTTP2 is very weak but, in my perspective, by creating multiple HTTP1.1 connections, we are already using the available bandwidth as much as we can and I’m not sure a new protocol would magically improve the throughput by a factor of 2 but I might be wrong so, if you make new findings, do not hesitate to share an updated patch.

Best regards,

Alain.

Hi @alainmazy ,

Thank you very much for taking the time to review my proposal.

I have attached the transfers.proto file again from my previous post (or you can find it in the attached file here).

I would like to clarify that my initiative is not primarily about improving the DICOM transfer speed. The main goal is to improve the limitation caused by the number of HTTP requests in the current REST API-based implementation.

As mentioned in my previous post, when there are too many transfer requests sent to the Orthanc server, all available HTTP threads can become occupied. As a result, there are no remaining HTTP threads available to handle other requests. During the time Orthanc is processing transfer operations, we may experience that other HTTP API calls become blocked or unavailable.

This is the main point where I believe gRPC could improve the current Orthanc Transfer Plugin design. By multiplexing multiple requests over a single connection, gRPC can reduce the pressure caused by creating and handling too many HTTP requests.

I hope this clarifies the motivation behind my proposal.

Thank you again for your feedback.

transfers.proto.txt (1.2 KB)