Skip to content

msk_cdm.dremio

DremioAPI

Bases: object

Object to simplify reading from Dremio (CDSI's SQL engine).

Source code in msk_cdm/dremio/_dremio_api.py
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
class DremioAPI(object):
    """Object to simplify reading from Dremio (CDSI's SQL engine)."""

    def __init__(
        self,
        *,
        fname_env: str,
        env_key_user: Optional[str] = "USER",
        env_key_pw: Optional[str] = "PW",
        scheme: Optional[str] = "grpc+tcp",
        hostname: Optional[str] = "tlvidreamcord1",
        flightport: Optional[int] = 32010,
    ):
        """Initialization

        Args:
            fname_env: Environment file with username and pw
            env_key_user: Key term to identify the username in fname_env
            env_key_pw: Key term to identify the password in fname_env
            scheme: The connection scheme used
            hostname: Server hostname
            flightport: Port number

        """

        self._df = None
        self._scheme = scheme
        self._hostname = hostname
        self._flightport = flightport

        load_dotenv(fname_env)
        self._authenticate(user=os.getenv(env_key_user), pw=os.getenv(env_key_pw))

    def return_data(self):
        """Return data queried from Dremio in a Pandas dataframe

        Returns:
            df

        """
        df = self._df

        return df

    def _authenticate(self, user, pw):
        scheme = self._scheme
        hostname = self._hostname
        flightport = self._flightport
        connection_args = {}
        # Two WLM settings can be provided upon initial authentication
        # with the Dremio Server Flight Endpoint:
        # - routing-tag
        # - routing queue
        initial_options = flight.FlightCallOptions(
            headers=[
                (b"routing-tag", b"test-routing-tag"),
                (b"routing-queue", b"Low Cost User Queries"),
            ]
        )
        client_auth_middleware = DremioClientAuthMiddlewareFactory()
        client = flight.FlightClient(
            f"{scheme}://{hostname}:{flightport}",
            middleware=[client_auth_middleware],
            **connection_args,
        )
        bearer_token = client.authenticate_basic_token(user, pw, initial_options)
        print("[INFO] Authentication was successful")

        self._client = client
        self._bearer_token = bearer_token

    def query_data(self, sql):
        """Query Dremio with SQL string

        Args:
            sql: SQL string used to query Dremio

        Returns:
            df_output

        """
        client = self._client
        bearer_token = self._bearer_token
        # Get table from our dicom segments
        flight_desc = flight.FlightDescriptor.for_command(sql)
        options = flight.FlightCallOptions(headers=[bearer_token])
        schema = client.get_schema(flight_desc, options)

        flight_info = client.get_flight_info(
            flight.FlightDescriptor.for_command(sql), options
        )
        reader = client.do_get(flight_info.endpoints[0].ticket, options)

        df_output = reader.read_pandas()

        self._df = df_output

        return df_output

__init__(*, fname_env, env_key_user='USER', env_key_pw='PW', scheme='grpc+tcp', hostname='tlvidreamcord1', flightport=32010)

Initialization

Parameters:

Name Type Description Default
fname_env str

Environment file with username and pw

required
env_key_user Optional[str]

Key term to identify the username in fname_env

'USER'
env_key_pw Optional[str]

Key term to identify the password in fname_env

'PW'
scheme Optional[str]

The connection scheme used

'grpc+tcp'
hostname Optional[str]

Server hostname

'tlvidreamcord1'
flightport Optional[int]

Port number

32010
Source code in msk_cdm/dremio/_dremio_api.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def __init__(
    self,
    *,
    fname_env: str,
    env_key_user: Optional[str] = "USER",
    env_key_pw: Optional[str] = "PW",
    scheme: Optional[str] = "grpc+tcp",
    hostname: Optional[str] = "tlvidreamcord1",
    flightport: Optional[int] = 32010,
):
    """Initialization

    Args:
        fname_env: Environment file with username and pw
        env_key_user: Key term to identify the username in fname_env
        env_key_pw: Key term to identify the password in fname_env
        scheme: The connection scheme used
        hostname: Server hostname
        flightport: Port number

    """

    self._df = None
    self._scheme = scheme
    self._hostname = hostname
    self._flightport = flightport

    load_dotenv(fname_env)
    self._authenticate(user=os.getenv(env_key_user), pw=os.getenv(env_key_pw))

query_data(sql)

Query Dremio with SQL string

Parameters:

Name Type Description Default
sql

SQL string used to query Dremio

required

Returns:

Type Description

df_output

Source code in msk_cdm/dremio/_dremio_api.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def query_data(self, sql):
    """Query Dremio with SQL string

    Args:
        sql: SQL string used to query Dremio

    Returns:
        df_output

    """
    client = self._client
    bearer_token = self._bearer_token
    # Get table from our dicom segments
    flight_desc = flight.FlightDescriptor.for_command(sql)
    options = flight.FlightCallOptions(headers=[bearer_token])
    schema = client.get_schema(flight_desc, options)

    flight_info = client.get_flight_info(
        flight.FlightDescriptor.for_command(sql), options
    )
    reader = client.do_get(flight_info.endpoints[0].ticket, options)

    df_output = reader.read_pandas()

    self._df = df_output

    return df_output

return_data()

Return data queried from Dremio in a Pandas dataframe

Returns:

Type Description

df

Source code in msk_cdm/dremio/_dremio_api.py
42
43
44
45
46
47
48
49
50
51
def return_data(self):
    """Return data queried from Dremio in a Pandas dataframe

    Returns:
        df

    """
    df = self._df

    return df

DremioClientAuthMiddleware

Bases: ClientMiddleware

A ClientMiddleware that extracts the bearer token from the authorization header returned by the Dremio Flight Server Endpoint. Parameters


factory : ClientHeaderAuthMiddlewareFactory The factory to set call credentials if an authorization header with bearer token is returned by the Dremio server.

Source code in msk_cdm/dremio/_dremio_api.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
class DremioClientAuthMiddleware(flight.ClientMiddleware):
    """
    A ClientMiddleware that extracts the bearer token from
    the authorization header returned by the Dremio
    Flight Server Endpoint.
    Parameters
    ----------
    factory : ClientHeaderAuthMiddlewareFactory
        The factory to set call credentials if an
        authorization header with bearer token is
        returned by the Dremio server.
    """

    def __init__(self, factory):
        self.factory = factory

    def received_headers(self, headers):
        auth_header_key = "authorization"
        authorization_header = []
        for key in headers:
            if key.lower() == auth_header_key:
                authorization_header = headers.get(auth_header_key)
        self.factory.set_call_credential(
            [b"authorization", authorization_header[0].encode("utf-8")]
        )

DremioClientAuthMiddlewareFactory

Bases: ClientMiddlewareFactory

A factory that creates DremioClientAuthMiddleware(s).

Source code in msk_cdm/dremio/_dremio_api.py
109
110
111
112
113
114
115
116
117
118
119
class DremioClientAuthMiddlewareFactory(flight.ClientMiddlewareFactory):
    """A factory that creates DremioClientAuthMiddleware(s)."""

    def __init__(self):
        self.call_credential = []

    def start_call(self, info):
        return DremioClientAuthMiddleware(self)

    def set_call_credential(self, call_credential):
        self.call_credential = call_credential