Initiative for enhancing Transfer Accelerator Plugin

Hi @alainmazy ,

I am using Transfer Accelerator Plugin to transfer dicom from one Orthanc A (acts as Gateway) to another Orthanc B (acts as LongTerm Storage). Whenever stable series event is triggered, Orthanc A sends the dicom series to Orthanc B. Since Modality sends a lot of dicoms to Orthanc A, it cannot properly transfer them to Orthanc B. As a result, Orthanc A quickly became stuck or hung due to many PushTransfer jobs pending/ running in Orthanc A. After analyzing the source, I notice that after Orthanc A sends buckets to Orthanc B, it sends the final “/commit” messages to Orthanc B and waiting for Orthanc B indexing those buckets in its storage. If Orthanc B is busy due to many incoming HTTP requests then the final “/commit” message will become timeout. Hence the transfer is failed. So I recommend that we add a new option that makes the “/commit” route asynchronous in addition to synchronous. In more detail, when receiver process the commit message, it will create a job to process the uploaded buckets and returns immediately to the sender. By that way, sender does not need to wait receiver completely processing its dicom files. I have successfully implemented that initiative. Here is snippet of my code.

ActivePushTransactions.cpp

void ActivePushTransactions::FinalizeTransaction(const std::string& transactionUuid,
                                                   bool commit)
  {
    boost::mutex::scoped_lock  lock(mutex_);

    Content::iterator found = content_.find(transactionUuid);
    if (found == content_.end())
    {
        throw Orthanc::OrthancException(Orthanc::ErrorCode_UnknownResource);
    }

    assert(found->second != NULL);
    if (commit)
    {
      if (!PluginContext::GetInstance().IsCommitAsync())
      {
        found->second->GetDownloadArea()->Commit();
      }
      else // Submit Job
      {
        std::string id = OrthancPlugins::OrthancJob::Submit(new OrthancPlugins::UploadDicomJob("UploadDicomJob", found->second->GetDownloadArea().release()), 1);
        LOG(INFO) << "ActivePushTransactions::FinalizeTransaction submit jobID=" << id;
      }
    }

    delete found->second;
    content_.erase(found);
    index_.Invalidate(transactionUuid);
    Saola::TransactionDatabase::Instance().DeleteById(transactionUuid);
  }

UploadDicomJob.cpp

#include "UploadDicomJob.h"
#include <Compatibility.h>  // For std::unique_ptr
#include <SystemToolbox.h>
#include <Logging.h>
#include <boost/filesystem.hpp>

namespace OrthancPlugins
{
  UploadDicomJob::UploadDicomJob(const std::string& jobType, DownloadArea* area) :
    OrthancJob(jobType)
  {
    area_.reset(area);
    totalInstances_ = area->GetInstancesCount();
    totalSize_ = area->GetTotalSize();

    Json::Value content;
    content["TotalInstances"] = totalInstances_;
    content["TotalSizeMB"] = ConvertToMegabytes(totalSize_);
    this->UpdateContent(content);
  }

  OrthancPluginJobStepStatus UploadDicomJob::Step()
  {
    size_t remaining = this->area_->StepCommit();
    Json::Value content;
    content["TotalInstances"] = totalInstances_;
    content["TotalSizeMB"] = ConvertToMegabytes(totalSize_);
    if (remaining > 0)
    {
      content["UploadedSizeMB"] = ConvertToMegabytes(totalSize_ - this->area_->GetSize());
      content["UploadedInstances"] = totalInstances_ - remaining;
      this->UpdateProgress(1 - (static_cast<float>(remaining) / static_cast<float>(this->totalInstances_)));
      this->UpdateContent(content);
      return OrthancPluginJobStepStatus_Continue;
    }
    content["UploadedSizeMB"] = ConvertToMegabytes(totalSize_);
    content["UploadedInstances"] = totalInstances_;
    this->UpdateContent(content);
    this->UpdateProgress(1.0f);

    return OrthancPluginJobStepStatus_Success;
  }

  
  void UploadDicomJob::Stop(OrthancPluginJobStopReason reason)
  {
    LOG(INFO) << "[UploadDicomJob::Stop] reason=" << reason;
  }

  
  void UploadDicomJob::Reset()
  {
    LOG(INFO) << "[UploadDicomJob::Reset]";
  }
}

