国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

NATS--NATS Streaming持久化

qiangdada / 1182人閱讀

摘要:持久訂閱會使得對應服務跟蹤客戶端最后確認消息的序列號和持久名稱。運行基于的持久化示例你將會看到如下的輸出可以看出默認的是基于內存的持久化。

前言

最近項目中需要使用到一個消息隊列,主要用來將原來一些操作異步化。根據自己的使用場景和熟悉程度,選擇了NATS Streaming。之所以,選擇NATS Streaming。一,因為我選型一些中間件,我會優先選取一些自己熟悉的語言編寫的,這樣方便排查問題和進一步的深究。二,因為自己一直做k8s等云原生這塊,偏向于cncf基金會管理的項目,畢竟這些項目從一開始就考慮了如何部署在k8s當中。三,是評估項目在不斷發展過程中,引入的組件是否能夠依舊滿足需求。

消息隊列的使用場景

如果問為什么這么做,需要說一下消息隊列的使用場景。之前看知乎的時候,看到一些回答比較認同,暫時拿過來,更能形象表達。感謝ScienJus同學的精彩解答。

消息隊列的主要特點是異步處理,主要目的是減少請求響應時間和解耦。所以主要的使用場景就是將比較耗時而且不需要即時(同步)返回結果的操作作為消息放入消息隊列。同時由于使用了消息隊列,只要保證消息格式不變,消息的發送方和接收方并不需要彼此聯系,也不需要受對方的影響,即解耦和。

使用場景的話,舉個例子:

假設用戶在你的軟件中注冊,服務端收到用戶的注冊請求后,它會做這些操作:

校驗用戶名等信息,如果沒問題會在數據庫中添加一個用戶記錄

如果是用郵箱注冊會給你發送一封注冊成功的郵件,手機注冊則會發送一條短信

分析用戶的個人信息,以便將來向他推薦一些志同道合的人,或向那些人推薦他

發送給用戶一個包含操作指南的系統通知等等……

但是對于用戶來說,注冊功能實際只需要第一步,只要服務端將他的賬戶信息存到數據庫中他便可以登錄上去做他想做的事情了。至于其他的事情,非要在這一次請求中全部完成么?值得用戶浪費時間等你處理這些對他來說無關緊要的事情么?所以實際當第一步做完后,服務端就可以把其他的操作放入對應的消息隊列中然后馬上返回用戶結果,由消息隊列異步的進行這些操作。

或者還有一種情況,同時有大量用戶注冊你的軟件,再高并發情況下注冊請求開始出現一些問題,例如郵件接口承受不住,或是分析信息時的大量計算使cpu滿載,這將會出現雖然用戶數據記錄很快的添加到數據庫中了,但是卻卡在發郵件或分析信息時的情況,導致請求的響應時間大幅增長,甚至出現超時,這就有點不劃算了。面對這種情況一般也是將這些操作放入消息隊列(生產者消費者模型),消息隊列慢慢的進行處理,同時可以很快的完成注冊請求,不會影響用戶使用其他功能。

所以在軟件的正常功能開發中,并不需要去刻意的尋找消息隊列的使用場景,而是當出現性能瓶頸時,去查看業務邏輯是否存在可以異步處理的耗時操作,如果存在的話便可以引入消息隊列來解決。否則盲目的使用消息隊列可能會增加維護和開發的成本卻無法得到可觀的性能提升,那就得不償失了。

其實,總結一下消息隊列的作用

削峰,形象點的話,可以比喻為蓄水池。比如elk日志收集系統中的kafka,主要在日志高峰期的時候,在犧牲實時性的同時,保證了整個系統的安全。

同步系統異構化。原先一個同步操作里的諸多步驟,可以考慮將一些不影響主線發展的步驟,通過消息隊列異步處理。比如,電商行業,一個訂單完成之后,一般除了直接返回給客戶購買成功的消息,還要通知賬戶組進行扣費,通知處理庫存變化,通知物流進行派送等,通知一些用戶組做一些增加會員積分等操作等。

NATS Streaming 簡介

