Skip to content

Commit

Permalink
Tool sync assets
Browse files Browse the repository at this point in the history
  • Loading branch information
josimar-bazam committed Aug 2, 2024
1 parent 1a21252 commit 185e9b4
Show file tree
Hide file tree
Showing 2 changed files with 310 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# Script de Sincronização de Ativos

## Visão Geral

Este script é projetado para interagir com a API da Conviso AppSec para buscar e sincronizar ativos com base em vários critérios. Ele suporta buscar ativos por tags, obter o número total de ativos com tags específicas e identificar ativos que não foram sincronizados nos últimos 30 dias.

## Recursos

- Buscar e listar ativos por tags específicas.
- Obter o número total de ativos associados a tags específicas.
- Identificar ativos que não foram sincronizados nos últimos 30 dias.
- Sincronizar ativos com integrações específicas (FORTIFY e DEPENDENCY_TRACK).

## Requisitos

- Python 3.x
- Biblioteca `requests`
- Biblioteca `python-dotenv`

## Instalação

1. Clone o repositório ou baixe o script.

2. Instale as bibliotecas necessárias usando pip:

```bash
pip install requests python-dotenv
```

3. Crie um arquivo `.env` no diretório raiz do projeto e adicione sua chave de API:

```env
API_KEY=sua_chave_de_api_aqui
```



## Company ID

Na query é necessário adicionar CompanyID, exemplo.

`assets(companyId: "439", page: 1, limit: 100)`

## Define as tags específicas que você deseja considerar

No código adicione as tags que a API deve consultar na CP da Conviso, exemplo.

`specific_tags = ['AURA', 'Lab-IA', '4P Microservicos', '4P FAST DATA', '4P APIS', '4P BIG DATA', 'MLOps']`

## Uso
1. Execute o script:

```bash
python sync.py
```

2. Siga os prompts para selecionar uma operação:
- **1**: Buscar o volume de ativos por tag.
- **2**: Buscar o volume total de ativos por tags.
- **3**: Buscar ativos que não foram sincronizados nos últimos 30 dias.

3. Se solicitado, insira a tag ou o número da operação conforme necessário.

4. O script exibirá os resultados no terminal e pedirá para sincronizar os ativos. Se você escolher sincronizar, ele processará os ativos e fornecerá o status da sincronização.

## Explicação do Código

- **Carregamento de Variáveis de Ambiente**:

O script usa `python-dotenv` para carregar chaves de API e outras configurações a partir de um arquivo `.env`.

- **Busca de Ativos**:

A função `fetch_all_assets` recupera todos os ativos da API paginando através dos resultados.

- **Sincronização de Ativos**:

A função `sync_asset` envia solicitações de sincronização para a API para as integrações especificadas.

- **Operações**:

- **Operação 1**: Lista ativos filtrados pela tag selecionada.
- **Operação 2**: Conta ativos associados a cada tag especificada.
- **Operação 3**: Lista ativos que não foram sincronizados nos últimos 30 dias.

## Tratamento de Erros

Se houver um problema com a sincronização, o script imprimirá mensagens de erro no terminal. Certifique-se de que sua chave de API está correta e que você tem acesso adequado na CP.


220 changes: 220 additions & 0 deletions Sync Assets/AssetSynchronizationScript/sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
import os
from dotenv import load_dotenv
import requests
from datetime import datetime, timedelta
from collections import defaultdict

# Carrega as variáveis de ambiente do arquivo .env
load_dotenv()

# Obtém a chave API da variável de ambiente
api_key = os.getenv("API_KEY")

# Verifica se a chave API foi carregada corretamente
if not api_key:
raise ValueError("API_KEY não encontrada. Verifique se a variável de ambiente está definida corretamente.")

# Define o cabeçalho com a chave API obtida da variável de ambiente
headers = {
"x-api-key": api_key,
"Content-Type": "application/json"
}

# Define a URL da API
url = "https://app.convisoappsec.com/graphql"

# Define a query para obter os ativos com o campo updatedAt
query = """
{
assets(companyId: "439", page: 1, limit: 100) {
collection {
id
name
assetsTagList
updatedAt
}
metadata {
currentPage
totalPages
}
}
}
"""

# Define as tags específicas que você deseja considerar
specific_tags = ['AURA', 'Lab-IA', '4P Microservicos', '4P FAST DATA', '4P APIS', '4P BIG DATA', 'MLOps']

# Define as integrações específicas que você deseja considerar
integration_tags = {
'FORTIFY': 'FORTIFY',
'DEPENDENCY_TRACK': 'DEPENDENCY_TRACK'
}

# Pergunta ao usuário qual informação ele deseja validar
print("Selecione a operação desejada:")
print("1: Puxar o volume de ativos por tag")
print("2: Puxar o volume total de ativos por tags")
print("3: Puxar ativos que não foram sincronizados nos últimos 30 dias")
operation = input("Digite o número da operação desejada: ")

if operation == '1':
print("Selecione uma das tags:")
for i, tag in enumerate(specific_tags, 1):
print(f"{i}: {tag}")
tag_index = int(input("Digite o número da tag desejada: ")) - 1
selected_tag = specific_tags[tag_index]
elif operation == '2':
selected_tag = None
elif operation == '3':
selected_tag = None
else:
raise ValueError("Operação inválida selecionada.")

# Função para fazer a requisição e obter todos os ativos
def fetch_all_assets(url, headers, query):
all_assets = []
page = 1
while True:
response = requests.post(url, headers=headers, json={"query": query.replace("page: 1", f"page: {page}")})
if response.status_code == 200:
data = response.json()
assets = data['data']['assets']['collection']
all_assets.extend(assets)
metadata = data['data']['assets']['metadata']
if metadata['currentPage'] >= metadata['totalPages']:
break
page += 1
else:
print(f"Erro na requisição: {response.status_code}")
break
return all_assets

