Skip to content

Commit

Permalink
cloudvision: Transition cvlib to open-source
Browse files Browse the repository at this point in the history
Change-Id: I58f8fee220d60ab3ce3c0ff323a71f922615a149
  • Loading branch information
cianmcgrath committed Jun 2, 2022
1 parent 71160c9 commit 03afaa5
Show file tree
Hide file tree
Showing 15 changed files with 1,094 additions and 1 deletion.
17 changes: 17 additions & 0 deletions cloudvision/cvlib/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Copyright (c) 2022 Arista Networks, Inc.
# Use of this source code is governed by the Apache License 2.0
# that can be found in the COPYING file.

from .action import Action, ActionContext
from .changecontrol import ChangeControl
from .connections import AuthAndEndpoints
from .context import Context
from .device import Device, Interface
from .execution import Execution
from .exceptions import *
from .logger import Logger
from .studio import Studio
from .topology import Connection, Topology
from .user import User

__version__ = "1.4.0"
63 changes: 63 additions & 0 deletions cloudvision/cvlib/action.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Copyright (c) 2022 Arista Networks, Inc.
# Use of this source code is governed by the Apache License 2.0
# that can be found in the COPYING file.

from enum import Enum
from typing import Dict, Optional

from cloudvision.Connector.grpc_client import GRPCClient

from .utils import queryCCStartTime


class ActionContext(Enum):
'''
Enum class used to store the various contexts in which actions are executed in
'''
Unknown = 0
ChangeControl = 1


class Action:
'''
Object to store common change control action script arguments:
- name: Name of the action currently running
- context: Enum for the context in which the action is running,
e.g. Action that is running is a change control action
- actionId: ID of the action currently running
- args: String -> String dictionary of the args associated with the action
- ccId: ID of the change control, if applicable
- stageId: ID of the current stage of the change control, if applicable
'''

def __init__(self, name: str,
context: ActionContext = ActionContext.Unknown,
actionId: Optional[str] = None,
args: Optional[Dict[str, str]] = None,
ccId: Optional[str] = None,
stageId: Optional[str] = None):
self.name = name
self.context = context
self.id = actionId
self.args = args
self.ccId = ccId
self.stageId = stageId

# Fields used in some execution contexts
self.__ccStartTime: Optional[int] = None

def getCCStartTime(self, cvClient: GRPCClient):
'''
Queries the cloudvision database for the change control start time
:param cvClient: context.getCvClient() client
:return: nanosecond start timestamp of the change control
'''
if self.context != ActionContext.ChangeControl or not self.ccId:
return None

if self.__ccStartTime:
return self.__ccStartTime

ccStartTs = queryCCStartTime(cvClient, self.ccId)
self.__ccStartTime = int(ccStartTs)
return self.__ccStartTime
44 changes: 44 additions & 0 deletions cloudvision/cvlib/changecontrol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Copyright (c) 2022 Arista Networks, Inc.
# Use of this source code is governed by the Apache License 2.0
# that can be found in the COPYING file.

from typing import Dict, Optional

from cloudvision.Connector.grpc_client import GRPCClient

from .utils import queryCCStartTime


class ChangeControl:
'''
(Deprecated) Please use ctx.action instead for various field information
Object to store common change control action script arguments:
- ccId: ID of the change control, if applicable
- stageId: ID of the current stage of the change control, if applicable
- args: Dict of user-defined/script defined arguments that are passed in
'''

def __init__(self,
ccId: Optional[str] = None,
stageId: Optional[str] = None,
args: Optional[Dict[str, str]] = None):
self.ccId = ccId
self.stageId = stageId
self.args = args
self.__ccStartTime: Optional[int] = None

def getStartTime(self, cvClient: GRPCClient):
'''
Queries the cloudvision database for the change control start time
:param cvClient: context.getCvClient() client
:return: nanosecond start timestamp of the change control
'''
if self.ccId is None:
return None

if self.__ccStartTime:
return self.__ccStartTime

ccStartTs = queryCCStartTime(cvClient, self.ccId)
self.__ccStartTime = int(ccStartTs)
return self.__ccStartTime
131 changes: 131 additions & 0 deletions cloudvision/cvlib/connections.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# Copyright (c) 2022 Arista Networks, Inc.
# Use of this source code is governed by the Apache License 2.0
# that can be found in the COPYING file.