NATS Streaming是一個由NATS驅動的數據流系統,用Go編程語言編寫。 NATS Streaming服務器的可執行文件名是nats-streaming-server。 NATS Streaming與核心NATS平臺無縫嵌入,擴展和互操作。 NATS Streaming服務器作為Apache-2.0許可下的開源軟件提供。 Synadia積極維護和支持NATS Streaming服務器。

特點

除了核心NATS平臺的功能外,NATS Streaming還提供以下功能:

增強消息協議

NATS Streaming使用谷歌協議緩沖區實現自己的增強型消息格式。這些消息通過二進制數據流在NATS核心平臺進行傳播,因此不需要改變NATS的基本協議。NATS Streaming信息包含以下字段:

  - 序列 - 一個全局順序序列號為主題的通道
  - 主題 - 是NATS Streaming 交付對象
  - 答復內容 - 對應"reply-to"對應的對象內容
  - 數據 - 真是數據內容
  - 時間戳 - 接收的時間戳,單位是納秒
  - 重復發送 - 標志這條數據是否需要服務再次發送
  - CRC32 - 一個循環冗余數據校驗選項,在數據存儲和數據通訊領域里,為了保證數據的正確性所采用的檢錯手段,這里使用的是 IEEE CRC32 算法

 - 消息/事件的持久性
  NATS Streaming提供了可配置的消息持久化,持久目的地可以為內存或者文件。另外,對應的存儲子系統使用了一個公共接口允許我們開發自己自定義實現來持久化對應的消息

 - 至少一次的發送
  NATS Streaming提供了發布者和服務器之間的消息確認(發布操作) 和訂閱者和服務器之間的消息確認(確認消息發送)。其中消息被保存在服務器端內存或者輔助存儲(或其他外部存儲器)用來為需要重新接受消息的訂閱者進行重發消息。

 - 發布者發送速率限定
  NATS Streaming提供了一個連接選項叫 MaxPubAcksInFlight,它能有效的限制一個發布者可能隨意的在任何時候發送的未被確認的消息。當達到這個配置的最大數量時,異步發送調用接口將會被阻塞,直到未確認消息降到指定數量之下。

- 每個訂閱者的速率匹配/限制
  NATS Streaming運行指定的訂閱中設置一個參數為 MaxInFlight,它用來指定已確認但未消費的最大數據量,當達到這個限制時,NATS Streaming 將暫停發送消息給訂閱者,直到未確認的數據量小于設定的量為止

以主題重發的歷史數據

  新訂閱的可以在已經存儲起來的訂閱的主題頻道指定起始位置消息流。通過使用這個選項,消息就可以開始發送傳遞了:

  1. 訂閱的主題存儲的最早的信息
  2. 與當前訂閱主題之前的最近存儲的數據,這通常被認為是 "最后的值" 或 "初值" 對應的緩存
  3. 一個以納秒為基準的 日期/時間
  4. 一個歷史的起始位置相對當前服務的 日期/時間,例如:最后30秒
  5. 一個特定的消息序列號

持久訂閱

  訂閱也可以指定一個“持久化的名稱”可以在客戶端重啟時不受影響。持久訂閱會使得對應服務跟蹤客戶端最后確認消息的序列號和持久名稱。當這個客戶端重啟或者重新訂閱的時候,使用相同的客戶端ID 和 持久化的名稱,對應的服務將會從最早的未被確認的消息處恢復。

docker 運行NATS Streaming

在運行之前,前面已經講過NATS Streaming 相比nats,多了持久化的一個future。所以我們在接下來的demo演示中,會重點說這點。

運行基于memory的持久化示例:
docker run -ti -p 4222:4222 -p 8222:8222  nats-streaming:0.12.0

你將會看到如下的輸出:

