Skip to content

msk_cdm.minio

MinioAPI

Bases: object

Object to simplify reading/writing to/from Minio.

Source code in msk_cdm/minio/_minio_api.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
class MinioAPI(object):
    """Object to simplify reading/writing to/from Minio."""

    def __init__(
        self,
        *,
        ACCESS_KEY: Optional[str] = None,
        SECRET_KEY: Optional[str] = None,
        ca_certs: Optional[str] = None,
        url_port: Optional[str] = "pllimsksparky3:9000", 
        fname_minio_env: Optional[Union[Path, str]] = None,
        bucket: Optional[str] = None,
    ):
        """Initialization

                Args:
                    - ACCESS_KEY: Minio access key. Optional if `fname_minio_env` is passed, in which case it may be present in the env file picked up by .env
                    - SECRET_KEY: Minio secret key. Optional if `fname_minio_env` is passed, in which case it may be present in the env file picked up by .env
                    - ca_certs: optional filename pointer to ca_cert bundle for `urllib3`. Only specify if not passing `fname_minio_env`.
                    - fname_minio_env: A filename with KEY=value lines with values for keys `CA_CERTS`, `URL_PORT`, `BUCKET`.
                    - bucket: optional default minio bucket to use for operations. Can also be specified as environment variable $BUCKET.
        """
        self._ACCESS_KEY = ACCESS_KEY
        self._SECRET_KEY = SECRET_KEY
        self._ca_certs = ca_certs
        self._url_port = url_port

        self._bucket = bucket
        self._client = None
        self._httpClient = None

        if fname_minio_env is not None:
            self._process_env(fname_minio_env)
        self._connect()

    def load_obj(
            self,
            path_object: str,
            bucket_name: Optional[str] = None
    ) -> urllib3.response.HTTPResponse:
        """Read an object from minio

        Raises `urllib3.exceptions.HTTPError` if request is unsuccessful.

        Args:
            path_object: Object file to read from minio.
            bucket_name: Optional bucket name, otherwise defaults to BUCKET passed
            via minio env fniame to constructor

        Returns:
            urllib3.response.HTTPResponse

        """
        if self._bucket is not None:
            bucket_name = self._bucket

        obj = self._client.get_object(bucket_name, path_object)

        return obj

        # try:
        #     obj = self._client.get_object(bucket_name, path_object)
        #     if obj.status != 200:
        #         raise RuntimeError(
        #             f"Got non-OK HTTP status {obj.status} requesting " "{obj_name}."
        #         )
        #     return obj
        #
        #     # From here, the object can be read in pandas
        #     # df = pd.read_csv(obj, sep=sep, low_memory=False)
        #
        # finally:
        #     obj.close()
        #     obj.release_conn()

    def save_obj(
        self,
        df,
        path_object: str,
        sep: Optional[str] = ",",
        bucket_name: Optional[str] = None,
    ):
        """Save an object to minio

        Args:
            df: Pandas dataframe to be saved to Minio
            path_object: Object filename for `df`
            sep: Separator when saving the Pandas dataframe
            bucket_name: Optional bucket name, otherwise defaults to BUCKET passed
            via minio env fniame to constructor

        """

        if self._bucket is not None:
            bucket_name = self._bucket

        csv_bytes = df.to_csv(index=False, sep=sep).encode("utf-8")
        csv_buffer = BytesIO(csv_bytes)

        self._client.put_object(
            bucket_name=bucket_name,
            object_name=path_object,
            data=csv_buffer,
            length=len(csv_bytes),
            content_type="application/csv",
        )

        return None

    def load_df(
            self,
            fname,
            sep: Optional[str] = "\t",
            dtype: Optional[str] = object
    ):
        obj = self.load_obj(path_object=fname)
        df= pd.read_csv(obj, dtype=dtype, sep=sep)
        return df

    def save_df(
            self,
            df,
            fname,
            sep: Optional[str] = "\t"
    ):
        self.save_obj(
            df=df,
            path_object=fname,
            sep=sep
        )
        print(f'Saved data to: {fname}')

    def print_list_objects(
        self,
        bucket_name: Optional[str] = None,
        prefix: Optional[str] = None,
        recursive: Optional[bool] = True,
    ):
        """Create a Python list of objects in a specified minio bucket

        Args:
            bucket_name: Optional bucket name, otherwise defaults to  BUCKET passed via minio env fname to constructor
            prefix: Optional string used to find an object starting with <prefix>

        Returns:
            obj_list: List of strings containing path locations in minio bucket.

        """
        if self._bucket is not None:
            bucket_name = self._bucket

        objs = self._client.list_objects(
            bucket_name=bucket_name,
            recursive=recursive,
            prefix=prefix
        )
        obj_list = []
        for obj in objs:
            obj_list.append(obj.object_name)

        return obj_list

    def remove_obj(self, path_object: str, bucket_name: Optional[str] = None):
        """Remove an object from minio

        Args:
            path_object: Object file to be removed from minio
            bucket_name: Optional bucket name, otherwise defaults to  BUCKET passed via minio env fname to constructor

        """
        # Remove list of objects.
        self._client.remove_object(bucket_name=bucket_name, object_name=path_object)
        print("Object removed. Bucket: %s, Object: %s" % (bucket_name, path_object))

        return None

    def copy_obj(
        self,
        source_path_object: str,
        dest_path_object: str,
        source_bucket: Optional[str] = None,
        dest_bucket: Optional[str] = None,
    ):
        """Copy an object in minio.

        Objects can be copied across different BUCKETS.
        Warning: objects with greater than 1GB may fail using this.
        Instead, use `load_obj` and `save_obj` in combination.

        Args:
            source_path_object: Object file to be copied
            dest_path_object: Object filename that `source_path_object` will be copied to
            bucket_name: Optional bucket name, otherwise defaults to  BUCKET passed
            via minio env fniame to constructor

        Returns:
            output: Object name and version ID of object
        """
        if self._bucket is not None:
            source_bucket = self._bucket
            dest_bucket = self._bucket

        result = self._client.copy_object(
            dest_bucket,
            dest_path_object,
            CopySource(source_bucket, source_path_object),
        )

        output = [result.object_name, result.version_id]

        return output

    def _process_env(self, fname_minio_env):
        print("Minio environment file: %s" % fname_minio_env)
        dict_config = dotenv_values(fname_minio_env)

        env_access_key = os.getenv("ACCESS_KEY")
        if env_access_key:
            dict_config["ACCESS_KEY"] = env_access_key

        env_secret_key = os.getenv("SECRET_KEY")
        if env_secret_key:
            dict_config["SECRET_KEY"] = env_secret_key

        if not self._ACCESS_KEY:
            self._ACCESS_KEY = dict_config.get("ACCESS_KEY", None)
        if not self._SECRET_KEY:
            self._SECRET_KEY = dict_config.get("SECRET_KEY", None)
        if not self._ca_certs:
            self._ca_certs = dict_config.get("CA_CERTS", None)
        if not self._url_port:
            self._url_port = dict_config.get("URL_PORT", None)
        if not self._bucket:
            self._bucket = dict_config.get("BUCKET", None)

        # # Print out for QC
        # print('Access Key: %s' % self._ACCESS_KEY)
        # print('Secret Key: %s' % self._SECRET_KEY)
        # print('CA Cert: %s' % self._ca_certs)
        # print('URL Port: %s' % self._url_port)
        # print('Bucket: %s' % self._bucket)

        return None

    def _connect(self):
        # required for self-signed certs
        httpClient = urllib3.PoolManager(
            cert_reqs="CERT_REQUIRED", ca_certs=self._ca_certs
        )

        # Create secure client with access key and secret key
        client = Minio(
            endpoint=self._url_port,
            access_key=self._ACCESS_KEY,
            secret_key=self._SECRET_KEY,
            secure=True,
            http_client=httpClient,
        )

        self._client = client
        self._httpClient = httpClient

        return None

