#include "../Resources/Orthanc/Plugins/OrthancPluginCppWrapper.h" #include #include #include #include #include #include #include #include #include #include namespace { std::string URL = ""; int HTTP_TIMEOUT = 120; int PRIORITY = 0; bool ANONYMIZED = false; class SingleFunctionJob : public OrthancPlugins::OrthancJob { public: class JobContext : public boost::noncopyable { private: SingleFunctionJob& that_; public: explicit JobContext(SingleFunctionJob& that) : that_(that) { } void SetContent(const std::string& key, const std::string& value) { that_.SetContent(key, value); } void SetProgress(unsigned int position, unsigned int maxPosition) { boost::mutex::scoped_lock lock(that_.mutex_); if (maxPosition == 0 || position > maxPosition) { that_.UpdateProgress(1); } else { that_.UpdateProgress(static_cast(position) / static_cast(maxPosition)); } } }; class IFunction : public boost::noncopyable { public: virtual ~IFunction() { } virtual void Execute(JobContext& context) = 0; }; class IFunctionFactory : public boost::noncopyable { public: virtual ~IFunctionFactory() { } // Called when the job is paused or canceled. WARNING: // "CancelFunction()" will be invoked while "Execute()" is // running. Mutex is probably necessary. virtual void CancelFunction() = 0; virtual void PauseFunction() = 0; virtual IFunction* CreateFunction() = 0; }; protected: void SetFactory(IFunctionFactory& factory) { boost::mutex::scoped_lock lock(mutex_); if (factory_ != NULL) { throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); } else { factory_ = &factory; } } private: enum FunctionResult { FunctionResult_Running, FunctionResult_Done, FunctionResult_Failure }; boost::mutex mutex_; FunctionResult functionResult_; // Can only be modified by the "Worker()" function std::unique_ptr worker_; Json::Value content_; IFunctionFactory* factory_; bool stopping_; void JoinWorker() { assert(factory_ != NULL); if (worker_.get() != NULL) { if (worker_->joinable()) { worker_->join(); } worker_.reset(); } } void StartWorker() { assert(factory_ != NULL); if (worker_.get() == NULL) { stopping_ = false; worker_.reset(new boost::thread(Worker, this, factory_)); } } void SetContent(const std::string& key, const std::string& value) { boost::mutex::scoped_lock lock(mutex_); content_[key] = value; UpdateContent(content_); } static void Worker(SingleFunctionJob* job, IFunctionFactory* factory) { assert(job != NULL && factory != NULL); try { JobContext context(*job); std::unique_ptr function(factory->CreateFunction()); function->Execute(context); { boost::mutex::scoped_lock lock(job->mutex_); job->functionResult_ = FunctionResult_Done; } } catch (Orthanc::OrthancException& e) { LOG(ERROR) << "Error in a job: " << e.What(); { boost::mutex::scoped_lock lock(job->mutex_); job->functionResult_ = FunctionResult_Failure; if (!job->stopping_) { // Don't report exceptions that are a consequence of stopping the function job->content_["FunctionErrorCode"] = e.GetErrorCode(); job->content_["FunctionErrorDescription"] = e.What(); if (e.HasDetails()) { job->content_["FunctionErrorDetails"] = e.GetDetails(); } job->UpdateContent(job->content_); } } } } public: explicit SingleFunctionJob(const std::string& jobName) : OrthancJob(jobName), functionResult_(FunctionResult_Running), content_(Json::objectValue), factory_(NULL), stopping_(false) { } virtual ~SingleFunctionJob() { std::cout << "Calling ~SingleFunctionJob \n"; if (worker_.get() != NULL) { LOG(ERROR) << "Classes deriving from SingleFunctionJob must " << "explicitly call Finalize() in their destructor"; Finalize(); } } void Finalize() { try { Stop(OrthancPluginJobStopReason_Canceled); } catch (Orthanc::OrthancException&) { } } virtual OrthancPluginJobStepStatus Step() ORTHANC_OVERRIDE { if (factory_ == NULL) { throw Orthanc::OrthancException(Orthanc::ErrorCode_BadSequenceOfCalls); } FunctionResult result; { boost::mutex::scoped_lock lock(mutex_); result = functionResult_; } switch (result) { case FunctionResult_Running: StartWorker(); boost::this_thread::sleep(boost::posix_time::milliseconds(500)); return OrthancPluginJobStepStatus_Continue; case FunctionResult_Done: JoinWorker(); return OrthancPluginJobStepStatus_Success; case FunctionResult_Failure: JoinWorker(); return OrthancPluginJobStepStatus_Failure; default: throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError); } } virtual void Stop(OrthancPluginJobStopReason reason) ORTHANC_OVERRIDE { if (factory_ == NULL) { return; } else if (reason == OrthancPluginJobStopReason_Paused || reason == OrthancPluginJobStopReason_Canceled) { stopping_ = true; if (reason == OrthancPluginJobStopReason_Paused) { factory_->PauseFunction(); } else { factory_->CancelFunction(); } JoinWorker(); // Be ready for the next possible call to "Step()" that will resume the function functionResult_ = FunctionResult_Running; } } virtual void Reset() ORTHANC_OVERRIDE { boost::mutex::scoped_lock lock(mutex_); assert(worker_.get() == NULL); functionResult_ = FunctionResult_Running; content_ = Json::objectValue; ClearContent(); } }; class MyClientJob : public SingleFunctionJob, private SingleFunctionJob::IFunctionFactory { private: enum Action { Action_None, Action_Pause, Action_Cancel }; boost::mutex mutex_; std::string url_; std::string studyInstanceUid_; std::vector instances_; OrthancPlugins::HttpClient::HttpHeaders headers_; std::string boundary_; size_t position_; Action action_; size_t networkSize_; bool debug_; bool ReadNextInstance(std::string& instanceId, std::string& dicom, JobContext& context) { boost::mutex::scoped_lock lock(mutex_); if (action_ != Action_None) { return false; } while (position_ < instances_.size()) { context.SetProgress(position_, instances_.size()); size_t i = position_++; if (debug_) { boost::this_thread::sleep(boost::posix_time::milliseconds(100)); } instanceId = instances_[i]; if (ANONYMIZED) { Json::Value body = Json::objectValue; body["Keep"].append("StudyInstanceUID"); body["Keep"].append("SeriesInstanceUID"); body["Force"] = true; OrthancPlugins::MemoryBuffer answer; if (answer.RestApiPost("/instances/" + instanceId + "/anonymize", body, false)) { answer.ToString(dicom); networkSize_ += dicom.size(); context.SetContent("NetworkSizeMB", boost::lexical_cast (networkSize_ / static_cast(1024 * 1024))); return true; } } else { if (OrthancPlugins::RestApiGetString(dicom, "/instances/" + instanceId + "/file", false)) { networkSize_ += dicom.size(); context.SetContent("NetworkSizeMB", boost::lexical_cast (networkSize_ / static_cast(1024 * 1024))); return true; } } } return false; } class RequestBody : public OrthancPlugins::HttpClient::IRequestBody { private: MyClientJob& that_; JobContext& context_; std::string boundary_; bool done_; size_t processedSize_; public: RequestBody(MyClientJob& that, JobContext& context) : that_(that), context_(context), boundary_(that.boundary_), done_(false), processedSize_(0) { } virtual bool ReadNextChunk(std::string& chunk) ORTHANC_OVERRIDE { if (done_) { context_.SetProgress(1, 1); return false; } else { std::string instanceId; std::string dicom; if (that_.ReadNextInstance(instanceId, dicom, context_)) { chunk = ("--" + boundary_ + "\r\n" + "Content-Disposition: form-data; name=\"file\"; filename=\"" + instanceId + "\"" + "\r\n" + "Content-Type: application/dicom\r\n\r\n"); chunk += (dicom + "\r\n"); } else { done_ = true; chunk = ("--" + boundary_ + "--"); } //boost::this_thread::sleep(boost::posix_time::seconds(1)); processedSize_ += chunk.size(); return true; } } size_t GetProcessedSize() const { return processedSize_; } }; class F : public IFunction { private: MyClientJob& that_; public: explicit F(MyClientJob& that) : that_(that) { } virtual void Execute(JobContext& context) { // The lifetime of "body" should be larger than "client" std::unique_ptr body; std::unique_ptr client; { boost::mutex::scoped_lock lock(that_.mutex_); context.SetContent("InstancesCount", boost::lexical_cast(that_.instances_.size())); context.SetContent("StudyInstanceUid", that_.studyInstanceUid_); body.reset(new RequestBody(that_, context)); client.reset(new OrthancPlugins::HttpClient); client->SetUrl(that_.url_); client->SetMethod(OrthancPluginHttpMethod_Post); client->AddHeaders(that_.headers_); client->SetTimeout(HTTP_TIMEOUT); } OrthancPlugins::HttpClient::HttpHeaders answerHeaders; std::string answerBody; assert(client.get() != NULL); client->SetBody(*body); client->Execute(answerHeaders, answerBody); { boost::mutex::scoped_lock lock(that_.mutex_); if (that_.action_ == Action_Cancel) { that_.position_ = 0; } } context.SetContent("Response", answerBody); } }; virtual void CancelFunction() ORTHANC_OVERRIDE { boost::mutex::scoped_lock lock(mutex_); action_ = Action_Cancel; } virtual void PauseFunction() ORTHANC_OVERRIDE { boost::mutex::scoped_lock lock(mutex_); action_ = Action_Pause; } virtual IFunction* CreateFunction() ORTHANC_OVERRIDE { action_ = Action_None; return new F(*this); } public: MyClientJob(const std::string& url, const std::string& studyInstanceUid, const std::list& instances) : SingleFunctionJob("MyClientJob " + studyInstanceUid), url_(url), studyInstanceUid_(studyInstanceUid), position_(0), action_(Action_None), networkSize_(0), debug_(false) { SetFactory(*this); instances_.reserve(instances.size()); for (std::list::const_iterator it = instances.begin(); it != instances.end(); ++it) { instances_.push_back(*it); } { OrthancPlugins::OrthancString tmp; tmp.Assign(OrthancPluginGenerateUuid(OrthancPlugins::GetGlobalContext())); if (tmp.GetContent() == NULL) { throw Orthanc::OrthancException(Orthanc::ErrorCode_InternalError, "Cannot generate a UUID"); } tmp.ToString(boundary_); } boundary_ = (boundary_ + "-" + boundary_); // Make the boundary longer headers_["x-api-key"] = X_API_KEY; headers_["Content-Type"] = "multipart/form-data; boundary=" + boundary_; } void SetDebug(bool debug) { debug_ = debug; } virtual ~MyClientJob() { std::cout << "Calling ~MyClientJob\n"; } }; void SendToClient(OrthancPluginRestOutput *output, const char *url, const OrthancPluginHttpRequest *request) { const char *studyId = request->groups[0]; std::string studyInstanceUid; { Json::Value study; OrthancPlugins::RestApiGet(study, "/studies/" + std::string(studyId), false); studyInstanceUid = study["MainDicomTags"]["StudyInstanceUID"].asCString(); } std::list instances; { Json::Value seriesArray; OrthancPlugins::RestApiGet(seriesArray, "/studies/" + std::string(studyId) + "/series", false); for (auto& series : seriesArray) { for (auto& resourceId : series["Instances"]) { instances.push_back(resourceId.asString()); } } } std::unique_ptr job(new MyClientJob(URL, studyInstanceUid, instances)); std::string jobId = OrthancPlugins::OrthancJob::Submit(job.release(), PRIORITY); Json::Value answer = Json::objectValue; answer["ID"] = jobId; std::string s = answer.toStyledString(); OrthancPluginAnswerBuffer(OrthancPlugins::GetGlobalContext(), output, s.c_str(), s.size(), "application/json"); } } // namespace extern "C" { ORTHANC_PLUGINS_API int32_t OrthancPluginInitialize(OrthancPluginContext* context) { OrthancPlugins::SetGlobalContext(context); Orthanc::Logging::InitializePluginContext(context); Orthanc::Logging::EnableInfoLevel(false); /* Check the version of the Orthanc core */ if (OrthancPluginCheckVersion(context) == 0) { OrthancPlugins::ReportMinimalOrthancVersion(ORTHANC_PLUGINS_MINIMAL_MAJOR_NUMBER, ORTHANC_PLUGINS_MINIMAL_MINOR_NUMBER, ORTHANC_PLUGINS_MINIMAL_REVISION_NUMBER); return -1; } OrthancPluginSetDescription(context, "Orthanc Relayer forwards c-find/ c-move to internal storage"); OrthancPlugins::OrthancConfiguration configuration; OrthancPlugins::OrthancConfiguration myconfig; configuration.GetSection(myconfig, "MyConfig"); bool enabled = myconfig.GetBooleanValue("Enable", false); if (enabled) { URL = myconfig.GetStringValue("Url", ""); HTTP_TIMEOUT = myconfig.GetIntegerValue("Timeout", 120); PRIORITY = myconfig.GetIntegerValue("Priority", 0); ANONYMIZED = myconfig.GetBooleanValue("Anonymized", true); OrthancPlugins::RegisterRestCallback("/mytest/([^/]*)", true); } else { OrthancPlugins::LogWarning("MyOrthancPlugin is disabled"); } return 0; } ORTHANC_PLUGINS_API void OrthancPluginFinalize() { OrthancPlugins::LogWarning("My plugin is finalizing"); } ORTHANC_PLUGINS_API const char* OrthancPluginGetName() { return "orthanc-myplugin"; } ORTHANC_PLUGINS_API const char* OrthancPluginGetVersion() { return ORTHANC_PLUGIN_VERSION; } }