[1] 2019/02/26 08:13:01.769734 [INF] STREAM: Starting nats-streaming-server[test-cluster] version 0.12.0
[1] 2019/02/26 08:13:01.769811 [INF] STREAM: ServerID: arfYGWPtu7Cn8Ojcb1yko3
[1] 2019/02/26 08:13:01.769826 [INF] STREAM: Go version: go1.11.5
[1] 2019/02/26 08:13:01.770363 [INF] Starting nats-server version 1.4.1
[1] 2019/02/26 08:13:01.770398 [INF] Git commit [not set]
[4] 2019/02/26 08:13:01.770492 [INF] Starting http monitor on 0.0.0.0:8222
[1] 2019/02/26 08:13:01.770555 [INF] Listening for client connections on 0.0.0.0:4222
[1] 2019/02/26 08:13:01.770581 [INF] Server is ready
[1] 2019/02/26 08:13:01.799435 [INF] STREAM: Recovering the state...
[1] 2019/02/26 08:13:01.799461 [INF] STREAM: No recovered state
[1] 2019/02/26 08:13:02.052460 [INF] STREAM: Message store is MEMORY
[1] 2019/02/26 08:13:02.052552 [INF] STREAM: ---------- Store Limits ----------
[1] 2019/02/26 08:13:02.052574 [INF] STREAM: Channels:                  100 *
[1] 2019/02/26 08:13:02.052586 [INF] STREAM: --------- Channels Limits --------
[1] 2019/02/26 08:13:02.052601 [INF] STREAM:   Subscriptions:          1000 *
[1] 2019/02/26 08:13:02.052613 [INF] STREAM:   Messages     :       1000000 *
[1] 2019/02/26 08:13:02.052624 [INF] STREAM:   Bytes        :     976.56 MB *
[1] 2019/02/26 08:13:02.052635 [INF] STREAM:   Age          :     unlimited *
[1] 2019/02/26 08:13:02.052649 [INF] STREAM:   Inactivity   :     unlimited *
[1] 2019/02/26 08:13:02.052697 [INF] STREAM: ----------------------------------

可以看出默認的是基于內存的持久化。

運行基于file的持久化示例:
docker run -ti -v /Users/gao/test/mq:/datastore  -p 4222:4222 -p 8222:8222  nats-streaming:0.12.0  -store file --dir /datastore -m 8222

你將會看到如下的輸出:

[1] 2019/02/26 08:16:07.641972 [INF] STREAM: Starting nats-streaming-server[test-cluster] version 0.12.0
[1] 2019/02/26 08:16:07.642038 [INF] STREAM: ServerID: 9d4H6GAFPibpZv282KY9QM
[1] 2019/02/26 08:16:07.642099 [INF] STREAM: Go version: go1.11.5
[1] 2019/02/26 08:16:07.643733 [INF] Starting nats-server version 1.4.1
[1] 2019/02/26 08:16:07.643762 [INF] Git commit [not set]
[5] 2019/02/26 08:16:07.643894 [INF] Listening for client connections on 0.0.0.0:4222
[1] 2019/02/26 08:16:07.643932 [INF] Server is ready
[1] 2019/02/26 08:16:07.672145 [INF] STREAM: Recovering the state...
[1] 2019/02/26 08:16:07.679327 [INF] STREAM: No recovered state
[1] 2019/02/26 08:16:07.933519 [INF] STREAM: Message store is FILE
[1] 2019/02/26 08:16:07.933570 [INF] STREAM: Store location: /datastore
[1] 2019/02/26 08:16:07.933633 [INF] STREAM: ---------- Store Limits ----------
[1] 2019/02/26 08:16:07.933679 [INF] STREAM: Channels:                  100 *
[1] 2019/02/26 08:16:07.933697 [INF] STREAM: --------- Channels Limits --------
[1] 2019/02/26 08:16:07.933711 [INF] STREAM:   Subscriptions:          1000 *
[1] 2019/02/26 08:16:07.933749 [INF] STREAM:   Messages     :       1000000 *
[1] 2019/02/26 08:16:07.933793 [INF] STREAM:   Bytes        :     976.56 MB *
[1] 2019/02/26 08:16:07.933837 [INF] STREAM:   Age          :     unlimited *
[1] 2019/02/26 08:16:07.933857 [INF] STREAM:   Inactivity   :     unlimited *
[1] 2019/02/26 08:16:07.933885 [INF] STREAM: ----------------------------------
PS

如果部署在k8s當中,那么就可以采取基于file的持久化,通過掛載一個塊存儲來保證,數據可靠。比如,aws的ebs或是ceph的rbd。

4222為客戶端連接的端口。8222為監控端口。

