Tạo evenstream thời tiết Bình Dương mỗi 10p với Fabric Notebook và Evenstream

 


Phần 1:Tạo evenstream và kiểm tra ghi data

1. Tạo workspace+ evenstream với Source "Custom endpoint", đặt tên source bất kỳ .





2. Sau khi tạo xong, sẽ có dạng như thế này, bấm "Publish" để lấy các key bảo mật, chưa cần quan tâm đến Destination.

=>Generate và Copy các key này để thay thế vào Pyspark .Lưu ý, không chia sẻ key vì nếu lộ, ai cũng có thể ghi data vào evenstream này




3. Tạo notebook trong cùng workspace, copy code này vào, tùy chỉnh để phù hợp với nhu cầu:

!pip install azure-eventhub

import requests
import json
import concurrent.futures
from datetime import datetime
from azure.eventhub import EventHubProducerClient, EventData

# copy evenhub name+connection string(trừ EntityPath)
EVENTHUB_NAME = "es_aecdb77a-a5bb-48e7-a5ea-bc3c0f5198dd"
CONNECTION_STR = "Endpoint=sb://esehsg3dpbjx2inibh451c.servicebus.windows.net/;SharedAccessKeyName=key_be5cb020-050c-4cc3-925a-69de9657d275;SharedAccessKey=zWPgSPZBrJ3+K+GBLK1FHNaUmTIT2Pu9L+AEhI42FDI="

# Danh sách tọa độ các khu vực trong Bình Dương
locations = {
"Thủ Dầu Một": (10.9804, 106.6519),
"Dĩ An": (10.9021, 106.7691),
"Thuận An": (10.9025, 106.7173),
"Bến Cát": (11.0715, 106.6223),
"Tân Uyên": (11.0487, 106.8225),
"Bàu Bàng": (11.2004, 106.6332),
"Dầu Tiếng": (11.2797, 106.4406),
"Phú Giáo": (11.2953, 106.8653)
}

# Hàm lấy dữ liệu thời tiết
def get_weather(location, lat, lon):
url = f"https://api.open-meteo.com/v1/forecast?latitude={lat}&longitude={lon}&current_weather=true&hourly=relative_humidity_2m,precipitation"
response = requests.get(url)
data = response.json()

try:
current = data["current_weather"]
humidity = data["hourly"]["relative_humidity_2m"][0] # Độ ẩm hiện tại
rain_forecast = [
data["hourly"]["precipitation"][1], # Mưa trong 1 giờ tới
data["hourly"]["precipitation"][2], # Mưa trong 2 giờ tới
data["hourly"]["precipitation"][3] # Mưa trong 3 giờ tới
]

return {
"timestamp": datetime.utcnow().isoformat() + "Z",
"location": location,
"temperature": current["temperature"],
"windspeed": current["windspeed"],
"humidity": humidity,
"rain_forecast": rain_forecast
}
except KeyError:
return {"location": location, "error": "Không lấy được dữ liệu"}

# Chạy API song song để lấy dữ liệu nhanh
weather_data = []
with concurrent.futures.ThreadPoolExecutor() as executor:
results = executor.map(lambda loc: get_weather(loc, *locations[loc]), locations.keys())

for result in results:
if "error" not in result:
weather_data.append(result) # Chỉ thêm dữ liệu hợp lệ

# Gửi dữ liệu vào Azure Event Hub
def send_to_event_hub(data_list):
try:
producer = EventHubProducerClient.from_connection_string(CONNECTION_STR, eventhub_name=EVENTHUB_NAME)
event_data_batch = producer.create_batch()

for data in data_list:
event_data_batch.add(EventData(json.dumps(data))) # Chuyển thành JSON và gửi

producer.send_batch(event_data_batch)
producer.close()
print(f"✅ Đã gửi {len(data_list)} bản ghi vào Event Hub!")
except Exception as e:
print("🚨 Gửi thất bại:", e)

# Gửi dữ liệu thời tiết vào Event Hub
send_to_event_hub(weather_data)

4. Run code xem evenstream có nhận dc data ko.






Nếu suôn sẻ, set Schedule cho Notebook chạy

Sau khi set schedule, cần kiểm tra run có thành công không, data có vào evenstream không:


Phần 2: Tạo destination

1. Tại workspace, tạo evenhouse (nếu chưa có)



2.Chọn get data from existing evenstream.



3. Chọn "new table", đặt tên cho table và chọn source evenstream



4. Chờ mapping, bấm finish




5. Sau khi tạo xong, trong evenstream sẽ xuất hiện thêm destination.



Table mới tạo chưa có data, sau khi Notebook refresh, data sẽ bắt đầu đổ về.
Nếu muốn đổ cả data cũ , hãy tạo table tại evenhouse trước, sau đó vào Evenstream tạo destination, nhưng như thế dễ dẫn tới lỗi mismatch datatype.




Table với data đã đổ về:





Phần 3. Transform data, tạo live power BI dashboard/Real-time dashboard

1. Dùng KQL để tranform data theo nhu cầu, ví dụ:
stream1
| project
timestamp = format_datetime(datetime_add("hour", 7, timestamp), "yyyy-MM-dd HH:mm"),
temperature,
humidity,
windspeed,
location,
rain_forecast_1 = toint(rain_forecast[0]),
rain_forecast_2 = toint(rain_forecast[1]),
rain_forecast_3 = toint(rain_forecast[2])
| order by timestamp desc
| take 48

2. Sau khi transform data dc như ý, bấm Pin to Dashboard hoặc Create Power Bi. Real-time report từ 2 phương pháp này sẽ ko thể chia sẻ publish.



3. Với Real-time dashboard:
Vào chế độ editing, bấm icon edit để sửa lại KQL code, hoặc add thêm visual.





Cài đặt auto refresh để bảng tự update data:



4. Với Power BI: Click Save để tạo dataset và report. Sau đó vào workspace lưu trữ report để mở báo cáo lên tùy chỉnh.
Lưu ý, không như Real-time dashboard, Dataset không cho phép chỉnh code dễ dàng nên hãy hoàn chỉnh phần transform data với KQL trước khi tạo dataset.



Có thể click vào report đã tạo để tùy chỉnh visual hoặc tạo các báo cáo mới từ dataset đã tạo.
Lưu ý:không như real-time dashboard cho phép auto refresh theo cài đặt, power BI chỉ có thể update data nếu user tương tác như bấm refresh, load lại trang.



Nhận xét