Skip to content

Google drive api v3 transfering ownership keeps stated that i don't have permission #2741

@syx-tar

Description

@syx-tar

hi. im want to transfer files quickly to my other account using python. but i get a problem whenever i tried, its always return

2026-04-17 17:22:45,421 INFO file_id=...<file-id>....- status=failed name=Graph.JPG message=<HttpError 403 when requesting https://www.googleapis.com/drive/v3/files/....<file-id>..../permissions/04862162013125427771?supportsAllDrives=true&fields=id%2CemailAddress%2Crole%2CpendingOwner&alt=json returned "The target user cannot be a pending owner because the target user does not have a writer role for the file.". Details: "[{'message': 'The target user cannot be a pending owner because the target user does not have a writer role for the file.', 'domain': 'global', 'reason': 'pendingOwnerWriterRequired'}]">

which clearly i have on the other account. i already give editor permission.
here is my code.Note this is from AI

import argparse
import json
import logging
import os
import random
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence
from urllib.parse import parse_qs

from google.auth.transport.requests import Request
from google.oauth2 import service_account
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError


SCOPES = ["https://www.googleapis.com/auth/drive"]
FOLDER_MIME = "application/vnd.google-apps.folder"
RETRYABLE_REASONS = {
    "rateLimitExceeded",
    "userRateLimitExceeded",
    "sharingRateLimitExceeded",
    "backendError",
    "internalError",
}
NON_RETRYABLE_REASONS = {"pendingOwnerWriterRequired"}
RETRYABLE_STATUS = {403, 429, 500, 502, 503, 504}


def parse_args() -> argparse.Namespace:
    base_dir = Path(__file__).resolve().parent
    parser = argparse.ArgumentParser()
    parser.add_argument("--auth-mode", choices=["oauth", "service-account"], default="oauth")
    parser.add_argument("--credentials-file")
    parser.add_argument("--config-file", default=str(base_dir / "request_config.json"))
    parser.add_argument("--token-file", default=str(base_dir / "request_token.json"))
    parser.add_argument("--oauth-redirect-uri")
    parser.add_argument("--subject-email")
    parser.add_argument("--folder-id")
    parser.add_argument("--target-owner-email")
    parser.add_argument("--page-size", type=int, default=200)
    parser.add_argument("--batch-size", type=int, default=100)
    parser.add_argument("--max-files", type=int)
    parser.add_argument("--max-retries", type=int, default=8)
    parser.add_argument("--min-backoff", type=float, default=1.0)
    parser.add_argument("--max-backoff", type=float, default=32.0)
    parser.add_argument("--send-notification-email", action="store_true", default=True)
    parser.add_argument("--no-send-notification-email", dest="send_notification_email", action="store_false")
    parser.add_argument("--log-file", default=str(base_dir / "request.log"))
    parser.add_argument("--result-file", default=str(base_dir / "request_results.jsonl"))
    parser.add_argument("--dry-run", action="store_true")
    return parser.parse_args()


def guess_credentials_file(auth_mode: str) -> Optional[str]:
    base_dir = Path(__file__).resolve().parent
    if auth_mode == "service-account":
        patterns = [
            "service-account.json",
            "*service-account*.json",
            "*.service-account.json",
        ]
    else:
        patterns = [
            "cred.json",
            "oauth-client.json",
            "credentials.json",
            "client_secret*.json",
            "*oauth*.json",
        ]
    for pattern in patterns:
        matches = sorted(base_dir.glob(pattern))
        if matches:
            return str(matches[0])
    return None


def prompt_value(label: str, current_value: Optional[str], default: Optional[str] = None) -> str:
    if current_value:
        return current_value
    if default:
        entered = input(f"{label} [{default}]: ").strip()
        return entered or default
    while True:
        entered = input(f"{label}: ").strip()
        if entered:
            return entered


