!pip install tradingview_ta
!pip install azure-eventhub
from tradingview_ta import TA_Handler, Interval
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from pyspark.sql.functions import expr, col
from datetime import datetime
from azure.eventhub import EventHubProducerClient, EventData
import json
# Danh sách 20 mã chứng khoán phổ biến trên HOSE
stock_symbols = ["VNM", "TCB", "MSN", "VCB", "HPG", "SSI", "MBB", "FPT", "STB", "VPB",
"BID", "CTG", "GAS", "REE", "HDB", "BVH", "VIC", "VHM", "PLX", "PNJ"]
# Kết nối Event Hub-sửa lại theo hub
EVENTHUB_CONNECTION_STRING = "Endpoint=sb://esehsgnsybaabfufku2g5y.servicebus.windows.net/;SharedAccessKeyName=key_c88cda35-03d2-4f7e-91f3-7c282f778bc1;SharedAccessKey=EeztJwaVeexAZs1sIOQ5KTVek23by6vuf+AEhFWY0r4="
EVENTHUB_NAME = "es_92d3411d-cec4-4633-84bd-b0c5d857aa8d"
# Hàm lấy dữ liệu chứng khoán từ TradingView
def get_stock_data(symbol):
try:
analysis = TA_Handler(
symbol=symbol,
exchange="HOSE",
screener="Vietnam",
interval=Interval.INTERVAL_1_DAY
)
result = analysis.get_analysis()
indicators = result.indicators
return {
"symbol": symbol,
"price": float(indicators.get("close", 0.0)),
"volume": float(indicators.get("volume", 0.0)),
"RSI": float(indicators.get("RSI", 0.0)),
"MACD": float(indicators.get("MACD.macd", 0.0)),
"EMA_50": float(indicators.get("EMA50", 0.0)),
"EMA_200": float(indicators.get("EMA200", 0.0)),
"SMA_20": float(indicators.get("SMA20", 0.0)),
"high": float(indicators.get("high", 0.0)),
"low": float(indicators.get("low", 0.0)),
"ADX": float(indicators.get("ADX", 0.0))
}
except Exception as e:
print(f"Lỗi lấy dữ liệu {symbol}: {e}")
return None
# Hàm gửi dữ liệu vào Event Hub
def send_to_eventhub(data):
producer = EventHubProducerClient.from_connection_string(conn_str=EVENTHUB_CONNECTION_STRING, eventhub_name=EVENTHUB_NAME)
with producer:
event_data_batch = producer.create_batch()
for record in data:
if record: # Chỉ gửi dữ liệu hợp lệ
event_data_batch.add(EventData(json.dumps(record)))
producer.send_batch(event_data_batch)
print("✅ Gửi dữ liệu thành công vào Event Hub")
# Khởi tạo Spark Session
spark = SparkSession.builder.appName("TradingViewToEventHub").getOrCreate()
# Lấy dữ liệu chứng khoán
data = [get_stock_data(symbol) for symbol in stock_symbols]
data = [d for d in data if d] # Lọc bỏ dữ liệu lỗi hoặc None
# Định nghĩa schema cho DataFrame
schema = StructType([
StructField("symbol", StringType(), True),
StructField("price", DoubleType(), True),
StructField("volume", DoubleType(), True),
StructField("RSI", DoubleType(), True),
StructField("MACD", DoubleType(), True),
StructField("EMA_50", DoubleType(), True),
StructField("EMA_200", DoubleType(), True),
StructField("SMA_20", DoubleType(), True),
StructField("high", DoubleType(), True),
StructField("low", DoubleType(), True),
StructField("ADX", DoubleType(), True),
StructField("Timestamp_UTC7", TimestampType(), True) # Thêm cột timestamp vào schema
])
# Tạo DataFrame Spark
df = spark.createDataFrame(data, schema=schema)
# Thêm cột timestamp (UTC+7)
df = df.withColumn("Timestamp_UTC7", expr("current_timestamp() + INTERVAL 7 HOURS"))
# Chuyển đổi DataFrame Spark sang JSON với timestamp dưới dạng string
json_data = [
{key: (value.strftime("%Y-%m-%d %H:%M:%S") if isinstance(value, datetime) else value)
for key, value in row.asDict().items()}
for row in df.collect()
]
# Gửi dữ liệu vào Event Hub (sau khi đã có timestamp chuẩn)
send_to_eventhub(json_data)
# Hiển thị dữ liệu
df.show()
Nhận xét
Đăng nhận xét