-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathingest.py
49 lines (42 loc) · 1.64 KB
/
ingest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import argparse
import os
import sys
from time import sleep
from dotenv import load_dotenv
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS
import itertools
if sys.version_info >= (3, 12):
batched = itertools.batched
else:
def batched(iterable, n):
iter_ = iter(iterable)
while True:
batch = tuple(itertools.islice(iter_, n))
if not batch:
break
yield batch
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('file', type=str, help='File to ingest')
parser.add_argument('--precision', type=str, help='Precision of the data', default='ns')
args = parser.parse_args()
load_dotenv()
url = os.environ["GREPTIME_HOST"].rstrip('/')
url = f"{url}/v1/influxdb/"
token = os.environ["GREPTIME_USERNAME"] + ":" + os.environ["GREPTIME_PASSWORD"]
org = "my-org" # GreptimeDB doesn't use orgs
bucket = os.environ["GREPTIME_DATABASE"]
client = InfluxDBClient(url=url, token=token, org=org)
write_api = client.write_api(write_options=SYNCHRONOUS)
with open(args.file) as f:
for batch_lines in batched(f, 1000):
write_api.write(bucket=bucket, write_precision=args.precision, record=batch_lines)
print(f'Wrote {len(batch_lines)} lines')
sleep(1) # sleep for 1 second avoid rate limit
# NOTE - Write data points directly is possible:
#
# import time
# from influxdb_client import Point, WritePrecision
# p = Point("my_measurement").tag("location", "Prague").field("temperature", 25.3).time(time.time_ns(), WritePrecision.NS)
# write_api.write(bucket=bucket, record=p)
print('Done')