@@ -0,0 +1,82 @@ | |||
# Chapter. Queue | |||
ThensorFlow is capable to handle multiple threads, and queues are powerful mechanism for asynchronous computation. If we have large datasets this can significantly speed up the training process of our models. This functionality is especially handy when reading, pre-processing and extracting in mini-batches our training data. The secret to being able to do professional and high performance training of our model is understanding TensorFlow queuing operations. TensorFlow has implemented 4 types of Queue: **FIFOQueue**, **PaddingFIFOQueue**, **PriorityQueue** and **RandomShuffleQueue**. | |||
 | |||
Like everything in TensorFlow, a queue is a node in a computation graph. It's a stateful node, like a variable: other nodes can modify its content, In particular, nodes can enqueue new items into the queue, or dequeue existing items from the queue. | |||
To get started with queue, let's consider a simple example. We will create a "first in, first out" queue (FIFOQueue) and fill it with numbers. Then we'll construct a graph that takes an item off the queue, adds one to that item, and puts it back on the end of the queue. | |||
```csharp | |||
[TestMethod] | |||
public void FIFOQueue() | |||
{ | |||
// create a first in first out queue with capacity up to 2 | |||
// and data type set as int32 | |||
var queue = tf.FIFOQueue(2, tf.int32); | |||
// init queue, push 2 elements into queue. | |||
var init = queue.enqueue_many(new[] { 10, 20 }); | |||
// pop out the first element | |||
var x = queue.dequeue(); | |||
// add 1 | |||
var y = x + 1; | |||
// push back into queue | |||
var inc = queue.enqueue(y); | |||
using (var sess = tf.Session()) | |||
{ | |||
// init queue | |||
init.run(); | |||
// pop out first element and push back calculated y | |||
(int dequeued, _) = sess.run((x, inc)); | |||
Assert.AreEqual(10, dequeued); | |||
(dequeued, _) = sess.run((x, inc)); | |||
Assert.AreEqual(20, dequeued); | |||
(dequeued, _) = sess.run((x, inc)); | |||
Assert.AreEqual(11, dequeued); | |||
(dequeued, _) = sess.run((x, inc)); | |||
Assert.AreEqual(21, dequeued); | |||
// thread will hang or block if you run sess.run(x) again | |||
// until queue has more element. | |||
} | |||
} | |||
``` | |||
`Enqueue`, `EnqueueMany` and `Dequeue` are special nodes. They take a pointer to the queue instead of a normal value, allowing them to change it. I first create a FIFOQueue *queue* of size up to 3, I enqueue two values into the *queue*. Then I immediately attempt to *dequeue* a value from it and assign it to *y* where I simply add 1 to the dequeued variable. Next, we start up a *session* and run. After we've run this operation a few times the queue will be empty - if we try and run the operation again, the main thread of the program will hang or block - this is because it will be waiting for another operation to be run to put more values in the queue. | |||
#### FIFOQueue | |||
Creates a queue that dequeues elements in a first-in first-out order. A `FIFOQueue` has bounded capacity; supports multiple concurrent producers and consumers; and provides exactly-once delivery. A `FIFOQueue` holds a list of up to `capacity` elements. Each element is a fixed-length tuple of tensors whose dtypes are described by `dtypes`, and whose shapes are optionally described by the `shapes` argument. | |||
#### PaddingFIFOQueue | |||
A FIFOQueue that supports batching variable-sized tensors by padding. A `PaddingFIFOQueue` may contain components with dynamic shape, while also supporting `dequeue_many`. A `PaddingFIFOQueue` holds a list of up to `capacity` elements. Each element is a fixed-length tuple of tensors whose dtypes are described by `dtypes`, and whose shapes are described by the `shapes` argument. | |||
#### PriorityQueue | |||
A queue implementation that dequeues elements in prioritized order. A `PriorityQueue` has bounded capacity; supports multiple concurrent producers and consumers; and provides exactly-once delivery. A `PriorityQueue` holds a list of up to `capacity` elements. Each element is a fixed-length tuple of tensors whose dtypes are described by `types`, and whose shapes are optionally described by the `shapes` argument. | |||
#### RandomShuffleQueue | |||
A queue implementation that dequeues elements in a random order. A `RandomShuffleQueue` has bounded capacity; supports multiple concurrent producers and consumers; and provides exactly-once delivery. A `RandomShuffleQueue` holds a list of up to `capacity` elements. Each element is a fixed-length tuple of tensors whose dtypes are described by `dtypes`, and whose shapes are optionally described by the `shapes` argument. | |||
Queue methods must run on the same device as the queue. `FIFOQueue` and `RandomShuffleQueue` are important TensorFlow objects for computing tensor asynchronously in a graph. For example, a typical input architecture is to use a `RandomShuffleQueue` to prepare inputs for training a model: | |||
* Multiple threads prepare training examples and push them in the queue. | |||
* A training thread executes a training op that dequeues mini-batches from the queue. | |||
This architecture simplifies the construction of input pipelines. | |||
From the above example, once the output gets to the point above you’ll actually have to terminate the program as it is blocked. Now, this isn’t very useful. What we really want to happen is for our little program to reload or enqueue more values whenever our queue is empty or is about to become empty. We could fix this by explicitly running our *enqueue_op* again in the code above to reload our queue with values. However, for large, more realistic programs, this will become unwieldy. Thankfully, TensorFlow has a solution. | |||
TensorFlow provides two classes to help multi-threading task: `tf.Coordinator` and `tf.QueueRunner`. There two classes are designed to be used together. The `Coordinator` class helps multiple threads stop together and report exceptions to a main thread. The `QueueRunner` class is used to create a number of threads cooperating to enqueue tensors in the same queue. |
@@ -22,6 +22,7 @@ Welcome to TensorFlow.NET's documentation! | |||
Graph | |||
Session | |||
Operation | |||
Queue | |||
Gradient | |||
Train | |||
EagerMode | |||
@@ -43,5 +43,52 @@ namespace Tensorflow | |||
names, | |||
shared_name: shared_name, | |||
name: name); | |||
public PaddingFIFOQueue PaddingFIFOQueue(int capacity, | |||
TF_DataType dtype, | |||
TensorShape shape, | |||
string shared_name = null, | |||
string name = "padding_fifo_queue") | |||
=> new PaddingFIFOQueue(capacity, | |||
new [] { dtype }, | |||
new[] { shape }, | |||
new[] { name }, | |||
shared_name: shared_name, | |||
name: name); | |||
/// <summary> | |||
/// A queue implementation that dequeues elements in first-in first-out order. | |||
/// </summary> | |||
/// <param name="capacity"></param> | |||
/// <param name="dtypes"></param> | |||
/// <param name="shapes"></param> | |||
/// <param name="names"></param> | |||
/// <param name="shared_name"></param> | |||
/// <param name="name"></param> | |||
/// <returns></returns> | |||
public FIFOQueue FIFOQueue(int capacity, | |||
TF_DataType[] dtypes, | |||
TensorShape[] shapes = null, | |||
string[] names = null, | |||
string shared_name = null, | |||
string name = "fifo_queue") | |||
=> new FIFOQueue(capacity, | |||
dtypes, | |||
shapes, | |||
names, | |||
shared_name: shared_name, | |||
name: name); | |||
public FIFOQueue FIFOQueue(int capacity, | |||
TF_DataType dtype, | |||
TensorShape shape = null, | |||
string shared_name = null, | |||
string name = "fifo_queue") | |||
=> new FIFOQueue(capacity, | |||
new[] { dtype }, | |||
new[] { shape ?? new TensorShape() }, | |||
new[] { name }, | |||
shared_name: shared_name, | |||
name: name); | |||
} | |||
} |
@@ -0,0 +1,28 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Linq; | |||
using System.Text; | |||
namespace Tensorflow.Queues | |||
{ | |||
public class FIFOQueue : QueueBase | |||
{ | |||
public FIFOQueue(int capacity, | |||
TF_DataType[] dtypes, | |||
TensorShape[] shapes, | |||
string[] names = null, | |||
string shared_name = null, | |||
string name = "fifo_queue") | |||
: base(dtypes: dtypes, shapes: shapes, names: names) | |||
{ | |||
_queue_ref = gen_data_flow_ops.fifo_queue_v2( | |||
component_types: dtypes, | |||
shapes: shapes, | |||
capacity: capacity, | |||
shared_name: shared_name, | |||
name: name); | |||
_name = _queue_ref.op.name.Split('/').Last(); | |||
} | |||
} | |||
} |
@@ -33,6 +33,59 @@ namespace Tensorflow.Queues | |||
}); | |||
} | |||
public Operation enqueue_many<T>(T[] vals, string name = null) | |||
{ | |||
return tf_with(ops.name_scope(name, $"{_name}_EnqueueMany", vals), scope => | |||
{ | |||
var vals_tensor = _check_enqueue_dtypes(vals); | |||
return gen_data_flow_ops.queue_enqueue_many_v2(_queue_ref, vals_tensor, name: scope); | |||
}); | |||
} | |||
private Tensor[] _check_enqueue_dtypes(object vals) | |||
{ | |||
var tensors = new List<Tensor>(); | |||
switch (vals) | |||
{ | |||
case int[][] vals1: | |||
{ | |||
int i = 0; | |||
foreach (var (val, dtype) in zip(vals1, _dtypes)) | |||
tensors.Add(ops.convert_to_tensor(val, dtype: dtype, name: $"component_{i++}")); | |||
} | |||
break; | |||
case int[] vals1: | |||
tensors.Add(ops.convert_to_tensor(vals1, dtype: _dtypes[0], name: $"component_0")); | |||
break; | |||
default: | |||
throw new NotImplementedException(""); | |||
} | |||
return tensors.ToArray(); | |||
} | |||
/// <summary> | |||
/// Dequeues one element from this queue. | |||
/// </summary> | |||
/// <param name="name"></param> | |||
/// <returns></returns> | |||
public Tensor dequeue(string name = null) | |||
{ | |||
Tensor ret; | |||
if (name == null) | |||
name = $"{_name}_Dequeue"; | |||
if (_queue_ref.dtype == TF_DataType.TF_RESOURCE) | |||
ret = gen_data_flow_ops.queue_dequeue_v2(_queue_ref, _dtypes, name: name)[0]; | |||
else | |||
ret = gen_data_flow_ops.queue_dequeue(_queue_ref, _dtypes, name: name)[0]; | |||
return ret; | |||
} | |||
public Tensor[] dequeue_many(int n, string name = null) | |||
{ | |||
if (name == null) | |||
@@ -61,6 +61,22 @@ namespace Tensorflow | |||
return _op.output; | |||
} | |||
public static Tensor fifo_queue_v2(TF_DataType[] component_types, TensorShape[] shapes, | |||
int capacity = -1, string container = "", string shared_name = "", | |||
string name = null) | |||
{ | |||
var _op = _op_def_lib._apply_op_helper("FIFOQueueV2", name, new | |||
{ | |||
component_types, | |||
shapes, | |||
capacity, | |||
container, | |||
shared_name | |||
}); | |||
return _op.output; | |||
} | |||
public static Operation queue_enqueue(Tensor handle, Tensor[] components, int timeout_ms = -1, string name = null) | |||
{ | |||
var _op = _op_def_lib._apply_op_helper("QueueEnqueue", name, new | |||
@@ -85,6 +101,42 @@ namespace Tensorflow | |||
return _op; | |||
} | |||
public static Tensor[] queue_dequeue_v2(Tensor handle, TF_DataType[] component_types, int timeout_ms = -1, string name = null) | |||
{ | |||
var _op = _op_def_lib._apply_op_helper("QueueDequeueV2", name, new | |||
{ | |||
handle, | |||
component_types, | |||
timeout_ms | |||
}); | |||
return _op.outputs; | |||
} | |||
public static Tensor[] queue_dequeue(Tensor handle, TF_DataType[] component_types, int timeout_ms = -1, string name = null) | |||
{ | |||
var _op = _op_def_lib._apply_op_helper("QueueDequeue", name, new | |||
{ | |||
handle, | |||
component_types, | |||
timeout_ms | |||
}); | |||
return _op.outputs; | |||
} | |||
public static Operation queue_enqueue_many_v2(Tensor handle, Tensor[] components, int timeout_ms = -1, string name = null) | |||
{ | |||
var _op = _op_def_lib._apply_op_helper("QueueEnqueueManyV2", name, new | |||
{ | |||
handle, | |||
components, | |||
timeout_ms | |||
}); | |||
return _op; | |||
} | |||
public static Tensor[] queue_dequeue_many_v2(Tensor handle, int n, TF_DataType[] component_types, int timeout_ms = -1, string name = null) | |||
{ | |||
var _op = _op_def_lib._apply_op_helper("QueueDequeueManyV2", name, new | |||
@@ -73,17 +73,17 @@ namespace Tensorflow | |||
public Session Session() | |||
{ | |||
return new Session(); | |||
return new Session().as_default(); | |||
} | |||
public Session Session(Graph graph, SessionOptions opts = null) | |||
{ | |||
return new Session(graph, opts: opts); | |||
return new Session(graph, opts: opts).as_default(); | |||
} | |||
public Session Session(SessionOptions opts) | |||
{ | |||
return new Session(null, opts); | |||
return new Session(null, opts).as_default(); | |||
} | |||
public void __init__() | |||
@@ -15,7 +15,7 @@ namespace TensorFlowNET.UnitTest | |||
public void PaddingFIFOQueue() | |||
{ | |||
var numbers = tf.placeholder(tf.int32); | |||
var queue = tf.PaddingFIFOQueue(capacity: 10, dtypes: new[] { tf.int32 }, shapes: new[] { new TensorShape(-1) }); | |||
var queue = tf.PaddingFIFOQueue(10, tf.int32, new TensorShape(-1)); | |||
var enqueue = queue.enqueue(numbers); | |||
var dequeue_many = queue.dequeue_many(n: 3); | |||
@@ -32,5 +32,43 @@ namespace TensorFlowNET.UnitTest | |||
Assert.IsTrue(Enumerable.SequenceEqual(new int[] { 3, 4, 5 }, result[2].ToArray<int>())); | |||
} | |||
} | |||
[TestMethod] | |||
public void FIFOQueue() | |||
{ | |||
// create a first in first out queue with capacity up to 2 | |||
// and data type set as int32 | |||
var queue = tf.FIFOQueue(2, tf.int32); | |||
// init queue, push 3 elements into queue. | |||
var init = queue.enqueue_many(new[] { 10, 20 }); | |||
// pop out the first element | |||
var x = queue.dequeue(); | |||
// add 1 | |||
var y = x + 1; | |||
// push back into queue | |||
var inc = queue.enqueue(y); | |||
using (var sess = tf.Session()) | |||
{ | |||
// init queue | |||
init.run(); | |||
// pop out first element and push back calculated y | |||
(int dequeued, _) = sess.run((x, inc)); | |||
Assert.AreEqual(10, dequeued); | |||
(dequeued, _) = sess.run((x, inc)); | |||
Assert.AreEqual(20, dequeued); | |||
(dequeued, _) = sess.run((x, inc)); | |||
Assert.AreEqual(11, dequeued); | |||
(dequeued, _) = sess.run((x, inc)); | |||
Assert.AreEqual(21, dequeued); | |||
// thread will hang or block if you run sess.run(x) again | |||
// until queue has more element. | |||
} | |||
} | |||
} | |||
} |