1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
| import sys
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from urllib.parse import urlparse
import concurrent.futures
from pathlib import Path
def create_session(retries=5, backoff_factor=0.3, status_formalist=(500, 502, 504)):
"""Create and configure a requests session for automatic retries."""
session = requests.Session()
retry_strategy = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=backoff_factor,
status_forcelist=status_formalist,
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount('http://', adapter)
session.mount('https://', adapter)
return session
def extract_user_repo(github_clone_url):
"""Extract username and repository name from a GitHub clone URL."""
path = urlparse(github_clone_url).path
parts = path.strip('/').split('/')
user, repo = parts[0], parts[1]
repo = repo[:-4] if repo.endswith('.git') else repo
return user, repo
def download_asset(session, asset_url, file_path):
"""Download a single Release asset."""
print(f"Downloading {file_path.name}...")
try:
response = session.get(asset_url, stream=True)
response.raise_for_status()
with file_path.open('wb') as file:
for chunk in response.iter_content(chunk_size=8192):
file.write(chunk)
print(f"Downloaded {file_path.name}")
except requests.RequestException as e:
print(f"Failed to download {file_path.name}: {e}")
def download_releases(github_clone_url):
"""Download all Releases for a specified repository."""
session = create_session()
user, repo = extract_user_repo(github_clone_url)
releases_url = f"https://api.github.com/repos/{user}/{repo}/releases"
response = session.get(releases_url)
releases = response.json()
directory = Path.cwd() / repo
directory.mkdir(exist_ok=True)
print(f"Total assets to download: {sum(len(release['assets']) for release in releases)}")
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = [
executor.submit(download_asset, session, asset['browser_download_url'], directory / asset['name'])
for release in releases for asset in release['assets']
]
concurrent.futures.wait(futures)
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: python script.py <github_repo_url>")
sys.exit(1)
download_releases(sys.argv[1])
|