# Função para sincronizar um ativo
def sync_asset(asset_id, integration):
mutation = """
mutation SyncAsset($input: SyncAssetInput!) {
syncAsset(input: $input) {
assetId
clientMutationId
failureReason
integration
}
}
"""
variables = {
"input": {
"assetId": asset_id,
"integration": integration
}
}
response = requests.post(url, headers=headers, json={"query": mutation, "variables": variables})
return response.json()

# Obtém todos os ativos
print("Buscando todos os ativos...")
assets = fetch_all_assets(url, headers, query)

# Filtra os ativos conforme a operação selecionada
filtered_assets = []
tag_groups = defaultdict(list)

if operation == '1':
for asset in assets:
if selected_tag in asset['assetsTagList']:
filtered_assets.append(asset)
tag_groups[selected_tag].append(asset)
elif operation == '2':
for asset in assets:
for tag in asset['assetsTagList']:
if tag in specific_tags:
filtered_assets.append(asset)
tag_groups[tag].append(asset)
elif operation == '3':
days_threshold = 30
now = datetime.now().astimezone()
for asset in assets:
updated_at = datetime.fromisoformat(asset['updatedAt'])
if now - updated_at > timedelta(days=days_threshold):
filtered_assets.append(asset)
for tag in asset['assetsTagList']:
if tag in specific_tags:
tag_groups[tag].append(asset)

# Cria a mensagem para exibir no terminal
message_lines = []
if operation == '1':
message_lines.append(f"TAG: {selected_tag}, Ativos:")
for asset in tag_groups[selected_tag]:
message_lines.append(f" - Asset ID: {asset['id']}, Nome: {asset['name']}, Última Sincronização: {asset['updatedAt']}")
message_lines.append(f"Total de ativos para a TAG {selected_tag}: {len(tag_groups[selected_tag])}")
elif operation == '2':
for tag, assets in tag_groups.items():
message_lines.append(f"TAG: {tag}, Quantidade de Ativos: {len(assets)}")
total_assets_with_tags = len(filtered_assets)
message_lines.append(f"\nTotal de todos os ativos que contêm as tags específicas: {total_assets_with_tags}")
elif operation == '3':
for tag, assets in tag_groups.items():
message_lines.append(f"TAG: {tag}, Ativos que não foram sincronizados nos últimos {days_threshold} dias:")
for asset in assets:
message_lines.append(f" - Asset ID: {asset['id']}, Nome: {asset['name']}, Última Sincronização: {asset['updatedAt']}")
message_lines.append(f"Total de ativos para a TAG {tag}: {len(assets)}")
total_assets_with_tags = len(filtered_assets)
message_lines.append(f"\nTotal de todos os ativos que não foram sincronizados nos últimos {days_threshold} dias: {total_assets_with_tags}")

message = "\n".join(message_lines)

# Verifica a mensagem criada
print("Mensagem criada:")
print(message)

# Variáveis para contar os ativos sincronizados por integração
fortify_count = 0
dependency_track_count = 0

# Pergunta ao usuário se deseja sincronizar os ativos
if operation in ['1', '2', '3']:
user_input = input("Deseja sincronizar os ativos que estão vinculados nas tags selecionadas? (s/n): ").strip().lower()
if user_input == 's':
for asset in filtered_assets:
for tag in asset['assetsTagList']:
if 'FORTIFY' in integration_tags:
sync_response = sync_asset(asset['id'], integration_tags['FORTIFY'])
print(f"Verificando se o ativo com ID {asset['id']} tem integração com FORTIFY...")

# Verifica se a resposta contém o campo esperado
sync_asset_data = sync_response.get('data', {}).get('syncAsset')
if sync_asset_data:
if sync_asset_data.get('failureReason'):
print(f"Ativo com ID {asset['id']} não tem integração com FORTIFY: {sync_asset_data['failureReason']}")
else:
errors = sync_response.get('errors', [])
if errors:
error_messages = [error.get('message', 'Erro desconhecido') for error in errors]
print(f"Erro ao sincronizar ativo {asset['id']} com FORTIFY: {', '.join(error_messages)}")
else:
fortify_count += 1
else:
print(f"Ativo com ID {asset['id']} sincronizado com FORTIFY.")

if 'DEPENDENCY_TRACK' in integration_tags:
sync_response = sync_asset(asset['id'], integration_tags['DEPENDENCY_TRACK'])
print(f"Verificando se o ativo com ID {asset['id']} tem integração DEPENDENCY_TRACK...")

# Verifica se a resposta contém o campo esperado
sync_asset_data = sync_response.get('data', {}).get('syncAsset')
if sync_asset_data:
if sync_asset_data.get('failureReason'):
print(f"Ativo com ID {asset['id']} não tem integração com DEPENDENCY_TRACK: {sync_asset_data['failureReason']}")
else:
errors = sync_response.get('errors', [])
if errors:
error_messages = [error.get('message', 'Erro desconhecido') for error in errors]
print(f"Erro ao sincronizar ativo {asset['id']} com DEPENDENCY_TRACK: {', '.join(error_messages)}")
else:
dependency_track_count += 1
else:
print(f"Ativo com ID {asset['id']} sincronizado com DEPENDENCY_TRACK.")

# Exibe a contagem de ativos sincronizados por integração
print(f"Sincronização concluída. Total de ativos sincronizados com FORTIFY: {fortify_count}")
print(f"Total de ativos sincronizados com DEPENDENCY_TRACK: {dependency_track_count}")

0 comments on commit 185e9b4

Please sign in to comment.