130 lines
4.5 KiB
Python
130 lines
4.5 KiB
Python
"""Fonctions utilitaires pour l'application"""
|
|
from typing import Dict, Any
|
|
from fastapi import HTTPException
|
|
from nc_py_api import Nextcloud, NextcloudException
|
|
from config import settings
|
|
|
|
|
|
def get_nextcloud_client() -> Nextcloud:
|
|
"""Crée et retourne un client Nextcloud"""
|
|
try:
|
|
nc = Nextcloud(
|
|
nextcloud_url=settings.nextcloud_url,
|
|
nc_auth_user=settings.nextcloud_username,
|
|
nc_auth_pass=settings.nextcloud_password
|
|
)
|
|
return nc
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Erreur de connexion à Nextcloud: {str(e)}"
|
|
)
|
|
|
|
|
|
def normalize_upload_paths(path: str, filename: str) -> tuple[str, str]:
|
|
"""
|
|
Normalise les chemins d'upload pour gérer les cas où le path contient déjà le nom du fichier
|
|
|
|
Args:
|
|
path: Chemin fourni par l'utilisateur (peut contenir ou non le nom du fichier)
|
|
filename: Nom du fichier
|
|
|
|
Returns:
|
|
tuple: (directory_path, destination_path)
|
|
"""
|
|
if path == "/" or path == "":
|
|
return "/", f"/{filename}"
|
|
|
|
# Nettoyer le chemin
|
|
clean_path = path.strip("/")
|
|
|
|
# Si le path se termine par le même nom que le filename, on l'enlève
|
|
# Ex: /GED/Mouvements/symbol.svg avec filename=symbol.svg -> /GED/Mouvements
|
|
path_parts = clean_path.split("/")
|
|
last_part = path_parts[-1]
|
|
|
|
# Vérifier si la dernière partie du path est le même que le filename
|
|
# ou si elle a une extension de fichier (contient un point dans les 5 derniers caractères)
|
|
if last_part == filename or (len(last_part) > 4 and "." in last_part[-5:]):
|
|
# Enlever la dernière partie qui est probablement un nom de fichier
|
|
if len(path_parts) > 1:
|
|
directory_path = "/" + "/".join(path_parts[:-1])
|
|
else:
|
|
directory_path = "/"
|
|
else:
|
|
directory_path = "/" + clean_path
|
|
|
|
# Construire le chemin de destination final
|
|
if directory_path == "/":
|
|
destination_path = f"/{filename}"
|
|
else:
|
|
destination_path = f"{directory_path}/{filename}"
|
|
|
|
return directory_path, destination_path
|
|
|
|
|
|
def create_directory_recursive(nc: Nextcloud, directory_path: str) -> None:
|
|
"""
|
|
Crée un dossier et tous ses parents récursivement si nécessaire
|
|
|
|
Args:
|
|
nc: Client Nextcloud
|
|
directory_path: Chemin complet du dossier à créer (ex: /GED/Mouvements/Octobre 2025)
|
|
"""
|
|
if directory_path == "/" or directory_path == "":
|
|
return
|
|
|
|
# Vérifier si le dossier existe déjà
|
|
try:
|
|
nc.files.by_path(directory_path)
|
|
return # Le dossier existe déjà, rien à faire
|
|
except NextcloudException:
|
|
pass # Le dossier n'existe pas, on continue
|
|
|
|
# Créer le dossier parent d'abord (récursif)
|
|
parent_path = "/".join(directory_path.rstrip("/").split("/")[:-1])
|
|
if parent_path and parent_path != "/":
|
|
create_directory_recursive(nc, parent_path)
|
|
|
|
# Créer le dossier actuel
|
|
try:
|
|
nc.files.mkdir(directory_path)
|
|
print(f"📁 Dossier créé : {directory_path}")
|
|
except NextcloudException as e:
|
|
# Ignorer l'erreur 405 (Method Not Allowed) qui signifie que le dossier existe déjà
|
|
if "405" not in str(e):
|
|
raise
|
|
|
|
|
|
def format_file_info(file_info) -> Dict[str, Any]:
|
|
"""Formate les informations d'un fichier/dossier"""
|
|
result = {
|
|
"name": getattr(file_info, 'name', ''),
|
|
"path": getattr(file_info, 'user_path', getattr(file_info, 'path', '')),
|
|
"is_dir": getattr(file_info, 'is_dir', False),
|
|
}
|
|
|
|
# Attributs optionnels
|
|
if hasattr(file_info, 'info'):
|
|
info = file_info.info
|
|
result["size"] = getattr(info, 'size', 0)
|
|
result["content_type"] = getattr(info, 'mimetype', None)
|
|
if hasattr(info, 'last_modified'):
|
|
result["last_modified"] = info.last_modified.isoformat() if info.last_modified else None
|
|
else:
|
|
result["last_modified"] = None
|
|
result["etag"] = getattr(info, 'etag', None)
|
|
else:
|
|
result["size"] = getattr(file_info, 'size', 0)
|
|
result["content_type"] = getattr(file_info, 'mimetype', getattr(file_info, 'content_type', None))
|
|
if hasattr(file_info, 'last_modified') and file_info.last_modified:
|
|
result["last_modified"] = file_info.last_modified.isoformat()
|
|
else:
|
|
result["last_modified"] = None
|
|
result["etag"] = getattr(file_info, 'etag', None)
|
|
|
|
result["file_id"] = getattr(file_info, 'file_id', None)
|
|
|
|
return result
|
|
|