Hi @Christophe

Interesting ! However, you may loose the “atomicity” of the transfer: the sender might have its job succeed while the commit may actually fail on the receiver side.

I’m wondering if another option is not to simply have a larger timeout for the commit phase no ?

BTW, I’ll have a deeper look at it … could you share the name of your configuration to enable this mode ? (PluginContext code + the configuration reading code)

Thanks,

Alain.

Hi @alainmazy ,
Thanks a lot for spending time to read this thread. The idea came from what I observed during operating Orthanc in one of our hospitals. A lot of transfers have been failed due to heavy dicom transmission.
Setting larger timeout for the commit phase only solves the issue that transfer is not getting failed. However another issue arises that concurrent OrthancJob in Sender is limit, and there are many pending jobs (waiting until the running jobs finish). And yes , we may loose the “atomicity” of the transfer, but we need to trade off. Atomicity versus Speed.

The configuration is very simple. My configuration name is “CommitAsync”

  "Transfers": {
        "CommitAsync": true,
        "Threads": 6, // Number of worker threads for one transfer
        "BucketSize": 4096, // Optimal size for a bucket (in KB)
        "CacheSize": 512, // Size of the memory cache to process DICOM files (in MB)
        "MaxPushTransactions": 20, // Maximum number of simultaneous receptions in push mode
        "MaxHttpRetries": 2, // Maximum number of HTTP retries for one bucket
        "PeerConnectivityTimeout": 120 // HTTP Timeout (in seconds) used when checking if a remote peer has the transfer plugin enabled in /transfers/peers GET route
    },

PluginContext.h

/**
 * Transfers accelerator plugin for Orthanc
 * Copyright (C) 2018-2023 Osimis S.A., Belgium
 * Copyright (C) 2024-2025 Orthanc Team SRL, Belgium
 * Copyright (C) 2021-2025 Sebastien Jodogne, ICTEAM UCLouvain, Belgium
 *
 * This program is free software: you can redistribute it and/or
 * modify it under the terms of the GNU Affero General Public License
 * as published by the Free Software Foundation, either version 3 of
 * the License, or (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
 * Affero General Public License for more details.
 * 
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 **/


#pragma once

#include "../Framework/OrthancInstancesCache.h"
#include "../Framework/PushMode/ActivePushTransactions.h"

#include <Compatibility.h>  // For std::unique_ptr
#include <MultiThreading/Semaphore.h>

#include <map>

namespace OrthancPlugins
{
  class PluginContext : public boost::noncopyable
  {
  private:
    // Runtime structures
    OrthancInstancesCache    cache_;
    ActivePushTransactions   pushTransactions_;
    Orthanc::Semaphore       semaphore_;
    std::string              pluginUuid_;

    // Configuration
    size_t                   threadsCount_;
    size_t                   targetBucketSize_;
    unsigned int             maxHttpRetries_;
    unsigned int             peerConnectivityTimeout_;
    bool                     commitAsync_;
  
    PluginContext(size_t threadsCount,
                  size_t targetBucketSize,
                  size_t maxPushTransactions,
                  size_t memoryCacheSize,
                  unsigned int maxHttpRetries,
                  unsigned int peerConnectivityTimeout,
                  bool commitAsync);

    static std::unique_ptr<PluginContext>& GetSingleton();
  
  public:
    OrthancInstancesCache& GetCache()
    {
      return cache_;
    }

    ActivePushTransactions& GetActivePushTransactions()
    {
      return pushTransactions_;
    }

    Orthanc::Semaphore& GetSemaphore()
    {
      return semaphore_;
    }

    const std::string& GetPluginUuid() const
    {
      return pluginUuid_;
    }

    size_t GetThreadsCount() const
    {
      return threadsCount_;
    }

    size_t GetTargetBucketSize() const
    {
      return targetBucketSize_;
    }

    unsigned int GetMaxHttpRetries() const
    {
      return maxHttpRetries_;
    }

    unsigned int GetPeerConnectivityTimeout() const
    {
      return peerConnectivityTimeout_;
    }

    bool IsCommitAsync() const
    {
      return commitAsync_;
    }