import collections
import grpc
from typing import Dict, Optional


class AuthAndEndpoints:
'''
Object to store auth and endpoint information for use in the context object
- apiserverAddr: Address of the CloudVision apiserver
- serviceAddr: Address of the CloudVision service proxy server, e.g. ambassador address
- cacert: Path to local CA Cert, used for establishing CV grpc connections
- commandEndpoint: Service endpoint where command requests are posted to
- logEndpoint: Service endpoint where log requests are posted to
- connectionTimeout: Timeout value for connections to endpoints in seconds
- cliTimeout: Timeout value for cli commands invoked by connections
- testAddresses: Api addresses to use when execution is in a test context
'''

def __init__(self,
apiserverAddr: Optional[str] = None,
serviceAddr: Optional[str] = None,
serviceCACert: Optional[str] = None,
aerisCACert: Optional[str] = None,
commandEndpoint: Optional[str] = None,
logEndpoint: Optional[str] = None,
connectionTimeout: Optional[int] = 250,
cliTimeout: Optional[int] = 200,
testAddresses: Optional[Dict[str, str]] = None):
self.apiserverAddr = apiserverAddr
self.serviceAddr = serviceAddr
self.serviceCACert = serviceCACert
self.aerisCACert = aerisCACert
self.commandEndpoint = commandEndpoint
self.logEndpoint = logEndpoint
self.connectionTimeout = connectionTimeout
self.cliTimeout = cliTimeout
self.testAddresses = testAddresses


# The following code is sourced from:
# https://github.com/grpc/grpc/blob/master/examples/python/interceptors/headers/generic_client_interceptor.py

# ------------------------------------------------------------------------
# Copyright 2017 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


class _GenericClientInterceptor(grpc.UnaryUnaryClientInterceptor,
grpc.UnaryStreamClientInterceptor,
grpc.StreamUnaryClientInterceptor,
grpc.StreamStreamClientInterceptor):
"""Base class for interceptors that operate on all RPC types."""

def __init__(self, interceptor_function):
self._fn = interceptor_function

def intercept_unary_unary(self, continuation, client_call_details, request):
new_details, new_request_iterator, postprocess = self._fn(
client_call_details, iter((request,)), False, False)
response = continuation(new_details, next(new_request_iterator))
return postprocess(response) if postprocess else response

def intercept_unary_stream(self, continuation, client_call_details,
request):
new_details, new_request_iterator, postprocess = self._fn(
client_call_details, iter((request,)), False, True)
response_it = continuation(new_details, next(new_request_iterator))
return postprocess(response_it) if postprocess else response_it

def intercept_stream_unary(self, continuation, client_call_details,
request_iterator):
new_details, new_request_iterator, postprocess = self._fn(
client_call_details, request_iterator, True, False)
response = continuation(new_details, new_request_iterator)
return postprocess(response) if postprocess else response

def intercept_stream_stream(self, continuation, client_call_details,
request_iterator):
new_details, new_request_iterator, postprocess = self._fn(
client_call_details, request_iterator, True, True)
response_it = continuation(new_details, new_request_iterator)
return postprocess(response_it) if postprocess else response_it


def create(intercept_call):
return _GenericClientInterceptor(intercept_call)

# ------------------------------------------------------------------------

# Code to add an interceptor that sets the header, based on example code from
# https://github.com/grpc/grpc/tree/master/examples/python/interceptors/headers


class _ClientCallDetails(
collections.namedtuple(
'_ClientCallDetails',
('method', 'timeout', 'metadata', 'credentials')),
grpc.ClientCallDetails):
pass


def addHeaderInterceptor(header, value):
def intercept_call(client_call_details, request_iterator, request_streaming, response_streaming):
metadata = []
if client_call_details.metadata is not None:
metadata = list(client_call_details.metadata)
metadata.append((
header,
value,
))
client_call_details = _ClientCallDetails(
client_call_details.method, client_call_details.timeout, metadata,
client_call_details.credentials)
return client_call_details, request_iterator, None

return create(intercept_call)
Loading

0 comments on commit 03afaa5

Please sign in to comment.