def load_json_file(path: str) -> Dict[str, Any]:
    try:
        payload = json.loads(Path(path).read_text(encoding="utf-8"))
    except Exception:
        return {}
    return payload if isinstance(payload, dict) else {}


def save_json_file(path: str, payload: Dict[str, Any]) -> None:
    config_path = Path(path)
    config_path.parent.mkdir(parents=True, exist_ok=True)
    config_path.write_text(json.dumps(payload, ensure_ascii=True, indent=2), encoding="utf-8")


def normalize_args(args: argparse.Namespace) -> argparse.Namespace:
    saved_config = load_json_file(args.config_file)
    guessed_credentials = guess_credentials_file(args.auth_mode)
    saved_credentials = saved_config.get("credentials_file")
    saved_folder_id = saved_config.get("folder_id")
    saved_target_owner_email = saved_config.get("target_owner_email")
    saved_subject_email = saved_config.get("subject_email")
    saved_redirect_uri = saved_config.get("oauth_redirect_uri")
    args.credentials_file = prompt_value(
        "Path credentials file",
        args.credentials_file,
        saved_credentials or guessed_credentials,
    )
    if args.auth_mode == "service-account":
        args.subject_email = prompt_value(
            "Email akun sumber untuk impersonation",
            args.subject_email,
            saved_subject_email,
        )
    args.folder_id = prompt_value("Folder ID sumber", args.folder_id, saved_folder_id)
    args.target_owner_email = prompt_value("Email akun tujuan", args.target_owner_email, saved_target_owner_email)
    if not args.oauth_redirect_uri:
        args.oauth_redirect_uri = saved_redirect_uri
    save_json_file(
        args.config_file,
        {
            "auth_mode": args.auth_mode,
            "credentials_file": args.credentials_file,
            "oauth_redirect_uri": args.oauth_redirect_uri,
            "subject_email": args.subject_email,
            "folder_id": args.folder_id,
            "target_owner_email": args.target_owner_email,
        },
    )
    return args


def resolve_oauth_redirect_uri(credentials_file: str, current_value: Optional[str]) -> str:
    if current_value:
        return current_value
    try:
        payload = json.loads(Path(credentials_file).read_text(encoding="utf-8"))
    except Exception:
        payload = {}
    for key in ("installed", "web"):
        config = payload.get(key) or {}
        redirect_uris = config.get("redirect_uris") or []
        if redirect_uris:
            return redirect_uris[0]
    return "http://localhost"


def configure_oauth_transport(redirect_uri: str) -> None:
    if redirect_uri.startswith("http://localhost") or redirect_uri.startswith("http://127.0.0.1"):
        os.environ["OAUTHLIB_INSECURE_TRANSPORT"] = "1"


def setup_logging(log_file: str) -> None:
    log_path = Path(log_file)
    log_path.parent.mkdir(parents=True, exist_ok=True)
    handlers = [
        logging.FileHandler(log_path, encoding="utf-8"),
        logging.StreamHandler(sys.stdout),
    ]
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s %(levelname)s %(message)s",
        handlers=handlers,
    )


def utc_now() -> str:
    return datetime.now(timezone.utc).isoformat()


def write_result(path: str, record: Dict[str, Any]) -> None:
    result_path = Path(path)
    result_path.parent.mkdir(parents=True, exist_ok=True)
    with result_path.open("a", encoding="utf-8") as handle:
        handle.write(json.dumps(record, ensure_ascii=True) + "\n")


