Skip to content

a972celia/crypto-etl-pipeline-data-engineering-project

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

7 Commits
 
 
 
 
 
 
 
 

Repository files navigation

crypto-etl-pipeline-data-engineering-project

Project Overview

This crypto etl pipeline data engineer project is designed to empower cryptocurrency investors by providing them with real-time insights based on price movements and news sentiment analysis. The project not only helps users keep a close eye on their preferred cryptocurrencies but also aids in making informed decisions by understanding market sentiments and trends.

Producer

A producer sends data to a Kafka topic. In the context of your project:

  1. Data Collection: Use the Binance API to get real-time crypto prices.
  2. Serialization: Transform this data into a format suitable for streaming, like converting it into a JSON string.
  3. Sending Data: Send this serialized data to a Kafka topic.

Here's a basic example in Python:

from kafka import KafkaProducer
import requests
import json
import time

producer = KafkaProducer(bootstrap_servers='your_kafka_bootstrap_server',
                         value_serializer=lambda v: json.dumps(v).encode('utf-8'))

while True:
    # Replace with your Binance API endpoint
    response = requests.get('your_binance_api_endpoint') 
    data = response.json() 
    producer.send('crypto_prices', data)
    time.sleep(1)  # delay for a certain period of time, e.g., 1 second)

Consumer

A consumer reads data from a Kafka topic.

  1. Receiving Data: Read data from the Kafka topic.
  2. Deserialization: Parse the incoming data, e.g., converting a JSON string back into a Python object.
  3. Data Processing: Process the data as per your use case. For instance, you might store it in a database, analyze it, or trigger some action based on it.

Example of a simple consumer:

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer('crypto_prices',
                         bootstrap_servers='your_kafka_bootstrap_server',
                         value_deserializer=lambda x: json.loads(x.decode('utf-8')))

for message in consumer:
    data = message.value
    # Now, you can process the data - store it, analyze it, etc.

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published