__init__(*, ACCESS_KEY=None, SECRET_KEY=None, ca_certs=None, url_port='pllimsksparky3:9000', fname_minio_env=None, bucket=None)

Initialization

Parameters:

Name Type Description Default
- ACCESS_KEY

Minio access key. Optional if fname_minio_env is passed, in which case it may be present in the env file picked up by .env

required
- SECRET_KEY

Minio secret key. Optional if fname_minio_env is passed, in which case it may be present in the env file picked up by .env

required
- ca_certs

optional filename pointer to ca_cert bundle for urllib3. Only specify if not passing fname_minio_env.

required
- fname_minio_env

A filename with KEY=value lines with values for keys CA_CERTS, URL_PORT, BUCKET.

required
- bucket

optional default minio bucket to use for operations. Can also be specified as environment variable $BUCKET.

required
Source code in msk_cdm/minio/_minio_api.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def __init__(
    self,
    *,
    ACCESS_KEY: Optional[str] = None,
    SECRET_KEY: Optional[str] = None,
    ca_certs: Optional[str] = None,
    url_port: Optional[str] = "pllimsksparky3:9000", 
    fname_minio_env: Optional[Union[Path, str]] = None,
    bucket: Optional[str] = None,
):
    """Initialization

            Args:
                - ACCESS_KEY: Minio access key. Optional if `fname_minio_env` is passed, in which case it may be present in the env file picked up by .env
                - SECRET_KEY: Minio secret key. Optional if `fname_minio_env` is passed, in which case it may be present in the env file picked up by .env
                - ca_certs: optional filename pointer to ca_cert bundle for `urllib3`. Only specify if not passing `fname_minio_env`.
                - fname_minio_env: A filename with KEY=value lines with values for keys `CA_CERTS`, `URL_PORT`, `BUCKET`.
                - bucket: optional default minio bucket to use for operations. Can also be specified as environment variable $BUCKET.
    """
    self._ACCESS_KEY = ACCESS_KEY
    self._SECRET_KEY = SECRET_KEY
    self._ca_certs = ca_certs
    self._url_port = url_port

    self._bucket = bucket
    self._client = None
    self._httpClient = None

    if fname_minio_env is not None:
        self._process_env(fname_minio_env)
    self._connect()