def load_credentials(args: argparse.Namespace):
    if args.auth_mode == "service-account":
        creds = service_account.Credentials.from_service_account_file(
            args.credentials_file,
            scopes=SCOPES,
        )
        if args.subject_email:
            creds = creds.with_subject(args.subject_email)
        return creds

    token_path = Path(args.token_file)
    creds = None
    if token_path.exists():
        creds = Credentials.from_authorized_user_file(str(token_path), SCOPES)
    if creds and creds.expired and creds.refresh_token:
        creds.refresh(Request())
        token_path.write_text(creds.to_json(), encoding="utf-8")
        return creds
    if creds and creds.valid:
        return creds
    args.oauth_redirect_uri = resolve_oauth_redirect_uri(args.credentials_file, args.oauth_redirect_uri)
    configure_oauth_transport(args.oauth_redirect_uri)
    flow = InstalledAppFlow.from_client_secrets_file(args.credentials_file, SCOPES)
    flow.redirect_uri = args.oauth_redirect_uri
    auth_url, _ = flow.authorization_url(access_type="offline", prompt="consent")
    print("Buka URL berikut di browser untuk login:")
    print(auth_url)
    print("")
    redirected = input("Setelah redirect ke localhost, paste URL lengkapnya di sini lalu Enter: ").strip()
    if redirected.startswith("http://") or redirected.startswith("https://"):
        flow.fetch_token(authorization_response=redirected)
    else:
        parsed = parse_qs(redirected)
        code = (parsed.get("code") or [redirected])[0]
        flow.fetch_token(code=code)
    creds = flow.credentials
    token_path.write_text(creds.to_json(), encoding="utf-8")
    return creds


def build_service(args: argparse.Namespace):
    creds = load_credentials(args)
    return build("drive", "v3", credentials=creds, cache_discovery=False)


def extract_error(error: HttpError) -> Dict[str, Any]:
    status = getattr(getattr(error, "resp", None), "status", None)
    reason = None
    message = str(error)
    payload = None
    try:
        payload = json.loads(error.content.decode("utf-8"))
    except Exception:
        payload = None
    if isinstance(payload, dict):
        error_block = payload.get("error") or {}
        message = error_block.get("message", message)
        details = error_block.get("errors") or []
        if details:
            reason = details[0].get("reason")
    return {"status": status, "reason": reason, "message": message}


def should_retry(error: Exception) -> bool:
    if not isinstance(error, HttpError):
        return isinstance(error, (TimeoutError, ConnectionError))
    details = extract_error(error)
    if details["reason"] in NON_RETRYABLE_REASONS:
        return False
    return details["status"] in RETRYABLE_STATUS or details["reason"] in RETRYABLE_REASONS


def execute_with_retry(
    request_factory: Callable[[], Any],
    args: argparse.Namespace,
    action: str,
    file_id: Optional[str] = None,
) -> Any:
    attempt = 0
    while True:
        try:
            return request_factory().execute()
        except Exception as error:
            attempt += 1
            if attempt > args.max_retries or not should_retry(error):
                raise
            sleep_for = min(args.max_backoff, args.min_backoff * (2 ** (attempt - 1))) + random.random()
            logging.warning(
                "retrying action=%s file_id=%s attempt=%s wait=%.2fs error=%s",
                action,
                file_id,
                attempt,
                sleep_for,
                error,
            )
            time.sleep(sleep_for)


def get_me(service, args: argparse.Namespace) -> Dict[str, Any]:
    return execute_with_retry(
        lambda: service.about().get(fields="user(emailAddress,displayName)"),
        args,
        "about.get",
    )


def get_file(service, args: argparse.Namespace, file_id: str) -> Dict[str, Any]:
    return execute_with_retry(
        lambda: service.files().get(
            fileId=file_id,
            fields="id,name,mimeType,owners(emailAddress),trashed,driveId",
            supportsAllDrives=True,
        ),
        args,
        "files.get",
        file_id=file_id,
    )


def list_children(
    service,
    args: argparse.Namespace,
    folder_id: str,
    page_token: Optional[str] = None,
) -> Dict[str, Any]:
    return execute_with_retry(
        lambda: service.files().list(
            q=f"'{folder_id}' in parents and trashed = false",
            fields="nextPageToken,files(id,name,mimeType,owners(emailAddress),trashed,driveId)",
            pageSize=args.page_size,
            pageToken=page_token,
            includeItemsFromAllDrives=True,
            supportsAllDrives=True,
        ),
        args,
        "files.list",
        file_id=folder_id,
    )


