Select Git revision
dagreader.go
dagreader.go 6.93 KiB
package io
import (
"bytes"
"errors"
"fmt"
"io"
"os"
proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
mdag "github.com/ipfs/go-ipfs/merkledag"
ft "github.com/ipfs/go-ipfs/unixfs"
ftpb "github.com/ipfs/go-ipfs/unixfs/pb"
)
var ErrIsDir = errors.New("this dag node is a directory")
var ErrCantReadSymlinks = errors.New("cannot currently read symlinks")
// DagReader provides a way to easily read the data contained in a dag.
type DagReader struct {
serv mdag.DAGService
// the node being read
node *mdag.Node
// cached protobuf structure from node.Data
pbdata *ftpb.Data
// the current data buffer to be read from
// will either be a bytes.Reader or a child DagReader
buf ReadSeekCloser
// NodeGetters for each of 'nodes' child links
promises []mdag.NodeGetter
// the index of the child link currently being read from
linkPosition int
// current offset for the read head within the 'file'
offset int64
// Our context
ctx context.Context
// context cancel for children
cancel func()
}
type ReadSeekCloser interface {
io.Reader
io.Seeker
io.Closer
io.WriterTo
}
// NewDagReader creates a new reader object that reads the data represented by the given
// node, using the passed in DAGService for data retreival
func NewDagReader(ctx context.Context, n *mdag.Node, serv mdag.DAGService) (*DagReader, error) {
pb := new(ftpb.Data)
if err := proto.Unmarshal(n.Data, pb); err != nil {
return nil, err
}
switch pb.GetType() {
case ftpb.Data_Directory:
// Dont allow reading directories
return nil, ErrIsDir
case ftpb.Data_Raw:
fallthrough
case ftpb.Data_File:
return NewDataFileReader(ctx, n, pb, serv), nil
case ftpb.Data_Metadata:
if len(n.Links) == 0 {
return nil, errors.New("incorrectly formatted metadata object")
}
child, err := n.Links[0].GetNode(ctx, serv)
if err != nil {
return nil, err
}
return NewDagReader(ctx, child, serv)
case ftpb.Data_Symlink:
return nil, ErrCantReadSymlinks
default:
return nil, ft.ErrUnrecognizedType
}
}
func NewDataFileReader(ctx context.Context, n *mdag.Node, pb *ftpb.Data, serv mdag.DAGService) *DagReader {
fctx, cancel := context.WithCancel(ctx)
promises := serv.GetDAG(fctx, n)
return &DagReader{
node: n,
serv: serv,
buf: NewRSNCFromBytes(pb.GetData()),
promises: promises,
ctx: fctx,
cancel: cancel,
pbdata: pb,
}
}
// precalcNextBuf follows the next link in line and loads it from the DAGService,
// setting the next buffer to read from
func (dr *DagReader) precalcNextBuf(ctx context.Context) error {
dr.buf.Close() // Just to make sure
if dr.linkPosition >= len(dr.promises) {
return io.EOF
}
nxt, err := dr.promises[dr.linkPosition].Get(ctx)
if err != nil {
return err
}
dr.linkPosition++
pb := new(ftpb.Data)
err = proto.Unmarshal(nxt.Data, pb)
if err != nil {
return fmt.Errorf("incorrectly formatted protobuf: %s", err)
}
switch pb.GetType() {
case ftpb.Data_Directory:
// A directory should not exist within a file
return ft.ErrInvalidDirLocation
case ftpb.Data_File:
dr.buf = NewDataFileReader(dr.ctx, nxt, pb, dr.serv)
return nil
case ftpb.Data_Raw:
dr.buf = NewRSNCFromBytes(pb.GetData())
return nil
case ftpb.Data_Metadata:
return errors.New("Shouldnt have had metadata object inside file")
case ftpb.Data_Symlink:
return errors.New("shouldnt have had symlink inside file")
default:
return ft.ErrUnrecognizedType
}
}
// Size return the total length of the data from the DAG structured file.
func (dr *DagReader) Size() uint64 {
return dr.pbdata.GetFilesize()
}
// Read reads data from the DAG structured file
func (dr *DagReader) Read(b []byte) (int, error) {
return dr.CtxReadFull(dr.ctx, b)
}
// CtxReadFull reads data from the DAG structured file
func (dr *DagReader) CtxReadFull(ctx context.Context, b []byte) (int, error) {
// If no cached buffer, load one
total := 0
for {
// Attempt to fill bytes from cached buffer
n, err := dr.buf.Read(b[total:])
total += n
dr.offset += int64(n)
if err != nil {
// EOF is expected
if err != io.EOF {
return total, err
}
}
// If weve read enough bytes, return
if total == len(b) {
return total, nil
}
// Otherwise, load up the next block
err = dr.precalcNextBuf(ctx)
if err != nil {
return total, err
}
}
}
func (dr *DagReader) WriteTo(w io.Writer) (int64, error) {
// If no cached buffer, load one
total := int64(0)
for {
// Attempt to write bytes from cached buffer
n, err := dr.buf.WriteTo(w)
total += n
dr.offset += n
if err != nil {
if err != io.EOF {
return total, err
}
}
// Otherwise, load up the next block
err = dr.precalcNextBuf(dr.ctx)
if err != nil {
if err == io.EOF {
return total, nil
}
return total, err
}
}
}
func (dr *DagReader) Close() error {
dr.cancel()
return nil
}
// Seek implements io.Seeker, and will seek to a given offset in the file
// interface matches standard unix seek
// TODO: check if we can do relative seeks, to reduce the amount of dagreader
// recreations that need to happen.
func (dr *DagReader) Seek(offset int64, whence int) (int64, error) {
switch whence {
case os.SEEK_SET:
if offset < 0 {
return -1, errors.New("Invalid offset")
}
// Grab cached protobuf object (solely to make code look cleaner)
pb := dr.pbdata
// left represents the number of bytes remaining to seek to (from beginning)
left := offset
if int64(len(pb.Data)) >= offset {
// Close current buf to close potential child dagreader
dr.buf.Close()
dr.buf = NewRSNCFromBytes(pb.GetData()[offset:])
// start reading links from the beginning
dr.linkPosition = 0
dr.offset = offset
return offset, nil
} else {
// skip past root block data
left -= int64(len(pb.Data))
}
// iterate through links and find where we need to be
for i := 0; i < len(pb.Blocksizes); i++ {
if pb.Blocksizes[i] > uint64(left) {
dr.linkPosition = i
break
} else {
left -= int64(pb.Blocksizes[i])
}
}
// start sub-block request
err := dr.precalcNextBuf(dr.ctx)
if err != nil {
return 0, err
}
// set proper offset within child readseeker
n, err := dr.buf.Seek(left, os.SEEK_SET)
if err != nil {
return -1, err
}
// sanity
left -= n
if left != 0 {
return -1, errors.New("failed to seek properly")
}
dr.offset = offset
return offset, nil
case os.SEEK_CUR:
// TODO: be smarter here
noffset := dr.offset + offset
return dr.Seek(noffset, os.SEEK_SET)
case os.SEEK_END:
noffset := int64(dr.pbdata.GetFilesize()) - offset
return dr.Seek(noffset, os.SEEK_SET)
default:
return 0, errors.New("invalid whence")
}
return 0, nil
}
// readSeekNopCloser wraps a bytes.Reader to implement ReadSeekCloser
type readSeekNopCloser struct {
*bytes.Reader
}
func NewRSNCFromBytes(b []byte) ReadSeekCloser {
return &readSeekNopCloser{bytes.NewReader(b)}
}
func (r *readSeekNopCloser) Close() error { return nil }