-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgcp_storage_services.py
139 lines (110 loc) · 4.76 KB
/
gcp_storage_services.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
from google.oauth2 import service_account
from google.cloud import storage
import settings
from io import BytesIO
from tqdm import tqdm
import os
import logging
import json
from datetime import datetime
class ProgressBytesIO(BytesIO):
def __init__(self, bytes_io, progress_bar):
self._bytes_io = bytes_io
self._progress_bar = progress_bar
# Ensure we're starting from the beginning of the BytesIO stream
self._bytes_io.seek(0)
def read(self, size=-1):
# Update the progress bar with the number of bytes read
chunk = self._bytes_io.read(size)
self._progress_bar.update(len(chunk))
return chunk
def seek(self, offset, whence=0):
return self._bytes_io.seek(offset, whence)
def tell(self):
return self._bytes_io.tell()
class GCPStorageServices:
creds = service_account.Credentials.from_service_account_file(settings.service_account_path)
client = storage.Client(credentials=creds)
def __init__(self):
self.logs = {'raw_success': 0,
'raw_failure': 0,
'processed_success': 0,
'processed_failure': 0,
'zip_success': 0,
'zip_failure': 0,
'raw_details': [],
'processed_details': [],
'zip_details': [],
'bucket_create_failure': []}
def upload_file_to_gcs(self, source_file_name, destination_path, gcp_bucket):
try:
# # Get the total file size
bucket = self.client.bucket(gcp_bucket)
blob = bucket.blob(destination_path)
# Wrap your BytesIO object with ProgressBytesIO
file_size = os.path.getsize(source_file_name)
pbar = tqdm(total=file_size, unit='B', unit_scale=True, desc='Uploading')
with open(source_file_name, "rb") as fh:
progress_io = ProgressBytesIO(fh, pbar)
blob.upload_from_file(progress_io, num_retries=3, timeout=600)
pbar.close()
msg = f"{source_file_name} Upload Completed To {gcp_bucket}/{destination_path}."
success = True
except Exception as e:
msg = f"{source_file_name} Upload Failed To {gcp_bucket}/{destination_path}. {e}"
success = False
print(msg)
return msg, success
def upload_dict_to_gcs(self, data: dict, bucket_name, filename):
try:
# Reference the specified bucket
bucket = self.client.bucket(bucket_name)
# Convert the dictionary to JSON
json_data = json.dumps(data)
# Create a blob object in the specified bucket
blob = bucket.blob(filename)
# Upload the JSON data
blob.upload_from_string(json_data, content_type='application/json')
msg = f"{filename} has been saved to {bucket_name}."
except Exception as e:
msg = f"{filename} failed to be saved to {bucket_name}."
print(msg)
return msg
@staticmethod
def read_in_chunks(file_object, chunk_size=1024):
"""Lazy function to read a file piece by piece."""
while True:
data = file_object.read(chunk_size)
if not data:
break
yield data
def create_gcs_buckets(self, bucket_name, location='US'):
try:
# Initialize the bucket object with desired properties
bucket = self.client.bucket(bucket_name)
bucket.storage_class = "STANDARD"
bucket.iam_configuration.uniform_bucket_level_access_enabled = True
bucket.iam_configuration.public_access_prevention = 'enforced'
# Create the new bucket
new_bucket = self.client.create_bucket(bucket, location=location)
print(f"Bucket {new_bucket.name} created.")
except Exception as e:
msg = f"Failed to create bucket {bucket_name}. Reason: {e}"
self.logs['bucket_create_failure'].append(msg)
def list_gcs_buckets(self):
# List all buckets
buckets = self.client.list_buckets()
# Extract and print bucket names
bucket_names = [bucket.name for bucket in buckets]
return bucket_names
def read_all_names_from_gcs_bucket(self, bucket_name):
file_names = []
try:
# Get the bucket
bucket = self.client.bucket(bucket_name)
# List all objects in the bucket and get their names
blobs = bucket.list_blobs()
file_names = [blob.name for blob in blobs]
except Exception as e:
print("Error in read_all_names_from_gcs_bucket bucket '{}': {}".format(bucket_name, e))
return file_names