啟動以后訪問:localhost:8222,可以看到如下的網頁:

啟動參數解析
Streaming Server Options:
    -cid, --cluster_id           Cluster ID (default: test-cluster)
    -st,  --store                Store type: MEMORY|FILE|SQL (default: MEMORY)
          --dir                  For FILE store type, this is the root directory
    -mc,  --max_channels            Max number of channels (0 for unlimited)
    -msu, --max_subs                Max number of subscriptions per channel (0 for unlimited)
    -mm,  --max_msgs                Max number of messages per channel (0 for unlimited)
    -mb,  --max_bytes              Max messages total size per channel (0 for unlimited)
    -ma,  --max_age            Max duration a message can be stored ("0s" for unlimited)
    -mi,  --max_inactivity     Max inactivity (no new message, no subscription) after which a channel can be garbage collected (0 for unlimited)
    -ns,  --nats_server          Connect to this external NATS Server URL (embedded otherwise)
    -sc,  --stan_config          Streaming server configuration file
    -hbi, --hb_interval        Interval at which server sends heartbeat to a client
    -hbt, --hb_timeout         How long server waits for a heartbeat response
    -hbf, --hb_fail_count           Number of failed heartbeats before server closes the client connection
          --ft_group             Name of the FT Group. A group can be 2 or more servers with a single active server and all sharing the same datastore
    -sl,  --signal [=]      Send signal to nats-streaming-server process (stop, quit, reopen)
          --encrypt                Specify if server should use encryption at rest
          --encryption_cipher    Cipher to use for encryption. Currently support AES and CHAHA (ChaChaPoly). Defaults to AES
          --encryption_key        Encryption Key. It is recommended to specify it through the NATS_STREAMING_ENCRYPTION_KEY environment variable instead

Streaming Server Clustering Options:
    --clustered                    Run the server in a clustered configuration (default: false)
    --cluster_node_id            ID of the node within the cluster if there is no stored ID (default: random UUID)
    --cluster_bootstrap            Bootstrap the cluster if there is no existing state by electing self as leader (default: false)
    --cluster_peers              List of cluster peer node IDs to bootstrap cluster state.
    --cluster_log_path           Directory to store log replication data
    --cluster_log_cache_size        Number of log entries to cache in memory to reduce disk IO (default: 512)
    --cluster_log_snapshots         Number of log snapshots to retain (default: 2)
    --cluster_trailing_logs         Number of log entries to leave after a snapshot and compaction
    --cluster_sync                 Do a file sync after every write to the replication log and message store
    --cluster_raft_logging         Enable logging from the Raft library (disabled by default)

Streaming Server File Store Options:
    --file_compact_enabled         Enable file compaction
    --file_compact_frag             File fragmentation threshold for compaction
    --file_compact_interval         Minimum interval (in seconds) between file compactions
    --file_compact_min_size        Minimum file size for compaction
    --file_buffer_size             File buffer size (in bytes)
    --file_crc                     Enable file CRC-32 checksum
    --file_crc_poly                 Polynomial used to make the table used for CRC-32 checksum
    --file_sync                    Enable File.Sync on Flush
    --file_slice_max_msgs           Maximum number of messages per file slice (subject to channel limits)
    --file_slice_max_bytes         Maximum file slice size - including index file (subject to channel limits)
    --file_slice_max_age       Maximum file slice duration starting when the first message is stored (subject to channel limits)
    --file_slice_archive_script  Path to script to use if you want to archive a file slice being removed
    --file_fds_limit                Store will try to use no more file descriptors than this given limit
    --file_parallel_recovery        On startup, number of channels that can be recovered in parallel
    --file_truncate_bad_eof        Truncate files for which there is an unexpected EOF on recovery, dataloss may occur

Streaming Server SQL Store Options:
    --sql_driver             Name of the SQL Driver ("mysql" or "postgres")
    --sql_source             Datasource used when opening an SQL connection to the database
    --sql_no_caching           Enable/Disable caching for improved performance
    --sql_max_open_conns        Maximum number of opened connections to the database

Streaming Server TLS Options:
    -secure                    Use a TLS connection to the NATS server without
                                     verification; weaker than specifying certificates.
    -tls_client_key          Client key for the streaming server
    -tls_client_cert         Client certificate for the streaming server
    -tls_client_cacert       Client certificate CA for the streaming server

Streaming Server Logging Options:
    -SD, --stan_debug=         Enable STAN debugging output
    -SV, --stan_trace=         Trace the raw STAN protocol
    -SDV                             Debug and trace STAN
         --syslog_name               On Windows, when running several servers as a service, use this name for the event source
    (See additional NATS logging options below)

Embedded NATS Server Options:
    -a, --addr               Bind to host address (default: 0.0.0.0)
    -p, --port                  Use port for clients (default: 4222)
    -P, --pid                File to store PID
    -m, --http_port             Use port for http monitoring
    -ms,--https_port            Use port for https monitoring
    -c, --config             Configuration file

Logging Options:
    -l, --log                File to redirect log output
    -T, --logtime=             Timestamp log entries (default: true)
    -s, --syslog             Enable syslog as log method
    -r, --remote_syslog      Syslog server addr (udp://localhost:514)
    -D, --debug=               Enable debugging output
    -V, --trace=               Trace the raw protocol
    -DV                              Debug and trace

Authorization Options:
        --user               User required for connections
        --pass               Password required for connections
        --auth               Authorization token required for connections

TLS Options:
        --tls=                 Enable TLS, do not verify clients (default: false)
        --tlscert            Server certificate file
        --tlskey             Private key for server certificate
        --tlsverify=           Enable TLS, verify client certificates
        --tlscacert          Client certificate CA for verification

NATS Clustering Options:
        --routes        Routes to solicit and connect
        --cluster            Cluster URL for solicited routes

Common Options:
    -h, --help                       Show this message
    -v, --version                    Show version
        --help_tls                   TLS help.
源碼簡單分析NATS Streaming 持久化

目前NATS Streaming支持以下4種持久化方式:

MEMORY

FILE

SQL

RAFT

其實看源碼可以知道:NATS Streaming的store基于接口實現,很容易擴展到更多的持久化方式。具體的接口如下:

// Store is the storage interface for NATS Streaming servers.
//
// If an implementation has a Store constructor with StoreLimits, it should be
// noted that the limits don"t apply to any state being recovered, for Store
// implementations supporting recovery.
//
type Store interface {
    // GetExclusiveLock is an advisory lock to prevent concurrent
    // access to the store from multiple instances.
    // This is not to protect individual API calls, instead, it
    // is meant to protect the store for the entire duration the
    // store is being used. This is why there is no `Unlock` API.
    // The lock should be released when the store is closed.
    //
    // If an exclusive lock can be immediately acquired (that is,
    // it should not block waiting for the lock to be acquired),
    // this call will return `true` with no error. Once a store
    // instance has acquired an exclusive lock, calling this
    // function has no effect and `true` with no error will again
    // be returned.
    //
    // If the lock cannot be acquired, this call will return
    // `false` with no error: the caller can try again later.
    //
    // If, however, the lock cannot be acquired due to a fatal
    // error, this call should return `false` and the error.
    //
    // It is important to note that the implementation should
    // make an effort to distinguish error conditions deemed
    // fatal (and therefore trying again would invariably result
    // in the same error) and those deemed transient, in which
    // case no error should be returned to indicate that the
    // caller could try later.
    //
    // Implementations that do not support exclusive locks should
    // return `false` and `ErrNotSupported`.
    GetExclusiveLock() (bool, error)

    // Init can be used to initialize the store with server"s information.
    Init(info *spb.ServerInfo) error

    // Name returns the name type of this store (e.g: MEMORY, FILESTORE, etc...).
    Name() string

    // Recover returns the recovered state.
    // Implementations that do not persist state and therefore cannot
    // recover from a previous run MUST return nil, not an error.
    // However, an error must be returned for implementations that are
    // attempting to recover the state but fail to do so.
    Recover() (*RecoveredState, error)

    // SetLimits sets limits for this store. The action is not expected
    // to be retroactive.
    // The store implementation should make a deep copy as to not change
    // the content of the structure passed by the caller.
    // This call may return an error due to limits validation errors.
    SetLimits(limits *StoreLimits) error

    // GetChannelLimits returns the limit for this channel. If the channel
    // does not exist, returns nil.
    GetChannelLimits(name string) *ChannelLimits

    // CreateChannel creates a Channel.
    // Implementations should return ErrAlreadyExists if the channel was
    // already created.
    // Limits defined for this channel in StoreLimits.PeChannel map, if present,
    // will apply. Otherwise, the global limits in StoreLimits will apply.
    CreateChannel(channel string) (*Channel, error)

    // DeleteChannel deletes a Channel.
    // Implementations should make sure that if no error is returned, the
    // channel would not be recovered after a restart, unless CreateChannel()
    // with the same channel is invoked.
    // If processing is expecting to be time consuming, work should be done
    // in the background as long as the above condition is guaranteed.
    // It is also acceptable for an implementation to have CreateChannel()
    // return an error if background deletion is still happening for a
    // channel of the same name.
    DeleteChannel(channel string) error

    // AddClient stores information about the client identified by `clientID`.
    AddClient(info *spb.ClientInfo) (*Client, error)

    // DeleteClient removes the client identified by `clientID` from the store.
    DeleteClient(clientID string) error

    // Close closes this store (including all MsgStore and SubStore).
    // If an exclusive lock was acquired, the lock shall be released.
    Close() error
}

官方也提供了mysql和pgsql兩種數據的支持:

postgres.db.sql

CREATE TABLE IF NOT EXISTS ServerInfo (uniquerow INTEGER DEFAULT 1, id VARCHAR(1024), proto BYTEA, version INTEGER, PRIMARY KEY (uniquerow));
CREATE TABLE IF NOT EXISTS Clients (id VARCHAR(1024), hbinbox TEXT, PRIMARY KEY (id));
CREATE TABLE IF NOT EXISTS Channels (id INTEGER, name VARCHAR(1024) NOT NULL, maxseq BIGINT DEFAULT 0, maxmsgs INTEGER DEFAULT 0, maxbytes BIGINT DEFAULT 0, maxage BIGINT DEFAULT 0, deleted BOOL DEFAULT FALSE, PRIMARY KEY (id));
CREATE INDEX Idx_ChannelsName ON Channels (name(256));
CREATE TABLE IF NOT EXISTS Messages (id INTEGER, seq BIGINT, timestamp BIGINT, size INTEGER, data BYTEA, CONSTRAINT PK_MsgKey PRIMARY KEY(id, seq));
CREATE INDEX Idx_MsgsTimestamp ON Messages (timestamp);
CREATE TABLE IF NOT EXISTS Subscriptions (id INTEGER, subid BIGINT, lastsent BIGINT DEFAULT 0, proto BYTEA, deleted BOOL DEFAULT FALSE, CONSTRAINT PK_SubKey PRIMARY KEY(id, subid));
CREATE TABLE IF NOT EXISTS SubsPending (subid BIGINT, row BIGINT, seq BIGINT DEFAULT 0, lastsent BIGINT DEFAULT 0, pending BYTEA, acks BYTEA, CONSTRAINT PK_MsgPendingKey PRIMARY KEY(subid, row));
CREATE INDEX Idx_SubsPendingSeq ON SubsPending (seq);
CREATE TABLE IF NOT EXISTS StoreLock (id VARCHAR(30), tick BIGINT DEFAULT 0);

-- Updates for 0.10.0
ALTER TABLE Clients ADD proto BYTEA;

mysql.db.sql

CREATE TABLE IF NOT EXISTS ServerInfo (uniquerow INT DEFAULT 1, id VARCHAR(1024), proto BLOB, version INTEGER, PRIMARY KEY (uniquerow));
CREATE TABLE IF NOT EXISTS Clients (id VARCHAR(1024), hbinbox TEXT, PRIMARY KEY (id(256)));
CREATE TABLE IF NOT EXISTS Channels (id INTEGER, name VARCHAR(1024) NOT NULL, maxseq BIGINT UNSIGNED DEFAULT 0, maxmsgs INTEGER DEFAULT 0, maxbytes BIGINT DEFAULT 0, maxage BIGINT DEFAULT 0, deleted BOOL DEFAULT FALSE, PRIMARY KEY (id), INDEX Idx_ChannelsName (name(256)));
CREATE TABLE IF NOT EXISTS Messages (id INTEGER, seq BIGINT UNSIGNED, timestamp BIGINT, size INTEGER, data BLOB, CONSTRAINT PK_MsgKey PRIMARY KEY(id, seq), INDEX Idx_MsgsTimestamp (timestamp));
CREATE TABLE IF NOT EXISTS Subscriptions (id INTEGER, subid BIGINT UNSIGNED, lastsent BIGINT UNSIGNED DEFAULT 0, proto BLOB, deleted BOOL DEFAULT FALSE, CONSTRAINT PK_SubKey PRIMARY KEY(id, subid));
CREATE TABLE IF NOT EXISTS SubsPending (subid BIGINT UNSIGNED, `row` BIGINT UNSIGNED, seq BIGINT UNSIGNED DEFAULT 0, lastsent BIGINT UNSIGNED DEFAULT 0, pending BLOB, acks BLOB, CONSTRAINT PK_MsgPendingKey PRIMARY KEY(subid, `row`), INDEX Idx_SubsPendingSeq(seq));
CREATE TABLE IF NOT EXISTS StoreLock (id VARCHAR(30), tick BIGINT UNSIGNED DEFAULT 0);

# Updates for 0.10.0
ALTER TABLE Clients ADD proto BLOB;
總結

后續會詳細解讀一下代碼實現和一些集群部署。當然肯定少不了如何部署高可用的集群在k8s當中。

參閱文章:

NATS Streaming詳解

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/27690.html

相關文章

  • Spark Streaming學習筆記

    摘要:輸入和接收器輸入代表從某種流式數據源流入的數據流。文件數據流可以從任何兼容包括等的文件系統,創建方式如下將監視該目錄,并處理該目錄下任何新建的文件目前還不支持嵌套目錄。會被一個個依次推入隊列,而則會依次以數據流形式處理這些的數據。 特點: Spark Streaming能夠實現對實時數據流的流式處理,并具有很好的可擴展性、高吞吐量和容錯性。 Spark Streaming支持從多種數...

    陸斌 評論0 收藏0
  • Spark Streaming遇到問題分析

    摘要:遇到問題分析之后搞了個還沒仔細了解可參考的與的有區別及并發控制先看看的,與的這幾個概念。一個可以認為就是會最終輸出一個結果的一條由組織而成的計算。在中,我們通過使用新極大地增強對狀態流處理的支持。 Spark Streaming遇到問題分析 1、Spark2.0之后搞了個Structured Streaming 還沒仔細了解,可參考:https://github.com/lw-lin/...

    stormzhang 評論0 收藏0
  • TBSSQL 的那些事 | TiDB Hackathon 2018 優秀項目分享

    摘要:當我們正準備做前期調研和設計的時候,主辦方把唐長老拉去做現場導師,參賽規則規定導師不能下場比賽,囧,于是就這樣被被動放了鴿子。川總早早來到現場。 本文作者是來自 TiBoys 隊的崔秋同學,他們的項目 TBSSQL 在 TiDB Hackathon 2018 中獲得了一等獎。TiDB Batch and Streaming SQL(簡稱 TBSSQL)擴展了 TiDB 的 SQL 引擎...

    KnewOne 評論0 收藏0
  • 阿里巴巴為什么選擇Apache Flink?

    摘要:從長遠來看,阿里決定用做一個統一的通用的大數據引擎作為未來的選型。在阿里的現狀基于在阿里巴巴搭建的平臺于年正式上線,并從阿里巴巴的搜索和推薦這兩大場景開始實現。目前阿里巴巴所有的業務,包括阿里巴巴所有子公司都采用了基于搭建的實時計算平臺。 本文主要整理自阿里巴巴計算平臺事業部資深技術專家莫問在云棲大會的演講。 合抱之木,生于毫末 隨著人工智能時代的降臨,數據量的爆發,在典型的大數據的業...

    CoderBear 評論0 收藏0

發表評論

0條評論

qiangdada

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<