def iter_tree(
    service,
    args: argparse.Namespace,
    root_folder_id: str,
) -> Iterable[Dict[str, Any]]:
    root = get_file(service, args, root_folder_id)
    seen = {root_folder_id}
    queue = [root]
    yielded = 0

    while queue:
        current = queue.pop(0)
        yield current
        yielded += 1
        if args.max_files and yielded >= args.max_files:
            return
        if current.get("mimeType") != FOLDER_MIME:
            continue
        page_token = None
        while True:
            response = list_children(service, args, current["id"], page_token=page_token)
            for child in response.get("files", []):
                child_id = child["id"]
                if child_id in seen:
                    continue
                seen.add(child_id)
                queue.append(child)
            page_token = response.get("nextPageToken")
            if not page_token:
                break


def list_permissions(service, args: argparse.Namespace, file_id: str) -> List[Dict[str, Any]]:
    response = execute_with_retry(
        lambda: service.permissions().list(
            fileId=file_id,
            fields="permissions(id,emailAddress,role,type,pendingOwner,deleted,permissionDetails(inherited,inheritedFrom,role,permissionType))",
            supportsAllDrives=True,
        ),
        args,
        "permissions.list",
        file_id=file_id,
    )
    return response.get("permissions", [])


def is_owned_by(item: Dict[str, Any], owner_email: str) -> bool:
    owners = item.get("owners") or []
    owner_lower = owner_email.lower()
    return any((owner.get("emailAddress") or "").lower() == owner_lower for owner in owners)


def find_target_permission(permissions: Sequence[Dict[str, Any]], target_email: str) -> Optional[Dict[str, Any]]:
    target_lower = target_email.lower()
    for permission in permissions:
        if permission.get("deleted"):
            continue
        email = (permission.get("emailAddress") or "").lower()
        if permission.get("type") == "user" and email == target_lower:
            return permission
    return None


def refresh_target_permission(
    service,
    args: argparse.Namespace,
    file_id: str,
    target_email: str,
) -> Optional[Dict[str, Any]]:
    permissions = list_permissions(service, args, file_id)
    return find_target_permission(permissions, target_email)


def ensure_direct_writer_permission(
    service,
    args: argparse.Namespace,
    item: Dict[str, Any],
) -> Dict[str, Any]:
    target_permission = refresh_target_permission(service, args, item["id"], args.target_owner_email)
    if target_permission and target_permission.get("role") == "owner":
        return target_permission

    if target_permission and target_permission.get("role") == "writer":
        return target_permission

    if target_permission:
        updated = execute_with_retry(
            lambda: service.permissions().update(
                fileId=item["id"],
                permissionId=target_permission["id"],
                body={"role": "writer"},
                supportsAllDrives=True,
                fields="id,emailAddress,role,pendingOwner",
            ),
            args,
            "permissions.update.writer",
            file_id=item["id"],
        )
        time.sleep(2)
        return updated

    created = execute_with_retry(
        lambda: service.permissions().create(
            fileId=item["id"],
            body={
                "type": "user",
                "role": "writer",
                "emailAddress": args.target_owner_email,
            },
            supportsAllDrives=True,
            sendNotificationEmail=args.send_notification_email,
            fields="id,emailAddress,role,pendingOwner",
        ),
        args,
        "permissions.create.writer",
        file_id=item["id"],
    )
    time.sleep(2)
    return created


def set_pending_owner(
    service,
    args: argparse.Namespace,
    item: Dict[str, Any],
) -> Dict[str, Any]:
    ensure_direct_writer_permission(service, args, item)
    time.sleep(5)
    target_permission = refresh_target_permission(service, args, item["id"], args.target_owner_email)
    if not target_permission:
        raise RuntimeError("Target account permission was not found after writer propagation")
    return execute_with_retry(
        lambda: service.permissions().update(
            fileId=item["id"],
            permissionId=target_permission["id"],
            body={"role": "writer", "pendingOwner": True},
            supportsAllDrives=True,
            fields="id,emailAddress,role,pendingOwner",
        ),
        args,
        "permissions.update.pendingOwner",
        file_id=item["id"],
    )