copy_obj(source_path_object, dest_path_object, source_bucket=None, dest_bucket=None)

Copy an object in minio.

Objects can be copied across different BUCKETS. Warning: objects with greater than 1GB may fail using this. Instead, use load_obj and save_obj in combination.

Parameters:

Name Type Description Default
source_path_object str

Object file to be copied

required
dest_path_object str

Object filename that source_path_object will be copied to

required
bucket_name

Optional bucket name, otherwise defaults to BUCKET passed

required

Returns:

Name Type Description
output

Object name and version ID of object

Source code in msk_cdm/minio/_minio_api.py
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
def copy_obj(
    self,
    source_path_object: str,
    dest_path_object: str,
    source_bucket: Optional[str] = None,
    dest_bucket: Optional[str] = None,
):
    """Copy an object in minio.

    Objects can be copied across different BUCKETS.
    Warning: objects with greater than 1GB may fail using this.
    Instead, use `load_obj` and `save_obj` in combination.

    Args:
        source_path_object: Object file to be copied
        dest_path_object: Object filename that `source_path_object` will be copied to
        bucket_name: Optional bucket name, otherwise defaults to  BUCKET passed
        via minio env fniame to constructor

    Returns:
        output: Object name and version ID of object
    """
    if self._bucket is not None:
        source_bucket = self._bucket
        dest_bucket = self._bucket

    result = self._client.copy_object(
        dest_bucket,
        dest_path_object,
        CopySource(source_bucket, source_path_object),
    )

    output = [result.object_name, result.version_id]

    return output

load_obj(path_object, bucket_name=None)

Read an object from minio

Raises urllib3.exceptions.HTTPError if request is unsuccessful.

Parameters:

Name Type Description Default
path_object str

Object file to read from minio.

required
bucket_name Optional[str]

Optional bucket name, otherwise defaults to BUCKET passed

None

Returns:

Type Description
HTTPResponse

