Update: Adding Phash as a private DICOM tag posed challenges, so I’ve decided to store Phash Values for InstanceIDs along with PatientName, PatientID, and InstituteName in an SQLite database. The current plugin setup fulfills my requirements, and this may be the last update in this thread.
Phasher.py
import sys
import orthanc
from PIL import Image
import requests
import os
import io
import pydicom
import imagehash
import sqlite3
DB_PATH = “D:\CLIP\Phasher.db”
def get_instances():
“”"
Fetches the list of instances from the Orthanc server.
Returns:
list: List of instances.
"""
api_url = "http://localhost:8042/instances/"
orthanc_auth = ("admin", "admin")
try:
response = requests.get(api_url, auth=orthanc_auth)
response.raise_for_status()
instances = response.json()
return instances
except requests.RequestException as e:
print(f"Error fetching instances: {e}")
return []
def check_instances_in_db(conn, instance_uuids):
“”"
Checks if instance UUIDs are already in the database.
Args:
conn (sqlite3.Connection): SQLite database connection.
instance_uuids (list): List of instance UUIDs to check.
Returns:
list: List of existing instance UUIDs in the database.
"""
cursor = conn.cursor()
cursor.execute("SELECT InstanceID FROM Table1 WHERE InstanceID IN ({})".format(
','.join(['?'] * len(instance_uuids))), instance_uuids)
existing_instances = [row[0] for row in cursor.fetchall()]
return existing_instances
def create_database():
“”"
Creates the database and table if they don’t exist.
Returns:
sqlite3.Connection: SQLite database connection.
"""
db_path = DB_PATH
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS Table1 (
InstanceID TEXT PRIMARY KEY,
Phash TEXT,
PatientName TEXT,
PatientID TEXT,
InstitutionName TEXT
)
''')
conn.commit()
return conn
def update_phash_value(conn, instance_id, new_hash_value, PatientName, PatientID, InstitutionName):
“”"
Updates the Phash value along with Patient Name, ID and Institution Name in the database.
Args:
conn (sqlite3.Connection): SQLite database connection.
instance_id (str): Instance UUID.
new_hash_value (str): Image Phash value.
PatientName (str): Patient name.
PatientID (str): Patient ID.
InstitutionName (str): Institution name.
"""
try:
cursor = conn.cursor()
cursor.execute("UPDATE Table1 SET Phash = ?, PatientName = ?, PatientID = ?, InstitutionName = ? WHERE InstanceID = ?",
(new_hash_value, PatientName, PatientID, InstitutionName, instance_id))
conn.commit()
print(f"Phash value updated successfully for InstanceID {instance_id}")
except sqlite3.Error as e:
print(f"Error updating Phash value: {e}")
def DecodeInstance(instance):
“”"
Decodes the instance and updates the Phash value in the database.
Args:
instance (str): Instance UUID.
"""
instance_id = instance
db_path = DB_PATH
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("SELECT Phash FROM Table1 WHERE InstanceID = ?", (instance_id,))
existing_phash = cursor.fetchone()
if existing_phash is not None and existing_phash[0]:
print(f"Phash value already exists for InstanceID {instance_id}. Skipping processing.")
conn.close()
return
f = orthanc.GetDicomForInstance(instance_id)
dicom = pydicom.dcmread(io.BytesIO(f))
if hasattr(dicom, 'Rows'):
if hasattr(dicom, 'NumberOfFrames') and dicom.NumberOfFrames > 5:
midpoint = round(dicom.NumberOfFrames / 2)
pixel_data = dicom.pixel_array[midpoint]
else:
pixel_data = dicom.pixel_array[0]
PatientName = str(dicom.PatientName)
PatientID = str(dicom.PatientID)
InstitutionName = str(dicom.InstitutionName)
new_hash_value = str(imagehash.phash(Image.fromarray(pixel_data), hash_size=30))
update_phash_value(conn, instance_id, new_hash_value, PatientName, PatientID, InstitutionName)
conn.close()
def manageDatabase():
“”"
Manages the database by updating existing records and adding new instances. Only updates at start of Orthanc Server.
“”"
instances = get_instances()
if not instances:
print(“No instances found.”)
return
conn = create_database()
existing_instances = check_instances_in_db(conn, instances)
for instance in instances:
if instance not in existing_instances:
conn.execute(“INSERT INTO Table1 (InstanceID) VALUES (?)”, (instance,))
conn.commit()
conn.close()
def OnChange(changeType, level, resource):
“”"
Callback function to handle Orthanc server changes.
“”"
db_path = DB_PATH
if changeType == orthanc.ChangeType.ORTHANC_STARTED:
manageDatabase()
if os.path.exists(db_path):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute(“SELECT InstanceID FROM Table1”)
instance_ids = [row[0] for row in cursor.fetchall()]
conn.close()
for instance_id in instance_ids:
DecodeInstance(instance_id)
orthanc.RegisterOnChangeCallback(OnChange)
Adding an API endpoint to retrieve list of visually similar duplicate dicom images would be ideal but I’m just using the following script to directly read the database and save results to a text document.
import sqlite3
import sys
def find_duplicates(db_path, output_file):
“”"
Finds and prints instances with duplicate Phash values from the specified SQLite database.
FindDuplicateInstances.py
Parameters:
- db_path (str): The path to the SQLite database file.
- output_file (str): The path to the file where the results will be saved.
"""
try:
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
query = """
SELECT COUNT(*), GROUP_CONCAT(PatientName), GROUP_CONCAT(PatientID), GROUP_CONCAT(InstitutionName)
FROM Table1
GROUP BY Phash
HAVING COUNT(*) > 1
"""
cursor.execute(query)
with open(output_file, 'w') as result_file:
original_stdout = sys.stdout
sys.stdout = result_file
print("Duplicates:")
for row in cursor.fetchall():
count, patient_names, patient_ids, institution_names = row
if patient_names is not None and patient_ids is not None and institution_names is not None:
print(f"{count}")
patient_names = patient_names.split(',')
patient_ids = patient_ids.split(',')
institution_names = institution_names.split(',')
patients_info = zip(patient_names, patient_ids, institution_names)
for patient_info in patients_info:
print(f"Patient Name: {patient_info[0]}, Patient ID: {patient_info[1]}, Institution Name: {patient_info[2]}")
sys.stdout = original_stdout
print(f"Results saved in {output_file}")
except sqlite3.Error as e:
print(f"Error accessing the database: {e}")
finally:
if conn:
conn.close()
if __name__ == "__main__":
db_path = "D:\\CLIP\\phasher.db"
output_file = "D:\\CLIP\\result.txt"
find_duplicates(db_path, output_file)