Documentation Index Fetch the complete documentation index at: https://benzinga-2-mrrancy-patch-1.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
GitHub 저장소 소스 코드를 확인하고 기여해 보세요
Python 2.6+ 및 Python 3와 호환
외부 종속성 없음
대용량 메시지 지원
지수 백오프를 사용하는 재시도 로직 구성 가능
setup.py를 사용하여 라이브러리를 설치합니다:
git clone https://github.com/Benzinga/python-bztcp.git
cd python-bztcp
python setup.py install
내장 데모를 사용하여 클라이언트를 테스트하세요:
Python 3 / 2.7+
With Retry Config
Python 2.6
python -m bztcp USERNAME API_KEY
python -m bztcp USERNAME API_KEY RETRIES DELAY BACKOFF
python -m bztcp.__main__ USERNAME API_KEY
bztcp.client.Client 클래스가 연결 및 스트리밍을 담당합니다.
from __future__ import print_function
from bztcp.client import Client
client = Client( username = 'USERNAME' , key = 'API_KEY' )
for content in client.content_items():
title = content.get( 'title' , None )
print (title)
지수 백오프를 사용해 재시도 동작을 구성하세요:
from bztcp.client import Client
client = Client(
username = 'USERNAME' ,
key = 'API_KEY' ,
retries = 5 , # 최대 재시도 횟수
delay = 90 , # 초 단위 초기 지연 시간
backoff = 2 # 백오프 배수
)
for content in client.content_items():
title = content.get( 'title' , None )
print (title)
Parameter Description Default usernameBenzinga TCP 사용자명 필수 keyAPI 액세스 키 필수 retries최대 재시도 횟수 - delay재시도 간 초기 지연 시간(초 단위) - backoff지수 백오프에 사용할 곱셈 계수 -
연결 상태와 개별 메시지를 보다 세밀하게 제어하려면:
from bztcp.client import Client, STATUS_STREAM
from bztcp.exceptions import BzException
client = Client( username = 'USERNAME' , key = 'API_KEY' )
while True :
try :
msg = client.next_msg()
if msg.status == STATUS_STREAM :
print ( f "Content item: { msg.data } " )
else :
print ( f "Status: { msg.status } " )
except KeyboardInterrupt :
print ( "취소됨, 연결을 해제합니다." )
client.disconnect()
break
except BzException as bze:
print ( f "BZ Error: { bze } " )
break
상태 설명 STATUS_STREAM일반적인 스트리밍 콘텐츠 메시지
Method Description content_items()content 딕셔너리를 순차적으로 반환하는 제너레이터 next_msg()다음 원시 메시지 객체를 반환합니다 disconnect()서버와의 연결을 정상적으로 종료합니다
이 라이브러리는 Benzinga 관련 오류에 대해 BzException 예외를 던집니다.
from bztcp.exceptions import BzException
try :
for content in client.content_items():
process(content)
except BzException as e:
print ( f "Benzinga 오류: { e } " )
except Exception as e:
print ( f "예기치 않은 오류: { e } " )
#!/usr/bin/env python
from __future__ import print_function
import json
from bztcp.client import Client
def main ():
client = Client(
username = 'YOUR_USERNAME' ,
key = 'YOUR_API_KEY' ,
retries = 5 ,
delay = 30 ,
backoff = 2
)
print ( "Starting Benzinga TCP stream..." )
for content in client.content_items():
# 주요 필드 추출
content_id = content.get( 'id' )
title = content.get( 'title' , 'No title' )
channels = content.get( 'channels' , [])
tickers = [t[ 'name' ] for t in content.get( 'tickers' , [])]
# 요약 정보 출력
print ( f "[ { content_id } ] { title } " )
if channels:
print ( f " Channels: { ', ' .join(channels) } " )
if tickers:
print ( f " Tickers: { ', ' .join(tickers) } " )
print ()
if __name__ == '__main__' :
main()