urllib3.response.HTTPResponse

Source code in msk_cdm/minio/_minio_api.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def load_obj(
        self,
        path_object: str,
        bucket_name: Optional[str] = None
) -> urllib3.response.HTTPResponse:
    """Read an object from minio

    Raises `urllib3.exceptions.HTTPError` if request is unsuccessful.

    Args:
        path_object: Object file to read from minio.
        bucket_name: Optional bucket name, otherwise defaults to BUCKET passed
        via minio env fniame to constructor

    Returns:
        urllib3.response.HTTPResponse

    """
    if self._bucket is not None:
        bucket_name = self._bucket

    obj = self._client.get_object(bucket_name, path_object)

    return obj

print_list_objects(bucket_name=None, prefix=None, recursive=True)

Create a Python list of objects in a specified minio bucket

Parameters:

Name Type Description Default
bucket_name Optional[str]

Optional bucket name, otherwise defaults to BUCKET passed via minio env fname to constructor

None
prefix Optional[str]

Optional string used to find an object starting with

None

Returns:

Name Type Description
obj_list

List of strings containing path locations in minio bucket.

Source code in msk_cdm/minio/_minio_api.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def print_list_objects(
    self,
    bucket_name: Optional[str] = None,
    prefix: Optional[str] = None,
    recursive: Optional[bool] = True,
):
    """Create a Python list of objects in a specified minio bucket

    Args:
        bucket_name: Optional bucket name, otherwise defaults to  BUCKET passed via minio env fname to constructor
        prefix: Optional string used to find an object starting with <prefix>

    Returns:
        obj_list: List of strings containing path locations in minio bucket.

    """
    if self._bucket is not None:
        bucket_name = self._bucket

    objs = self._client.list_objects(
        bucket_name=bucket_name,
        recursive=recursive,
        prefix=prefix
    )
    obj_list = []
    for obj in objs:
        obj_list.append(obj.object_name)

    return obj_list

remove_obj(path_object, bucket_name=None)

Remove an object from minio

Parameters:

Name Type Description Default
path_object str

Object file to be removed from minio

required
bucket_name Optional[str]

Optional bucket name, otherwise defaults to BUCKET passed via minio env fname to constructor

None
Source code in msk_cdm/minio/_minio_api.py
180
181
182
183
184
185
186
187
188
189
190
191
192
def remove_obj(self, path_object: str, bucket_name: Optional[str] = None):
    """Remove an object from minio

    Args:
        path_object: Object file to be removed from minio
        bucket_name: Optional bucket name, otherwise defaults to  BUCKET passed via minio env fname to constructor

    """
    # Remove list of objects.
    self._client.remove_object(bucket_name=bucket_name, object_name=path_object)
    print("Object removed. Bucket: %s, Object: %s" % (bucket_name, path_object))

    return None

save_obj(df, path_object, sep=',', bucket_name=None)

Save an object to minio

Parameters:

Name Type Description Default
df

Pandas dataframe to be saved to Minio

required
path_object str

Object filename for df

required
sep Optional[str]

Separator when saving the Pandas dataframe

','
bucket_name Optional[str]

Optional bucket name, otherwise defaults to BUCKET passed

None
Source code in msk_cdm/minio/_minio_api.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def save_obj(
    self,
    df,
    path_object: str,
    sep: Optional[str] = ",",
    bucket_name: Optional[str] = None,
):
    """Save an object to minio

    Args:
        df: Pandas dataframe to be saved to Minio
        path_object: Object filename for `df`
        sep: Separator when saving the Pandas dataframe
        bucket_name: Optional bucket name, otherwise defaults to BUCKET passed
        via minio env fniame to constructor

    """

    if self._bucket is not None:
        bucket_name = self._bucket

    csv_bytes = df.to_csv(index=False, sep=sep).encode("utf-8")
    csv_buffer = BytesIO(csv_bytes)

    self._client.put_object(
        bucket_name=bucket_name,
        object_name=path_object,
        data=csv_buffer,
        length=len(csv_bytes),
        content_type="application/csv",
    )

    return None