Documentation Index Fetch the complete documentation index at: https://benzinga-2-mrrancy-patch-1.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
GitHub リポジトリ ソースコードを閲覧し、開発に貢献する
Python 2.6 以降および Python 3 と互換性あり
外部ライブラリへの依存なし
大容量メッセージをサポート
指数バックオフ付きの再試行ロジックを設定可能
setup.py を使ってライブラリをインストールします:
git clone https://github.com/Benzinga/python-bztcp.git
cd python-bztcp
python setup.py install
組み込みのデモを使用してクライアントをテストします:
Python 3 / 2.7+
リトライ設定あり
Python 2.6
python -m bztcp USERNAME API_KEY
python -m bztcp USERNAME API_KEY RETRIES DELAY BACKOFF
python -m bztcp.__main__ USERNAME API_KEY
bztcp.client.Client クラスが接続とストリーミングを処理します。
from __future__ import print_function
from bztcp.client import Client
client = Client( username = 'USERNAME' , key = 'API_KEY' )
for content in client.content_items():
title = content.get( 'title' , None )
print (title)
指数バックオフ方式のリトライ動作を設定します:
from bztcp.client import Client
client = Client(
username = 'USERNAME' ,
key = 'API_KEY' ,
retries = 5 , # 最大リトライ回数
delay = 90 , # 初期遅延(秒)
backoff = 2 # バックオフ倍率
)
for content in client.content_items():
title = content.get( 'title' , None )
print (title)
パラメータ 説明 デフォルト usernameBenzinga TCP のユーザー名 必須 keyAPI アクセスキー 必須 retriesリトライ試行回数の上限 - delayリトライ間の初期間隔(秒) - backoff指数バックオフに使用する倍率 -
接続ステータスや個々のメッセージを詳細に制御するには:
from bztcp.client import Client, STATUS_STREAM
from bztcp.exceptions import BzException
client = Client( username = 'USERNAME' , key = 'API_KEY' )
while True :
try :
msg = client.next_msg()
if msg.status == STATUS_STREAM :
print ( f "Content item: { msg.data } " )
else :
print ( f "Status: { msg.status } " )
except KeyboardInterrupt :
print ( "Cancelled, disconnecting." )
client.disconnect()
break
except BzException as bze:
print ( f "BZ Error: { bze } " )
break
Status 説明 STATUS_STREAM通常のストリーミングメッセージ
Method Description content_items()content の辞書を逐次返すジェネレーター next_msg()次の生のメッセージオブジェクトを返す disconnect()サーバーとの接続を正常に終了する
Benzinga 固有のエラーが発生した場合、このライブラリは BzException をスローします。
from bztcp.exceptions import BzException
try :
for content in client.content_items():
process(content)
except BzException as e:
print ( f "Benzinga error: { e } " )
except Exception as e:
print ( f "Unexpected error: { e } " )
#!/usr/bin/env python
from __future__ import print_function
import json
from bztcp.client import Client
def main ():
client = Client(
username = 'YOUR_USERNAME' ,
key = 'YOUR_API_KEY' ,
retries = 5 ,
delay = 30 ,
backoff = 2
)
print ( "Starting Benzinga TCP stream..." )
for content in client.content_items():
# 主要なフィールドを抽出
content_id = content.get( 'id' )
title = content.get( 'title' , 'No title' )
channels = content.get( 'channels' , [])
tickers = [t[ 'name' ] for t in content.get( 'tickers' , [])]
# 概要を出力
print ( f "[ { content_id } ] { title } " )
if channels:
print ( f " Channels: { ', ' .join(channels) } " )
if tickers:
print ( f " Tickers: { ', ' .join(tickers) } " )
print ()
if __name__ == '__main__' :
main()