I'm building a system where the web socket will publish the recieved data into a queue and the consumer will deal with its processing and trading decisions.
But I'm looking for a mechanism in which I can update the symbols list that I have initially subscribed in the onopen()
(like modifying a watchlist). I'm thinking of setting up an API (flask or fastapi) that can recieve the request to update and then I want to update it in the `FyersDataSocket` . But if I start the thread once ( fyers.keep_running()
) , can I update or unsubscribe from the main thread where the api service is running ?
In the documentation of market data unsubscribe in web socket, I saw that the fyers.unsubscribe
is written inside the onmessage()
callback function, I'm wondering if that can be done outside (in the main thread) also.
I don’t have much experience with multithreading, but this looks interesting and want to learn more about it, any help or ideas is appreciated.
here is a rough version of my idea.from fastapi import FastAPI
from fyers_apiv3.FyersWebsocket import data_ws
def onmessage(message):
# publish the message to the queue
pass
def onerror(message):
pass
def onclose(message):
pass
def onopen():
data_type = "SymbolUpdate"
symbols = ['NSE:SBIN-EQ', 'NSE:ADANIENT-EQ']
fyers.subscribe(symbols=symbols, data_type=data_type)
fyers.keep_running()
access_token = "XC4XXXXXXM-100:eXXXXXXXXXXXXfZNSBoLo"
fyers = data_ws.FyersDataSocket(
access_token=access_token, # Access token in the format "appid:accesstoken"
log_path="", # Path to save logs. Leave empty to auto-create logs in the current directory.
litemode=False, # Lite mode disabled. Set to True if you want a lite response.
write_to_file=False, # Save response in a log file instead of printing it.
reconnect=True, # Enable auto-reconnection to WebSocket on disconnection.
on_connect=onopen, # Callback function to subscribe to data upon connection.
on_close=onclose, # Callback function to handle WebSocket connection close events.
on_error=onerror, # Callback function to handle WebSocket errors.
on_message=onmessage # Callback function to handle incoming messages from the WebSocket.
)
app = FastAPI()
@app.get("/start")
def start():
fyers.connect()
return {"message": "Connection established"}
@app.post("/add_symbol")
def update(symbol:str):
data_type = "SymbolUpdate"
symbols = [symbol]
fyers.subscribe(symbols=symbols, data_type=data_type)
return {"message": "Subscribed to symbol"}
@app.post("/remove_symbol")
def remove(symbol:str):
data_type = "SymbolUpdate"
symbols_to_unsubscribe = [symbol]
fyers.unsubscribe(symbols=symbols_to_unsubscribe, data_type=data_type)
return {"message": "Unsubscribed from symbol"}