    static void Initialize(size_t threadsCount,
                           size_t targetBucketSize,
                           size_t maxPushTransactions,
                           size_t memoryCacheSize,
                           unsigned int maxHttpRetries,
                           unsigned int peerConnectivityTimeout,
                           bool commitAsync);
  
    static PluginContext& GetInstance();

    static void Finalize();
  };
}

Plugin.cpp


extern "C"
{
  ORTHANC_PLUGINS_API int32_t OrthancPluginInitialize(OrthancPluginContext* context)
  {
#if ORTHANC_FRAMEWORK_VERSION_IS_ABOVE(1, 7, 2)
    Orthanc::Logging::InitializePluginContext(context);
#else
    Orthanc::Logging::Initialize(context);
#endif
    
    assert(DisplayPerformanceWarning());

    OrthancPlugins::SetGlobalContext(context);
    
    /* Check the version of the Orthanc core */
    if (OrthancPluginCheckVersion(context) == 0)
    {
      LOG(ERROR) << "Your version of Orthanc (" 
                 << context->orthancVersion << ") must be above "
                 << ORTHANC_PLUGINS_MINIMAL_MAJOR_NUMBER << "."
                 << ORTHANC_PLUGINS_MINIMAL_MINOR_NUMBER << "."
                 << ORTHANC_PLUGINS_MINIMAL_REVISION_NUMBER
                 << " to run this plugin";
      return -1;
    }

    OrthancPlugins::SetDescription(PLUGIN_NAME, "Accelerates transfers and provides "
                                   "storage commitment between Orthanc peers");

    try
    {
      size_t threadsCount = 4;
      size_t targetBucketSize = 4096;  // In KB
      size_t maxPushTransactions = 4;
      size_t memoryCacheSize = 512;    // In MB
      unsigned int maxHttpRetries = 0;
      unsigned int peerConnectivityTimeout = 2;

      bool commitAsync = true;
    
      {
        OrthancPlugins::OrthancConfiguration config;

        if (config.IsSection(KEY_PLUGIN_CONFIGURATION))
        {
          OrthancPlugins::OrthancConfiguration plugin;
          config.GetSection(plugin, KEY_PLUGIN_CONFIGURATION);

          threadsCount = plugin.GetUnsignedIntegerValue("Threads", threadsCount);
          targetBucketSize = plugin.GetUnsignedIntegerValue("BucketSize", targetBucketSize);
          memoryCacheSize = plugin.GetUnsignedIntegerValue("CacheSize", memoryCacheSize);
          maxPushTransactions = plugin.GetUnsignedIntegerValue("MaxPushTransactions", maxPushTransactions);
          maxHttpRetries = plugin.GetUnsignedIntegerValue("MaxHttpRetries", maxHttpRetries);
          peerConnectivityTimeout = plugin.GetUnsignedIntegerValue("PeerConnectivityTimeout", peerConnectivityTimeout);
          commitAsync = plugin.GetBooleanValue("CommitAsync", true);
        }
      }

      OrthancPlugins::PluginContext::Initialize(threadsCount, targetBucketSize * KB, maxPushTransactions,   memoryCacheSize * MB, maxHttpRetries, peerConnectivityTimeout,  commitAsync);

And the last point I want to mention here that for Transfer Plugin running in Distributed Environent (Docker Swarm/ Kubernetes), the idea is to get Orthanc Plugin communicate with third party distributed cached database (like Redis, or MemCached). I myself use RQLite. I am not sure if this point can be included into this topic, so please ignore if that is not relevant to the title of the topic.

Hi @Christophe

I thought about your patch proposal again and, although I understand the benefit in your situation, I consider that it should not make it into the plugin repository.

In all our other APIs to transfer a study (DICOM, DicomWeb, peering), when the job succeeds, it means that the study has been stored on the remote destination which would not happen with this asynchronous commit.

In a worst case scenario when duplicating the content of 2 very large Orthanc instances with a slow storage on destination, you could have to wait hours/days before all instances are actually committed with many risks of disk starvation either for the main Orthanc Storage or for the temporary storage used by the transfer plugin.

I understand that you have your own fork of the plugin anyway so I hope this is not a big issue for you.

Thanks anyway for your contribution and do not hesitate to discuss if you feel I’m wrong.

Note that I have also implemented a PeerCommitTimeout configuration defaulting to 600 seconds to reduce the timeouts during the commit phase.

1 Like