def skip_record(item: Dict[str, Any], status: str, message: str) -> Dict[str, Any]:
    return {
        "timestamp": utc_now(),
        "fileId": item.get("id"),
        "name": item.get("name"),
        "mimeType": item.get("mimeType"),
        "status": status,
        "message": message,
    }


def request_transfer(
    service,
    args: argparse.Namespace,
    item: Dict[str, Any],
    source_email: str,
) -> Dict[str, Any]:
    record = {
        "timestamp": utc_now(),
        "fileId": item["id"],
        "name": item.get("name"),
        "mimeType": item.get("mimeType"),
        "targetOwnerEmail": args.target_owner_email,
    }

    if item.get("trashed"):
        record.update({"status": "skipped_trashed", "message": "File is trashed"})
        return record

    if item.get("driveId"):
        record.update(
            {
                "status": "skipped_shared_drive",
                "message": "Ownership transfer is not supported for shared drive items",
            }
        )
        return record

    if not is_owned_by(item, source_email):
        record.update(
            {
                "status": "skipped_not_owned",
                "message": f"Authenticated account does not own this file as {source_email}",
            }
        )
        return record

    if args.dry_run:
        record.update({"status": "dry_run", "message": "No permission change was sent"})
        return record

    target_permission = refresh_target_permission(service, args, item["id"], args.target_owner_email)

    if target_permission and target_permission.get("role") == "owner":
        record.update({"status": "already_owner", "message": "Target account is already the owner"})
        return record

    updated = set_pending_owner(service, args, item)
    record.update(
        {
            "status": "requested",
            "permissionId": updated.get("id"),
            "pendingOwner": updated.get("pendingOwner"),
            "role": updated.get("role"),
            "message": "Prepared direct writer permission and set pending owner",
        }
    )
    return record


def chunks(items: Sequence[Dict[str, Any]], size: int) -> Iterable[Sequence[Dict[str, Any]]]:
    for index in range(0, len(items), size):
        yield items[index : index + size]


def main() -> int:
    args = normalize_args(parse_args())
    setup_logging(args.log_file)
    service = build_service(args)
    me = get_me(service, args)
    source_email = ((me.get("user") or {}).get("emailAddress") or "").strip()
    if not source_email:
        logging.error("Unable to determine authenticated email address")
        return 1

    logging.info("authenticated email=%s target=%s", source_email, args.target_owner_email)
    items = list(iter_tree(service, args, args.folder_id))
    logging.info("discovered items=%s folder_id=%s", len(items), args.folder_id)

    counts = {
        "requested": 0,
        "already_owner": 0,
        "dry_run": 0,
        "skipped_not_owned": 0,
        "skipped_shared_drive": 0,
        "skipped_trashed": 0,
        "failed": 0,
    }

    for batch_number, batch in enumerate(chunks(items, args.batch_size), start=1):
        logging.info("processing batch=%s size=%s", batch_number, len(batch))
        for item in batch:
            try:
                result = request_transfer(service, args, item, source_email)
            except Exception as error:
                result = skip_record(item, "failed", str(error))
            counts[result["status"]] = counts.get(result["status"], 0) + 1
            write_result(args.result_file, result)
            logging.info(
                "file_id=%s status=%s name=%s message=%s",
                result.get("fileId"),
                result.get("status"),
                result.get("name"),
                result.get("message"),
            )

    logging.info("summary=%s", json.dumps(counts, ensure_ascii=True, sort_keys=True))
    return 0 if counts.get("failed", 0) == 0 else 2


if __name__ == "__main__":
    raise SystemExit(main())

my permission is already editor, idk whats wrong this. i just want to transfer quickly.
i already search this problem in google, but no one has this issue.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions