Running the Agent in Play mode
How to run the infini-agent in playmode and interact with it from the NATS CLI
First install the agent for your platform.
Running infini-agent
Section titled “Running infini-agent”infini-agent playThe play command will start an agent on your host computer, signup for a temporary account,
create a virtual group and store the objects in memory.
Create a stream
Section titled “Create a stream”Let’s create a stream with the NATS CLI (we will bundle the CLI into agent in near future).
Since we are running the InfiniAgent on the localhost, we will use that for -s option.
Let’s create a stream.
nats stream add testing? Subjects testing.*? Storage file? Replication 1? Retention Policy Limits? Discard Policy Old? Stream Messages Limit -1? Per Subject Messages Limit -1? Total Stream Size -1? Message TTL -1? Max Message Size -1? Duplicate tracking time window 2m0s? Allow message Roll-ups No? Allow message deletion Yes? Allow purging subjects or the entire stream YesStream testing was created
Information for Stream testing created ...
Subjects: foo Replicas: 1 Storage: File
Options:
Retention: Limits Acknowledgments: true Discard Policy: Old Duplicate Window: 2m0s Direct Get: true Allows Msg Delete: true Allows Purge: true Allows Rollups: false
Limits:
Maximum Messages: unlimited Maximum Per Subject: unlimited Maximum Bytes: unlimited Maximum Age: unlimited Maximum Message Size: unlimited Maximum Consumers: unlimited
State:
Messages: 0 Bytes: 0 B First Sequence: 0 Last Sequence: 0 Active Consumers: 0or you can pass it from a file
cat <<EOF > ./newStream; ./nats -s nats://localhost:4224 stream add testing --config=./newStream{ "name": "testing", "subjects": [ "testing.*" ], "retention": "limits", "max_consumers": -1, "max_msgs_per_subject": -1, "max_msgs": -1, "max_bytes": -1, "max_age": 0, "max_msg_size": -1, "storage": "file", "discard": "old", "num_replicas": 1, "duplicate_window": 120000000000, "sealed": false, "deny_delete": false, "deny_purge": false, "allow_rollup_hdrs": false, "allow_direct": false, "mirror_direct": false}EOFand check it
./nats -s nats://localhost:4222 stream info testinglets publish a few messages to this stream
./nats pub testing.foo --count=10 --sleep 1s "publication #{{.Count}} @ {{.TimeStamp}}"Create consumer
Section titled “Create consumer”We can administratively create a consumer using the nats consumer add command, in this example we will name the consumer “pull_consumer”,
and we will leave the delivery subject to ‘nothing’ (i.e. just hit return at the prompt) because we are creating a ‘pull consumer’ and select all
for the start policy, you can then just use the defaults and hit return for all the other prompts.
The stream the consumer is created on should be the stream ‘testing’ we just created above
nats consumer add? Consumer name pull_consumer? Delivery target (empty for Pull Consumers)? Start policy (all, new, last, subject, 1h, msg sequence) all? Acknowledgment policy explicit? Replay policy instant? Filter Stream by subjects (blank for all)? Maximum Allowed Deliveries -1? Maximum Acknowledgments Pending 0? Deliver headers only without bodies No? Add a Retry Backoff Policy No? Select a Stream testingInformation for Consumer testing > pull_consumer created
Configuration:
Name: pull_consumer Pull Mode: true Deliver Policy: All Ack Policy: Explicit Ack Wait: 30.00s Replay Policy: Instant Max Ack Pending: 1,000 Max Waiting Pulls: 512
State:
Last Delivered Message: Consumer sequence: 0 Stream sequence: 0 Acknowledgment Floor: Consumer sequence: 0 Stream sequence: 0 Outstanding Acks: 0 out of maximum 1,000 Redelivered Messages: 0 Unprocessed Messages: 74 Waiting Pulls: 0 of maximum 512You can check on the status of any consumer at any time using nats consumer info or
view the messages in the stream using nats stream view testing or nats stream get testing,
or even remove individual messages from the stream using nats stream rmm.
Or with a script like this:
cat << EOF > ./c11; ./nats -s nats://localhost:4224 consumer add testing --config c11{ "ack_policy": "explicit", "deliver_policy": "all", "durable_name": "pull_consumer", "max_deliver": -1, "replay_policy": "original", "num_replicas": 0}EOFcheck stream for consumers
./nats -s nats://localhost:4224 consumer list testingSubscribing from the consumer
Section titled “Subscribing from the consumer”Now that the consumer has been created and since there are messages in the stream we can now start subscribing to the consumer:
nats consumer next testing pull_consumer --count 10This will print out all the messages in the stream starting with the first message (which was published in the past) and continuing with new messages as they are published until the count is reached.
Note that in this example we are creating a pull consumer with a ‘durable’ name, this means that the consumer can be shared between as many consuming processes as you want. For example instead of running a single nats consumer next with a count of 10 messages you could have started two instances of nats consumer each with a message count of 5 and you would see the consumption of the messages from the consumer distributed between those instances of nats.
Replaying the messages again
Section titled “Replaying the messages again”Once you have iterated over all the messages in the stream with the consumer,
you can get them again by simply creating a new consumer or by deleting that consumer (nats consumer rm) and
re-creating it (nats consumer add).
Cleaning up
Section titled “Cleaning up”You can clean up a stream (and release the resources associated with it (e.g. the messages stored in the stream))
using nats stream purge
You can also delete a stream (which will also automatically delete all of the consumers that may be defined on that stream) using nats stream rm