Single Click Login

I am getting error while attempting single click login {"s": "error", "code": -410, "message": "The requested resource is no longer available. It appears you are using a deprecated endpoint. Please migrate to APIv3 for continued support."} please anyone support

class fyers_broker:
    SUCCESS = 1
    ERROR = -1

    def __init__(self):
        # API keys and endpoints    
        self.fyers = None
        self.load_credentials()

    def load_credentials(self):
        """Load credentials from credentials.json in the data folder."""
        try:
            credentials_path = self.get_data_path('credentials.json')
            if not os.path.exists(credentials_path):
                raise FileNotFoundError(f"Credentials file not found at {credentials_path}")
            # Read and parse the JSON file
            with open(self.get_data_path('credentials.json'), 'r') as file:
                credentials = json.load(file)
            
            # Extract Fyers credentials
            fyers_creds = credentials.get("fyers", {})
            self.CLIENT_ID = fyers_creds.get("client_id", "")
            self.APP_ID = fyers_creds.get("app_id", "")
            self.APP_TYPE = fyers_creds.get("app_type", "")
            self.SECRET_KEY = fyers_creds.get("secret_key", "")
            self.ID = fyers_creds.get("id", "")
            self.APP_ID_TYPE = fyers_creds.get("app_id_type", "")
            self.TOTP_KEY = fyers_creds.get("totp_key", "")
            self.PIN = fyers_creds.get("pin", "")
            self.REDIRECT_URI = fyers_creds.get("redirect_uri", "")
            self.APP_ID_HASH = fyers_creds.get("app_id_hash", "")
            self.BASE_URL = fyers_creds.get("base_url", "")
            self.BASE_URL_2 = fyers_creds.get("base_url2", "")

            # Validate that required credentials are present
            if not all([self.CLIENT_ID, self.APP_ID, self.APP_TYPE, self.SECRET_KEY, self.ID, self.APP_ID_TYPE, self.TOTP_KEY, self.PIN, self.REDIRECT_URI, self.APP_ID_HASH]):
                self.handle_error("fyers", "load_credentials", "Missing required credentials")
                return None

            self.URL_SEND_LOGIN_OTP = self.BASE_URL + "/send_login_otp"
            self.URL_VERIFY_TOTP = self.BASE_URL + "/verify_otp"
            self.URL_VERIFY_PIN = self.BASE_URL + "/verify_pin"
            self.URL_TOKEN = self.BASE_URL_2 + "/token"
            self.URL_VALIDATE_AUTH_CODE = self.BASE_URL_2 + "/validate-authcode"

            self.SUCCESS = 1
            self.ERROR = -1

        except Exception as e:
            self.handle_error("fyers", "load_credentials", e)
            raise

    def send_login_otp(self,fy_id, app_id):
        try:
            payload = {"fy_id": fy_id,"app_id": app_id}
            result_string = requests.post(url=self.URL_SEND_LOGIN_OTP, json=payload)
            if result_string.status_code != 200:
                return [self.ERROR, result_string.text]
            self.result = json.loads(result_string.text)
            request_key = self.result["request_key"]
            return [self.SUCCESS, request_key]
        except Exception as e:
            self.handle_error("fyers", "send_login_otp", e)
            return [self.ERROR, e]

    def generate_totp(self, secret):
        try:
            generated_totp = pyotp.TOTP(secret).now()
            return [self.SUCCESS, generated_totp]
        except Exception as e:
            self.handle_error("fyers", "generate_totp", e)
            return [self.ERROR, e]

    def verify_totp(self, request_key, totp):
        try:
            payload = {
                "request_key": request_key,
                "otp": totp
            }
            result_string = requests.post(url=self.URL_VERIFY_TOTP, json=payload)
            if result_string.status_code != 200:
                return [self.ERROR, result_string.text]
            self.result = json.loads(result_string.text)
            request_key = self.result["request_key"]
            return [self.SUCCESS, request_key]
        except Exception as e:
            self.handle_error("fyers", "verify_totp", e)
            return [self.ERROR, e]

    def verify_PIN(self,request_key, pin):
        try:
            payload = {
                "request_key": request_key,
                "identity_type": "pin",
                "identifier": pin
            }
            result_string = requests.post(url=self.URL_VERIFY_PIN, json=payload)
            if result_string.status_code != 200:
                return [self.ERROR, result_string.text]
            self.result = json.loads(result_string.text)
            access_token = self.result["data"]["access_token"]
            return [self.SUCCESS, access_token]
        except Exception as e:
            self.handle_error("fyers", "verify_PIN", e)
            return [self.ERROR, e]

    def token(self,fy_id, app_id, redirect_uri, app_type, access_token):
        try:
            payload = {
                "fyers_id": fy_id,
                "app_id": app_id,
                "redirect_uri": redirect_uri,
                "appType": app_type,
                "code_challenge": "",
                "state": "sample_state",
                "scope": "",
                "nonce": "",
                "response_type": "code",
                "create_cookie": True
            }
            headers = {'Authorization': f'Bearer {access_token}'}
            result_string = requests.post(
                url=self.URL_TOKEN, json=payload, headers=headers
            )
            if result_string.status_code != 308:
                return [self.ERROR, result_string.text]
            self.result = json.loads(result_string.text)
            url = self.result["Url"]
            auth_code = parse.parse_qs(parse.urlparse(url).query)['auth_code'][0]
            return [self.SUCCESS, auth_code]
        except Exception as e:
            self.handle_error("fyers", "token", e)
            return [self.ERROR, e]

    def validate_authcode(self,app_id_hash, auth_code):
        try:
            payload = {
                "grant_type": "authorization_code",
                "appIdHash": app_id_hash,
                "code": auth_code,
            }
            result_string = requests.post(url=self.URL_VALIDATE_AUTH_CODE, json=payload)
            if result_string.status_code != 200:
                return [self.ERROR, result_string.text]
            self.result = json.loads(result_string.text)
            access_token = self.result["access_token"]
            return [self.SUCCESS, access_token]
        except Exception as e:
            self.handle_error("fyers", "validate_authcode", e)
            return [self.ERROR, e]

    def connect(self):
        try:            
            # Step 1 - Retrieve request_key from send_login_otp API
            send_otp_result = self.send_login_otp(fy_id=self.ID, app_id=self.APP_ID_TYPE)
            if send_otp_result[0] != self.SUCCESS:
                self.handle_error("fyers", "send_login_otp", send_otp_result[1])
                return None
            
            # Step 2 - Generate totp
            generate_totp_result = self.generate_totp(secret=self.TOTP_KEY)
            if generate_totp_result[0] != self.SUCCESS:
                self.handle_error("fyers", "generate_totp", f"Failed to generate TOTP: {generate_totp_result[1]}")
                return None
            
            # Step 3 - Verify totp and get request key from verify_otp API
            request_key = send_otp_result[1]
            totp = generate_totp_result[1]
            verify_totp_result = self.verify_totp(request_key=request_key, totp=totp)
            if verify_totp_result[0] != self.SUCCESS:
                self.handle_error("fyers", "verify_totp", f"Failed to verify TOTP: {verify_totp_result[1]}")
                return None    
               
            # Step 4 - Verify pin and send back access token
            request_key_2 = verify_totp_result[1]
            verify_pin_result = self.verify_PIN(request_key=request_key_2, pin=self.PIN)
            if verify_pin_result[0] != self.SUCCESS:
                self.handle_error("fyers", "verify_PIN", f"Failed to verify PIN: {verify_pin_result[1]}")
                return None
                      
            # Step 5 - Get auth code for API V2 App from trade access token
            token_result = self.token(
                fy_id=self.ID, app_id=self.APP_ID, redirect_uri=self.REDIRECT_URI, app_type=self.APP_TYPE,
                access_token=verify_pin_result[1]
            )
            if token_result[0] != self.SUCCESS:
                self.handle_error("fyers", "token", f"Failed to get token: {token_result[1]}")
                return None
            
            # Step 6 - Get API V2 access token from validating auth code
            auth_code = token_result[1]
            validate_authcode_result = self.validate_authcode(
                app_id_hash=self.APP_ID_HASH, auth_code=auth_code
            )
            if validate_authcode_result[0] != self.SUCCESS:  # Fixed check to validate_authcode_result
                self.handle_error("fyers", "validate_authcode", f"Failed to validate auth code: {validate_authcode_result[1]}")
                return None          
            access_token = validate_authcode_result[1]  # Corrected access token format

            self.fyers = fyersModel.FyersModel(client_id=self.CLIENT_ID,
                                                        token=access_token,
                                                        log_path=r"data")
            # Test connection (e.g., by fetching the profile)
            self.profile = self.fyers.get_profile()
            if "data" in self.profile and "name" in self.profile["data"]:
                return self.fyers
        except Exception as e:
            self.handle_error("fyers", "connect", e)
            return None
            
    def handle_error(self, broker_name, context, error):
        """Handle errors and update status."""
        exc_type, exc_value, exc_tb = sys.exc_info()
        tb_lineno = exc_tb.tb_lineno if exc_tb else None
        if tb_lineno:
            error_msg = f"Error in {context} for {broker_name} (Line {tb_lineno}): {error}"
        else:
            error_msg = f"Error in {context} for {broker_name}: {error}"
        logger.error(error_msg)
    
    def get_data_path(self,filename):
        if getattr(sys, 'frozen', False):
            base_path = os.path.dirname(sys.executable)  # Directory containing the .exe
        else:
            base_path = os.path.dirname(os.path.dirname(__file__))

        return os.path.join(base_path, "data", filename)
1
1 reply