Code pull data chứng khoán chạy trên Pyspark Python

 

!pip install tradingview_ta
!pip install azure-eventhub
from tradingview_ta import TA_Handler, Interval
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from pyspark.sql.functions import expr, col
from datetime import datetime
from azure.eventhub import EventHubProducerClient, EventData
import json

# Danh sách 20 mã chứng khoán phổ biến trên HOSE
stock_symbols = ["VNM", "TCB", "MSN", "VCB", "HPG", "SSI", "MBB", "FPT", "STB", "VPB",
                 "BID", "CTG", "GAS", "REE", "HDB", "BVH", "VIC", "VHM", "PLX", "PNJ"]

# Kết nối Event Hub-sửa lại theo hub
EVENTHUB_CONNECTION_STRING = "Endpoint=sb://esehsgnsybaabfufku2g5y.servicebus.windows.net/;SharedAccessKeyName=key_c88cda35-03d2-4f7e-91f3-7c282f778bc1;SharedAccessKey=EeztJwaVeexAZs1sIOQ5KTVek23by6vuf+AEhFWY0r4="
EVENTHUB_NAME = "es_92d3411d-cec4-4633-84bd-b0c5d857aa8d"

# Hàm lấy dữ liệu chứng khoán từ TradingView
def get_stock_data(symbol):
    try:
        analysis = TA_Handler(
            symbol=symbol,
            exchange="HOSE",
            screener="Vietnam",
            interval=Interval.INTERVAL_1_DAY
        )
        result = analysis.get_analysis()
        indicators = result.indicators

        return {
            "symbol": symbol,
            "price": float(indicators.get("close", 0.0)),
            "volume": float(indicators.get("volume", 0.0)),
            "RSI": float(indicators.get("RSI", 0.0)),
            "MACD": float(indicators.get("MACD.macd", 0.0)),
            "EMA_50": float(indicators.get("EMA50", 0.0)),
            "EMA_200": float(indicators.get("EMA200", 0.0)),
            "SMA_20": float(indicators.get("SMA20", 0.0)),
            "high": float(indicators.get("high", 0.0)),
            "low": float(indicators.get("low", 0.0)),
            "ADX": float(indicators.get("ADX", 0.0))
        }
    except Exception as e:
        print(f"Lỗi lấy dữ liệu {symbol}: {e}")
        return None

# Hàm gửi dữ liệu vào Event Hub
def send_to_eventhub(data):
    producer = EventHubProducerClient.from_connection_string(conn_str=EVENTHUB_CONNECTION_STRING, eventhub_name=EVENTHUB_NAME)
    with producer:
        event_data_batch = producer.create_batch()
        for record in data:
            if record:  # Chỉ gửi dữ liệu hợp lệ
                event_data_batch.add(EventData(json.dumps(record)))
        producer.send_batch(event_data_batch)
    print("✅ Gửi dữ liệu thành công vào Event Hub")

# Khởi tạo Spark Session
spark = SparkSession.builder.appName("TradingViewToEventHub").getOrCreate()

# Lấy dữ liệu chứng khoán
data = [get_stock_data(symbol) for symbol in stock_symbols]
data = [d for d in data if d]  # Lọc bỏ dữ liệu lỗi hoặc None

# Định nghĩa schema cho DataFrame
schema = StructType([
    StructField("symbol", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("volume", DoubleType(), True),
    StructField("RSI", DoubleType(), True),
    StructField("MACD", DoubleType(), True),
    StructField("EMA_50", DoubleType(), True),
    StructField("EMA_200", DoubleType(), True),
    StructField("SMA_20", DoubleType(), True),
    StructField("high", DoubleType(), True),
    StructField("low", DoubleType(), True),
    StructField("ADX", DoubleType(), True),
    StructField("Timestamp_UTC7", TimestampType(), True)  # Thêm cột timestamp vào schema
])

# Tạo DataFrame Spark
df = spark.createDataFrame(data, schema=schema)

# Thêm cột timestamp (UTC+7)
df = df.withColumn("Timestamp_UTC7", expr("current_timestamp() + INTERVAL 7 HOURS"))

# Chuyển đổi DataFrame Spark sang JSON với timestamp dưới dạng string
json_data = [
    {key: (value.strftime("%Y-%m-%d %H:%M:%S") if isinstance(value, datetime) else value)
     for key, value in row.asDict().items()}
    for row in df.collect()
]

# Gửi dữ liệu vào Event Hub (sau khi đã có timestamp chuẩn)
send_to_eventhub(json_data)

# Hiển thị dữ liệu
df.show()

Nhận xét